1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38 TableProperties, parse_metadata_file_compression,
39};
40use crate::catalog::MetadataLocation;
41use crate::compression::CompressionCodec;
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
51pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
52
53pub const INITIAL_ROW_ID: u64 = 0;
55pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
57pub type TableMetadataRef = Arc<TableMetadata>;
59
60#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
61#[serde(try_from = "TableMetadataEnum")]
62pub struct TableMetadata {
67 pub(crate) format_version: FormatVersion,
69 pub(crate) table_uuid: Uuid,
71 pub(crate) location: String,
73 pub(crate) last_sequence_number: i64,
75 pub(crate) last_updated_ms: i64,
77 pub(crate) last_column_id: i32,
79 pub(crate) schemas: HashMap<i32, SchemaRef>,
81 pub(crate) current_schema_id: i32,
83 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
85 pub(crate) default_spec: PartitionSpecRef,
87 pub(crate) default_partition_type: StructType,
89 pub(crate) last_partition_id: i32,
91 pub(crate) properties: HashMap<String, String>,
95 pub(crate) current_snapshot_id: Option<i64>,
98 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
103 pub(crate) snapshot_log: Vec<SnapshotLog>,
110
111 pub(crate) metadata_log: Vec<MetadataLog>,
118
119 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
121 pub(crate) default_sort_order_id: i64,
125 pub(crate) refs: HashMap<String, SnapshotReference>,
130 pub(crate) statistics: HashMap<i64, StatisticsFile>,
132 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
134 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
136 pub(crate) next_row_id: u64,
138}
139
140impl TableMetadata {
141 #[must_use]
148 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
149 TableMetadataBuilder::new_from_metadata(self, current_file_location)
150 }
151
152 #[inline]
154 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
155 self.partition_specs
156 .values()
157 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
158 }
159
160 #[inline]
162 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
163 self.schemas
164 .values()
165 .any(|schema| schema.field_by_name(name).is_some())
166 }
167
168 #[inline]
170 pub fn format_version(&self) -> FormatVersion {
171 self.format_version
172 }
173
174 #[inline]
176 pub fn uuid(&self) -> Uuid {
177 self.table_uuid
178 }
179
180 #[inline]
182 pub fn location(&self) -> &str {
183 self.location.as_str()
184 }
185
186 #[inline]
188 pub fn last_sequence_number(&self) -> i64 {
189 self.last_sequence_number
190 }
191
192 #[inline]
197 pub fn next_sequence_number(&self) -> i64 {
198 match self.format_version {
199 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
200 _ => self.last_sequence_number + 1,
201 }
202 }
203
204 #[inline]
206 pub fn last_column_id(&self) -> i32 {
207 self.last_column_id
208 }
209
210 #[inline]
212 pub fn last_partition_id(&self) -> i32 {
213 self.last_partition_id
214 }
215
216 #[inline]
218 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
219 timestamp_ms_to_utc(self.last_updated_ms)
220 }
221
222 #[inline]
224 pub fn last_updated_ms(&self) -> i64 {
225 self.last_updated_ms
226 }
227
228 #[inline]
230 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
231 self.schemas.values()
232 }
233
234 #[inline]
236 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
237 self.schemas.get(&schema_id)
238 }
239
240 #[inline]
242 pub fn current_schema(&self) -> &SchemaRef {
243 self.schema_by_id(self.current_schema_id)
244 .expect("Current schema id set, but not found in table metadata")
245 }
246
247 #[inline]
249 pub fn current_schema_id(&self) -> SchemaId {
250 self.current_schema_id
251 }
252
253 #[inline]
255 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
256 self.partition_specs.values()
257 }
258
259 #[inline]
261 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
262 self.partition_specs.get(&spec_id)
263 }
264
265 #[inline]
267 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
268 &self.default_spec
269 }
270
271 #[inline]
273 pub fn default_partition_type(&self) -> &StructType {
274 &self.default_partition_type
275 }
276
277 #[inline]
278 pub fn default_partition_spec_id(&self) -> i32 {
280 self.default_spec.spec_id()
281 }
282
283 #[inline]
285 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
286 self.snapshots.values()
287 }
288
289 #[inline]
291 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
292 self.snapshots.get(&snapshot_id)
293 }
294
295 #[inline]
297 pub fn history(&self) -> &[SnapshotLog] {
298 &self.snapshot_log
299 }
300
301 #[inline]
303 pub fn metadata_log(&self) -> &[MetadataLog] {
304 &self.metadata_log
305 }
306
307 #[inline]
309 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
310 self.current_snapshot_id.map(|s| {
311 self.snapshot_by_id(s)
312 .expect("Current snapshot id has been set, but doesn't exist in metadata")
313 })
314 }
315
316 #[inline]
318 pub fn current_snapshot_id(&self) -> Option<i64> {
319 self.current_snapshot_id
320 }
321
322 #[inline]
325 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
326 self.refs.get(ref_name).map(|r| {
327 self.snapshot_by_id(r.snapshot_id)
328 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
329 })
330 }
331
332 #[inline]
334 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
335 self.sort_orders.values()
336 }
337
338 #[inline]
340 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
341 self.sort_orders.get(&sort_order_id)
342 }
343
344 #[inline]
346 pub fn default_sort_order(&self) -> &SortOrderRef {
347 self.sort_orders
348 .get(&self.default_sort_order_id)
349 .expect("Default order id has been set, but not found in table metadata!")
350 }
351
352 #[inline]
354 pub fn default_sort_order_id(&self) -> i64 {
355 self.default_sort_order_id
356 }
357
358 #[inline]
360 pub fn properties(&self) -> &HashMap<String, String> {
361 &self.properties
362 }
363
364 pub fn metadata_compression_codec(&self) -> Result<CompressionCodec> {
373 parse_metadata_file_compression(&self.properties)
374 }
375
376 pub fn table_properties(&self) -> Result<TableProperties> {
378 TableProperties::try_from(&self.properties).map_err(|e| {
379 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
380 })
381 }
382
383 #[inline]
385 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
386 self.statistics.values()
387 }
388
389 #[inline]
391 pub fn partition_statistics_iter(
392 &self,
393 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
394 self.partition_statistics.values()
395 }
396
397 #[inline]
399 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
400 self.statistics.get(&snapshot_id)
401 }
402
403 #[inline]
405 pub fn partition_statistics_for_snapshot(
406 &self,
407 snapshot_id: i64,
408 ) -> Option<&PartitionStatisticsFile> {
409 self.partition_statistics.get(&snapshot_id)
410 }
411
412 fn construct_refs(&mut self) {
413 if let Some(current_snapshot_id) = self.current_snapshot_id
414 && !self.refs.contains_key(MAIN_BRANCH)
415 {
416 self.refs
417 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
418 snapshot_id: current_snapshot_id,
419 retention: SnapshotRetention::Branch {
420 min_snapshots_to_keep: None,
421 max_snapshot_age_ms: None,
422 max_ref_age_ms: None,
423 },
424 });
425 }
426 }
427
428 #[inline]
430 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
431 self.encryption_keys.values()
432 }
433
434 #[inline]
436 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
437 self.encryption_keys.get(key_id)
438 }
439
440 #[inline]
442 pub fn next_row_id(&self) -> u64 {
443 self.next_row_id
444 }
445
446 pub async fn read_from(
448 file_io: &FileIO,
449 metadata_location: impl AsRef<str>,
450 ) -> Result<TableMetadata> {
451 let metadata_location = metadata_location.as_ref();
452 let input_file = file_io.new_input(metadata_location)?;
453 let metadata_content = input_file.read().await?;
454
455 let metadata = if metadata_content.len() > 2
457 && metadata_content[0] == 0x1F
458 && metadata_content[1] == 0x8B
459 {
460 let decompressed_data = CompressionCodec::Gzip
461 .decompress(metadata_content.to_vec())
462 .map_err(|e| {
463 Error::new(
464 ErrorKind::DataInvalid,
465 "Trying to read compressed metadata file",
466 )
467 .with_context("file_path", metadata_location)
468 .with_source(e)
469 })?;
470 serde_json::from_slice(&decompressed_data)?
471 } else {
472 serde_json::from_slice(&metadata_content)?
473 };
474
475 Ok(metadata)
476 }
477
478 pub async fn write_to(
480 &self,
481 file_io: &FileIO,
482 metadata_location: &MetadataLocation,
483 ) -> Result<()> {
484 let json_data = serde_json::to_vec(self)?;
485
486 let codec = parse_metadata_file_compression(&self.properties)?;
488
489 if codec != metadata_location.compression_codec() {
490 return Err(Error::new(
491 ErrorKind::DataInvalid,
492 format!(
493 "Compression codec mismatch: metadata_location has {:?}, but table properties specify {:?}",
494 metadata_location.compression_codec(),
495 codec
496 ),
497 ));
498 }
499
500 let data_to_write = match codec {
502 CompressionCodec::Gzip => codec.compress(json_data)?,
503 CompressionCodec::None => json_data,
504 _ => {
505 return Err(Error::new(
506 ErrorKind::DataInvalid,
507 format!("Unsupported metadata compression codec: {codec:?}"),
508 ));
509 }
510 };
511
512 file_io
513 .new_output(metadata_location.to_string())?
514 .write(data_to_write.into())
515 .await
516 }
517
518 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
526 self.validate_current_schema()?;
527 self.normalize_current_snapshot()?;
528 self.construct_refs();
529 self.validate_refs()?;
530 self.validate_chronological_snapshot_logs()?;
531 self.validate_chronological_metadata_logs()?;
532 self.location = self.location.trim_end_matches('/').to_string();
534 self.validate_snapshot_sequence_number()?;
535 self.try_normalize_partition_spec()?;
536 self.try_normalize_sort_order()?;
537 Ok(self)
538 }
539
540 fn try_normalize_partition_spec(&mut self) -> Result<()> {
542 if self
543 .partition_spec_by_id(self.default_spec.spec_id())
544 .is_none()
545 {
546 self.partition_specs.insert(
547 self.default_spec.spec_id(),
548 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
549 );
550 }
551
552 Ok(())
553 }
554
555 fn try_normalize_sort_order(&mut self) -> Result<()> {
557 if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
559 && !sort_order.fields.is_empty()
560 {
561 return Err(Error::new(
562 ErrorKind::Unexpected,
563 format!(
564 "Sort order ID {} is reserved for unsorted order",
565 SortOrder::UNSORTED_ORDER_ID
566 ),
567 ));
568 }
569
570 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
571 return Ok(());
572 }
573
574 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
575 return Err(Error::new(
576 ErrorKind::DataInvalid,
577 format!(
578 "No sort order exists with the default sort order id {}.",
579 self.default_sort_order_id
580 ),
581 ));
582 }
583
584 let sort_order = SortOrder::unsorted_order();
585 self.sort_orders
586 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
587 Ok(())
588 }
589
590 fn validate_current_schema(&self) -> Result<()> {
592 if self.schema_by_id(self.current_schema_id).is_none() {
593 return Err(Error::new(
594 ErrorKind::DataInvalid,
595 format!(
596 "No schema exists with the current schema id {}.",
597 self.current_schema_id
598 ),
599 ));
600 }
601 Ok(())
602 }
603
604 fn normalize_current_snapshot(&mut self) -> Result<()> {
606 if let Some(current_snapshot_id) = self.current_snapshot_id {
607 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
608 self.current_snapshot_id = None;
609 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
610 return Err(Error::new(
611 ErrorKind::DataInvalid,
612 format!(
613 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
614 ),
615 ));
616 }
617 }
618 Ok(())
619 }
620
621 fn validate_refs(&self) -> Result<()> {
623 for (name, snapshot_ref) in self.refs.iter() {
624 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
625 return Err(Error::new(
626 ErrorKind::DataInvalid,
627 format!(
628 "Snapshot for reference {name} does not exist in the existing snapshots list"
629 ),
630 ));
631 }
632 }
633
634 let main_ref = self.refs.get(MAIN_BRANCH);
635 if self.current_snapshot_id.is_some() {
636 if let Some(main_ref) = main_ref
637 && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
638 {
639 return Err(Error::new(
640 ErrorKind::DataInvalid,
641 format!(
642 "Current snapshot id does not match main branch ({:?} != {:?})",
643 self.current_snapshot_id.unwrap_or_default(),
644 main_ref.snapshot_id
645 ),
646 ));
647 }
648 } else if main_ref.is_some() {
649 return Err(Error::new(
650 ErrorKind::DataInvalid,
651 "Current snapshot is not set, but main branch exists",
652 ));
653 }
654
655 Ok(())
656 }
657
658 fn validate_snapshot_sequence_number(&self) -> Result<()> {
660 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
661 return Err(Error::new(
662 ErrorKind::DataInvalid,
663 format!(
664 "Last sequence number must be 0 in v1. Found {}",
665 self.last_sequence_number
666 ),
667 ));
668 }
669
670 if self.format_version >= FormatVersion::V2
671 && let Some(snapshot) = self
672 .snapshots
673 .values()
674 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
675 {
676 return Err(Error::new(
677 ErrorKind::DataInvalid,
678 format!(
679 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
680 snapshot.snapshot_id(),
681 snapshot.sequence_number(),
682 self.last_sequence_number
683 ),
684 ));
685 }
686
687 Ok(())
688 }
689
690 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
692 for window in self.snapshot_log.windows(2) {
693 let (prev, curr) = (&window[0], &window[1]);
694 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
697 return Err(Error::new(
698 ErrorKind::DataInvalid,
699 "Expected sorted snapshot log entries",
700 ));
701 }
702 }
703
704 if let Some(last) = self.snapshot_log.last() {
705 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
708 return Err(Error::new(
709 ErrorKind::DataInvalid,
710 format!(
711 "Invalid update timestamp {}: before last snapshot log entry at {}",
712 self.last_updated_ms, last.timestamp_ms
713 ),
714 ));
715 }
716 }
717 Ok(())
718 }
719
720 fn validate_chronological_metadata_logs(&self) -> Result<()> {
721 for window in self.metadata_log.windows(2) {
722 let (prev, curr) = (&window[0], &window[1]);
723 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
726 return Err(Error::new(
727 ErrorKind::DataInvalid,
728 "Expected sorted metadata log entries",
729 ));
730 }
731 }
732
733 if let Some(last) = self.metadata_log.last() {
734 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
737 return Err(Error::new(
738 ErrorKind::DataInvalid,
739 format!(
740 "Invalid update timestamp {}: before last metadata log entry at {}",
741 self.last_updated_ms, last.timestamp_ms
742 ),
743 ));
744 }
745 }
746
747 Ok(())
748 }
749}
750
751pub(super) mod _serde {
752 use std::borrow::BorrowMut;
753 use std::collections::HashMap;
758 use std::sync::Arc;
763
764 use serde::{Deserialize, Serialize};
765 use uuid::Uuid;
766
767 use super::{
768 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
769 TableMetadata,
770 };
771 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
772 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
773 use crate::spec::{
774 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
775 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
776 SortOrder, StatisticsFile,
777 };
778 use crate::{Error, ErrorKind};
779
780 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
781 #[serde(untagged)]
782 pub(super) enum TableMetadataEnum {
783 V3(TableMetadataV3),
784 V2(TableMetadataV2),
785 V1(TableMetadataV1),
786 }
787
788 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
789 #[serde(rename_all = "kebab-case")]
790 pub(super) struct TableMetadataV3 {
792 pub format_version: VersionNumber<3>,
793 #[serde(flatten)]
794 pub shared: TableMetadataV2V3Shared,
795 pub next_row_id: u64,
796 #[serde(skip_serializing_if = "Option::is_none")]
797 pub encryption_keys: Option<Vec<EncryptedKey>>,
798 #[serde(skip_serializing_if = "Option::is_none")]
799 pub snapshots: Option<Vec<SnapshotV3>>,
800 }
801
802 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
803 #[serde(rename_all = "kebab-case")]
804 pub(super) struct TableMetadataV2V3Shared {
806 pub table_uuid: Uuid,
807 pub location: String,
808 pub last_sequence_number: i64,
809 pub last_updated_ms: i64,
810 pub last_column_id: i32,
811 pub schemas: Vec<SchemaV2>,
812 pub current_schema_id: i32,
813 pub partition_specs: Vec<PartitionSpec>,
814 pub default_spec_id: i32,
815 pub last_partition_id: i32,
816 #[serde(skip_serializing_if = "Option::is_none")]
817 pub properties: Option<HashMap<String, String>>,
818 #[serde(skip_serializing_if = "Option::is_none")]
819 pub current_snapshot_id: Option<i64>,
820 #[serde(skip_serializing_if = "Option::is_none")]
821 pub snapshot_log: Option<Vec<SnapshotLog>>,
822 #[serde(skip_serializing_if = "Option::is_none")]
823 pub metadata_log: Option<Vec<MetadataLog>>,
824 pub sort_orders: Vec<SortOrder>,
825 pub default_sort_order_id: i64,
826 #[serde(skip_serializing_if = "Option::is_none")]
827 pub refs: Option<HashMap<String, SnapshotReference>>,
828 #[serde(default, skip_serializing_if = "Vec::is_empty")]
829 pub statistics: Vec<StatisticsFile>,
830 #[serde(default, skip_serializing_if = "Vec::is_empty")]
831 pub partition_statistics: Vec<PartitionStatisticsFile>,
832 }
833
834 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
835 #[serde(rename_all = "kebab-case")]
836 pub(super) struct TableMetadataV2 {
838 pub format_version: VersionNumber<2>,
839 #[serde(flatten)]
840 pub shared: TableMetadataV2V3Shared,
841 #[serde(skip_serializing_if = "Option::is_none")]
842 pub snapshots: Option<Vec<SnapshotV2>>,
843 }
844
845 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
846 #[serde(rename_all = "kebab-case")]
847 pub(super) struct TableMetadataV1 {
849 pub format_version: VersionNumber<1>,
850 #[serde(skip_serializing_if = "Option::is_none")]
851 pub table_uuid: Option<Uuid>,
852 pub location: String,
853 pub last_updated_ms: i64,
854 pub last_column_id: i32,
855 pub schema: Option<SchemaV1>,
857 #[serde(skip_serializing_if = "Option::is_none")]
858 pub schemas: Option<Vec<SchemaV1>>,
859 #[serde(skip_serializing_if = "Option::is_none")]
860 pub current_schema_id: Option<i32>,
861 pub partition_spec: Option<Vec<PartitionField>>,
863 #[serde(skip_serializing_if = "Option::is_none")]
864 pub partition_specs: Option<Vec<PartitionSpec>>,
865 #[serde(skip_serializing_if = "Option::is_none")]
866 pub default_spec_id: Option<i32>,
867 #[serde(skip_serializing_if = "Option::is_none")]
868 pub last_partition_id: Option<i32>,
869 #[serde(skip_serializing_if = "Option::is_none")]
870 pub properties: Option<HashMap<String, String>>,
871 #[serde(skip_serializing_if = "Option::is_none")]
872 pub current_snapshot_id: Option<i64>,
873 #[serde(skip_serializing_if = "Option::is_none")]
874 pub snapshots: Option<Vec<SnapshotV1>>,
875 #[serde(skip_serializing_if = "Option::is_none")]
876 pub snapshot_log: Option<Vec<SnapshotLog>>,
877 #[serde(skip_serializing_if = "Option::is_none")]
878 pub metadata_log: Option<Vec<MetadataLog>>,
879 pub sort_orders: Option<Vec<SortOrder>>,
880 pub default_sort_order_id: Option<i64>,
881 #[serde(default, skip_serializing_if = "Vec::is_empty")]
882 pub statistics: Vec<StatisticsFile>,
883 #[serde(default, skip_serializing_if = "Vec::is_empty")]
884 pub partition_statistics: Vec<PartitionStatisticsFile>,
885 }
886
887 #[derive(Debug, PartialEq, Eq)]
889 pub(crate) struct VersionNumber<const V: u8>;
890
891 impl Serialize for TableMetadata {
892 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
893 where S: serde::Serializer {
894 let table_metadata_enum: TableMetadataEnum =
896 self.clone().try_into().map_err(serde::ser::Error::custom)?;
897
898 table_metadata_enum.serialize(serializer)
899 }
900 }
901
902 impl<const V: u8> Serialize for VersionNumber<V> {
903 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
904 where S: serde::Serializer {
905 serializer.serialize_u8(V)
906 }
907 }
908
909 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
910 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
911 where D: serde::Deserializer<'de> {
912 let value = u8::deserialize(deserializer)?;
913 if value == V {
914 Ok(VersionNumber::<V>)
915 } else {
916 Err(serde::de::Error::custom("Invalid Version"))
917 }
918 }
919 }
920
921 impl TryFrom<TableMetadataEnum> for TableMetadata {
922 type Error = Error;
923 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
924 match value {
925 TableMetadataEnum::V3(value) => value.try_into(),
926 TableMetadataEnum::V2(value) => value.try_into(),
927 TableMetadataEnum::V1(value) => value.try_into(),
928 }
929 }
930 }
931
932 impl TryFrom<TableMetadata> for TableMetadataEnum {
933 type Error = Error;
934 fn try_from(value: TableMetadata) -> Result<Self, Error> {
935 Ok(match value.format_version {
936 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
937 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
938 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
939 })
940 }
941 }
942
943 impl TryFrom<TableMetadataV3> for TableMetadata {
944 type Error = Error;
945 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
946 let TableMetadataV3 {
947 format_version: _,
948 shared: value,
949 next_row_id,
950 encryption_keys,
951 snapshots,
952 } = value;
953 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
954 None
955 } else {
956 value.current_snapshot_id
957 };
958 let schemas = HashMap::from_iter(
959 value
960 .schemas
961 .into_iter()
962 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
963 .collect::<Result<Vec<_>, Error>>()?,
964 );
965
966 let current_schema: &SchemaRef =
967 schemas.get(&value.current_schema_id).ok_or_else(|| {
968 Error::new(
969 ErrorKind::DataInvalid,
970 format!(
971 "No schema exists with the current schema id {}.",
972 value.current_schema_id
973 ),
974 )
975 })?;
976 let partition_specs = HashMap::from_iter(
977 value
978 .partition_specs
979 .into_iter()
980 .map(|x| (x.spec_id(), Arc::new(x))),
981 );
982 let default_spec_id = value.default_spec_id;
983 let default_spec: PartitionSpecRef = partition_specs
984 .get(&value.default_spec_id)
985 .map(|spec| (**spec).clone())
986 .or_else(|| {
987 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
988 .then(PartitionSpec::unpartition_spec)
989 })
990 .ok_or_else(|| {
991 Error::new(
992 ErrorKind::DataInvalid,
993 format!("Default partition spec {default_spec_id} not found"),
994 )
995 })?
996 .into();
997 let default_partition_type = default_spec.partition_type(current_schema)?;
998
999 let mut metadata = TableMetadata {
1000 format_version: FormatVersion::V3,
1001 table_uuid: value.table_uuid,
1002 location: value.location,
1003 last_sequence_number: value.last_sequence_number,
1004 last_updated_ms: value.last_updated_ms,
1005 last_column_id: value.last_column_id,
1006 current_schema_id: value.current_schema_id,
1007 schemas,
1008 partition_specs,
1009 default_partition_type,
1010 default_spec,
1011 last_partition_id: value.last_partition_id,
1012 properties: value.properties.unwrap_or_default(),
1013 current_snapshot_id,
1014 snapshots: snapshots
1015 .map(|snapshots| {
1016 HashMap::from_iter(
1017 snapshots
1018 .into_iter()
1019 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1020 )
1021 })
1022 .unwrap_or_default(),
1023 snapshot_log: value.snapshot_log.unwrap_or_default(),
1024 metadata_log: value.metadata_log.unwrap_or_default(),
1025 sort_orders: HashMap::from_iter(
1026 value
1027 .sort_orders
1028 .into_iter()
1029 .map(|x| (x.order_id, Arc::new(x))),
1030 ),
1031 default_sort_order_id: value.default_sort_order_id,
1032 refs: value.refs.unwrap_or_else(|| {
1033 if let Some(snapshot_id) = current_snapshot_id {
1034 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1035 snapshot_id,
1036 retention: SnapshotRetention::Branch {
1037 min_snapshots_to_keep: None,
1038 max_snapshot_age_ms: None,
1039 max_ref_age_ms: None,
1040 },
1041 })])
1042 } else {
1043 HashMap::new()
1044 }
1045 }),
1046 statistics: index_statistics(value.statistics),
1047 partition_statistics: index_partition_statistics(value.partition_statistics),
1048 encryption_keys: encryption_keys
1049 .map(|keys| {
1050 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1051 })
1052 .unwrap_or_default(),
1053 next_row_id,
1054 };
1055
1056 metadata.borrow_mut().try_normalize()?;
1057 Ok(metadata)
1058 }
1059 }
1060
1061 impl TryFrom<TableMetadataV2> for TableMetadata {
1062 type Error = Error;
1063 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1064 let snapshots = value.snapshots;
1065 let value = value.shared;
1066 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1067 None
1068 } else {
1069 value.current_snapshot_id
1070 };
1071 let schemas = HashMap::from_iter(
1072 value
1073 .schemas
1074 .into_iter()
1075 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1076 .collect::<Result<Vec<_>, Error>>()?,
1077 );
1078
1079 let current_schema: &SchemaRef =
1080 schemas.get(&value.current_schema_id).ok_or_else(|| {
1081 Error::new(
1082 ErrorKind::DataInvalid,
1083 format!(
1084 "No schema exists with the current schema id {}.",
1085 value.current_schema_id
1086 ),
1087 )
1088 })?;
1089 let partition_specs = HashMap::from_iter(
1090 value
1091 .partition_specs
1092 .into_iter()
1093 .map(|x| (x.spec_id(), Arc::new(x))),
1094 );
1095 let default_spec_id = value.default_spec_id;
1096 let default_spec: PartitionSpecRef = partition_specs
1097 .get(&value.default_spec_id)
1098 .map(|spec| (**spec).clone())
1099 .or_else(|| {
1100 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1101 .then(PartitionSpec::unpartition_spec)
1102 })
1103 .ok_or_else(|| {
1104 Error::new(
1105 ErrorKind::DataInvalid,
1106 format!("Default partition spec {default_spec_id} not found"),
1107 )
1108 })?
1109 .into();
1110 let default_partition_type = default_spec.partition_type(current_schema)?;
1111
1112 let mut metadata = TableMetadata {
1113 format_version: FormatVersion::V2,
1114 table_uuid: value.table_uuid,
1115 location: value.location,
1116 last_sequence_number: value.last_sequence_number,
1117 last_updated_ms: value.last_updated_ms,
1118 last_column_id: value.last_column_id,
1119 current_schema_id: value.current_schema_id,
1120 schemas,
1121 partition_specs,
1122 default_partition_type,
1123 default_spec,
1124 last_partition_id: value.last_partition_id,
1125 properties: value.properties.unwrap_or_default(),
1126 current_snapshot_id,
1127 snapshots: snapshots
1128 .map(|snapshots| {
1129 HashMap::from_iter(
1130 snapshots
1131 .into_iter()
1132 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1133 )
1134 })
1135 .unwrap_or_default(),
1136 snapshot_log: value.snapshot_log.unwrap_or_default(),
1137 metadata_log: value.metadata_log.unwrap_or_default(),
1138 sort_orders: HashMap::from_iter(
1139 value
1140 .sort_orders
1141 .into_iter()
1142 .map(|x| (x.order_id, Arc::new(x))),
1143 ),
1144 default_sort_order_id: value.default_sort_order_id,
1145 refs: value.refs.unwrap_or_else(|| {
1146 if let Some(snapshot_id) = current_snapshot_id {
1147 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1148 snapshot_id,
1149 retention: SnapshotRetention::Branch {
1150 min_snapshots_to_keep: None,
1151 max_snapshot_age_ms: None,
1152 max_ref_age_ms: None,
1153 },
1154 })])
1155 } else {
1156 HashMap::new()
1157 }
1158 }),
1159 statistics: index_statistics(value.statistics),
1160 partition_statistics: index_partition_statistics(value.partition_statistics),
1161 encryption_keys: HashMap::new(),
1162 next_row_id: INITIAL_ROW_ID,
1163 };
1164
1165 metadata.borrow_mut().try_normalize()?;
1166 Ok(metadata)
1167 }
1168 }
1169
1170 impl TryFrom<TableMetadataV1> for TableMetadata {
1171 type Error = Error;
1172 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1173 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1174 None
1175 } else {
1176 value.current_snapshot_id
1177 };
1178
1179 let (schemas, current_schema_id, current_schema) =
1180 if let (Some(schemas_vec), Some(schema_id)) =
1181 (&value.schemas, value.current_schema_id)
1182 {
1183 let schema_map = HashMap::from_iter(
1185 schemas_vec
1186 .clone()
1187 .into_iter()
1188 .map(|schema| {
1189 let schema: Schema = schema.try_into()?;
1190 Ok((schema.schema_id(), Arc::new(schema)))
1191 })
1192 .collect::<Result<Vec<_>, Error>>()?,
1193 );
1194
1195 let schema = schema_map
1196 .get(&schema_id)
1197 .ok_or_else(|| {
1198 Error::new(
1199 ErrorKind::DataInvalid,
1200 format!("No schema exists with the current schema id {schema_id}."),
1201 )
1202 })?
1203 .clone();
1204 (schema_map, schema_id, schema)
1205 } else if let Some(schema) = value.schema {
1206 let schema: Schema = schema.try_into()?;
1208 let schema_id = schema.schema_id();
1209 let schema_arc = Arc::new(schema);
1210 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1211 (schema_map, schema_id, schema_arc)
1212 } else {
1213 return Err(Error::new(
1215 ErrorKind::DataInvalid,
1216 "No valid schema configuration found in table metadata",
1217 ));
1218 };
1219
1220 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1222 specs_vec
1224 .into_iter()
1225 .map(|x| (x.spec_id(), Arc::new(x)))
1226 .collect::<HashMap<_, _>>()
1227 } else if let Some(partition_spec) = value.partition_spec {
1228 let spec = PartitionSpec::builder(current_schema.clone())
1230 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1231 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1232 .build()?;
1233
1234 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1235 } else {
1236 let spec = PartitionSpec::builder(current_schema.clone())
1238 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1239 .build()?;
1240
1241 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1242 };
1243
1244 let default_spec_id = value
1246 .default_spec_id
1247 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1248
1249 let default_spec: PartitionSpecRef = partition_specs
1251 .get(&default_spec_id)
1252 .map(|x| Arc::unwrap_or_clone(x.clone()))
1253 .ok_or_else(|| {
1254 Error::new(
1255 ErrorKind::DataInvalid,
1256 format!("Default partition spec {default_spec_id} not found"),
1257 )
1258 })?
1259 .into();
1260 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1261
1262 let mut metadata = TableMetadata {
1263 format_version: FormatVersion::V1,
1264 table_uuid: value.table_uuid.unwrap_or_default(),
1265 location: value.location,
1266 last_sequence_number: 0,
1267 last_updated_ms: value.last_updated_ms,
1268 last_column_id: value.last_column_id,
1269 current_schema_id,
1270 default_spec,
1271 default_partition_type,
1272 last_partition_id: value
1273 .last_partition_id
1274 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1275 partition_specs,
1276 schemas,
1277 properties: value.properties.unwrap_or_default(),
1278 current_snapshot_id,
1279 snapshots: value
1280 .snapshots
1281 .map(|snapshots| {
1282 Ok::<_, Error>(HashMap::from_iter(
1283 snapshots
1284 .into_iter()
1285 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1286 .collect::<Result<Vec<_>, Error>>()?,
1287 ))
1288 })
1289 .transpose()?
1290 .unwrap_or_default(),
1291 snapshot_log: value.snapshot_log.unwrap_or_default(),
1292 metadata_log: value.metadata_log.unwrap_or_default(),
1293 sort_orders: match value.sort_orders {
1294 Some(sort_orders) => HashMap::from_iter(
1295 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1296 ),
1297 None => HashMap::new(),
1298 },
1299 default_sort_order_id: value
1300 .default_sort_order_id
1301 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1302 refs: if let Some(snapshot_id) = current_snapshot_id {
1303 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1304 snapshot_id,
1305 retention: SnapshotRetention::Branch {
1306 min_snapshots_to_keep: None,
1307 max_snapshot_age_ms: None,
1308 max_ref_age_ms: None,
1309 },
1310 })])
1311 } else {
1312 HashMap::new()
1313 },
1314 statistics: index_statistics(value.statistics),
1315 partition_statistics: index_partition_statistics(value.partition_statistics),
1316 encryption_keys: HashMap::new(),
1317 next_row_id: INITIAL_ROW_ID, };
1319
1320 metadata.borrow_mut().try_normalize()?;
1321 Ok(metadata)
1322 }
1323 }
1324
1325 impl TryFrom<TableMetadata> for TableMetadataV3 {
1326 type Error = Error;
1327
1328 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1329 let next_row_id = v.next_row_id;
1330 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1331 let snapshots = std::mem::take(&mut v.snapshots);
1332 let shared = v.into();
1333
1334 Ok(TableMetadataV3 {
1335 format_version: VersionNumber::<3>,
1336 shared,
1337 next_row_id,
1338 encryption_keys: if encryption_keys.is_empty() {
1339 None
1340 } else {
1341 Some(encryption_keys.into_values().collect())
1342 },
1343 snapshots: if snapshots.is_empty() {
1344 None
1345 } else {
1346 Some(
1347 snapshots
1348 .into_values()
1349 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1350 .collect::<Result<_, _>>()?,
1351 )
1352 },
1353 })
1354 }
1355 }
1356
1357 impl From<TableMetadata> for TableMetadataV2 {
1358 fn from(mut v: TableMetadata) -> Self {
1359 let snapshots = std::mem::take(&mut v.snapshots);
1360 let shared = v.into();
1361
1362 TableMetadataV2 {
1363 format_version: VersionNumber::<2>,
1364 shared,
1365 snapshots: if snapshots.is_empty() {
1366 None
1367 } else {
1368 Some(
1369 snapshots
1370 .into_values()
1371 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1372 .collect(),
1373 )
1374 },
1375 }
1376 }
1377 }
1378
1379 impl From<TableMetadata> for TableMetadataV2V3Shared {
1380 fn from(v: TableMetadata) -> Self {
1381 TableMetadataV2V3Shared {
1382 table_uuid: v.table_uuid,
1383 location: v.location,
1384 last_sequence_number: v.last_sequence_number,
1385 last_updated_ms: v.last_updated_ms,
1386 last_column_id: v.last_column_id,
1387 schemas: v
1388 .schemas
1389 .into_values()
1390 .map(|x| {
1391 Arc::try_unwrap(x)
1392 .unwrap_or_else(|schema| schema.as_ref().clone())
1393 .into()
1394 })
1395 .collect(),
1396 current_schema_id: v.current_schema_id,
1397 partition_specs: v
1398 .partition_specs
1399 .into_values()
1400 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1401 .collect(),
1402 default_spec_id: v.default_spec.spec_id(),
1403 last_partition_id: v.last_partition_id,
1404 properties: if v.properties.is_empty() {
1405 None
1406 } else {
1407 Some(v.properties)
1408 },
1409 current_snapshot_id: v.current_snapshot_id,
1410 snapshot_log: if v.snapshot_log.is_empty() {
1411 None
1412 } else {
1413 Some(v.snapshot_log)
1414 },
1415 metadata_log: if v.metadata_log.is_empty() {
1416 None
1417 } else {
1418 Some(v.metadata_log)
1419 },
1420 sort_orders: v
1421 .sort_orders
1422 .into_values()
1423 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1424 .collect(),
1425 default_sort_order_id: v.default_sort_order_id,
1426 refs: Some(v.refs),
1427 statistics: v.statistics.into_values().collect(),
1428 partition_statistics: v.partition_statistics.into_values().collect(),
1429 }
1430 }
1431 }
1432
1433 impl TryFrom<TableMetadata> for TableMetadataV1 {
1434 type Error = Error;
1435 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1436 Ok(TableMetadataV1 {
1437 format_version: VersionNumber::<1>,
1438 table_uuid: Some(v.table_uuid),
1439 location: v.location,
1440 last_updated_ms: v.last_updated_ms,
1441 last_column_id: v.last_column_id,
1442 schema: Some(
1443 v.schemas
1444 .get(&v.current_schema_id)
1445 .ok_or(Error::new(
1446 ErrorKind::Unexpected,
1447 "current_schema_id not found in schemas",
1448 ))?
1449 .as_ref()
1450 .clone()
1451 .into(),
1452 ),
1453 schemas: Some(
1454 v.schemas
1455 .into_values()
1456 .map(|x| {
1457 Arc::try_unwrap(x)
1458 .unwrap_or_else(|schema| schema.as_ref().clone())
1459 .into()
1460 })
1461 .collect(),
1462 ),
1463 current_schema_id: Some(v.current_schema_id),
1464 partition_spec: Some(v.default_spec.fields().to_vec()),
1465 partition_specs: Some(
1466 v.partition_specs
1467 .into_values()
1468 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1469 .collect(),
1470 ),
1471 default_spec_id: Some(v.default_spec.spec_id()),
1472 last_partition_id: Some(v.last_partition_id),
1473 properties: if v.properties.is_empty() {
1474 None
1475 } else {
1476 Some(v.properties)
1477 },
1478 current_snapshot_id: v.current_snapshot_id,
1479 snapshots: if v.snapshots.is_empty() {
1480 None
1481 } else {
1482 Some(
1483 v.snapshots
1484 .into_values()
1485 .map(|x| Snapshot::clone(&x).into())
1486 .collect(),
1487 )
1488 },
1489 snapshot_log: if v.snapshot_log.is_empty() {
1490 None
1491 } else {
1492 Some(v.snapshot_log)
1493 },
1494 metadata_log: if v.metadata_log.is_empty() {
1495 None
1496 } else {
1497 Some(v.metadata_log)
1498 },
1499 sort_orders: Some(
1500 v.sort_orders
1501 .into_values()
1502 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1503 .collect(),
1504 ),
1505 default_sort_order_id: Some(v.default_sort_order_id),
1506 statistics: v.statistics.into_values().collect(),
1507 partition_statistics: v.partition_statistics.into_values().collect(),
1508 })
1509 }
1510 }
1511
1512 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1513 statistics
1514 .into_iter()
1515 .rev()
1516 .map(|s| (s.snapshot_id, s))
1517 .collect()
1518 }
1519
1520 fn index_partition_statistics(
1521 statistics: Vec<PartitionStatisticsFile>,
1522 ) -> HashMap<i64, PartitionStatisticsFile> {
1523 statistics
1524 .into_iter()
1525 .rev()
1526 .map(|s| (s.snapshot_id, s))
1527 .collect()
1528 }
1529}
1530
1531#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1532#[repr(u8)]
1533pub enum FormatVersion {
1535 V1 = 1u8,
1537 V2 = 2u8,
1539 V3 = 3u8,
1541}
1542
1543impl PartialOrd for FormatVersion {
1544 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1545 Some(self.cmp(other))
1546 }
1547}
1548
1549impl Ord for FormatVersion {
1550 fn cmp(&self, other: &Self) -> Ordering {
1551 (*self as u8).cmp(&(*other as u8))
1552 }
1553}
1554
1555impl Display for FormatVersion {
1556 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1557 match self {
1558 FormatVersion::V1 => write!(f, "v1"),
1559 FormatVersion::V2 => write!(f, "v2"),
1560 FormatVersion::V3 => write!(f, "v3"),
1561 }
1562 }
1563}
1564
1565#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1566#[serde(rename_all = "kebab-case")]
1567pub struct MetadataLog {
1569 pub metadata_file: String,
1571 pub timestamp_ms: i64,
1573}
1574
1575#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1576#[serde(rename_all = "kebab-case")]
1577pub struct SnapshotLog {
1579 pub snapshot_id: i64,
1581 pub timestamp_ms: i64,
1583}
1584
1585impl SnapshotLog {
1586 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1588 timestamp_ms_to_utc(self.timestamp_ms)
1589 }
1590
1591 #[inline]
1593 pub fn timestamp_ms(&self) -> i64 {
1594 self.timestamp_ms
1595 }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600 use std::collections::HashMap;
1601 use std::fs;
1602 use std::sync::Arc;
1603
1604 use anyhow::Result;
1605 use base64::Engine as _;
1606 use pretty_assertions::assert_eq;
1607 use tempfile::TempDir;
1608 use uuid::Uuid;
1609
1610 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1611 use crate::catalog::MetadataLocation;
1612 use crate::compression::CompressionCodec;
1613 use crate::io::FileIO;
1614 use crate::spec::table_metadata::TableMetadata;
1615 use crate::spec::{
1616 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1617 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1618 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1619 Summary, TableProperties, Transform, Type, UnboundPartitionField,
1620 };
1621 use crate::{ErrorKind, TableCreation};
1622
1623 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1624 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1625 assert_eq!(desered_type, expected_type);
1626
1627 let sered_json = serde_json::to_string(&expected_type).unwrap();
1628 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1629
1630 assert_eq!(parsed_json_value, desered_type);
1631 }
1632
1633 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1634 let path = format!("testdata/table_metadata/{file_name}");
1635 let metadata: String = fs::read_to_string(path).unwrap();
1636
1637 serde_json::from_str(&metadata).unwrap()
1638 }
1639
1640 #[test]
1641 fn test_table_data_v2() {
1642 let data = r#"
1643 {
1644 "format-version" : 2,
1645 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1646 "location": "s3://b/wh/data.db/table",
1647 "last-sequence-number" : 1,
1648 "last-updated-ms": 1515100955770,
1649 "last-column-id": 1,
1650 "schemas": [
1651 {
1652 "schema-id" : 1,
1653 "type" : "struct",
1654 "fields" :[
1655 {
1656 "id": 1,
1657 "name": "struct_name",
1658 "required": true,
1659 "type": "fixed[1]"
1660 },
1661 {
1662 "id": 4,
1663 "name": "ts",
1664 "required": true,
1665 "type": "timestamp"
1666 }
1667 ]
1668 }
1669 ],
1670 "current-schema-id" : 1,
1671 "partition-specs": [
1672 {
1673 "spec-id": 0,
1674 "fields": [
1675 {
1676 "source-id": 4,
1677 "field-id": 1000,
1678 "name": "ts_day",
1679 "transform": "day"
1680 }
1681 ]
1682 }
1683 ],
1684 "default-spec-id": 0,
1685 "last-partition-id": 1000,
1686 "properties": {
1687 "commit.retry.num-retries": "1"
1688 },
1689 "metadata-log": [
1690 {
1691 "metadata-file": "s3://bucket/.../v1.json",
1692 "timestamp-ms": 1515100
1693 }
1694 ],
1695 "refs": {},
1696 "sort-orders": [
1697 {
1698 "order-id": 0,
1699 "fields": []
1700 }
1701 ],
1702 "default-sort-order-id": 0
1703 }
1704 "#;
1705
1706 let schema = Schema::builder()
1707 .with_schema_id(1)
1708 .with_fields(vec![
1709 Arc::new(NestedField::required(
1710 1,
1711 "struct_name",
1712 Type::Primitive(PrimitiveType::Fixed(1)),
1713 )),
1714 Arc::new(NestedField::required(
1715 4,
1716 "ts",
1717 Type::Primitive(PrimitiveType::Timestamp),
1718 )),
1719 ])
1720 .build()
1721 .unwrap();
1722
1723 let partition_spec = PartitionSpec::builder(schema.clone())
1724 .with_spec_id(0)
1725 .add_unbound_field(UnboundPartitionField {
1726 name: "ts_day".to_string(),
1727 transform: Transform::Day,
1728 source_id: 4,
1729 field_id: Some(1000),
1730 })
1731 .unwrap()
1732 .build()
1733 .unwrap();
1734
1735 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1736 let expected = TableMetadata {
1737 format_version: FormatVersion::V2,
1738 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1739 location: "s3://b/wh/data.db/table".to_string(),
1740 last_updated_ms: 1515100955770,
1741 last_column_id: 1,
1742 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1743 current_schema_id: 1,
1744 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1745 default_partition_type,
1746 default_spec: partition_spec.into(),
1747 last_partition_id: 1000,
1748 default_sort_order_id: 0,
1749 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1750 snapshots: HashMap::default(),
1751 current_snapshot_id: None,
1752 last_sequence_number: 1,
1753 properties: HashMap::from_iter(vec![(
1754 "commit.retry.num-retries".to_string(),
1755 "1".to_string(),
1756 )]),
1757 snapshot_log: Vec::new(),
1758 metadata_log: vec![MetadataLog {
1759 metadata_file: "s3://bucket/.../v1.json".to_string(),
1760 timestamp_ms: 1515100,
1761 }],
1762 refs: HashMap::new(),
1763 statistics: HashMap::new(),
1764 partition_statistics: HashMap::new(),
1765 encryption_keys: HashMap::new(),
1766 next_row_id: INITIAL_ROW_ID,
1767 };
1768
1769 let expected_json_value = serde_json::to_value(&expected).unwrap();
1770 check_table_metadata_serde(data, expected);
1771
1772 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1773 assert_eq!(json_value, expected_json_value);
1774 }
1775
1776 #[test]
1777 fn test_table_data_v3() {
1778 let data = r#"
1779 {
1780 "format-version" : 3,
1781 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1782 "location": "s3://b/wh/data.db/table",
1783 "last-sequence-number" : 1,
1784 "last-updated-ms": 1515100955770,
1785 "last-column-id": 1,
1786 "next-row-id": 5,
1787 "schemas": [
1788 {
1789 "schema-id" : 1,
1790 "type" : "struct",
1791 "fields" :[
1792 {
1793 "id": 4,
1794 "name": "ts",
1795 "required": true,
1796 "type": "timestamp"
1797 }
1798 ]
1799 }
1800 ],
1801 "current-schema-id" : 1,
1802 "partition-specs": [
1803 {
1804 "spec-id": 0,
1805 "fields": [
1806 {
1807 "source-id": 4,
1808 "field-id": 1000,
1809 "name": "ts_day",
1810 "transform": "day"
1811 }
1812 ]
1813 }
1814 ],
1815 "default-spec-id": 0,
1816 "last-partition-id": 1000,
1817 "properties": {
1818 "commit.retry.num-retries": "1"
1819 },
1820 "metadata-log": [
1821 {
1822 "metadata-file": "s3://bucket/.../v1.json",
1823 "timestamp-ms": 1515100
1824 }
1825 ],
1826 "refs": {},
1827 "snapshots" : [ {
1828 "snapshot-id" : 1,
1829 "timestamp-ms" : 1662532818843,
1830 "sequence-number" : 0,
1831 "first-row-id" : 0,
1832 "added-rows" : 4,
1833 "key-id" : "key1",
1834 "summary" : {
1835 "operation" : "append"
1836 },
1837 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1838 "schema-id" : 0
1839 }
1840 ],
1841 "encryption-keys": [
1842 {
1843 "key-id": "key1",
1844 "encrypted-by-id": "KMS",
1845 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1846 "properties": {
1847 "p1": "v1"
1848 }
1849 }
1850 ],
1851 "sort-orders": [
1852 {
1853 "order-id": 0,
1854 "fields": []
1855 }
1856 ],
1857 "default-sort-order-id": 0
1858 }
1859 "#;
1860
1861 let schema = Schema::builder()
1862 .with_schema_id(1)
1863 .with_fields(vec![Arc::new(NestedField::required(
1864 4,
1865 "ts",
1866 Type::Primitive(PrimitiveType::Timestamp),
1867 ))])
1868 .build()
1869 .unwrap();
1870
1871 let partition_spec = PartitionSpec::builder(schema.clone())
1872 .with_spec_id(0)
1873 .add_unbound_field(UnboundPartitionField {
1874 name: "ts_day".to_string(),
1875 transform: Transform::Day,
1876 source_id: 4,
1877 field_id: Some(1000),
1878 })
1879 .unwrap()
1880 .build()
1881 .unwrap();
1882
1883 let snapshot = Snapshot::builder()
1884 .with_snapshot_id(1)
1885 .with_timestamp_ms(1662532818843)
1886 .with_sequence_number(0)
1887 .with_row_range(0, 4)
1888 .with_encryption_key_id(Some("key1".to_string()))
1889 .with_summary(Summary {
1890 operation: Operation::Append,
1891 additional_properties: HashMap::new(),
1892 })
1893 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1894 .with_schema_id(0)
1895 .build();
1896
1897 let encryption_key = EncryptedKey::builder()
1898 .key_id("key1".to_string())
1899 .encrypted_by_id("KMS".to_string())
1900 .encrypted_key_metadata(
1901 base64::prelude::BASE64_STANDARD
1902 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1903 .unwrap(),
1904 )
1905 .properties(HashMap::from_iter(vec![(
1906 "p1".to_string(),
1907 "v1".to_string(),
1908 )]))
1909 .build();
1910
1911 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1912 let expected = TableMetadata {
1913 format_version: FormatVersion::V3,
1914 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1915 location: "s3://b/wh/data.db/table".to_string(),
1916 last_updated_ms: 1515100955770,
1917 last_column_id: 1,
1918 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1919 current_schema_id: 1,
1920 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1921 default_partition_type,
1922 default_spec: partition_spec.into(),
1923 last_partition_id: 1000,
1924 default_sort_order_id: 0,
1925 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1926 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1927 current_snapshot_id: None,
1928 last_sequence_number: 1,
1929 properties: HashMap::from_iter(vec![(
1930 "commit.retry.num-retries".to_string(),
1931 "1".to_string(),
1932 )]),
1933 snapshot_log: Vec::new(),
1934 metadata_log: vec![MetadataLog {
1935 metadata_file: "s3://bucket/.../v1.json".to_string(),
1936 timestamp_ms: 1515100,
1937 }],
1938 refs: HashMap::new(),
1939 statistics: HashMap::new(),
1940 partition_statistics: HashMap::new(),
1941 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1942 next_row_id: 5,
1943 };
1944
1945 let expected_json_value = serde_json::to_value(&expected).unwrap();
1946 check_table_metadata_serde(data, expected);
1947
1948 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1949 assert_eq!(json_value, expected_json_value);
1950 }
1951
1952 #[test]
1953 fn test_table_data_v1() {
1954 let data = r#"
1955 {
1956 "format-version" : 1,
1957 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1958 "location" : "/home/iceberg/warehouse/nyc/taxis",
1959 "last-updated-ms" : 1662532818843,
1960 "last-column-id" : 5,
1961 "schema" : {
1962 "type" : "struct",
1963 "schema-id" : 0,
1964 "fields" : [ {
1965 "id" : 1,
1966 "name" : "vendor_id",
1967 "required" : false,
1968 "type" : "long"
1969 }, {
1970 "id" : 2,
1971 "name" : "trip_id",
1972 "required" : false,
1973 "type" : "long"
1974 }, {
1975 "id" : 3,
1976 "name" : "trip_distance",
1977 "required" : false,
1978 "type" : "float"
1979 }, {
1980 "id" : 4,
1981 "name" : "fare_amount",
1982 "required" : false,
1983 "type" : "double"
1984 }, {
1985 "id" : 5,
1986 "name" : "store_and_fwd_flag",
1987 "required" : false,
1988 "type" : "string"
1989 } ]
1990 },
1991 "partition-spec" : [ {
1992 "name" : "vendor_id",
1993 "transform" : "identity",
1994 "source-id" : 1,
1995 "field-id" : 1000
1996 } ],
1997 "last-partition-id" : 1000,
1998 "default-sort-order-id" : 0,
1999 "sort-orders" : [ {
2000 "order-id" : 0,
2001 "fields" : [ ]
2002 } ],
2003 "properties" : {
2004 "owner" : "root"
2005 },
2006 "current-snapshot-id" : 638933773299822130,
2007 "refs" : {
2008 "main" : {
2009 "snapshot-id" : 638933773299822130,
2010 "type" : "branch"
2011 }
2012 },
2013 "snapshots" : [ {
2014 "snapshot-id" : 638933773299822130,
2015 "timestamp-ms" : 1662532818843,
2016 "sequence-number" : 0,
2017 "summary" : {
2018 "operation" : "append",
2019 "spark.app.id" : "local-1662532784305",
2020 "added-data-files" : "4",
2021 "added-records" : "4",
2022 "added-files-size" : "6001"
2023 },
2024 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2025 "schema-id" : 0
2026 } ],
2027 "snapshot-log" : [ {
2028 "timestamp-ms" : 1662532818843,
2029 "snapshot-id" : 638933773299822130
2030 } ],
2031 "metadata-log" : [ {
2032 "timestamp-ms" : 1662532805245,
2033 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
2034 } ]
2035 }
2036 "#;
2037
2038 let schema = Schema::builder()
2039 .with_fields(vec![
2040 Arc::new(NestedField::optional(
2041 1,
2042 "vendor_id",
2043 Type::Primitive(PrimitiveType::Long),
2044 )),
2045 Arc::new(NestedField::optional(
2046 2,
2047 "trip_id",
2048 Type::Primitive(PrimitiveType::Long),
2049 )),
2050 Arc::new(NestedField::optional(
2051 3,
2052 "trip_distance",
2053 Type::Primitive(PrimitiveType::Float),
2054 )),
2055 Arc::new(NestedField::optional(
2056 4,
2057 "fare_amount",
2058 Type::Primitive(PrimitiveType::Double),
2059 )),
2060 Arc::new(NestedField::optional(
2061 5,
2062 "store_and_fwd_flag",
2063 Type::Primitive(PrimitiveType::String),
2064 )),
2065 ])
2066 .build()
2067 .unwrap();
2068
2069 let schema = Arc::new(schema);
2070 let partition_spec = PartitionSpec::builder(schema.clone())
2071 .with_spec_id(0)
2072 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2073 .unwrap()
2074 .build()
2075 .unwrap();
2076
2077 let sort_order = SortOrder::builder()
2078 .with_order_id(0)
2079 .build_unbound()
2080 .unwrap();
2081
2082 let snapshot = Snapshot::builder()
2083 .with_snapshot_id(638933773299822130)
2084 .with_timestamp_ms(1662532818843)
2085 .with_sequence_number(0)
2086 .with_schema_id(0)
2087 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2088 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2089 .build();
2090
2091 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2092 let expected = TableMetadata {
2093 format_version: FormatVersion::V1,
2094 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2095 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2096 last_updated_ms: 1662532818843,
2097 last_column_id: 5,
2098 schemas: HashMap::from_iter(vec![(0, schema)]),
2099 current_schema_id: 0,
2100 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2101 default_partition_type,
2102 default_spec: Arc::new(partition_spec),
2103 last_partition_id: 1000,
2104 default_sort_order_id: 0,
2105 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2106 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2107 current_snapshot_id: Some(638933773299822130),
2108 last_sequence_number: 0,
2109 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2110 snapshot_log: vec![SnapshotLog {
2111 snapshot_id: 638933773299822130,
2112 timestamp_ms: 1662532818843,
2113 }],
2114 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2115 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2116 statistics: HashMap::new(),
2117 partition_statistics: HashMap::new(),
2118 encryption_keys: HashMap::new(),
2119 next_row_id: INITIAL_ROW_ID,
2120 };
2121
2122 check_table_metadata_serde(data, expected);
2123 }
2124
2125 #[test]
2126 fn test_table_data_v2_no_snapshots() {
2127 let data = r#"
2128 {
2129 "format-version" : 2,
2130 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2131 "location": "s3://b/wh/data.db/table",
2132 "last-sequence-number" : 1,
2133 "last-updated-ms": 1515100955770,
2134 "last-column-id": 1,
2135 "schemas": [
2136 {
2137 "schema-id" : 1,
2138 "type" : "struct",
2139 "fields" :[
2140 {
2141 "id": 1,
2142 "name": "struct_name",
2143 "required": true,
2144 "type": "fixed[1]"
2145 }
2146 ]
2147 }
2148 ],
2149 "current-schema-id" : 1,
2150 "partition-specs": [
2151 {
2152 "spec-id": 0,
2153 "fields": []
2154 }
2155 ],
2156 "refs": {},
2157 "default-spec-id": 0,
2158 "last-partition-id": 1000,
2159 "metadata-log": [
2160 {
2161 "metadata-file": "s3://bucket/.../v1.json",
2162 "timestamp-ms": 1515100
2163 }
2164 ],
2165 "sort-orders": [
2166 {
2167 "order-id": 0,
2168 "fields": []
2169 }
2170 ],
2171 "default-sort-order-id": 0
2172 }
2173 "#;
2174
2175 let schema = Schema::builder()
2176 .with_schema_id(1)
2177 .with_fields(vec![Arc::new(NestedField::required(
2178 1,
2179 "struct_name",
2180 Type::Primitive(PrimitiveType::Fixed(1)),
2181 ))])
2182 .build()
2183 .unwrap();
2184
2185 let partition_spec = PartitionSpec::builder(schema.clone())
2186 .with_spec_id(0)
2187 .build()
2188 .unwrap();
2189
2190 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2191 let expected = TableMetadata {
2192 format_version: FormatVersion::V2,
2193 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2194 location: "s3://b/wh/data.db/table".to_string(),
2195 last_updated_ms: 1515100955770,
2196 last_column_id: 1,
2197 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2198 current_schema_id: 1,
2199 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2200 default_partition_type,
2201 default_spec: partition_spec.into(),
2202 last_partition_id: 1000,
2203 default_sort_order_id: 0,
2204 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2205 snapshots: HashMap::default(),
2206 current_snapshot_id: None,
2207 last_sequence_number: 1,
2208 properties: HashMap::new(),
2209 snapshot_log: Vec::new(),
2210 metadata_log: vec![MetadataLog {
2211 metadata_file: "s3://bucket/.../v1.json".to_string(),
2212 timestamp_ms: 1515100,
2213 }],
2214 refs: HashMap::new(),
2215 statistics: HashMap::new(),
2216 partition_statistics: HashMap::new(),
2217 encryption_keys: HashMap::new(),
2218 next_row_id: INITIAL_ROW_ID,
2219 };
2220
2221 let expected_json_value = serde_json::to_value(&expected).unwrap();
2222 check_table_metadata_serde(data, expected);
2223
2224 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2225 assert_eq!(json_value, expected_json_value);
2226 }
2227
2228 #[test]
2229 fn test_current_snapshot_id_must_match_main_branch() {
2230 let data = r#"
2231 {
2232 "format-version" : 2,
2233 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2234 "location": "s3://b/wh/data.db/table",
2235 "last-sequence-number" : 1,
2236 "last-updated-ms": 1515100955770,
2237 "last-column-id": 1,
2238 "schemas": [
2239 {
2240 "schema-id" : 1,
2241 "type" : "struct",
2242 "fields" :[
2243 {
2244 "id": 1,
2245 "name": "struct_name",
2246 "required": true,
2247 "type": "fixed[1]"
2248 },
2249 {
2250 "id": 4,
2251 "name": "ts",
2252 "required": true,
2253 "type": "timestamp"
2254 }
2255 ]
2256 }
2257 ],
2258 "current-schema-id" : 1,
2259 "partition-specs": [
2260 {
2261 "spec-id": 0,
2262 "fields": [
2263 {
2264 "source-id": 4,
2265 "field-id": 1000,
2266 "name": "ts_day",
2267 "transform": "day"
2268 }
2269 ]
2270 }
2271 ],
2272 "default-spec-id": 0,
2273 "last-partition-id": 1000,
2274 "properties": {
2275 "commit.retry.num-retries": "1"
2276 },
2277 "metadata-log": [
2278 {
2279 "metadata-file": "s3://bucket/.../v1.json",
2280 "timestamp-ms": 1515100
2281 }
2282 ],
2283 "sort-orders": [
2284 {
2285 "order-id": 0,
2286 "fields": []
2287 }
2288 ],
2289 "default-sort-order-id": 0,
2290 "current-snapshot-id" : 1,
2291 "refs" : {
2292 "main" : {
2293 "snapshot-id" : 2,
2294 "type" : "branch"
2295 }
2296 },
2297 "snapshots" : [ {
2298 "snapshot-id" : 1,
2299 "timestamp-ms" : 1662532818843,
2300 "sequence-number" : 0,
2301 "summary" : {
2302 "operation" : "append",
2303 "spark.app.id" : "local-1662532784305",
2304 "added-data-files" : "4",
2305 "added-records" : "4",
2306 "added-files-size" : "6001"
2307 },
2308 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2309 "schema-id" : 0
2310 },
2311 {
2312 "snapshot-id" : 2,
2313 "timestamp-ms" : 1662532818844,
2314 "sequence-number" : 0,
2315 "summary" : {
2316 "operation" : "append",
2317 "spark.app.id" : "local-1662532784305",
2318 "added-data-files" : "4",
2319 "added-records" : "4",
2320 "added-files-size" : "6001"
2321 },
2322 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2323 "schema-id" : 0
2324 } ]
2325 }
2326 "#;
2327
2328 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2329 assert!(
2330 err.to_string()
2331 .contains("Current snapshot id does not match main branch")
2332 );
2333 }
2334
2335 #[test]
2336 fn test_main_without_current() {
2337 let data = r#"
2338 {
2339 "format-version" : 2,
2340 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2341 "location": "s3://b/wh/data.db/table",
2342 "last-sequence-number" : 1,
2343 "last-updated-ms": 1515100955770,
2344 "last-column-id": 1,
2345 "schemas": [
2346 {
2347 "schema-id" : 1,
2348 "type" : "struct",
2349 "fields" :[
2350 {
2351 "id": 1,
2352 "name": "struct_name",
2353 "required": true,
2354 "type": "fixed[1]"
2355 },
2356 {
2357 "id": 4,
2358 "name": "ts",
2359 "required": true,
2360 "type": "timestamp"
2361 }
2362 ]
2363 }
2364 ],
2365 "current-schema-id" : 1,
2366 "partition-specs": [
2367 {
2368 "spec-id": 0,
2369 "fields": [
2370 {
2371 "source-id": 4,
2372 "field-id": 1000,
2373 "name": "ts_day",
2374 "transform": "day"
2375 }
2376 ]
2377 }
2378 ],
2379 "default-spec-id": 0,
2380 "last-partition-id": 1000,
2381 "properties": {
2382 "commit.retry.num-retries": "1"
2383 },
2384 "metadata-log": [
2385 {
2386 "metadata-file": "s3://bucket/.../v1.json",
2387 "timestamp-ms": 1515100
2388 }
2389 ],
2390 "sort-orders": [
2391 {
2392 "order-id": 0,
2393 "fields": []
2394 }
2395 ],
2396 "default-sort-order-id": 0,
2397 "refs" : {
2398 "main" : {
2399 "snapshot-id" : 1,
2400 "type" : "branch"
2401 }
2402 },
2403 "snapshots" : [ {
2404 "snapshot-id" : 1,
2405 "timestamp-ms" : 1662532818843,
2406 "sequence-number" : 0,
2407 "summary" : {
2408 "operation" : "append",
2409 "spark.app.id" : "local-1662532784305",
2410 "added-data-files" : "4",
2411 "added-records" : "4",
2412 "added-files-size" : "6001"
2413 },
2414 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2415 "schema-id" : 0
2416 } ]
2417 }
2418 "#;
2419
2420 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2421 assert!(
2422 err.to_string()
2423 .contains("Current snapshot is not set, but main branch exists")
2424 );
2425 }
2426
2427 #[test]
2428 fn test_branch_snapshot_missing() {
2429 let data = r#"
2430 {
2431 "format-version" : 2,
2432 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2433 "location": "s3://b/wh/data.db/table",
2434 "last-sequence-number" : 1,
2435 "last-updated-ms": 1515100955770,
2436 "last-column-id": 1,
2437 "schemas": [
2438 {
2439 "schema-id" : 1,
2440 "type" : "struct",
2441 "fields" :[
2442 {
2443 "id": 1,
2444 "name": "struct_name",
2445 "required": true,
2446 "type": "fixed[1]"
2447 },
2448 {
2449 "id": 4,
2450 "name": "ts",
2451 "required": true,
2452 "type": "timestamp"
2453 }
2454 ]
2455 }
2456 ],
2457 "current-schema-id" : 1,
2458 "partition-specs": [
2459 {
2460 "spec-id": 0,
2461 "fields": [
2462 {
2463 "source-id": 4,
2464 "field-id": 1000,
2465 "name": "ts_day",
2466 "transform": "day"
2467 }
2468 ]
2469 }
2470 ],
2471 "default-spec-id": 0,
2472 "last-partition-id": 1000,
2473 "properties": {
2474 "commit.retry.num-retries": "1"
2475 },
2476 "metadata-log": [
2477 {
2478 "metadata-file": "s3://bucket/.../v1.json",
2479 "timestamp-ms": 1515100
2480 }
2481 ],
2482 "sort-orders": [
2483 {
2484 "order-id": 0,
2485 "fields": []
2486 }
2487 ],
2488 "default-sort-order-id": 0,
2489 "refs" : {
2490 "main" : {
2491 "snapshot-id" : 1,
2492 "type" : "branch"
2493 },
2494 "foo" : {
2495 "snapshot-id" : 2,
2496 "type" : "branch"
2497 }
2498 },
2499 "snapshots" : [ {
2500 "snapshot-id" : 1,
2501 "timestamp-ms" : 1662532818843,
2502 "sequence-number" : 0,
2503 "summary" : {
2504 "operation" : "append",
2505 "spark.app.id" : "local-1662532784305",
2506 "added-data-files" : "4",
2507 "added-records" : "4",
2508 "added-files-size" : "6001"
2509 },
2510 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2511 "schema-id" : 0
2512 } ]
2513 }
2514 "#;
2515
2516 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2517 assert!(
2518 err.to_string().contains(
2519 "Snapshot for reference foo does not exist in the existing snapshots list"
2520 )
2521 );
2522 }
2523
2524 #[test]
2525 fn test_v2_wrong_max_snapshot_sequence_number() {
2526 let data = r#"
2527 {
2528 "format-version": 2,
2529 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2530 "location": "s3://bucket/test/location",
2531 "last-sequence-number": 1,
2532 "last-updated-ms": 1602638573590,
2533 "last-column-id": 3,
2534 "current-schema-id": 0,
2535 "schemas": [
2536 {
2537 "type": "struct",
2538 "schema-id": 0,
2539 "fields": [
2540 {
2541 "id": 1,
2542 "name": "x",
2543 "required": true,
2544 "type": "long"
2545 }
2546 ]
2547 }
2548 ],
2549 "default-spec-id": 0,
2550 "partition-specs": [
2551 {
2552 "spec-id": 0,
2553 "fields": []
2554 }
2555 ],
2556 "last-partition-id": 1000,
2557 "default-sort-order-id": 0,
2558 "sort-orders": [
2559 {
2560 "order-id": 0,
2561 "fields": []
2562 }
2563 ],
2564 "properties": {},
2565 "current-snapshot-id": 3055729675574597004,
2566 "snapshots": [
2567 {
2568 "snapshot-id": 3055729675574597004,
2569 "timestamp-ms": 1555100955770,
2570 "sequence-number": 4,
2571 "summary": {
2572 "operation": "append"
2573 },
2574 "manifest-list": "s3://a/b/2.avro",
2575 "schema-id": 0
2576 }
2577 ],
2578 "statistics": [],
2579 "snapshot-log": [],
2580 "metadata-log": []
2581 }
2582 "#;
2583
2584 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2585 assert!(err.to_string().contains(
2586 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2587 ));
2588
2589 let data = data.replace(
2591 r#""last-sequence-number": 1,"#,
2592 r#""last-sequence-number": 4,"#,
2593 );
2594 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2595 assert_eq!(metadata.last_sequence_number, 4);
2596
2597 let data = data.replace(
2599 r#""last-sequence-number": 4,"#,
2600 r#""last-sequence-number": 5,"#,
2601 );
2602 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2603 assert_eq!(metadata.last_sequence_number, 5);
2604 }
2605
2606 #[test]
2607 fn test_statistic_files() {
2608 let data = r#"
2609 {
2610 "format-version": 2,
2611 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2612 "location": "s3://bucket/test/location",
2613 "last-sequence-number": 34,
2614 "last-updated-ms": 1602638573590,
2615 "last-column-id": 3,
2616 "current-schema-id": 0,
2617 "schemas": [
2618 {
2619 "type": "struct",
2620 "schema-id": 0,
2621 "fields": [
2622 {
2623 "id": 1,
2624 "name": "x",
2625 "required": true,
2626 "type": "long"
2627 }
2628 ]
2629 }
2630 ],
2631 "default-spec-id": 0,
2632 "partition-specs": [
2633 {
2634 "spec-id": 0,
2635 "fields": []
2636 }
2637 ],
2638 "last-partition-id": 1000,
2639 "default-sort-order-id": 0,
2640 "sort-orders": [
2641 {
2642 "order-id": 0,
2643 "fields": []
2644 }
2645 ],
2646 "properties": {},
2647 "current-snapshot-id": 3055729675574597004,
2648 "snapshots": [
2649 {
2650 "snapshot-id": 3055729675574597004,
2651 "timestamp-ms": 1555100955770,
2652 "sequence-number": 1,
2653 "summary": {
2654 "operation": "append"
2655 },
2656 "manifest-list": "s3://a/b/2.avro",
2657 "schema-id": 0
2658 }
2659 ],
2660 "statistics": [
2661 {
2662 "snapshot-id": 3055729675574597004,
2663 "statistics-path": "s3://a/b/stats.puffin",
2664 "file-size-in-bytes": 413,
2665 "file-footer-size-in-bytes": 42,
2666 "blob-metadata": [
2667 {
2668 "type": "ndv",
2669 "snapshot-id": 3055729675574597004,
2670 "sequence-number": 1,
2671 "fields": [
2672 1
2673 ]
2674 }
2675 ]
2676 }
2677 ],
2678 "snapshot-log": [],
2679 "metadata-log": []
2680 }
2681 "#;
2682
2683 let schema = Schema::builder()
2684 .with_schema_id(0)
2685 .with_fields(vec![Arc::new(NestedField::required(
2686 1,
2687 "x",
2688 Type::Primitive(PrimitiveType::Long),
2689 ))])
2690 .build()
2691 .unwrap();
2692 let partition_spec = PartitionSpec::builder(schema.clone())
2693 .with_spec_id(0)
2694 .build()
2695 .unwrap();
2696 let snapshot = Snapshot::builder()
2697 .with_snapshot_id(3055729675574597004)
2698 .with_timestamp_ms(1555100955770)
2699 .with_sequence_number(1)
2700 .with_manifest_list("s3://a/b/2.avro")
2701 .with_schema_id(0)
2702 .with_summary(Summary {
2703 operation: Operation::Append,
2704 additional_properties: HashMap::new(),
2705 })
2706 .build();
2707
2708 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2709 let expected = TableMetadata {
2710 format_version: FormatVersion::V2,
2711 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2712 location: "s3://bucket/test/location".to_string(),
2713 last_updated_ms: 1602638573590,
2714 last_column_id: 3,
2715 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2716 current_schema_id: 0,
2717 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2718 default_partition_type,
2719 default_spec: Arc::new(partition_spec),
2720 last_partition_id: 1000,
2721 default_sort_order_id: 0,
2722 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2723 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2724 current_snapshot_id: Some(3055729675574597004),
2725 last_sequence_number: 34,
2726 properties: HashMap::new(),
2727 snapshot_log: Vec::new(),
2728 metadata_log: Vec::new(),
2729 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2730 snapshot_id: 3055729675574597004,
2731 statistics_path: "s3://a/b/stats.puffin".to_string(),
2732 file_size_in_bytes: 413,
2733 file_footer_size_in_bytes: 42,
2734 key_metadata: None,
2735 blob_metadata: vec![BlobMetadata {
2736 snapshot_id: 3055729675574597004,
2737 sequence_number: 1,
2738 fields: vec![1],
2739 r#type: "ndv".to_string(),
2740 properties: HashMap::new(),
2741 }],
2742 })]),
2743 partition_statistics: HashMap::new(),
2744 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2745 snapshot_id: 3055729675574597004,
2746 retention: SnapshotRetention::Branch {
2747 min_snapshots_to_keep: None,
2748 max_snapshot_age_ms: None,
2749 max_ref_age_ms: None,
2750 },
2751 })]),
2752 encryption_keys: HashMap::new(),
2753 next_row_id: INITIAL_ROW_ID,
2754 };
2755
2756 check_table_metadata_serde(data, expected);
2757 }
2758
2759 #[test]
2760 fn test_partition_statistics_file() {
2761 let data = r#"
2762 {
2763 "format-version": 2,
2764 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2765 "location": "s3://bucket/test/location",
2766 "last-sequence-number": 34,
2767 "last-updated-ms": 1602638573590,
2768 "last-column-id": 3,
2769 "current-schema-id": 0,
2770 "schemas": [
2771 {
2772 "type": "struct",
2773 "schema-id": 0,
2774 "fields": [
2775 {
2776 "id": 1,
2777 "name": "x",
2778 "required": true,
2779 "type": "long"
2780 }
2781 ]
2782 }
2783 ],
2784 "default-spec-id": 0,
2785 "partition-specs": [
2786 {
2787 "spec-id": 0,
2788 "fields": []
2789 }
2790 ],
2791 "last-partition-id": 1000,
2792 "default-sort-order-id": 0,
2793 "sort-orders": [
2794 {
2795 "order-id": 0,
2796 "fields": []
2797 }
2798 ],
2799 "properties": {},
2800 "current-snapshot-id": 3055729675574597004,
2801 "snapshots": [
2802 {
2803 "snapshot-id": 3055729675574597004,
2804 "timestamp-ms": 1555100955770,
2805 "sequence-number": 1,
2806 "summary": {
2807 "operation": "append"
2808 },
2809 "manifest-list": "s3://a/b/2.avro",
2810 "schema-id": 0
2811 }
2812 ],
2813 "partition-statistics": [
2814 {
2815 "snapshot-id": 3055729675574597004,
2816 "statistics-path": "s3://a/b/partition-stats.parquet",
2817 "file-size-in-bytes": 43
2818 }
2819 ],
2820 "snapshot-log": [],
2821 "metadata-log": []
2822 }
2823 "#;
2824
2825 let schema = Schema::builder()
2826 .with_schema_id(0)
2827 .with_fields(vec![Arc::new(NestedField::required(
2828 1,
2829 "x",
2830 Type::Primitive(PrimitiveType::Long),
2831 ))])
2832 .build()
2833 .unwrap();
2834 let partition_spec = PartitionSpec::builder(schema.clone())
2835 .with_spec_id(0)
2836 .build()
2837 .unwrap();
2838 let snapshot = Snapshot::builder()
2839 .with_snapshot_id(3055729675574597004)
2840 .with_timestamp_ms(1555100955770)
2841 .with_sequence_number(1)
2842 .with_manifest_list("s3://a/b/2.avro")
2843 .with_schema_id(0)
2844 .with_summary(Summary {
2845 operation: Operation::Append,
2846 additional_properties: HashMap::new(),
2847 })
2848 .build();
2849
2850 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2851 let expected = TableMetadata {
2852 format_version: FormatVersion::V2,
2853 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2854 location: "s3://bucket/test/location".to_string(),
2855 last_updated_ms: 1602638573590,
2856 last_column_id: 3,
2857 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2858 current_schema_id: 0,
2859 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2860 default_spec: Arc::new(partition_spec),
2861 default_partition_type,
2862 last_partition_id: 1000,
2863 default_sort_order_id: 0,
2864 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2865 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2866 current_snapshot_id: Some(3055729675574597004),
2867 last_sequence_number: 34,
2868 properties: HashMap::new(),
2869 snapshot_log: Vec::new(),
2870 metadata_log: Vec::new(),
2871 statistics: HashMap::new(),
2872 partition_statistics: HashMap::from_iter(vec![(
2873 3055729675574597004,
2874 PartitionStatisticsFile {
2875 snapshot_id: 3055729675574597004,
2876 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2877 file_size_in_bytes: 43,
2878 },
2879 )]),
2880 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2881 snapshot_id: 3055729675574597004,
2882 retention: SnapshotRetention::Branch {
2883 min_snapshots_to_keep: None,
2884 max_snapshot_age_ms: None,
2885 max_ref_age_ms: None,
2886 },
2887 })]),
2888 encryption_keys: HashMap::new(),
2889 next_row_id: INITIAL_ROW_ID,
2890 };
2891
2892 check_table_metadata_serde(data, expected);
2893 }
2894
2895 #[test]
2896 fn test_invalid_table_uuid() -> Result<()> {
2897 let data = r#"
2898 {
2899 "format-version" : 2,
2900 "table-uuid": "xxxx"
2901 }
2902 "#;
2903 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2904 Ok(())
2905 }
2906
2907 #[test]
2908 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2909 let data = r#"
2910 {
2911 "format-version" : 1
2912 }
2913 "#;
2914 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2915 Ok(())
2916 }
2917
2918 #[test]
2919 fn test_table_metadata_v3_valid_minimal() {
2920 let metadata_str =
2921 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2922
2923 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2924 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2925
2926 let schema = Schema::builder()
2927 .with_schema_id(0)
2928 .with_fields(vec![
2929 Arc::new(
2930 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2931 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2932 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2933 ),
2934 Arc::new(
2935 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2936 .with_doc("comment"),
2937 ),
2938 Arc::new(NestedField::required(
2939 3,
2940 "z",
2941 Type::Primitive(PrimitiveType::Long),
2942 )),
2943 ])
2944 .build()
2945 .unwrap();
2946
2947 let partition_spec = PartitionSpec::builder(schema.clone())
2948 .with_spec_id(0)
2949 .add_unbound_field(UnboundPartitionField {
2950 name: "x".to_string(),
2951 transform: Transform::Identity,
2952 source_id: 1,
2953 field_id: Some(1000),
2954 })
2955 .unwrap()
2956 .build()
2957 .unwrap();
2958
2959 let sort_order = SortOrder::builder()
2960 .with_order_id(3)
2961 .with_sort_field(SortField {
2962 source_id: 2,
2963 transform: Transform::Identity,
2964 direction: SortDirection::Ascending,
2965 null_order: NullOrder::First,
2966 })
2967 .with_sort_field(SortField {
2968 source_id: 3,
2969 transform: Transform::Bucket(4),
2970 direction: SortDirection::Descending,
2971 null_order: NullOrder::Last,
2972 })
2973 .build_unbound()
2974 .unwrap();
2975
2976 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2977 let expected = TableMetadata {
2978 format_version: FormatVersion::V3,
2979 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2980 location: "s3://bucket/test/location".to_string(),
2981 last_updated_ms: 1602638573590,
2982 last_column_id: 3,
2983 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2984 current_schema_id: 0,
2985 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2986 default_spec: Arc::new(partition_spec),
2987 default_partition_type,
2988 last_partition_id: 1000,
2989 default_sort_order_id: 3,
2990 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2991 snapshots: HashMap::default(),
2992 current_snapshot_id: None,
2993 last_sequence_number: 34,
2994 properties: HashMap::new(),
2995 snapshot_log: Vec::new(),
2996 metadata_log: Vec::new(),
2997 refs: HashMap::new(),
2998 statistics: HashMap::new(),
2999 partition_statistics: HashMap::new(),
3000 encryption_keys: HashMap::new(),
3001 next_row_id: 0, };
3003
3004 check_table_metadata_serde(&metadata_str, expected);
3005 }
3006
3007 #[test]
3008 fn test_table_metadata_v2_file_valid() {
3009 let metadata =
3010 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
3011
3012 let schema1 = Schema::builder()
3013 .with_schema_id(0)
3014 .with_fields(vec![Arc::new(NestedField::required(
3015 1,
3016 "x",
3017 Type::Primitive(PrimitiveType::Long),
3018 ))])
3019 .build()
3020 .unwrap();
3021
3022 let schema2 = Schema::builder()
3023 .with_schema_id(1)
3024 .with_fields(vec![
3025 Arc::new(NestedField::required(
3026 1,
3027 "x",
3028 Type::Primitive(PrimitiveType::Long),
3029 )),
3030 Arc::new(
3031 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3032 .with_doc("comment"),
3033 ),
3034 Arc::new(NestedField::required(
3035 3,
3036 "z",
3037 Type::Primitive(PrimitiveType::Long),
3038 )),
3039 ])
3040 .with_identifier_field_ids(vec![1, 2])
3041 .build()
3042 .unwrap();
3043
3044 let partition_spec = PartitionSpec::builder(schema2.clone())
3045 .with_spec_id(0)
3046 .add_unbound_field(UnboundPartitionField {
3047 name: "x".to_string(),
3048 transform: Transform::Identity,
3049 source_id: 1,
3050 field_id: Some(1000),
3051 })
3052 .unwrap()
3053 .build()
3054 .unwrap();
3055
3056 let sort_order = SortOrder::builder()
3057 .with_order_id(3)
3058 .with_sort_field(SortField {
3059 source_id: 2,
3060 transform: Transform::Identity,
3061 direction: SortDirection::Ascending,
3062 null_order: NullOrder::First,
3063 })
3064 .with_sort_field(SortField {
3065 source_id: 3,
3066 transform: Transform::Bucket(4),
3067 direction: SortDirection::Descending,
3068 null_order: NullOrder::Last,
3069 })
3070 .build_unbound()
3071 .unwrap();
3072
3073 let snapshot1 = Snapshot::builder()
3074 .with_snapshot_id(3051729675574597004)
3075 .with_timestamp_ms(1515100955770)
3076 .with_sequence_number(0)
3077 .with_manifest_list("s3://a/b/1.avro")
3078 .with_summary(Summary {
3079 operation: Operation::Append,
3080 additional_properties: HashMap::new(),
3081 })
3082 .build();
3083
3084 let snapshot2 = Snapshot::builder()
3085 .with_snapshot_id(3055729675574597004)
3086 .with_parent_snapshot_id(Some(3051729675574597004))
3087 .with_timestamp_ms(1555100955770)
3088 .with_sequence_number(1)
3089 .with_schema_id(1)
3090 .with_manifest_list("s3://a/b/2.avro")
3091 .with_summary(Summary {
3092 operation: Operation::Append,
3093 additional_properties: HashMap::new(),
3094 })
3095 .build();
3096
3097 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3098 let expected = TableMetadata {
3099 format_version: FormatVersion::V2,
3100 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3101 location: "s3://bucket/test/location".to_string(),
3102 last_updated_ms: 1602638573590,
3103 last_column_id: 3,
3104 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3105 current_schema_id: 1,
3106 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3107 default_spec: Arc::new(partition_spec),
3108 default_partition_type,
3109 last_partition_id: 1000,
3110 default_sort_order_id: 3,
3111 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3112 snapshots: HashMap::from_iter(vec![
3113 (3051729675574597004, Arc::new(snapshot1)),
3114 (3055729675574597004, Arc::new(snapshot2)),
3115 ]),
3116 current_snapshot_id: Some(3055729675574597004),
3117 last_sequence_number: 34,
3118 properties: HashMap::new(),
3119 snapshot_log: vec![
3120 SnapshotLog {
3121 snapshot_id: 3051729675574597004,
3122 timestamp_ms: 1515100955770,
3123 },
3124 SnapshotLog {
3125 snapshot_id: 3055729675574597004,
3126 timestamp_ms: 1555100955770,
3127 },
3128 ],
3129 metadata_log: Vec::new(),
3130 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3131 snapshot_id: 3055729675574597004,
3132 retention: SnapshotRetention::Branch {
3133 min_snapshots_to_keep: None,
3134 max_snapshot_age_ms: None,
3135 max_ref_age_ms: None,
3136 },
3137 })]),
3138 statistics: HashMap::new(),
3139 partition_statistics: HashMap::new(),
3140 encryption_keys: HashMap::new(),
3141 next_row_id: INITIAL_ROW_ID,
3142 };
3143
3144 check_table_metadata_serde(&metadata, expected);
3145 }
3146
3147 #[test]
3148 fn test_table_metadata_v2_file_valid_minimal() {
3149 let metadata =
3150 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3151
3152 let schema = Schema::builder()
3153 .with_schema_id(0)
3154 .with_fields(vec![
3155 Arc::new(NestedField::required(
3156 1,
3157 "x",
3158 Type::Primitive(PrimitiveType::Long),
3159 )),
3160 Arc::new(
3161 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3162 .with_doc("comment"),
3163 ),
3164 Arc::new(NestedField::required(
3165 3,
3166 "z",
3167 Type::Primitive(PrimitiveType::Long),
3168 )),
3169 ])
3170 .build()
3171 .unwrap();
3172
3173 let partition_spec = PartitionSpec::builder(schema.clone())
3174 .with_spec_id(0)
3175 .add_unbound_field(UnboundPartitionField {
3176 name: "x".to_string(),
3177 transform: Transform::Identity,
3178 source_id: 1,
3179 field_id: Some(1000),
3180 })
3181 .unwrap()
3182 .build()
3183 .unwrap();
3184
3185 let sort_order = SortOrder::builder()
3186 .with_order_id(3)
3187 .with_sort_field(SortField {
3188 source_id: 2,
3189 transform: Transform::Identity,
3190 direction: SortDirection::Ascending,
3191 null_order: NullOrder::First,
3192 })
3193 .with_sort_field(SortField {
3194 source_id: 3,
3195 transform: Transform::Bucket(4),
3196 direction: SortDirection::Descending,
3197 null_order: NullOrder::Last,
3198 })
3199 .build_unbound()
3200 .unwrap();
3201
3202 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3203 let expected = TableMetadata {
3204 format_version: FormatVersion::V2,
3205 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3206 location: "s3://bucket/test/location".to_string(),
3207 last_updated_ms: 1602638573590,
3208 last_column_id: 3,
3209 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3210 current_schema_id: 0,
3211 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3212 default_partition_type,
3213 default_spec: Arc::new(partition_spec),
3214 last_partition_id: 1000,
3215 default_sort_order_id: 3,
3216 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3217 snapshots: HashMap::default(),
3218 current_snapshot_id: None,
3219 last_sequence_number: 34,
3220 properties: HashMap::new(),
3221 snapshot_log: vec![],
3222 metadata_log: Vec::new(),
3223 refs: HashMap::new(),
3224 statistics: HashMap::new(),
3225 partition_statistics: HashMap::new(),
3226 encryption_keys: HashMap::new(),
3227 next_row_id: INITIAL_ROW_ID,
3228 };
3229
3230 check_table_metadata_serde(&metadata, expected);
3231 }
3232
3233 #[test]
3234 fn test_table_metadata_v1_file_valid() {
3235 let metadata =
3236 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3237
3238 let schema = Schema::builder()
3239 .with_schema_id(0)
3240 .with_fields(vec![
3241 Arc::new(NestedField::required(
3242 1,
3243 "x",
3244 Type::Primitive(PrimitiveType::Long),
3245 )),
3246 Arc::new(
3247 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3248 .with_doc("comment"),
3249 ),
3250 Arc::new(NestedField::required(
3251 3,
3252 "z",
3253 Type::Primitive(PrimitiveType::Long),
3254 )),
3255 ])
3256 .build()
3257 .unwrap();
3258
3259 let partition_spec = PartitionSpec::builder(schema.clone())
3260 .with_spec_id(0)
3261 .add_unbound_field(UnboundPartitionField {
3262 name: "x".to_string(),
3263 transform: Transform::Identity,
3264 source_id: 1,
3265 field_id: Some(1000),
3266 })
3267 .unwrap()
3268 .build()
3269 .unwrap();
3270
3271 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3272 let expected = TableMetadata {
3273 format_version: FormatVersion::V1,
3274 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3275 location: "s3://bucket/test/location".to_string(),
3276 last_updated_ms: 1602638573874,
3277 last_column_id: 3,
3278 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3279 current_schema_id: 0,
3280 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3281 default_spec: Arc::new(partition_spec),
3282 default_partition_type,
3283 last_partition_id: 0,
3284 default_sort_order_id: 0,
3285 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3287 snapshots: HashMap::new(),
3288 current_snapshot_id: None,
3289 last_sequence_number: 0,
3290 properties: HashMap::new(),
3291 snapshot_log: vec![],
3292 metadata_log: Vec::new(),
3293 refs: HashMap::new(),
3294 statistics: HashMap::new(),
3295 partition_statistics: HashMap::new(),
3296 encryption_keys: HashMap::new(),
3297 next_row_id: INITIAL_ROW_ID,
3298 };
3299
3300 check_table_metadata_serde(&metadata, expected);
3301 }
3302
3303 #[test]
3304 fn test_table_metadata_v1_compat() {
3305 let metadata =
3306 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3307
3308 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3310 .expect("Failed to deserialize TableMetadataV1Compat.json");
3311
3312 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3314 assert_eq!(
3315 desered_type.uuid(),
3316 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3317 );
3318 assert_eq!(
3319 desered_type.location(),
3320 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3321 );
3322 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3323 assert_eq!(desered_type.current_schema_id(), 0);
3324 }
3325
3326 #[test]
3327 fn test_table_metadata_v1_schemas_without_current_id() {
3328 let metadata = fs::read_to_string(
3329 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3330 )
3331 .unwrap();
3332
3333 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3335 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3336
3337 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3339 assert_eq!(
3340 desered_type.uuid(),
3341 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3342 );
3343
3344 let schema = desered_type.current_schema();
3346 assert_eq!(schema.as_struct().fields().len(), 3);
3347 assert_eq!(schema.as_struct().fields()[0].name, "x");
3348 assert_eq!(schema.as_struct().fields()[1].name, "y");
3349 assert_eq!(schema.as_struct().fields()[2].name, "z");
3350 }
3351
3352 #[test]
3353 fn test_table_metadata_v1_no_valid_schema() {
3354 let metadata =
3355 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3356 .unwrap();
3357
3358 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3360
3361 assert!(desered.is_err());
3362 let error_message = desered.unwrap_err().to_string();
3363 assert!(
3364 error_message.contains("No valid schema configuration found"),
3365 "Expected error about no valid schema configuration, got: {error_message}"
3366 );
3367 }
3368
3369 #[test]
3370 fn test_table_metadata_v1_partition_specs_without_default_id() {
3371 let metadata = fs::read_to_string(
3372 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3373 )
3374 .unwrap();
3375
3376 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3378 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3379
3380 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3382 assert_eq!(
3383 desered_type.uuid(),
3384 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3385 );
3386
3387 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3390
3391 let default_spec = &desered_type.default_spec;
3393 assert_eq!(default_spec.spec_id(), 2);
3394 assert_eq!(default_spec.fields().len(), 1);
3395 assert_eq!(default_spec.fields()[0].name, "y");
3396 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3397 assert_eq!(default_spec.fields()[0].source_id, 2);
3398 }
3399
3400 #[test]
3401 fn test_table_metadata_v2_schema_not_found() {
3402 let metadata =
3403 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3404 .unwrap();
3405
3406 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3407
3408 assert_eq!(
3409 desered.unwrap_err().to_string(),
3410 "DataInvalid => No schema exists with the current schema id 2."
3411 )
3412 }
3413
3414 #[test]
3415 fn test_table_metadata_v2_missing_sort_order() {
3416 let metadata =
3417 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3418 .unwrap();
3419
3420 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3421
3422 assert_eq!(
3423 desered.unwrap_err().to_string(),
3424 "data did not match any variant of untagged enum TableMetadataEnum"
3425 )
3426 }
3427
3428 #[test]
3429 fn test_table_metadata_v2_missing_partition_specs() {
3430 let metadata =
3431 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3432 .unwrap();
3433
3434 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3435
3436 assert_eq!(
3437 desered.unwrap_err().to_string(),
3438 "data did not match any variant of untagged enum TableMetadataEnum"
3439 )
3440 }
3441
3442 #[test]
3443 fn test_table_metadata_v2_missing_last_partition_id() {
3444 let metadata = fs::read_to_string(
3445 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3446 )
3447 .unwrap();
3448
3449 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3450
3451 assert_eq!(
3452 desered.unwrap_err().to_string(),
3453 "data did not match any variant of untagged enum TableMetadataEnum"
3454 )
3455 }
3456
3457 #[test]
3458 fn test_table_metadata_v2_missing_schemas() {
3459 let metadata =
3460 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3461 .unwrap();
3462
3463 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3464
3465 assert_eq!(
3466 desered.unwrap_err().to_string(),
3467 "data did not match any variant of untagged enum TableMetadataEnum"
3468 )
3469 }
3470
3471 #[test]
3472 fn test_table_metadata_v2_unsupported_version() {
3473 let metadata =
3474 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3475 .unwrap();
3476
3477 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3478
3479 assert_eq!(
3480 desered.unwrap_err().to_string(),
3481 "data did not match any variant of untagged enum TableMetadataEnum"
3482 )
3483 }
3484
3485 #[test]
3486 fn test_order_of_format_version() {
3487 assert!(FormatVersion::V1 < FormatVersion::V2);
3488 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3489 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3490 }
3491
3492 #[test]
3493 fn test_default_partition_spec() {
3494 let default_spec_id = 1234;
3495 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3496 let partition_spec = PartitionSpec::unpartition_spec();
3497 table_meta_data.default_spec = partition_spec.clone().into();
3498 table_meta_data
3499 .partition_specs
3500 .insert(default_spec_id, Arc::new(partition_spec));
3501
3502 assert_eq!(
3503 (*table_meta_data.default_partition_spec().clone()).clone(),
3504 (*table_meta_data
3505 .partition_spec_by_id(default_spec_id)
3506 .unwrap()
3507 .clone())
3508 .clone()
3509 );
3510 }
3511 #[test]
3512 fn test_default_sort_order() {
3513 let default_sort_order_id = 1234;
3514 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3515 table_meta_data.default_sort_order_id = default_sort_order_id;
3516 table_meta_data
3517 .sort_orders
3518 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3519
3520 assert_eq!(
3521 table_meta_data.default_sort_order(),
3522 table_meta_data
3523 .sort_orders
3524 .get(&default_sort_order_id)
3525 .unwrap()
3526 )
3527 }
3528
3529 #[test]
3530 fn test_table_metadata_builder_from_table_creation() {
3531 let table_creation = TableCreation::builder()
3532 .location("s3://db/table".to_string())
3533 .name("table".to_string())
3534 .properties(HashMap::new())
3535 .schema(Schema::builder().build().unwrap())
3536 .build();
3537 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3538 .unwrap()
3539 .build()
3540 .unwrap()
3541 .metadata;
3542 assert_eq!(table_metadata.location, "s3://db/table");
3543 assert_eq!(table_metadata.schemas.len(), 1);
3544 assert_eq!(
3545 table_metadata
3546 .schemas
3547 .get(&0)
3548 .unwrap()
3549 .as_struct()
3550 .fields()
3551 .len(),
3552 0
3553 );
3554 assert_eq!(table_metadata.properties.len(), 0);
3555 assert_eq!(
3556 table_metadata.partition_specs,
3557 HashMap::from([(
3558 0,
3559 Arc::new(
3560 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3561 .with_spec_id(0)
3562 .build()
3563 .unwrap()
3564 )
3565 )])
3566 );
3567 assert_eq!(
3568 table_metadata.sort_orders,
3569 HashMap::from([(
3570 0,
3571 Arc::new(SortOrder {
3572 order_id: 0,
3573 fields: vec![]
3574 })
3575 )])
3576 );
3577 }
3578
3579 #[tokio::test]
3580 async fn test_table_metadata_read_write() {
3581 let temp_dir = TempDir::new().unwrap();
3583 let temp_path = temp_dir.path().to_str().unwrap();
3584
3585 let file_io = FileIO::new_with_fs();
3587
3588 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3590
3591 let metadata_location = MetadataLocation::new_with_metadata(temp_path, &original_metadata);
3593 let metadata_location_str = metadata_location.to_string();
3594
3595 original_metadata
3597 .write_to(&file_io, &metadata_location)
3598 .await
3599 .unwrap();
3600
3601 assert!(fs::metadata(&metadata_location_str).is_ok());
3603
3604 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3606 .await
3607 .unwrap();
3608
3609 assert_eq!(read_metadata, original_metadata);
3611 }
3612
3613 #[tokio::test]
3614 async fn test_table_metadata_read_compressed() {
3615 let temp_dir = TempDir::new().unwrap();
3616 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3617
3618 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3619 let json = serde_json::to_string(&original_metadata).unwrap();
3620
3621 let compressed = CompressionCodec::Gzip
3622 .compress(json.into_bytes())
3623 .expect("failed to compress metadata");
3624 std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3625
3626 let file_io = FileIO::new_with_fs();
3628 let metadata_location = metadata_location.to_str().unwrap();
3629 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3630 .await
3631 .unwrap();
3632
3633 assert_eq!(read_metadata, original_metadata);
3635 }
3636
3637 #[tokio::test]
3638 async fn test_table_metadata_read_nonexistent_file() {
3639 let file_io = FileIO::new_with_fs();
3641
3642 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3644
3645 assert!(result.is_err());
3647 }
3648
3649 #[tokio::test]
3650 async fn test_table_metadata_write_with_gzip_compression() {
3651 let temp_dir = TempDir::new().unwrap();
3652 let temp_path = temp_dir.path().to_str().unwrap();
3653 let file_io = FileIO::new_with_fs();
3654
3655 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3657
3658 let mut props = original_metadata.properties.clone();
3660 props.insert(
3661 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
3662 "GziP".to_string(),
3663 );
3664 let compressed_metadata =
3666 TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None)
3667 .assign_uuid(original_metadata.table_uuid)
3668 .set_properties(props.clone())
3669 .unwrap()
3670 .build()
3671 .unwrap()
3672 .metadata;
3673
3674 let metadata_location =
3676 MetadataLocation::new_with_metadata(temp_path, &compressed_metadata);
3677 let metadata_location_str = metadata_location.to_string();
3678
3679 assert!(metadata_location_str.contains(".gz.metadata.json"));
3681
3682 compressed_metadata
3684 .write_to(&file_io, &metadata_location)
3685 .await
3686 .unwrap();
3687
3688 assert!(std::path::Path::new(&metadata_location_str).exists());
3690
3691 let raw_content = std::fs::read(&metadata_location_str).unwrap();
3693 assert!(raw_content.len() > 2);
3694 assert_eq!(raw_content[0], 0x1F); assert_eq!(raw_content[1], 0x8B); let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3699 .await
3700 .unwrap();
3701
3702 assert_eq!(read_metadata, compressed_metadata);
3704 }
3705
3706 #[test]
3707 fn test_partition_name_exists() {
3708 let schema = Schema::builder()
3709 .with_fields(vec![
3710 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3711 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3712 .into(),
3713 ])
3714 .build()
3715 .unwrap();
3716
3717 let spec1 = PartitionSpec::builder(schema.clone())
3718 .with_spec_id(1)
3719 .add_partition_field("data", "data_partition", Transform::Identity)
3720 .unwrap()
3721 .build()
3722 .unwrap();
3723
3724 let spec2 = PartitionSpec::builder(schema.clone())
3725 .with_spec_id(2)
3726 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3727 .unwrap()
3728 .build()
3729 .unwrap();
3730
3731 let metadata = TableMetadataBuilder::new(
3733 schema,
3734 spec1.clone().into_unbound(),
3735 SortOrder::unsorted_order(),
3736 "s3://test/location".to_string(),
3737 FormatVersion::V2,
3738 HashMap::new(),
3739 )
3740 .unwrap()
3741 .add_partition_spec(spec2.into_unbound())
3742 .unwrap()
3743 .build()
3744 .unwrap()
3745 .metadata;
3746
3747 assert!(metadata.partition_name_exists("data_partition"));
3748 assert!(metadata.partition_name_exists("partition_bucket"));
3749
3750 assert!(!metadata.partition_name_exists("nonexistent_field"));
3751 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3753 }
3754
3755 #[test]
3756 fn test_partition_name_exists_empty_specs() {
3757 let schema = Schema::builder()
3759 .with_fields(vec![
3760 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3761 ])
3762 .build()
3763 .unwrap();
3764
3765 let metadata = TableMetadataBuilder::new(
3766 schema,
3767 PartitionSpec::unpartition_spec().into_unbound(),
3768 SortOrder::unsorted_order(),
3769 "s3://test/location".to_string(),
3770 FormatVersion::V2,
3771 HashMap::new(),
3772 )
3773 .unwrap()
3774 .build()
3775 .unwrap()
3776 .metadata;
3777
3778 assert!(!metadata.partition_name_exists("any_field"));
3779 assert!(!metadata.partition_name_exists("data"));
3780 }
3781
3782 #[test]
3783 fn test_name_exists_in_any_schema() {
3784 let schema1 = Schema::builder()
3786 .with_schema_id(1)
3787 .with_fields(vec![
3788 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3789 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3790 ])
3791 .build()
3792 .unwrap();
3793
3794 let schema2 = Schema::builder()
3795 .with_schema_id(2)
3796 .with_fields(vec![
3797 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3798 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3799 ])
3800 .build()
3801 .unwrap();
3802
3803 let metadata = TableMetadataBuilder::new(
3804 schema1,
3805 PartitionSpec::unpartition_spec().into_unbound(),
3806 SortOrder::unsorted_order(),
3807 "s3://test/location".to_string(),
3808 FormatVersion::V2,
3809 HashMap::new(),
3810 )
3811 .unwrap()
3812 .add_current_schema(schema2)
3813 .unwrap()
3814 .build()
3815 .unwrap()
3816 .metadata;
3817
3818 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3823 assert!(!metadata.name_exists_in_any_schema("field4"));
3824 assert!(!metadata.name_exists_in_any_schema(""));
3825 }
3826
3827 #[test]
3828 fn test_name_exists_in_any_schema_empty_schemas() {
3829 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3830
3831 let metadata = TableMetadataBuilder::new(
3832 schema,
3833 PartitionSpec::unpartition_spec().into_unbound(),
3834 SortOrder::unsorted_order(),
3835 "s3://test/location".to_string(),
3836 FormatVersion::V2,
3837 HashMap::new(),
3838 )
3839 .unwrap()
3840 .build()
3841 .unwrap()
3842 .metadata;
3843
3844 assert!(!metadata.name_exists_in_any_schema("any_field"));
3845 }
3846
3847 #[test]
3848 fn test_helper_methods_multi_version_scenario() {
3849 let initial_schema = Schema::builder()
3851 .with_fields(vec![
3852 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3853 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3854 NestedField::required(
3855 3,
3856 "deprecated_field",
3857 Type::Primitive(PrimitiveType::String),
3858 )
3859 .into(),
3860 ])
3861 .build()
3862 .unwrap();
3863
3864 let metadata = TableMetadataBuilder::new(
3865 initial_schema,
3866 PartitionSpec::unpartition_spec().into_unbound(),
3867 SortOrder::unsorted_order(),
3868 "s3://test/location".to_string(),
3869 FormatVersion::V2,
3870 HashMap::new(),
3871 )
3872 .unwrap();
3873
3874 let evolved_schema = Schema::builder()
3875 .with_fields(vec![
3876 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3877 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3878 NestedField::required(
3879 3,
3880 "deprecated_field",
3881 Type::Primitive(PrimitiveType::String),
3882 )
3883 .into(),
3884 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3885 .into(),
3886 ])
3887 .build()
3888 .unwrap();
3889
3890 let _final_schema = Schema::builder()
3892 .with_fields(vec![
3893 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3894 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3895 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3896 .into(),
3897 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3898 .into(),
3899 ])
3900 .build()
3901 .unwrap();
3902
3903 let final_metadata = metadata
3904 .add_current_schema(evolved_schema)
3905 .unwrap()
3906 .build()
3907 .unwrap()
3908 .metadata;
3909
3910 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3917 }
3918
3919 #[test]
3920 fn test_invalid_sort_order_id_zero_with_fields() {
3921 let metadata = r#"
3922 {
3923 "format-version": 2,
3924 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3925 "location": "s3://bucket/test/location",
3926 "last-sequence-number": 111,
3927 "last-updated-ms": 1600000000000,
3928 "last-column-id": 3,
3929 "current-schema-id": 1,
3930 "schemas": [
3931 {
3932 "type": "struct",
3933 "schema-id": 1,
3934 "fields": [
3935 {"id": 1, "name": "x", "required": true, "type": "long"},
3936 {"id": 2, "name": "y", "required": true, "type": "long"}
3937 ]
3938 }
3939 ],
3940 "default-spec-id": 0,
3941 "partition-specs": [{"spec-id": 0, "fields": []}],
3942 "last-partition-id": 999,
3943 "default-sort-order-id": 0,
3944 "sort-orders": [
3945 {
3946 "order-id": 0,
3947 "fields": [
3948 {
3949 "transform": "identity",
3950 "source-id": 1,
3951 "direction": "asc",
3952 "null-order": "nulls-first"
3953 }
3954 ]
3955 }
3956 ],
3957 "properties": {},
3958 "current-snapshot-id": -1,
3959 "snapshots": []
3960 }
3961 "#;
3962
3963 let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3964
3965 assert!(
3967 result.is_err(),
3968 "Parsing should fail for sort order ID 0 with fields"
3969 );
3970 }
3971
3972 #[test]
3973 fn test_table_properties_with_defaults() {
3974 use crate::spec::TableProperties;
3975
3976 let schema = Schema::builder()
3977 .with_fields(vec![
3978 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3979 ])
3980 .build()
3981 .unwrap();
3982
3983 let metadata = TableMetadataBuilder::new(
3984 schema,
3985 PartitionSpec::unpartition_spec().into_unbound(),
3986 SortOrder::unsorted_order(),
3987 "s3://test/location".to_string(),
3988 FormatVersion::V2,
3989 HashMap::new(),
3990 )
3991 .unwrap()
3992 .build()
3993 .unwrap()
3994 .metadata;
3995
3996 let props = metadata.table_properties().unwrap();
3997
3998 assert_eq!(
3999 props.commit_num_retries,
4000 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
4001 );
4002 assert_eq!(
4003 props.write_target_file_size_bytes,
4004 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
4005 );
4006 }
4007
4008 #[test]
4009 fn test_table_properties_with_custom_values() {
4010 use crate::spec::TableProperties;
4011
4012 let schema = Schema::builder()
4013 .with_fields(vec![
4014 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4015 ])
4016 .build()
4017 .unwrap();
4018
4019 let properties = HashMap::from([
4020 (
4021 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
4022 "10".to_string(),
4023 ),
4024 (
4025 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
4026 "1024".to_string(),
4027 ),
4028 ]);
4029
4030 let metadata = TableMetadataBuilder::new(
4031 schema,
4032 PartitionSpec::unpartition_spec().into_unbound(),
4033 SortOrder::unsorted_order(),
4034 "s3://test/location".to_string(),
4035 FormatVersion::V2,
4036 properties,
4037 )
4038 .unwrap()
4039 .build()
4040 .unwrap()
4041 .metadata;
4042
4043 let props = metadata.table_properties().unwrap();
4044
4045 assert_eq!(props.commit_num_retries, 10);
4046 assert_eq!(props.write_target_file_size_bytes, 1024);
4047 }
4048
4049 #[test]
4050 fn test_table_properties_with_invalid_value() {
4051 let schema = Schema::builder()
4052 .with_fields(vec![
4053 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4054 ])
4055 .build()
4056 .unwrap();
4057
4058 let properties = HashMap::from([(
4059 "commit.retry.num-retries".to_string(),
4060 "not_a_number".to_string(),
4061 )]);
4062
4063 let metadata = TableMetadataBuilder::new(
4064 schema,
4065 PartitionSpec::unpartition_spec().into_unbound(),
4066 SortOrder::unsorted_order(),
4067 "s3://test/location".to_string(),
4068 FormatVersion::V2,
4069 properties,
4070 )
4071 .unwrap()
4072 .build()
4073 .unwrap()
4074 .metadata;
4075
4076 let err = metadata.table_properties().unwrap_err();
4077 assert_eq!(err.kind(), ErrorKind::DataInvalid);
4078 assert!(err.message().contains("Invalid table properties"));
4079 }
4080
4081 #[test]
4082 fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
4083 let schema = Schema::builder()
4085 .with_fields(vec![
4086 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4087 ])
4088 .build()
4089 .unwrap();
4090
4091 let v2_metadata = TableMetadataBuilder::new(
4092 schema,
4093 PartitionSpec::unpartition_spec().into_unbound(),
4094 SortOrder::unsorted_order(),
4095 "s3://bucket/test/location".to_string(),
4096 FormatVersion::V2,
4097 HashMap::new(),
4098 )
4099 .unwrap()
4100 .build()
4101 .unwrap()
4102 .metadata;
4103
4104 let snapshot = Snapshot::builder()
4106 .with_snapshot_id(1)
4107 .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4108 .with_sequence_number(1)
4109 .with_schema_id(0)
4110 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4111 .with_summary(Summary {
4112 operation: Operation::Append,
4113 additional_properties: HashMap::from([(
4114 "added-data-files".to_string(),
4115 "1".to_string(),
4116 )]),
4117 })
4118 .build();
4119
4120 let v2_with_snapshot = v2_metadata
4121 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4122 .add_snapshot(snapshot)
4123 .unwrap()
4124 .set_ref("main", SnapshotReference {
4125 snapshot_id: 1,
4126 retention: SnapshotRetention::Branch {
4127 min_snapshots_to_keep: None,
4128 max_snapshot_age_ms: None,
4129 max_ref_age_ms: None,
4130 },
4131 })
4132 .unwrap()
4133 .build()
4134 .unwrap()
4135 .metadata;
4136
4137 let v2_json = serde_json::to_string(&v2_with_snapshot);
4139 assert!(v2_json.is_ok(), "v2 serialization should work");
4140
4141 let v3_metadata = v2_with_snapshot
4143 .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4144 .upgrade_format_version(FormatVersion::V3)
4145 .unwrap()
4146 .build()
4147 .unwrap()
4148 .metadata;
4149
4150 assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4151 assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4152 assert_eq!(v3_metadata.snapshots.len(), 1);
4153
4154 let snapshot = v3_metadata.snapshots.values().next().unwrap();
4156 assert!(
4157 snapshot.row_range().is_none(),
4158 "Snapshot should have no row_range after upgrade"
4159 );
4160
4161 let v3_json = serde_json::to_string(&v3_metadata);
4163 assert!(
4164 v3_json.is_ok(),
4165 "v3 serialization should work for upgraded tables"
4166 );
4167
4168 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4170 assert_eq!(deserialized.format_version, FormatVersion::V3);
4171 assert_eq!(deserialized.snapshots.len(), 1);
4172
4173 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4175 assert!(
4176 deserialized_snapshot.row_range().is_none(),
4177 "Deserialized snapshot should have no row_range"
4178 );
4179 }
4180
4181 #[test]
4182 fn test_v3_snapshot_with_row_lineage_serialization() {
4183 let schema = Schema::builder()
4185 .with_fields(vec![
4186 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4187 ])
4188 .build()
4189 .unwrap();
4190
4191 let v3_metadata = TableMetadataBuilder::new(
4192 schema,
4193 PartitionSpec::unpartition_spec().into_unbound(),
4194 SortOrder::unsorted_order(),
4195 "s3://bucket/test/location".to_string(),
4196 FormatVersion::V3,
4197 HashMap::new(),
4198 )
4199 .unwrap()
4200 .build()
4201 .unwrap()
4202 .metadata;
4203
4204 let snapshot = Snapshot::builder()
4206 .with_snapshot_id(1)
4207 .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4208 .with_sequence_number(1)
4209 .with_schema_id(0)
4210 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4211 .with_summary(Summary {
4212 operation: Operation::Append,
4213 additional_properties: HashMap::from([(
4214 "added-data-files".to_string(),
4215 "1".to_string(),
4216 )]),
4217 })
4218 .with_row_range(100, 50) .build();
4220
4221 let v3_with_snapshot = v3_metadata
4222 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4223 .add_snapshot(snapshot)
4224 .unwrap()
4225 .set_ref("main", SnapshotReference {
4226 snapshot_id: 1,
4227 retention: SnapshotRetention::Branch {
4228 min_snapshots_to_keep: None,
4229 max_snapshot_age_ms: None,
4230 max_ref_age_ms: None,
4231 },
4232 })
4233 .unwrap()
4234 .build()
4235 .unwrap()
4236 .metadata;
4237
4238 let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4240 assert!(
4241 snapshot.row_range().is_some(),
4242 "Snapshot should have row_range"
4243 );
4244 let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4245 assert_eq!(first_row_id, 100);
4246 assert_eq!(added_rows, 50);
4247
4248 let v3_json = serde_json::to_string(&v3_with_snapshot);
4250 assert!(
4251 v3_json.is_ok(),
4252 "v3 serialization should work for snapshots with row lineage"
4253 );
4254
4255 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4257 assert_eq!(deserialized.format_version, FormatVersion::V3);
4258 assert_eq!(deserialized.snapshots.len(), 1);
4259
4260 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4262 assert!(
4263 deserialized_snapshot.row_range().is_some(),
4264 "Deserialized snapshot should have row_range"
4265 );
4266 let (deserialized_first_row_id, deserialized_added_rows) =
4267 deserialized_snapshot.row_range().unwrap();
4268 assert_eq!(deserialized_first_row_id, 100);
4269 assert_eq!(deserialized_added_rows, 50);
4270 }
4271}