1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38 TableProperties, parse_metadata_file_compression,
39};
40use crate::catalog::MetadataLocation;
41use crate::compression::CompressionCodec;
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
54pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
55
56pub const INITIAL_ROW_ID: u64 = 0;
58pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
60pub type TableMetadataRef = Arc<TableMetadata>;
62
63#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
64#[serde(try_from = "TableMetadataEnum")]
65pub struct TableMetadata {
70 pub(crate) format_version: FormatVersion,
72 pub(crate) table_uuid: Uuid,
74 pub(crate) location: String,
76 pub(crate) last_sequence_number: i64,
78 pub(crate) last_updated_ms: i64,
80 pub(crate) last_column_id: i32,
82 pub(crate) schemas: HashMap<i32, SchemaRef>,
84 pub(crate) current_schema_id: i32,
86 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
88 pub(crate) default_spec: PartitionSpecRef,
90 pub(crate) default_partition_type: StructType,
92 pub(crate) last_partition_id: i32,
94 pub(crate) properties: HashMap<String, String>,
98 pub(crate) current_snapshot_id: Option<i64>,
101 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
106 pub(crate) snapshot_log: Vec<SnapshotLog>,
113
114 pub(crate) metadata_log: Vec<MetadataLog>,
121
122 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
124 pub(crate) default_sort_order_id: i64,
128 pub(crate) refs: HashMap<String, SnapshotReference>,
133 pub(crate) statistics: HashMap<i64, StatisticsFile>,
135 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
137 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
139 pub(crate) next_row_id: u64,
141}
142
143impl TableMetadata {
144 #[must_use]
151 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
152 TableMetadataBuilder::new_from_metadata(self, current_file_location)
153 }
154
155 #[inline]
157 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
158 self.partition_specs
159 .values()
160 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
161 }
162
163 #[inline]
165 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
166 self.schemas
167 .values()
168 .any(|schema| schema.field_by_name(name).is_some())
169 }
170
171 #[inline]
173 pub fn format_version(&self) -> FormatVersion {
174 self.format_version
175 }
176
177 #[inline]
179 pub fn uuid(&self) -> Uuid {
180 self.table_uuid
181 }
182
183 #[inline]
185 pub fn location(&self) -> &str {
186 self.location.as_str()
187 }
188
189 #[inline]
191 pub fn last_sequence_number(&self) -> i64 {
192 self.last_sequence_number
193 }
194
195 #[inline]
200 pub fn next_sequence_number(&self) -> i64 {
201 match self.format_version {
202 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
203 _ => self.last_sequence_number + 1,
204 }
205 }
206
207 #[inline]
209 pub fn last_column_id(&self) -> i32 {
210 self.last_column_id
211 }
212
213 #[inline]
215 pub fn last_partition_id(&self) -> i32 {
216 self.last_partition_id
217 }
218
219 #[inline]
221 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
222 timestamp_ms_to_utc(self.last_updated_ms)
223 }
224
225 #[inline]
227 pub fn last_updated_ms(&self) -> i64 {
228 self.last_updated_ms
229 }
230
231 #[inline]
233 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
234 self.schemas.values()
235 }
236
237 #[inline]
239 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
240 self.schemas.get(&schema_id)
241 }
242
243 #[inline]
245 pub fn current_schema(&self) -> &SchemaRef {
246 self.schema_by_id(self.current_schema_id)
247 .expect("Current schema id set, but not found in table metadata")
248 }
249
250 #[inline]
252 pub fn current_schema_id(&self) -> SchemaId {
253 self.current_schema_id
254 }
255
256 #[inline]
258 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
259 self.partition_specs.values()
260 }
261
262 #[inline]
264 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
265 self.partition_specs.get(&spec_id)
266 }
267
268 #[inline]
270 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
271 &self.default_spec
272 }
273
274 #[inline]
276 pub fn default_partition_type(&self) -> &StructType {
277 &self.default_partition_type
278 }
279
280 #[inline]
281 pub fn default_partition_spec_id(&self) -> i32 {
283 self.default_spec.spec_id()
284 }
285
286 #[inline]
288 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
289 self.snapshots.values()
290 }
291
292 #[inline]
294 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
295 self.snapshots.get(&snapshot_id)
296 }
297
298 #[inline]
300 pub fn history(&self) -> &[SnapshotLog] {
301 &self.snapshot_log
302 }
303
304 #[inline]
306 pub fn metadata_log(&self) -> &[MetadataLog] {
307 &self.metadata_log
308 }
309
310 #[inline]
312 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
313 self.current_snapshot_id.map(|s| {
314 self.snapshot_by_id(s)
315 .expect("Current snapshot id has been set, but doesn't exist in metadata")
316 })
317 }
318
319 #[inline]
321 pub fn current_snapshot_id(&self) -> Option<i64> {
322 self.current_snapshot_id
323 }
324
325 #[inline]
328 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
329 self.refs.get(ref_name).map(|r| {
330 self.snapshot_by_id(r.snapshot_id)
331 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
332 })
333 }
334
335 #[inline]
337 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
338 self.sort_orders.values()
339 }
340
341 #[inline]
343 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
344 self.sort_orders.get(&sort_order_id)
345 }
346
347 #[inline]
349 pub fn default_sort_order(&self) -> &SortOrderRef {
350 self.sort_orders
351 .get(&self.default_sort_order_id)
352 .expect("Default order id has been set, but not found in table metadata!")
353 }
354
355 #[inline]
357 pub fn default_sort_order_id(&self) -> i64 {
358 self.default_sort_order_id
359 }
360
361 #[inline]
363 pub fn properties(&self) -> &HashMap<String, String> {
364 &self.properties
365 }
366
367 pub fn metadata_compression_codec(&self) -> Result<CompressionCodec> {
376 parse_metadata_file_compression(&self.properties)
377 }
378
379 pub fn table_properties(&self) -> Result<TableProperties> {
381 TableProperties::try_from(&self.properties).map_err(|e| {
382 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
383 })
384 }
385
386 #[inline]
388 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
389 self.statistics.values()
390 }
391
392 #[inline]
394 pub fn partition_statistics_iter(
395 &self,
396 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
397 self.partition_statistics.values()
398 }
399
400 #[inline]
402 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
403 self.statistics.get(&snapshot_id)
404 }
405
406 #[inline]
408 pub fn partition_statistics_for_snapshot(
409 &self,
410 snapshot_id: i64,
411 ) -> Option<&PartitionStatisticsFile> {
412 self.partition_statistics.get(&snapshot_id)
413 }
414
415 fn construct_refs(&mut self) {
416 if let Some(current_snapshot_id) = self.current_snapshot_id
417 && !self.refs.contains_key(MAIN_BRANCH)
418 {
419 self.refs
420 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
421 snapshot_id: current_snapshot_id,
422 retention: SnapshotRetention::Branch {
423 min_snapshots_to_keep: None,
424 max_snapshot_age_ms: None,
425 max_ref_age_ms: None,
426 },
427 });
428 }
429 }
430
431 #[inline]
433 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
434 self.encryption_keys.values()
435 }
436
437 #[inline]
439 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
440 self.encryption_keys.get(key_id)
441 }
442
443 #[inline]
445 pub fn next_row_id(&self) -> u64 {
446 self.next_row_id
447 }
448
449 pub async fn read_from(
451 file_io: &FileIO,
452 metadata_location: impl AsRef<str>,
453 ) -> Result<TableMetadata> {
454 let metadata_location = metadata_location.as_ref();
455 let input_file = file_io.new_input(metadata_location)?;
456 let metadata_content = input_file.read().await?;
457
458 let metadata = if metadata_content.len() > 2
460 && metadata_content[0] == 0x1F
461 && metadata_content[1] == 0x8B
462 {
463 let decompressed_data = CompressionCodec::gzip_default()
464 .decompress(metadata_content.to_vec())
465 .map_err(|e| {
466 Error::new(
467 ErrorKind::DataInvalid,
468 "Trying to read compressed metadata file",
469 )
470 .with_context("file_path", metadata_location)
471 .with_source(e)
472 })?;
473 serde_json::from_slice(&decompressed_data)?
474 } else {
475 serde_json::from_slice(&metadata_content)?
476 };
477
478 Ok(metadata)
479 }
480
481 pub async fn write_to(
483 &self,
484 file_io: &FileIO,
485 metadata_location: &MetadataLocation,
486 ) -> Result<()> {
487 let json_data = serde_json::to_vec(self)?;
488
489 let codec = parse_metadata_file_compression(&self.properties)?;
491
492 if codec != metadata_location.compression_codec() {
493 return Err(Error::new(
494 ErrorKind::DataInvalid,
495 format!(
496 "Compression codec mismatch: metadata_location has {:?}, but table properties specify {:?}",
497 metadata_location.compression_codec(),
498 codec
499 ),
500 ));
501 }
502
503 let data_to_write = match codec {
505 CompressionCodec::Gzip(_) => codec.compress(json_data)?,
506 CompressionCodec::None => json_data,
507 _ => {
508 return Err(Error::new(
509 ErrorKind::DataInvalid,
510 format!("Unsupported metadata compression codec: {codec:?}"),
511 ));
512 }
513 };
514
515 file_io
516 .new_output(metadata_location.to_string())?
517 .write(data_to_write.into())
518 .await
519 }
520
521 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
529 self.validate_current_schema()?;
530 self.normalize_current_snapshot()?;
531 self.construct_refs();
532 self.validate_refs()?;
533 self.validate_chronological_snapshot_logs()?;
534 self.validate_chronological_metadata_logs()?;
535 self.location = self.location.trim_end_matches('/').to_string();
537 self.validate_snapshot_sequence_number()?;
538 self.try_normalize_partition_spec()?;
539 self.try_normalize_sort_order()?;
540 Ok(self)
541 }
542
543 fn try_normalize_partition_spec(&mut self) -> Result<()> {
545 if self
546 .partition_spec_by_id(self.default_spec.spec_id())
547 .is_none()
548 {
549 self.partition_specs.insert(
550 self.default_spec.spec_id(),
551 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
552 );
553 }
554
555 Ok(())
556 }
557
558 fn try_normalize_sort_order(&mut self) -> Result<()> {
560 if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
562 && !sort_order.fields.is_empty()
563 {
564 return Err(Error::new(
565 ErrorKind::Unexpected,
566 format!(
567 "Sort order ID {} is reserved for unsorted order",
568 SortOrder::UNSORTED_ORDER_ID
569 ),
570 ));
571 }
572
573 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
574 return Ok(());
575 }
576
577 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
578 return Err(Error::new(
579 ErrorKind::DataInvalid,
580 format!(
581 "No sort order exists with the default sort order id {}.",
582 self.default_sort_order_id
583 ),
584 ));
585 }
586
587 let sort_order = SortOrder::unsorted_order();
588 self.sort_orders
589 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
590 Ok(())
591 }
592
593 fn validate_current_schema(&self) -> Result<()> {
595 if self.schema_by_id(self.current_schema_id).is_none() {
596 return Err(Error::new(
597 ErrorKind::DataInvalid,
598 format!(
599 "No schema exists with the current schema id {}.",
600 self.current_schema_id
601 ),
602 ));
603 }
604 Ok(())
605 }
606
607 fn normalize_current_snapshot(&mut self) -> Result<()> {
609 if let Some(current_snapshot_id) = self.current_snapshot_id {
610 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
611 self.current_snapshot_id = None;
612 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
613 return Err(Error::new(
614 ErrorKind::DataInvalid,
615 format!(
616 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
617 ),
618 ));
619 }
620 }
621 Ok(())
622 }
623
624 fn validate_refs(&self) -> Result<()> {
626 for (name, snapshot_ref) in self.refs.iter() {
627 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
628 return Err(Error::new(
629 ErrorKind::DataInvalid,
630 format!(
631 "Snapshot for reference {name} does not exist in the existing snapshots list"
632 ),
633 ));
634 }
635 }
636
637 let main_ref = self.refs.get(MAIN_BRANCH);
638 if self.current_snapshot_id.is_some() {
639 if let Some(main_ref) = main_ref
640 && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
641 {
642 return Err(Error::new(
643 ErrorKind::DataInvalid,
644 format!(
645 "Current snapshot id does not match main branch ({:?} != {:?})",
646 self.current_snapshot_id.unwrap_or_default(),
647 main_ref.snapshot_id
648 ),
649 ));
650 }
651 } else if main_ref.is_some() {
652 return Err(Error::new(
653 ErrorKind::DataInvalid,
654 "Current snapshot is not set, but main branch exists",
655 ));
656 }
657
658 Ok(())
659 }
660
661 fn validate_snapshot_sequence_number(&self) -> Result<()> {
663 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
664 return Err(Error::new(
665 ErrorKind::DataInvalid,
666 format!(
667 "Last sequence number must be 0 in v1. Found {}",
668 self.last_sequence_number
669 ),
670 ));
671 }
672
673 if self.format_version >= FormatVersion::V2
674 && let Some(snapshot) = self
675 .snapshots
676 .values()
677 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
678 {
679 return Err(Error::new(
680 ErrorKind::DataInvalid,
681 format!(
682 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
683 snapshot.snapshot_id(),
684 snapshot.sequence_number(),
685 self.last_sequence_number
686 ),
687 ));
688 }
689
690 Ok(())
691 }
692
693 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
695 for window in self.snapshot_log.windows(2) {
696 let (prev, curr) = (&window[0], &window[1]);
697 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
700 return Err(Error::new(
701 ErrorKind::DataInvalid,
702 "Expected sorted snapshot log entries",
703 ));
704 }
705 }
706
707 if let Some(last) = self.snapshot_log.last() {
708 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
711 return Err(Error::new(
712 ErrorKind::DataInvalid,
713 format!(
714 "Invalid update timestamp {}: before last snapshot log entry at {}",
715 self.last_updated_ms, last.timestamp_ms
716 ),
717 ));
718 }
719 }
720 Ok(())
721 }
722
723 fn validate_chronological_metadata_logs(&self) -> Result<()> {
724 for window in self.metadata_log.windows(2) {
725 let (prev, curr) = (&window[0], &window[1]);
726 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
729 return Err(Error::new(
730 ErrorKind::DataInvalid,
731 "Expected sorted metadata log entries",
732 ));
733 }
734 }
735
736 if let Some(last) = self.metadata_log.last() {
737 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
740 return Err(Error::new(
741 ErrorKind::DataInvalid,
742 format!(
743 "Invalid update timestamp {}: before last metadata log entry at {}",
744 self.last_updated_ms, last.timestamp_ms
745 ),
746 ));
747 }
748 }
749
750 Ok(())
751 }
752}
753
754pub(super) mod _serde {
755 use std::borrow::BorrowMut;
756 use std::collections::HashMap;
761 use std::sync::Arc;
766
767 use serde::{Deserialize, Serialize};
768 use uuid::Uuid;
769
770 use super::{
771 DEFAULT_PARTITION_SPEC_ID, EMPTY_SNAPSHOT_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
772 SnapshotLog, TableMetadata,
773 };
774 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
775 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
776 use crate::spec::{
777 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
778 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
779 SortOrder, StatisticsFile,
780 };
781 use crate::{Error, ErrorKind};
782
783 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
784 #[serde(untagged)]
785 pub(super) enum TableMetadataEnum {
786 V3(TableMetadataV3),
787 V2(TableMetadataV2),
788 V1(TableMetadataV1),
789 }
790
791 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
792 #[serde(rename_all = "kebab-case")]
793 pub(super) struct TableMetadataV3 {
795 pub format_version: VersionNumber<3>,
796 #[serde(flatten)]
797 pub shared: TableMetadataV2V3Shared,
798 pub next_row_id: u64,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 pub encryption_keys: Option<Vec<EncryptedKey>>,
801 #[serde(skip_serializing_if = "Option::is_none")]
802 pub snapshots: Option<Vec<SnapshotV3>>,
803 }
804
805 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
806 #[serde(rename_all = "kebab-case")]
807 pub(super) struct TableMetadataV2V3Shared {
809 pub table_uuid: Uuid,
810 pub location: String,
811 pub last_sequence_number: i64,
812 pub last_updated_ms: i64,
813 pub last_column_id: i32,
814 pub schemas: Vec<SchemaV2>,
815 pub current_schema_id: i32,
816 pub partition_specs: Vec<PartitionSpec>,
817 pub default_spec_id: i32,
818 pub last_partition_id: i32,
819 #[serde(skip_serializing_if = "Option::is_none")]
820 pub properties: Option<HashMap<String, String>>,
821 #[serde(skip_serializing_if = "Option::is_none")]
822 pub current_snapshot_id: Option<i64>,
823 #[serde(skip_serializing_if = "Option::is_none")]
824 pub snapshot_log: Option<Vec<SnapshotLog>>,
825 #[serde(skip_serializing_if = "Option::is_none")]
826 pub metadata_log: Option<Vec<MetadataLog>>,
827 pub sort_orders: Vec<SortOrder>,
828 pub default_sort_order_id: i64,
829 #[serde(skip_serializing_if = "Option::is_none")]
830 pub refs: Option<HashMap<String, SnapshotReference>>,
831 #[serde(default, skip_serializing_if = "Vec::is_empty")]
832 pub statistics: Vec<StatisticsFile>,
833 #[serde(default, skip_serializing_if = "Vec::is_empty")]
834 pub partition_statistics: Vec<PartitionStatisticsFile>,
835 }
836
837 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
838 #[serde(rename_all = "kebab-case")]
839 pub(super) struct TableMetadataV2 {
841 pub format_version: VersionNumber<2>,
842 #[serde(flatten)]
843 pub shared: TableMetadataV2V3Shared,
844 #[serde(skip_serializing_if = "Option::is_none")]
845 pub snapshots: Option<Vec<SnapshotV2>>,
846 }
847
848 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
849 #[serde(rename_all = "kebab-case")]
850 pub(super) struct TableMetadataV1 {
852 pub format_version: VersionNumber<1>,
853 #[serde(skip_serializing_if = "Option::is_none")]
854 pub table_uuid: Option<Uuid>,
855 pub location: String,
856 pub last_updated_ms: i64,
857 pub last_column_id: i32,
858 pub schema: Option<SchemaV1>,
860 #[serde(skip_serializing_if = "Option::is_none")]
861 pub schemas: Option<Vec<SchemaV1>>,
862 #[serde(skip_serializing_if = "Option::is_none")]
863 pub current_schema_id: Option<i32>,
864 pub partition_spec: Option<Vec<PartitionField>>,
866 #[serde(skip_serializing_if = "Option::is_none")]
867 pub partition_specs: Option<Vec<PartitionSpec>>,
868 #[serde(skip_serializing_if = "Option::is_none")]
869 pub default_spec_id: Option<i32>,
870 #[serde(skip_serializing_if = "Option::is_none")]
871 pub last_partition_id: Option<i32>,
872 #[serde(skip_serializing_if = "Option::is_none")]
873 pub properties: Option<HashMap<String, String>>,
874 #[serde(skip_serializing_if = "Option::is_none")]
875 pub current_snapshot_id: Option<i64>,
876 #[serde(skip_serializing_if = "Option::is_none")]
877 pub snapshots: Option<Vec<SnapshotV1>>,
878 #[serde(skip_serializing_if = "Option::is_none")]
879 pub snapshot_log: Option<Vec<SnapshotLog>>,
880 #[serde(skip_serializing_if = "Option::is_none")]
881 pub metadata_log: Option<Vec<MetadataLog>>,
882 pub sort_orders: Option<Vec<SortOrder>>,
883 pub default_sort_order_id: Option<i64>,
884 #[serde(default, skip_serializing_if = "Vec::is_empty")]
885 pub statistics: Vec<StatisticsFile>,
886 #[serde(default, skip_serializing_if = "Vec::is_empty")]
887 pub partition_statistics: Vec<PartitionStatisticsFile>,
888 }
889
890 #[derive(Debug, PartialEq, Eq)]
892 pub(crate) struct VersionNumber<const V: u8>;
893
894 impl Serialize for TableMetadata {
895 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
896 where S: serde::Serializer {
897 let table_metadata_enum: TableMetadataEnum =
899 self.clone().try_into().map_err(serde::ser::Error::custom)?;
900
901 table_metadata_enum.serialize(serializer)
902 }
903 }
904
905 impl<const V: u8> Serialize for VersionNumber<V> {
906 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
907 where S: serde::Serializer {
908 serializer.serialize_u8(V)
909 }
910 }
911
912 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
913 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
914 where D: serde::Deserializer<'de> {
915 let value = u8::deserialize(deserializer)?;
916 if value == V {
917 Ok(VersionNumber::<V>)
918 } else {
919 Err(serde::de::Error::custom("Invalid Version"))
920 }
921 }
922 }
923
924 impl TryFrom<TableMetadataEnum> for TableMetadata {
925 type Error = Error;
926 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
927 match value {
928 TableMetadataEnum::V3(value) => value.try_into(),
929 TableMetadataEnum::V2(value) => value.try_into(),
930 TableMetadataEnum::V1(value) => value.try_into(),
931 }
932 }
933 }
934
935 impl TryFrom<TableMetadata> for TableMetadataEnum {
936 type Error = Error;
937 fn try_from(value: TableMetadata) -> Result<Self, Error> {
938 Ok(match value.format_version {
939 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
940 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
941 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
942 })
943 }
944 }
945
946 impl TryFrom<TableMetadataV3> for TableMetadata {
947 type Error = Error;
948 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
949 let TableMetadataV3 {
950 format_version: _,
951 shared: value,
952 next_row_id,
953 encryption_keys,
954 snapshots,
955 } = value;
956 let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
957 None
958 } else {
959 value.current_snapshot_id
960 };
961 let schemas = HashMap::from_iter(
962 value
963 .schemas
964 .into_iter()
965 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
966 .collect::<Result<Vec<_>, Error>>()?,
967 );
968
969 let current_schema: &SchemaRef =
970 schemas.get(&value.current_schema_id).ok_or_else(|| {
971 Error::new(
972 ErrorKind::DataInvalid,
973 format!(
974 "No schema exists with the current schema id {}.",
975 value.current_schema_id
976 ),
977 )
978 })?;
979 let partition_specs = HashMap::from_iter(
980 value
981 .partition_specs
982 .into_iter()
983 .map(|x| (x.spec_id(), Arc::new(x))),
984 );
985 let default_spec_id = value.default_spec_id;
986 let default_spec: PartitionSpecRef = partition_specs
987 .get(&value.default_spec_id)
988 .map(|spec| (**spec).clone())
989 .or_else(|| {
990 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
991 .then(PartitionSpec::unpartition_spec)
992 })
993 .ok_or_else(|| {
994 Error::new(
995 ErrorKind::DataInvalid,
996 format!("Default partition spec {default_spec_id} not found"),
997 )
998 })?
999 .into();
1000 let default_partition_type = default_spec.partition_type(current_schema)?;
1001
1002 let mut metadata = TableMetadata {
1003 format_version: FormatVersion::V3,
1004 table_uuid: value.table_uuid,
1005 location: value.location,
1006 last_sequence_number: value.last_sequence_number,
1007 last_updated_ms: value.last_updated_ms,
1008 last_column_id: value.last_column_id,
1009 current_schema_id: value.current_schema_id,
1010 schemas,
1011 partition_specs,
1012 default_partition_type,
1013 default_spec,
1014 last_partition_id: value.last_partition_id,
1015 properties: value.properties.unwrap_or_default(),
1016 current_snapshot_id,
1017 snapshots: snapshots
1018 .map(|snapshots| {
1019 HashMap::from_iter(
1020 snapshots
1021 .into_iter()
1022 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1023 )
1024 })
1025 .unwrap_or_default(),
1026 snapshot_log: value.snapshot_log.unwrap_or_default(),
1027 metadata_log: value.metadata_log.unwrap_or_default(),
1028 sort_orders: HashMap::from_iter(
1029 value
1030 .sort_orders
1031 .into_iter()
1032 .map(|x| (x.order_id, Arc::new(x))),
1033 ),
1034 default_sort_order_id: value.default_sort_order_id,
1035 refs: value.refs.unwrap_or_else(|| {
1036 if let Some(snapshot_id) = current_snapshot_id {
1037 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1038 snapshot_id,
1039 retention: SnapshotRetention::Branch {
1040 min_snapshots_to_keep: None,
1041 max_snapshot_age_ms: None,
1042 max_ref_age_ms: None,
1043 },
1044 })])
1045 } else {
1046 HashMap::new()
1047 }
1048 }),
1049 statistics: index_statistics(value.statistics),
1050 partition_statistics: index_partition_statistics(value.partition_statistics),
1051 encryption_keys: encryption_keys
1052 .map(|keys| {
1053 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1054 })
1055 .unwrap_or_default(),
1056 next_row_id,
1057 };
1058
1059 metadata.borrow_mut().try_normalize()?;
1060 Ok(metadata)
1061 }
1062 }
1063
1064 impl TryFrom<TableMetadataV2> for TableMetadata {
1065 type Error = Error;
1066 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1067 let snapshots = value.snapshots;
1068 let value = value.shared;
1069 let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
1070 None
1071 } else {
1072 value.current_snapshot_id
1073 };
1074 let schemas = HashMap::from_iter(
1075 value
1076 .schemas
1077 .into_iter()
1078 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1079 .collect::<Result<Vec<_>, Error>>()?,
1080 );
1081
1082 let current_schema: &SchemaRef =
1083 schemas.get(&value.current_schema_id).ok_or_else(|| {
1084 Error::new(
1085 ErrorKind::DataInvalid,
1086 format!(
1087 "No schema exists with the current schema id {}.",
1088 value.current_schema_id
1089 ),
1090 )
1091 })?;
1092 let partition_specs = HashMap::from_iter(
1093 value
1094 .partition_specs
1095 .into_iter()
1096 .map(|x| (x.spec_id(), Arc::new(x))),
1097 );
1098 let default_spec_id = value.default_spec_id;
1099 let default_spec: PartitionSpecRef = partition_specs
1100 .get(&value.default_spec_id)
1101 .map(|spec| (**spec).clone())
1102 .or_else(|| {
1103 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1104 .then(PartitionSpec::unpartition_spec)
1105 })
1106 .ok_or_else(|| {
1107 Error::new(
1108 ErrorKind::DataInvalid,
1109 format!("Default partition spec {default_spec_id} not found"),
1110 )
1111 })?
1112 .into();
1113 let default_partition_type = default_spec.partition_type(current_schema)?;
1114
1115 let mut metadata = TableMetadata {
1116 format_version: FormatVersion::V2,
1117 table_uuid: value.table_uuid,
1118 location: value.location,
1119 last_sequence_number: value.last_sequence_number,
1120 last_updated_ms: value.last_updated_ms,
1121 last_column_id: value.last_column_id,
1122 current_schema_id: value.current_schema_id,
1123 schemas,
1124 partition_specs,
1125 default_partition_type,
1126 default_spec,
1127 last_partition_id: value.last_partition_id,
1128 properties: value.properties.unwrap_or_default(),
1129 current_snapshot_id,
1130 snapshots: snapshots
1131 .map(|snapshots| {
1132 HashMap::from_iter(
1133 snapshots
1134 .into_iter()
1135 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1136 )
1137 })
1138 .unwrap_or_default(),
1139 snapshot_log: value.snapshot_log.unwrap_or_default(),
1140 metadata_log: value.metadata_log.unwrap_or_default(),
1141 sort_orders: HashMap::from_iter(
1142 value
1143 .sort_orders
1144 .into_iter()
1145 .map(|x| (x.order_id, Arc::new(x))),
1146 ),
1147 default_sort_order_id: value.default_sort_order_id,
1148 refs: value.refs.unwrap_or_else(|| {
1149 if let Some(snapshot_id) = current_snapshot_id {
1150 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1151 snapshot_id,
1152 retention: SnapshotRetention::Branch {
1153 min_snapshots_to_keep: None,
1154 max_snapshot_age_ms: None,
1155 max_ref_age_ms: None,
1156 },
1157 })])
1158 } else {
1159 HashMap::new()
1160 }
1161 }),
1162 statistics: index_statistics(value.statistics),
1163 partition_statistics: index_partition_statistics(value.partition_statistics),
1164 encryption_keys: HashMap::new(),
1165 next_row_id: INITIAL_ROW_ID,
1166 };
1167
1168 metadata.borrow_mut().try_normalize()?;
1169 Ok(metadata)
1170 }
1171 }
1172
1173 impl TryFrom<TableMetadataV1> for TableMetadata {
1174 type Error = Error;
1175 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1176 let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
1177 None
1178 } else {
1179 value.current_snapshot_id
1180 };
1181
1182 let (schemas, current_schema_id, current_schema) =
1183 if let (Some(schemas_vec), Some(schema_id)) =
1184 (&value.schemas, value.current_schema_id)
1185 {
1186 let schema_map = HashMap::from_iter(
1188 schemas_vec
1189 .clone()
1190 .into_iter()
1191 .map(|schema| {
1192 let schema: Schema = schema.try_into()?;
1193 Ok((schema.schema_id(), Arc::new(schema)))
1194 })
1195 .collect::<Result<Vec<_>, Error>>()?,
1196 );
1197
1198 let schema = schema_map
1199 .get(&schema_id)
1200 .ok_or_else(|| {
1201 Error::new(
1202 ErrorKind::DataInvalid,
1203 format!("No schema exists with the current schema id {schema_id}."),
1204 )
1205 })?
1206 .clone();
1207 (schema_map, schema_id, schema)
1208 } else if let Some(schema) = value.schema {
1209 let schema: Schema = schema.try_into()?;
1211 let schema_id = schema.schema_id();
1212 let schema_arc = Arc::new(schema);
1213 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1214 (schema_map, schema_id, schema_arc)
1215 } else {
1216 return Err(Error::new(
1218 ErrorKind::DataInvalid,
1219 "No valid schema configuration found in table metadata",
1220 ));
1221 };
1222
1223 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1225 specs_vec
1227 .into_iter()
1228 .map(|x| (x.spec_id(), Arc::new(x)))
1229 .collect::<HashMap<_, _>>()
1230 } else if let Some(partition_spec) = value.partition_spec {
1231 let spec = PartitionSpec::builder(current_schema.clone())
1233 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1234 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1235 .build()?;
1236
1237 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1238 } else {
1239 let spec = PartitionSpec::builder(current_schema.clone())
1241 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1242 .build()?;
1243
1244 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1245 };
1246
1247 let default_spec_id = value
1249 .default_spec_id
1250 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1251
1252 let default_spec: PartitionSpecRef = partition_specs
1254 .get(&default_spec_id)
1255 .map(|x| Arc::unwrap_or_clone(x.clone()))
1256 .ok_or_else(|| {
1257 Error::new(
1258 ErrorKind::DataInvalid,
1259 format!("Default partition spec {default_spec_id} not found"),
1260 )
1261 })?
1262 .into();
1263 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1264
1265 let mut metadata = TableMetadata {
1266 format_version: FormatVersion::V1,
1267 table_uuid: value.table_uuid.unwrap_or_default(),
1268 location: value.location,
1269 last_sequence_number: 0,
1270 last_updated_ms: value.last_updated_ms,
1271 last_column_id: value.last_column_id,
1272 current_schema_id,
1273 default_spec,
1274 default_partition_type,
1275 last_partition_id: value
1276 .last_partition_id
1277 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1278 partition_specs,
1279 schemas,
1280 properties: value.properties.unwrap_or_default(),
1281 current_snapshot_id,
1282 snapshots: value
1283 .snapshots
1284 .map(|snapshots| {
1285 Ok::<_, Error>(HashMap::from_iter(
1286 snapshots
1287 .into_iter()
1288 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1289 .collect::<Result<Vec<_>, Error>>()?,
1290 ))
1291 })
1292 .transpose()?
1293 .unwrap_or_default(),
1294 snapshot_log: value.snapshot_log.unwrap_or_default(),
1295 metadata_log: value.metadata_log.unwrap_or_default(),
1296 sort_orders: match value.sort_orders {
1297 Some(sort_orders) => HashMap::from_iter(
1298 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1299 ),
1300 None => HashMap::new(),
1301 },
1302 default_sort_order_id: value
1303 .default_sort_order_id
1304 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1305 refs: if let Some(snapshot_id) = current_snapshot_id {
1306 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1307 snapshot_id,
1308 retention: SnapshotRetention::Branch {
1309 min_snapshots_to_keep: None,
1310 max_snapshot_age_ms: None,
1311 max_ref_age_ms: None,
1312 },
1313 })])
1314 } else {
1315 HashMap::new()
1316 },
1317 statistics: index_statistics(value.statistics),
1318 partition_statistics: index_partition_statistics(value.partition_statistics),
1319 encryption_keys: HashMap::new(),
1320 next_row_id: INITIAL_ROW_ID, };
1322
1323 metadata.borrow_mut().try_normalize()?;
1324 Ok(metadata)
1325 }
1326 }
1327
1328 impl TryFrom<TableMetadata> for TableMetadataV3 {
1329 type Error = Error;
1330
1331 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1332 let next_row_id = v.next_row_id;
1333 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1334 let snapshots = std::mem::take(&mut v.snapshots);
1335 let shared = v.into();
1336
1337 Ok(TableMetadataV3 {
1338 format_version: VersionNumber::<3>,
1339 shared,
1340 next_row_id,
1341 encryption_keys: if encryption_keys.is_empty() {
1342 None
1343 } else {
1344 Some(encryption_keys.into_values().collect())
1345 },
1346 snapshots: if snapshots.is_empty() {
1347 None
1348 } else {
1349 Some(
1350 snapshots
1351 .into_values()
1352 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1353 .collect::<Result<_, _>>()?,
1354 )
1355 },
1356 })
1357 }
1358 }
1359
1360 impl From<TableMetadata> for TableMetadataV2 {
1361 fn from(mut v: TableMetadata) -> Self {
1362 let snapshots = std::mem::take(&mut v.snapshots);
1363 let shared = v.into();
1364
1365 TableMetadataV2 {
1366 format_version: VersionNumber::<2>,
1367 shared,
1368 snapshots: if snapshots.is_empty() {
1369 None
1370 } else {
1371 Some(
1372 snapshots
1373 .into_values()
1374 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1375 .collect(),
1376 )
1377 },
1378 }
1379 }
1380 }
1381
1382 impl From<TableMetadata> for TableMetadataV2V3Shared {
1383 fn from(v: TableMetadata) -> Self {
1384 TableMetadataV2V3Shared {
1385 table_uuid: v.table_uuid,
1386 location: v.location,
1387 last_sequence_number: v.last_sequence_number,
1388 last_updated_ms: v.last_updated_ms,
1389 last_column_id: v.last_column_id,
1390 schemas: v
1391 .schemas
1392 .into_values()
1393 .map(|x| {
1394 Arc::try_unwrap(x)
1395 .unwrap_or_else(|schema| schema.as_ref().clone())
1396 .into()
1397 })
1398 .collect(),
1399 current_schema_id: v.current_schema_id,
1400 partition_specs: v
1401 .partition_specs
1402 .into_values()
1403 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1404 .collect(),
1405 default_spec_id: v.default_spec.spec_id(),
1406 last_partition_id: v.last_partition_id,
1407 properties: if v.properties.is_empty() {
1408 None
1409 } else {
1410 Some(v.properties)
1411 },
1412 current_snapshot_id: v.current_snapshot_id,
1413 snapshot_log: if v.snapshot_log.is_empty() {
1414 None
1415 } else {
1416 Some(v.snapshot_log)
1417 },
1418 metadata_log: if v.metadata_log.is_empty() {
1419 None
1420 } else {
1421 Some(v.metadata_log)
1422 },
1423 sort_orders: v
1424 .sort_orders
1425 .into_values()
1426 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1427 .collect(),
1428 default_sort_order_id: v.default_sort_order_id,
1429 refs: Some(v.refs),
1430 statistics: v.statistics.into_values().collect(),
1431 partition_statistics: v.partition_statistics.into_values().collect(),
1432 }
1433 }
1434 }
1435
1436 impl TryFrom<TableMetadata> for TableMetadataV1 {
1437 type Error = Error;
1438 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1439 Ok(TableMetadataV1 {
1440 format_version: VersionNumber::<1>,
1441 table_uuid: Some(v.table_uuid),
1442 location: v.location,
1443 last_updated_ms: v.last_updated_ms,
1444 last_column_id: v.last_column_id,
1445 schema: Some(
1446 v.schemas
1447 .get(&v.current_schema_id)
1448 .ok_or(Error::new(
1449 ErrorKind::Unexpected,
1450 "current_schema_id not found in schemas",
1451 ))?
1452 .as_ref()
1453 .clone()
1454 .into(),
1455 ),
1456 schemas: Some(
1457 v.schemas
1458 .into_values()
1459 .map(|x| {
1460 Arc::try_unwrap(x)
1461 .unwrap_or_else(|schema| schema.as_ref().clone())
1462 .into()
1463 })
1464 .collect(),
1465 ),
1466 current_schema_id: Some(v.current_schema_id),
1467 partition_spec: Some(v.default_spec.fields().to_vec()),
1468 partition_specs: Some(
1469 v.partition_specs
1470 .into_values()
1471 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1472 .collect(),
1473 ),
1474 default_spec_id: Some(v.default_spec.spec_id()),
1475 last_partition_id: Some(v.last_partition_id),
1476 properties: if v.properties.is_empty() {
1477 None
1478 } else {
1479 Some(v.properties)
1480 },
1481 current_snapshot_id: v.current_snapshot_id,
1482 snapshots: if v.snapshots.is_empty() {
1483 None
1484 } else {
1485 Some(
1486 v.snapshots
1487 .into_values()
1488 .map(|x| Snapshot::clone(&x).into())
1489 .collect(),
1490 )
1491 },
1492 snapshot_log: if v.snapshot_log.is_empty() {
1493 None
1494 } else {
1495 Some(v.snapshot_log)
1496 },
1497 metadata_log: if v.metadata_log.is_empty() {
1498 None
1499 } else {
1500 Some(v.metadata_log)
1501 },
1502 sort_orders: Some(
1503 v.sort_orders
1504 .into_values()
1505 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1506 .collect(),
1507 ),
1508 default_sort_order_id: Some(v.default_sort_order_id),
1509 statistics: v.statistics.into_values().collect(),
1510 partition_statistics: v.partition_statistics.into_values().collect(),
1511 })
1512 }
1513 }
1514
1515 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1516 statistics
1517 .into_iter()
1518 .rev()
1519 .map(|s| (s.snapshot_id, s))
1520 .collect()
1521 }
1522
1523 fn index_partition_statistics(
1524 statistics: Vec<PartitionStatisticsFile>,
1525 ) -> HashMap<i64, PartitionStatisticsFile> {
1526 statistics
1527 .into_iter()
1528 .rev()
1529 .map(|s| (s.snapshot_id, s))
1530 .collect()
1531 }
1532}
1533
1534#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1535#[repr(u8)]
1536pub enum FormatVersion {
1538 V1 = 1u8,
1540 V2 = 2u8,
1542 V3 = 3u8,
1544}
1545
1546impl PartialOrd for FormatVersion {
1547 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1548 Some(self.cmp(other))
1549 }
1550}
1551
1552impl Ord for FormatVersion {
1553 fn cmp(&self, other: &Self) -> Ordering {
1554 (*self as u8).cmp(&(*other as u8))
1555 }
1556}
1557
1558impl Display for FormatVersion {
1559 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1560 match self {
1561 FormatVersion::V1 => write!(f, "v1"),
1562 FormatVersion::V2 => write!(f, "v2"),
1563 FormatVersion::V3 => write!(f, "v3"),
1564 }
1565 }
1566}
1567
1568#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1569#[serde(rename_all = "kebab-case")]
1570pub struct MetadataLog {
1572 pub metadata_file: String,
1574 pub timestamp_ms: i64,
1576}
1577
1578#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1579#[serde(rename_all = "kebab-case")]
1580pub struct SnapshotLog {
1582 pub snapshot_id: i64,
1584 pub timestamp_ms: i64,
1586}
1587
1588impl SnapshotLog {
1589 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1591 timestamp_ms_to_utc(self.timestamp_ms)
1592 }
1593
1594 #[inline]
1596 pub fn timestamp_ms(&self) -> i64 {
1597 self.timestamp_ms
1598 }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603 use std::collections::HashMap;
1604 use std::fs;
1605 use std::sync::Arc;
1606
1607 use anyhow::Result;
1608 use base64::Engine as _;
1609 use pretty_assertions::assert_eq;
1610 use tempfile::TempDir;
1611 use uuid::Uuid;
1612
1613 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1614 use crate::catalog::MetadataLocation;
1615 use crate::compression::CompressionCodec;
1616 use crate::io::FileIO;
1617 use crate::spec::table_metadata::TableMetadata;
1618 use crate::spec::{
1619 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1620 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1621 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1622 Summary, TableProperties, Transform, Type, UnboundPartitionField,
1623 };
1624 use crate::{ErrorKind, TableCreation};
1625
1626 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1627 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1628 assert_eq!(desered_type, expected_type);
1629
1630 let sered_json = serde_json::to_string(&expected_type).unwrap();
1631 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1632
1633 assert_eq!(parsed_json_value, desered_type);
1634 }
1635
1636 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1637 let path = format!("testdata/table_metadata/{file_name}");
1638 let metadata: String = fs::read_to_string(path).unwrap();
1639
1640 serde_json::from_str(&metadata).unwrap()
1641 }
1642
1643 #[test]
1644 fn test_table_data_v2() {
1645 let data = r#"
1646 {
1647 "format-version" : 2,
1648 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1649 "location": "s3://b/wh/data.db/table",
1650 "last-sequence-number" : 1,
1651 "last-updated-ms": 1515100955770,
1652 "last-column-id": 1,
1653 "schemas": [
1654 {
1655 "schema-id" : 1,
1656 "type" : "struct",
1657 "fields" :[
1658 {
1659 "id": 1,
1660 "name": "struct_name",
1661 "required": true,
1662 "type": "fixed[1]"
1663 },
1664 {
1665 "id": 4,
1666 "name": "ts",
1667 "required": true,
1668 "type": "timestamp"
1669 }
1670 ]
1671 }
1672 ],
1673 "current-schema-id" : 1,
1674 "partition-specs": [
1675 {
1676 "spec-id": 0,
1677 "fields": [
1678 {
1679 "source-id": 4,
1680 "field-id": 1000,
1681 "name": "ts_day",
1682 "transform": "day"
1683 }
1684 ]
1685 }
1686 ],
1687 "default-spec-id": 0,
1688 "last-partition-id": 1000,
1689 "properties": {
1690 "commit.retry.num-retries": "1"
1691 },
1692 "metadata-log": [
1693 {
1694 "metadata-file": "s3://bucket/.../v1.json",
1695 "timestamp-ms": 1515100
1696 }
1697 ],
1698 "refs": {},
1699 "sort-orders": [
1700 {
1701 "order-id": 0,
1702 "fields": []
1703 }
1704 ],
1705 "default-sort-order-id": 0
1706 }
1707 "#;
1708
1709 let schema = Schema::builder()
1710 .with_schema_id(1)
1711 .with_fields(vec![
1712 Arc::new(NestedField::required(
1713 1,
1714 "struct_name",
1715 Type::Primitive(PrimitiveType::Fixed(1)),
1716 )),
1717 Arc::new(NestedField::required(
1718 4,
1719 "ts",
1720 Type::Primitive(PrimitiveType::Timestamp),
1721 )),
1722 ])
1723 .build()
1724 .unwrap();
1725
1726 let partition_spec = PartitionSpec::builder(schema.clone())
1727 .with_spec_id(0)
1728 .add_unbound_field(UnboundPartitionField {
1729 name: "ts_day".to_string(),
1730 transform: Transform::Day,
1731 source_id: 4,
1732 field_id: Some(1000),
1733 })
1734 .unwrap()
1735 .build()
1736 .unwrap();
1737
1738 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1739 let expected = TableMetadata {
1740 format_version: FormatVersion::V2,
1741 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1742 location: "s3://b/wh/data.db/table".to_string(),
1743 last_updated_ms: 1515100955770,
1744 last_column_id: 1,
1745 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1746 current_schema_id: 1,
1747 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1748 default_partition_type,
1749 default_spec: partition_spec.into(),
1750 last_partition_id: 1000,
1751 default_sort_order_id: 0,
1752 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1753 snapshots: HashMap::default(),
1754 current_snapshot_id: None,
1755 last_sequence_number: 1,
1756 properties: HashMap::from_iter(vec![(
1757 "commit.retry.num-retries".to_string(),
1758 "1".to_string(),
1759 )]),
1760 snapshot_log: Vec::new(),
1761 metadata_log: vec![MetadataLog {
1762 metadata_file: "s3://bucket/.../v1.json".to_string(),
1763 timestamp_ms: 1515100,
1764 }],
1765 refs: HashMap::new(),
1766 statistics: HashMap::new(),
1767 partition_statistics: HashMap::new(),
1768 encryption_keys: HashMap::new(),
1769 next_row_id: INITIAL_ROW_ID,
1770 };
1771
1772 let expected_json_value = serde_json::to_value(&expected).unwrap();
1773 check_table_metadata_serde(data, expected);
1774
1775 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1776 assert_eq!(json_value, expected_json_value);
1777 }
1778
1779 #[test]
1780 fn test_table_data_v3() {
1781 let data = r#"
1782 {
1783 "format-version" : 3,
1784 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1785 "location": "s3://b/wh/data.db/table",
1786 "last-sequence-number" : 1,
1787 "last-updated-ms": 1515100955770,
1788 "last-column-id": 1,
1789 "next-row-id": 5,
1790 "schemas": [
1791 {
1792 "schema-id" : 1,
1793 "type" : "struct",
1794 "fields" :[
1795 {
1796 "id": 4,
1797 "name": "ts",
1798 "required": true,
1799 "type": "timestamp"
1800 }
1801 ]
1802 }
1803 ],
1804 "current-schema-id" : 1,
1805 "partition-specs": [
1806 {
1807 "spec-id": 0,
1808 "fields": [
1809 {
1810 "source-id": 4,
1811 "field-id": 1000,
1812 "name": "ts_day",
1813 "transform": "day"
1814 }
1815 ]
1816 }
1817 ],
1818 "default-spec-id": 0,
1819 "last-partition-id": 1000,
1820 "properties": {
1821 "commit.retry.num-retries": "1"
1822 },
1823 "metadata-log": [
1824 {
1825 "metadata-file": "s3://bucket/.../v1.json",
1826 "timestamp-ms": 1515100
1827 }
1828 ],
1829 "refs": {},
1830 "snapshots" : [ {
1831 "snapshot-id" : 1,
1832 "timestamp-ms" : 1662532818843,
1833 "sequence-number" : 0,
1834 "first-row-id" : 0,
1835 "added-rows" : 4,
1836 "key-id" : "key1",
1837 "summary" : {
1838 "operation" : "append"
1839 },
1840 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1841 "schema-id" : 0
1842 }
1843 ],
1844 "encryption-keys": [
1845 {
1846 "key-id": "key1",
1847 "encrypted-by-id": "KMS",
1848 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1849 "properties": {
1850 "p1": "v1"
1851 }
1852 }
1853 ],
1854 "sort-orders": [
1855 {
1856 "order-id": 0,
1857 "fields": []
1858 }
1859 ],
1860 "default-sort-order-id": 0
1861 }
1862 "#;
1863
1864 let schema = Schema::builder()
1865 .with_schema_id(1)
1866 .with_fields(vec![Arc::new(NestedField::required(
1867 4,
1868 "ts",
1869 Type::Primitive(PrimitiveType::Timestamp),
1870 ))])
1871 .build()
1872 .unwrap();
1873
1874 let partition_spec = PartitionSpec::builder(schema.clone())
1875 .with_spec_id(0)
1876 .add_unbound_field(UnboundPartitionField {
1877 name: "ts_day".to_string(),
1878 transform: Transform::Day,
1879 source_id: 4,
1880 field_id: Some(1000),
1881 })
1882 .unwrap()
1883 .build()
1884 .unwrap();
1885
1886 let snapshot = Snapshot::builder()
1887 .with_snapshot_id(1)
1888 .with_timestamp_ms(1662532818843)
1889 .with_sequence_number(0)
1890 .with_row_range(0, 4)
1891 .with_encryption_key_id(Some("key1".to_string()))
1892 .with_summary(Summary {
1893 operation: Operation::Append,
1894 additional_properties: HashMap::new(),
1895 })
1896 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1897 .with_schema_id(0)
1898 .build();
1899
1900 let encryption_key = EncryptedKey::builder()
1901 .key_id("key1".to_string())
1902 .encrypted_by_id("KMS".to_string())
1903 .encrypted_key_metadata(
1904 base64::prelude::BASE64_STANDARD
1905 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1906 .unwrap(),
1907 )
1908 .properties(HashMap::from_iter(vec![(
1909 "p1".to_string(),
1910 "v1".to_string(),
1911 )]))
1912 .build();
1913
1914 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1915 let expected = TableMetadata {
1916 format_version: FormatVersion::V3,
1917 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1918 location: "s3://b/wh/data.db/table".to_string(),
1919 last_updated_ms: 1515100955770,
1920 last_column_id: 1,
1921 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1922 current_schema_id: 1,
1923 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1924 default_partition_type,
1925 default_spec: partition_spec.into(),
1926 last_partition_id: 1000,
1927 default_sort_order_id: 0,
1928 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1929 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1930 current_snapshot_id: None,
1931 last_sequence_number: 1,
1932 properties: HashMap::from_iter(vec![(
1933 "commit.retry.num-retries".to_string(),
1934 "1".to_string(),
1935 )]),
1936 snapshot_log: Vec::new(),
1937 metadata_log: vec![MetadataLog {
1938 metadata_file: "s3://bucket/.../v1.json".to_string(),
1939 timestamp_ms: 1515100,
1940 }],
1941 refs: HashMap::new(),
1942 statistics: HashMap::new(),
1943 partition_statistics: HashMap::new(),
1944 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1945 next_row_id: 5,
1946 };
1947
1948 let expected_json_value = serde_json::to_value(&expected).unwrap();
1949 check_table_metadata_serde(data, expected);
1950
1951 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1952 assert_eq!(json_value, expected_json_value);
1953 }
1954
1955 #[test]
1956 fn test_table_data_v1() {
1957 let data = r#"
1958 {
1959 "format-version" : 1,
1960 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1961 "location" : "/home/iceberg/warehouse/nyc/taxis",
1962 "last-updated-ms" : 1662532818843,
1963 "last-column-id" : 5,
1964 "schema" : {
1965 "type" : "struct",
1966 "schema-id" : 0,
1967 "fields" : [ {
1968 "id" : 1,
1969 "name" : "vendor_id",
1970 "required" : false,
1971 "type" : "long"
1972 }, {
1973 "id" : 2,
1974 "name" : "trip_id",
1975 "required" : false,
1976 "type" : "long"
1977 }, {
1978 "id" : 3,
1979 "name" : "trip_distance",
1980 "required" : false,
1981 "type" : "float"
1982 }, {
1983 "id" : 4,
1984 "name" : "fare_amount",
1985 "required" : false,
1986 "type" : "double"
1987 }, {
1988 "id" : 5,
1989 "name" : "store_and_fwd_flag",
1990 "required" : false,
1991 "type" : "string"
1992 } ]
1993 },
1994 "partition-spec" : [ {
1995 "name" : "vendor_id",
1996 "transform" : "identity",
1997 "source-id" : 1,
1998 "field-id" : 1000
1999 } ],
2000 "last-partition-id" : 1000,
2001 "default-sort-order-id" : 0,
2002 "sort-orders" : [ {
2003 "order-id" : 0,
2004 "fields" : [ ]
2005 } ],
2006 "properties" : {
2007 "owner" : "root"
2008 },
2009 "current-snapshot-id" : 638933773299822130,
2010 "refs" : {
2011 "main" : {
2012 "snapshot-id" : 638933773299822130,
2013 "type" : "branch"
2014 }
2015 },
2016 "snapshots" : [ {
2017 "snapshot-id" : 638933773299822130,
2018 "timestamp-ms" : 1662532818843,
2019 "sequence-number" : 0,
2020 "summary" : {
2021 "operation" : "append",
2022 "spark.app.id" : "local-1662532784305",
2023 "added-data-files" : "4",
2024 "added-records" : "4",
2025 "added-files-size" : "6001"
2026 },
2027 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2028 "schema-id" : 0
2029 } ],
2030 "snapshot-log" : [ {
2031 "timestamp-ms" : 1662532818843,
2032 "snapshot-id" : 638933773299822130
2033 } ],
2034 "metadata-log" : [ {
2035 "timestamp-ms" : 1662532805245,
2036 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
2037 } ]
2038 }
2039 "#;
2040
2041 let schema = Schema::builder()
2042 .with_fields(vec![
2043 Arc::new(NestedField::optional(
2044 1,
2045 "vendor_id",
2046 Type::Primitive(PrimitiveType::Long),
2047 )),
2048 Arc::new(NestedField::optional(
2049 2,
2050 "trip_id",
2051 Type::Primitive(PrimitiveType::Long),
2052 )),
2053 Arc::new(NestedField::optional(
2054 3,
2055 "trip_distance",
2056 Type::Primitive(PrimitiveType::Float),
2057 )),
2058 Arc::new(NestedField::optional(
2059 4,
2060 "fare_amount",
2061 Type::Primitive(PrimitiveType::Double),
2062 )),
2063 Arc::new(NestedField::optional(
2064 5,
2065 "store_and_fwd_flag",
2066 Type::Primitive(PrimitiveType::String),
2067 )),
2068 ])
2069 .build()
2070 .unwrap();
2071
2072 let schema = Arc::new(schema);
2073 let partition_spec = PartitionSpec::builder(schema.clone())
2074 .with_spec_id(0)
2075 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2076 .unwrap()
2077 .build()
2078 .unwrap();
2079
2080 let sort_order = SortOrder::builder()
2081 .with_order_id(0)
2082 .build_unbound()
2083 .unwrap();
2084
2085 let snapshot = Snapshot::builder()
2086 .with_snapshot_id(638933773299822130)
2087 .with_timestamp_ms(1662532818843)
2088 .with_sequence_number(0)
2089 .with_schema_id(0)
2090 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2091 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2092 .build();
2093
2094 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2095 let expected = TableMetadata {
2096 format_version: FormatVersion::V1,
2097 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2098 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2099 last_updated_ms: 1662532818843,
2100 last_column_id: 5,
2101 schemas: HashMap::from_iter(vec![(0, schema)]),
2102 current_schema_id: 0,
2103 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2104 default_partition_type,
2105 default_spec: Arc::new(partition_spec),
2106 last_partition_id: 1000,
2107 default_sort_order_id: 0,
2108 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2109 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2110 current_snapshot_id: Some(638933773299822130),
2111 last_sequence_number: 0,
2112 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2113 snapshot_log: vec![SnapshotLog {
2114 snapshot_id: 638933773299822130,
2115 timestamp_ms: 1662532818843,
2116 }],
2117 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2118 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2119 statistics: HashMap::new(),
2120 partition_statistics: HashMap::new(),
2121 encryption_keys: HashMap::new(),
2122 next_row_id: INITIAL_ROW_ID,
2123 };
2124
2125 check_table_metadata_serde(data, expected);
2126 }
2127
2128 #[test]
2129 fn test_table_data_v2_no_snapshots() {
2130 let data = r#"
2131 {
2132 "format-version" : 2,
2133 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2134 "location": "s3://b/wh/data.db/table",
2135 "last-sequence-number" : 1,
2136 "last-updated-ms": 1515100955770,
2137 "last-column-id": 1,
2138 "schemas": [
2139 {
2140 "schema-id" : 1,
2141 "type" : "struct",
2142 "fields" :[
2143 {
2144 "id": 1,
2145 "name": "struct_name",
2146 "required": true,
2147 "type": "fixed[1]"
2148 }
2149 ]
2150 }
2151 ],
2152 "current-schema-id" : 1,
2153 "partition-specs": [
2154 {
2155 "spec-id": 0,
2156 "fields": []
2157 }
2158 ],
2159 "refs": {},
2160 "default-spec-id": 0,
2161 "last-partition-id": 1000,
2162 "metadata-log": [
2163 {
2164 "metadata-file": "s3://bucket/.../v1.json",
2165 "timestamp-ms": 1515100
2166 }
2167 ],
2168 "sort-orders": [
2169 {
2170 "order-id": 0,
2171 "fields": []
2172 }
2173 ],
2174 "default-sort-order-id": 0
2175 }
2176 "#;
2177
2178 let schema = Schema::builder()
2179 .with_schema_id(1)
2180 .with_fields(vec![Arc::new(NestedField::required(
2181 1,
2182 "struct_name",
2183 Type::Primitive(PrimitiveType::Fixed(1)),
2184 ))])
2185 .build()
2186 .unwrap();
2187
2188 let partition_spec = PartitionSpec::builder(schema.clone())
2189 .with_spec_id(0)
2190 .build()
2191 .unwrap();
2192
2193 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2194 let expected = TableMetadata {
2195 format_version: FormatVersion::V2,
2196 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2197 location: "s3://b/wh/data.db/table".to_string(),
2198 last_updated_ms: 1515100955770,
2199 last_column_id: 1,
2200 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2201 current_schema_id: 1,
2202 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2203 default_partition_type,
2204 default_spec: partition_spec.into(),
2205 last_partition_id: 1000,
2206 default_sort_order_id: 0,
2207 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2208 snapshots: HashMap::default(),
2209 current_snapshot_id: None,
2210 last_sequence_number: 1,
2211 properties: HashMap::new(),
2212 snapshot_log: Vec::new(),
2213 metadata_log: vec![MetadataLog {
2214 metadata_file: "s3://bucket/.../v1.json".to_string(),
2215 timestamp_ms: 1515100,
2216 }],
2217 refs: HashMap::new(),
2218 statistics: HashMap::new(),
2219 partition_statistics: HashMap::new(),
2220 encryption_keys: HashMap::new(),
2221 next_row_id: INITIAL_ROW_ID,
2222 };
2223
2224 let expected_json_value = serde_json::to_value(&expected).unwrap();
2225 check_table_metadata_serde(data, expected);
2226
2227 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2228 assert_eq!(json_value, expected_json_value);
2229 }
2230
2231 #[test]
2232 fn test_current_snapshot_id_must_match_main_branch() {
2233 let data = r#"
2234 {
2235 "format-version" : 2,
2236 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2237 "location": "s3://b/wh/data.db/table",
2238 "last-sequence-number" : 1,
2239 "last-updated-ms": 1515100955770,
2240 "last-column-id": 1,
2241 "schemas": [
2242 {
2243 "schema-id" : 1,
2244 "type" : "struct",
2245 "fields" :[
2246 {
2247 "id": 1,
2248 "name": "struct_name",
2249 "required": true,
2250 "type": "fixed[1]"
2251 },
2252 {
2253 "id": 4,
2254 "name": "ts",
2255 "required": true,
2256 "type": "timestamp"
2257 }
2258 ]
2259 }
2260 ],
2261 "current-schema-id" : 1,
2262 "partition-specs": [
2263 {
2264 "spec-id": 0,
2265 "fields": [
2266 {
2267 "source-id": 4,
2268 "field-id": 1000,
2269 "name": "ts_day",
2270 "transform": "day"
2271 }
2272 ]
2273 }
2274 ],
2275 "default-spec-id": 0,
2276 "last-partition-id": 1000,
2277 "properties": {
2278 "commit.retry.num-retries": "1"
2279 },
2280 "metadata-log": [
2281 {
2282 "metadata-file": "s3://bucket/.../v1.json",
2283 "timestamp-ms": 1515100
2284 }
2285 ],
2286 "sort-orders": [
2287 {
2288 "order-id": 0,
2289 "fields": []
2290 }
2291 ],
2292 "default-sort-order-id": 0,
2293 "current-snapshot-id" : 1,
2294 "refs" : {
2295 "main" : {
2296 "snapshot-id" : 2,
2297 "type" : "branch"
2298 }
2299 },
2300 "snapshots" : [ {
2301 "snapshot-id" : 1,
2302 "timestamp-ms" : 1662532818843,
2303 "sequence-number" : 0,
2304 "summary" : {
2305 "operation" : "append",
2306 "spark.app.id" : "local-1662532784305",
2307 "added-data-files" : "4",
2308 "added-records" : "4",
2309 "added-files-size" : "6001"
2310 },
2311 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2312 "schema-id" : 0
2313 },
2314 {
2315 "snapshot-id" : 2,
2316 "timestamp-ms" : 1662532818844,
2317 "sequence-number" : 0,
2318 "summary" : {
2319 "operation" : "append",
2320 "spark.app.id" : "local-1662532784305",
2321 "added-data-files" : "4",
2322 "added-records" : "4",
2323 "added-files-size" : "6001"
2324 },
2325 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2326 "schema-id" : 0
2327 } ]
2328 }
2329 "#;
2330
2331 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2332 assert!(
2333 err.to_string()
2334 .contains("Current snapshot id does not match main branch")
2335 );
2336 }
2337
2338 #[test]
2339 fn test_main_without_current() {
2340 let data = r#"
2341 {
2342 "format-version" : 2,
2343 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2344 "location": "s3://b/wh/data.db/table",
2345 "last-sequence-number" : 1,
2346 "last-updated-ms": 1515100955770,
2347 "last-column-id": 1,
2348 "schemas": [
2349 {
2350 "schema-id" : 1,
2351 "type" : "struct",
2352 "fields" :[
2353 {
2354 "id": 1,
2355 "name": "struct_name",
2356 "required": true,
2357 "type": "fixed[1]"
2358 },
2359 {
2360 "id": 4,
2361 "name": "ts",
2362 "required": true,
2363 "type": "timestamp"
2364 }
2365 ]
2366 }
2367 ],
2368 "current-schema-id" : 1,
2369 "partition-specs": [
2370 {
2371 "spec-id": 0,
2372 "fields": [
2373 {
2374 "source-id": 4,
2375 "field-id": 1000,
2376 "name": "ts_day",
2377 "transform": "day"
2378 }
2379 ]
2380 }
2381 ],
2382 "default-spec-id": 0,
2383 "last-partition-id": 1000,
2384 "properties": {
2385 "commit.retry.num-retries": "1"
2386 },
2387 "metadata-log": [
2388 {
2389 "metadata-file": "s3://bucket/.../v1.json",
2390 "timestamp-ms": 1515100
2391 }
2392 ],
2393 "sort-orders": [
2394 {
2395 "order-id": 0,
2396 "fields": []
2397 }
2398 ],
2399 "default-sort-order-id": 0,
2400 "refs" : {
2401 "main" : {
2402 "snapshot-id" : 1,
2403 "type" : "branch"
2404 }
2405 },
2406 "snapshots" : [ {
2407 "snapshot-id" : 1,
2408 "timestamp-ms" : 1662532818843,
2409 "sequence-number" : 0,
2410 "summary" : {
2411 "operation" : "append",
2412 "spark.app.id" : "local-1662532784305",
2413 "added-data-files" : "4",
2414 "added-records" : "4",
2415 "added-files-size" : "6001"
2416 },
2417 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2418 "schema-id" : 0
2419 } ]
2420 }
2421 "#;
2422
2423 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2424 assert!(
2425 err.to_string()
2426 .contains("Current snapshot is not set, but main branch exists")
2427 );
2428 }
2429
2430 #[test]
2431 fn test_branch_snapshot_missing() {
2432 let data = r#"
2433 {
2434 "format-version" : 2,
2435 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2436 "location": "s3://b/wh/data.db/table",
2437 "last-sequence-number" : 1,
2438 "last-updated-ms": 1515100955770,
2439 "last-column-id": 1,
2440 "schemas": [
2441 {
2442 "schema-id" : 1,
2443 "type" : "struct",
2444 "fields" :[
2445 {
2446 "id": 1,
2447 "name": "struct_name",
2448 "required": true,
2449 "type": "fixed[1]"
2450 },
2451 {
2452 "id": 4,
2453 "name": "ts",
2454 "required": true,
2455 "type": "timestamp"
2456 }
2457 ]
2458 }
2459 ],
2460 "current-schema-id" : 1,
2461 "partition-specs": [
2462 {
2463 "spec-id": 0,
2464 "fields": [
2465 {
2466 "source-id": 4,
2467 "field-id": 1000,
2468 "name": "ts_day",
2469 "transform": "day"
2470 }
2471 ]
2472 }
2473 ],
2474 "default-spec-id": 0,
2475 "last-partition-id": 1000,
2476 "properties": {
2477 "commit.retry.num-retries": "1"
2478 },
2479 "metadata-log": [
2480 {
2481 "metadata-file": "s3://bucket/.../v1.json",
2482 "timestamp-ms": 1515100
2483 }
2484 ],
2485 "sort-orders": [
2486 {
2487 "order-id": 0,
2488 "fields": []
2489 }
2490 ],
2491 "default-sort-order-id": 0,
2492 "refs" : {
2493 "main" : {
2494 "snapshot-id" : 1,
2495 "type" : "branch"
2496 },
2497 "foo" : {
2498 "snapshot-id" : 2,
2499 "type" : "branch"
2500 }
2501 },
2502 "snapshots" : [ {
2503 "snapshot-id" : 1,
2504 "timestamp-ms" : 1662532818843,
2505 "sequence-number" : 0,
2506 "summary" : {
2507 "operation" : "append",
2508 "spark.app.id" : "local-1662532784305",
2509 "added-data-files" : "4",
2510 "added-records" : "4",
2511 "added-files-size" : "6001"
2512 },
2513 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2514 "schema-id" : 0
2515 } ]
2516 }
2517 "#;
2518
2519 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2520 assert!(
2521 err.to_string().contains(
2522 "Snapshot for reference foo does not exist in the existing snapshots list"
2523 )
2524 );
2525 }
2526
2527 #[test]
2528 fn test_v2_wrong_max_snapshot_sequence_number() {
2529 let data = r#"
2530 {
2531 "format-version": 2,
2532 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2533 "location": "s3://bucket/test/location",
2534 "last-sequence-number": 1,
2535 "last-updated-ms": 1602638573590,
2536 "last-column-id": 3,
2537 "current-schema-id": 0,
2538 "schemas": [
2539 {
2540 "type": "struct",
2541 "schema-id": 0,
2542 "fields": [
2543 {
2544 "id": 1,
2545 "name": "x",
2546 "required": true,
2547 "type": "long"
2548 }
2549 ]
2550 }
2551 ],
2552 "default-spec-id": 0,
2553 "partition-specs": [
2554 {
2555 "spec-id": 0,
2556 "fields": []
2557 }
2558 ],
2559 "last-partition-id": 1000,
2560 "default-sort-order-id": 0,
2561 "sort-orders": [
2562 {
2563 "order-id": 0,
2564 "fields": []
2565 }
2566 ],
2567 "properties": {},
2568 "current-snapshot-id": 3055729675574597004,
2569 "snapshots": [
2570 {
2571 "snapshot-id": 3055729675574597004,
2572 "timestamp-ms": 1555100955770,
2573 "sequence-number": 4,
2574 "summary": {
2575 "operation": "append"
2576 },
2577 "manifest-list": "s3://a/b/2.avro",
2578 "schema-id": 0
2579 }
2580 ],
2581 "statistics": [],
2582 "snapshot-log": [],
2583 "metadata-log": []
2584 }
2585 "#;
2586
2587 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2588 assert!(err.to_string().contains(
2589 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2590 ));
2591
2592 let data = data.replace(
2594 r#""last-sequence-number": 1,"#,
2595 r#""last-sequence-number": 4,"#,
2596 );
2597 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2598 assert_eq!(metadata.last_sequence_number, 4);
2599
2600 let data = data.replace(
2602 r#""last-sequence-number": 4,"#,
2603 r#""last-sequence-number": 5,"#,
2604 );
2605 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2606 assert_eq!(metadata.last_sequence_number, 5);
2607 }
2608
2609 #[test]
2610 fn test_statistic_files() {
2611 let data = r#"
2612 {
2613 "format-version": 2,
2614 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2615 "location": "s3://bucket/test/location",
2616 "last-sequence-number": 34,
2617 "last-updated-ms": 1602638573590,
2618 "last-column-id": 3,
2619 "current-schema-id": 0,
2620 "schemas": [
2621 {
2622 "type": "struct",
2623 "schema-id": 0,
2624 "fields": [
2625 {
2626 "id": 1,
2627 "name": "x",
2628 "required": true,
2629 "type": "long"
2630 }
2631 ]
2632 }
2633 ],
2634 "default-spec-id": 0,
2635 "partition-specs": [
2636 {
2637 "spec-id": 0,
2638 "fields": []
2639 }
2640 ],
2641 "last-partition-id": 1000,
2642 "default-sort-order-id": 0,
2643 "sort-orders": [
2644 {
2645 "order-id": 0,
2646 "fields": []
2647 }
2648 ],
2649 "properties": {},
2650 "current-snapshot-id": 3055729675574597004,
2651 "snapshots": [
2652 {
2653 "snapshot-id": 3055729675574597004,
2654 "timestamp-ms": 1555100955770,
2655 "sequence-number": 1,
2656 "summary": {
2657 "operation": "append"
2658 },
2659 "manifest-list": "s3://a/b/2.avro",
2660 "schema-id": 0
2661 }
2662 ],
2663 "statistics": [
2664 {
2665 "snapshot-id": 3055729675574597004,
2666 "statistics-path": "s3://a/b/stats.puffin",
2667 "file-size-in-bytes": 413,
2668 "file-footer-size-in-bytes": 42,
2669 "blob-metadata": [
2670 {
2671 "type": "ndv",
2672 "snapshot-id": 3055729675574597004,
2673 "sequence-number": 1,
2674 "fields": [
2675 1
2676 ]
2677 }
2678 ]
2679 }
2680 ],
2681 "snapshot-log": [],
2682 "metadata-log": []
2683 }
2684 "#;
2685
2686 let schema = Schema::builder()
2687 .with_schema_id(0)
2688 .with_fields(vec![Arc::new(NestedField::required(
2689 1,
2690 "x",
2691 Type::Primitive(PrimitiveType::Long),
2692 ))])
2693 .build()
2694 .unwrap();
2695 let partition_spec = PartitionSpec::builder(schema.clone())
2696 .with_spec_id(0)
2697 .build()
2698 .unwrap();
2699 let snapshot = Snapshot::builder()
2700 .with_snapshot_id(3055729675574597004)
2701 .with_timestamp_ms(1555100955770)
2702 .with_sequence_number(1)
2703 .with_manifest_list("s3://a/b/2.avro")
2704 .with_schema_id(0)
2705 .with_summary(Summary {
2706 operation: Operation::Append,
2707 additional_properties: HashMap::new(),
2708 })
2709 .build();
2710
2711 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2712 let expected = TableMetadata {
2713 format_version: FormatVersion::V2,
2714 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2715 location: "s3://bucket/test/location".to_string(),
2716 last_updated_ms: 1602638573590,
2717 last_column_id: 3,
2718 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2719 current_schema_id: 0,
2720 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2721 default_partition_type,
2722 default_spec: Arc::new(partition_spec),
2723 last_partition_id: 1000,
2724 default_sort_order_id: 0,
2725 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2726 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2727 current_snapshot_id: Some(3055729675574597004),
2728 last_sequence_number: 34,
2729 properties: HashMap::new(),
2730 snapshot_log: Vec::new(),
2731 metadata_log: Vec::new(),
2732 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2733 snapshot_id: 3055729675574597004,
2734 statistics_path: "s3://a/b/stats.puffin".to_string(),
2735 file_size_in_bytes: 413,
2736 file_footer_size_in_bytes: 42,
2737 key_metadata: None,
2738 blob_metadata: vec![BlobMetadata {
2739 snapshot_id: 3055729675574597004,
2740 sequence_number: 1,
2741 fields: vec![1],
2742 r#type: "ndv".to_string(),
2743 properties: HashMap::new(),
2744 }],
2745 })]),
2746 partition_statistics: HashMap::new(),
2747 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2748 snapshot_id: 3055729675574597004,
2749 retention: SnapshotRetention::Branch {
2750 min_snapshots_to_keep: None,
2751 max_snapshot_age_ms: None,
2752 max_ref_age_ms: None,
2753 },
2754 })]),
2755 encryption_keys: HashMap::new(),
2756 next_row_id: INITIAL_ROW_ID,
2757 };
2758
2759 check_table_metadata_serde(data, expected);
2760 }
2761
2762 #[test]
2763 fn test_partition_statistics_file() {
2764 let data = r#"
2765 {
2766 "format-version": 2,
2767 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2768 "location": "s3://bucket/test/location",
2769 "last-sequence-number": 34,
2770 "last-updated-ms": 1602638573590,
2771 "last-column-id": 3,
2772 "current-schema-id": 0,
2773 "schemas": [
2774 {
2775 "type": "struct",
2776 "schema-id": 0,
2777 "fields": [
2778 {
2779 "id": 1,
2780 "name": "x",
2781 "required": true,
2782 "type": "long"
2783 }
2784 ]
2785 }
2786 ],
2787 "default-spec-id": 0,
2788 "partition-specs": [
2789 {
2790 "spec-id": 0,
2791 "fields": []
2792 }
2793 ],
2794 "last-partition-id": 1000,
2795 "default-sort-order-id": 0,
2796 "sort-orders": [
2797 {
2798 "order-id": 0,
2799 "fields": []
2800 }
2801 ],
2802 "properties": {},
2803 "current-snapshot-id": 3055729675574597004,
2804 "snapshots": [
2805 {
2806 "snapshot-id": 3055729675574597004,
2807 "timestamp-ms": 1555100955770,
2808 "sequence-number": 1,
2809 "summary": {
2810 "operation": "append"
2811 },
2812 "manifest-list": "s3://a/b/2.avro",
2813 "schema-id": 0
2814 }
2815 ],
2816 "partition-statistics": [
2817 {
2818 "snapshot-id": 3055729675574597004,
2819 "statistics-path": "s3://a/b/partition-stats.parquet",
2820 "file-size-in-bytes": 43
2821 }
2822 ],
2823 "snapshot-log": [],
2824 "metadata-log": []
2825 }
2826 "#;
2827
2828 let schema = Schema::builder()
2829 .with_schema_id(0)
2830 .with_fields(vec![Arc::new(NestedField::required(
2831 1,
2832 "x",
2833 Type::Primitive(PrimitiveType::Long),
2834 ))])
2835 .build()
2836 .unwrap();
2837 let partition_spec = PartitionSpec::builder(schema.clone())
2838 .with_spec_id(0)
2839 .build()
2840 .unwrap();
2841 let snapshot = Snapshot::builder()
2842 .with_snapshot_id(3055729675574597004)
2843 .with_timestamp_ms(1555100955770)
2844 .with_sequence_number(1)
2845 .with_manifest_list("s3://a/b/2.avro")
2846 .with_schema_id(0)
2847 .with_summary(Summary {
2848 operation: Operation::Append,
2849 additional_properties: HashMap::new(),
2850 })
2851 .build();
2852
2853 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2854 let expected = TableMetadata {
2855 format_version: FormatVersion::V2,
2856 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2857 location: "s3://bucket/test/location".to_string(),
2858 last_updated_ms: 1602638573590,
2859 last_column_id: 3,
2860 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2861 current_schema_id: 0,
2862 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2863 default_spec: Arc::new(partition_spec),
2864 default_partition_type,
2865 last_partition_id: 1000,
2866 default_sort_order_id: 0,
2867 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2868 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2869 current_snapshot_id: Some(3055729675574597004),
2870 last_sequence_number: 34,
2871 properties: HashMap::new(),
2872 snapshot_log: Vec::new(),
2873 metadata_log: Vec::new(),
2874 statistics: HashMap::new(),
2875 partition_statistics: HashMap::from_iter(vec![(
2876 3055729675574597004,
2877 PartitionStatisticsFile {
2878 snapshot_id: 3055729675574597004,
2879 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2880 file_size_in_bytes: 43,
2881 },
2882 )]),
2883 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2884 snapshot_id: 3055729675574597004,
2885 retention: SnapshotRetention::Branch {
2886 min_snapshots_to_keep: None,
2887 max_snapshot_age_ms: None,
2888 max_ref_age_ms: None,
2889 },
2890 })]),
2891 encryption_keys: HashMap::new(),
2892 next_row_id: INITIAL_ROW_ID,
2893 };
2894
2895 check_table_metadata_serde(data, expected);
2896 }
2897
2898 #[test]
2899 fn test_invalid_table_uuid() -> Result<()> {
2900 let data = r#"
2901 {
2902 "format-version" : 2,
2903 "table-uuid": "xxxx"
2904 }
2905 "#;
2906 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2907 Ok(())
2908 }
2909
2910 #[test]
2911 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2912 let data = r#"
2913 {
2914 "format-version" : 1
2915 }
2916 "#;
2917 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2918 Ok(())
2919 }
2920
2921 #[test]
2922 fn test_table_metadata_v3_valid_minimal() {
2923 let metadata_str =
2924 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2925
2926 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2927 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2928
2929 let schema = Schema::builder()
2930 .with_schema_id(0)
2931 .with_fields(vec![
2932 Arc::new(
2933 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2934 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2935 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2936 ),
2937 Arc::new(
2938 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2939 .with_doc("comment"),
2940 ),
2941 Arc::new(NestedField::required(
2942 3,
2943 "z",
2944 Type::Primitive(PrimitiveType::Long),
2945 )),
2946 ])
2947 .build()
2948 .unwrap();
2949
2950 let partition_spec = PartitionSpec::builder(schema.clone())
2951 .with_spec_id(0)
2952 .add_unbound_field(UnboundPartitionField {
2953 name: "x".to_string(),
2954 transform: Transform::Identity,
2955 source_id: 1,
2956 field_id: Some(1000),
2957 })
2958 .unwrap()
2959 .build()
2960 .unwrap();
2961
2962 let sort_order = SortOrder::builder()
2963 .with_order_id(3)
2964 .with_sort_field(SortField {
2965 source_id: 2,
2966 transform: Transform::Identity,
2967 direction: SortDirection::Ascending,
2968 null_order: NullOrder::First,
2969 })
2970 .with_sort_field(SortField {
2971 source_id: 3,
2972 transform: Transform::Bucket(4),
2973 direction: SortDirection::Descending,
2974 null_order: NullOrder::Last,
2975 })
2976 .build_unbound()
2977 .unwrap();
2978
2979 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2980 let expected = TableMetadata {
2981 format_version: FormatVersion::V3,
2982 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2983 location: "s3://bucket/test/location".to_string(),
2984 last_updated_ms: 1602638573590,
2985 last_column_id: 3,
2986 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2987 current_schema_id: 0,
2988 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2989 default_spec: Arc::new(partition_spec),
2990 default_partition_type,
2991 last_partition_id: 1000,
2992 default_sort_order_id: 3,
2993 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2994 snapshots: HashMap::default(),
2995 current_snapshot_id: None,
2996 last_sequence_number: 34,
2997 properties: HashMap::new(),
2998 snapshot_log: Vec::new(),
2999 metadata_log: Vec::new(),
3000 refs: HashMap::new(),
3001 statistics: HashMap::new(),
3002 partition_statistics: HashMap::new(),
3003 encryption_keys: HashMap::new(),
3004 next_row_id: 0, };
3006
3007 check_table_metadata_serde(&metadata_str, expected);
3008 }
3009
3010 #[test]
3011 fn test_table_metadata_v2_file_valid() {
3012 let metadata =
3013 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
3014
3015 let schema1 = Schema::builder()
3016 .with_schema_id(0)
3017 .with_fields(vec![Arc::new(NestedField::required(
3018 1,
3019 "x",
3020 Type::Primitive(PrimitiveType::Long),
3021 ))])
3022 .build()
3023 .unwrap();
3024
3025 let schema2 = Schema::builder()
3026 .with_schema_id(1)
3027 .with_fields(vec![
3028 Arc::new(NestedField::required(
3029 1,
3030 "x",
3031 Type::Primitive(PrimitiveType::Long),
3032 )),
3033 Arc::new(
3034 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3035 .with_doc("comment"),
3036 ),
3037 Arc::new(NestedField::required(
3038 3,
3039 "z",
3040 Type::Primitive(PrimitiveType::Long),
3041 )),
3042 ])
3043 .with_identifier_field_ids(vec![1, 2])
3044 .build()
3045 .unwrap();
3046
3047 let partition_spec = PartitionSpec::builder(schema2.clone())
3048 .with_spec_id(0)
3049 .add_unbound_field(UnboundPartitionField {
3050 name: "x".to_string(),
3051 transform: Transform::Identity,
3052 source_id: 1,
3053 field_id: Some(1000),
3054 })
3055 .unwrap()
3056 .build()
3057 .unwrap();
3058
3059 let sort_order = SortOrder::builder()
3060 .with_order_id(3)
3061 .with_sort_field(SortField {
3062 source_id: 2,
3063 transform: Transform::Identity,
3064 direction: SortDirection::Ascending,
3065 null_order: NullOrder::First,
3066 })
3067 .with_sort_field(SortField {
3068 source_id: 3,
3069 transform: Transform::Bucket(4),
3070 direction: SortDirection::Descending,
3071 null_order: NullOrder::Last,
3072 })
3073 .build_unbound()
3074 .unwrap();
3075
3076 let snapshot1 = Snapshot::builder()
3077 .with_snapshot_id(3051729675574597004)
3078 .with_timestamp_ms(1515100955770)
3079 .with_sequence_number(0)
3080 .with_manifest_list("s3://a/b/1.avro")
3081 .with_summary(Summary {
3082 operation: Operation::Append,
3083 additional_properties: HashMap::new(),
3084 })
3085 .build();
3086
3087 let snapshot2 = Snapshot::builder()
3088 .with_snapshot_id(3055729675574597004)
3089 .with_parent_snapshot_id(Some(3051729675574597004))
3090 .with_timestamp_ms(1555100955770)
3091 .with_sequence_number(1)
3092 .with_schema_id(1)
3093 .with_manifest_list("s3://a/b/2.avro")
3094 .with_summary(Summary {
3095 operation: Operation::Append,
3096 additional_properties: HashMap::new(),
3097 })
3098 .build();
3099
3100 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3101 let expected = TableMetadata {
3102 format_version: FormatVersion::V2,
3103 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3104 location: "s3://bucket/test/location".to_string(),
3105 last_updated_ms: 1602638573590,
3106 last_column_id: 3,
3107 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3108 current_schema_id: 1,
3109 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3110 default_spec: Arc::new(partition_spec),
3111 default_partition_type,
3112 last_partition_id: 1000,
3113 default_sort_order_id: 3,
3114 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3115 snapshots: HashMap::from_iter(vec![
3116 (3051729675574597004, Arc::new(snapshot1)),
3117 (3055729675574597004, Arc::new(snapshot2)),
3118 ]),
3119 current_snapshot_id: Some(3055729675574597004),
3120 last_sequence_number: 34,
3121 properties: HashMap::new(),
3122 snapshot_log: vec![
3123 SnapshotLog {
3124 snapshot_id: 3051729675574597004,
3125 timestamp_ms: 1515100955770,
3126 },
3127 SnapshotLog {
3128 snapshot_id: 3055729675574597004,
3129 timestamp_ms: 1555100955770,
3130 },
3131 ],
3132 metadata_log: Vec::new(),
3133 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3134 snapshot_id: 3055729675574597004,
3135 retention: SnapshotRetention::Branch {
3136 min_snapshots_to_keep: None,
3137 max_snapshot_age_ms: None,
3138 max_ref_age_ms: None,
3139 },
3140 })]),
3141 statistics: HashMap::new(),
3142 partition_statistics: HashMap::new(),
3143 encryption_keys: HashMap::new(),
3144 next_row_id: INITIAL_ROW_ID,
3145 };
3146
3147 check_table_metadata_serde(&metadata, expected);
3148 }
3149
3150 #[test]
3151 fn test_table_metadata_v2_file_valid_minimal() {
3152 let metadata =
3153 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3154
3155 let schema = Schema::builder()
3156 .with_schema_id(0)
3157 .with_fields(vec![
3158 Arc::new(NestedField::required(
3159 1,
3160 "x",
3161 Type::Primitive(PrimitiveType::Long),
3162 )),
3163 Arc::new(
3164 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3165 .with_doc("comment"),
3166 ),
3167 Arc::new(NestedField::required(
3168 3,
3169 "z",
3170 Type::Primitive(PrimitiveType::Long),
3171 )),
3172 ])
3173 .build()
3174 .unwrap();
3175
3176 let partition_spec = PartitionSpec::builder(schema.clone())
3177 .with_spec_id(0)
3178 .add_unbound_field(UnboundPartitionField {
3179 name: "x".to_string(),
3180 transform: Transform::Identity,
3181 source_id: 1,
3182 field_id: Some(1000),
3183 })
3184 .unwrap()
3185 .build()
3186 .unwrap();
3187
3188 let sort_order = SortOrder::builder()
3189 .with_order_id(3)
3190 .with_sort_field(SortField {
3191 source_id: 2,
3192 transform: Transform::Identity,
3193 direction: SortDirection::Ascending,
3194 null_order: NullOrder::First,
3195 })
3196 .with_sort_field(SortField {
3197 source_id: 3,
3198 transform: Transform::Bucket(4),
3199 direction: SortDirection::Descending,
3200 null_order: NullOrder::Last,
3201 })
3202 .build_unbound()
3203 .unwrap();
3204
3205 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3206 let expected = TableMetadata {
3207 format_version: FormatVersion::V2,
3208 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3209 location: "s3://bucket/test/location".to_string(),
3210 last_updated_ms: 1602638573590,
3211 last_column_id: 3,
3212 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3213 current_schema_id: 0,
3214 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3215 default_partition_type,
3216 default_spec: Arc::new(partition_spec),
3217 last_partition_id: 1000,
3218 default_sort_order_id: 3,
3219 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3220 snapshots: HashMap::default(),
3221 current_snapshot_id: None,
3222 last_sequence_number: 34,
3223 properties: HashMap::new(),
3224 snapshot_log: vec![],
3225 metadata_log: Vec::new(),
3226 refs: HashMap::new(),
3227 statistics: HashMap::new(),
3228 partition_statistics: HashMap::new(),
3229 encryption_keys: HashMap::new(),
3230 next_row_id: INITIAL_ROW_ID,
3231 };
3232
3233 check_table_metadata_serde(&metadata, expected);
3234 }
3235
3236 #[test]
3237 fn test_table_metadata_v1_file_valid() {
3238 let metadata =
3239 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3240
3241 let schema = Schema::builder()
3242 .with_schema_id(0)
3243 .with_fields(vec![
3244 Arc::new(NestedField::required(
3245 1,
3246 "x",
3247 Type::Primitive(PrimitiveType::Long),
3248 )),
3249 Arc::new(
3250 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3251 .with_doc("comment"),
3252 ),
3253 Arc::new(NestedField::required(
3254 3,
3255 "z",
3256 Type::Primitive(PrimitiveType::Long),
3257 )),
3258 ])
3259 .build()
3260 .unwrap();
3261
3262 let partition_spec = PartitionSpec::builder(schema.clone())
3263 .with_spec_id(0)
3264 .add_unbound_field(UnboundPartitionField {
3265 name: "x".to_string(),
3266 transform: Transform::Identity,
3267 source_id: 1,
3268 field_id: Some(1000),
3269 })
3270 .unwrap()
3271 .build()
3272 .unwrap();
3273
3274 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3275 let expected = TableMetadata {
3276 format_version: FormatVersion::V1,
3277 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3278 location: "s3://bucket/test/location".to_string(),
3279 last_updated_ms: 1602638573874,
3280 last_column_id: 3,
3281 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3282 current_schema_id: 0,
3283 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3284 default_spec: Arc::new(partition_spec),
3285 default_partition_type,
3286 last_partition_id: 0,
3287 default_sort_order_id: 0,
3288 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3290 snapshots: HashMap::new(),
3291 current_snapshot_id: None,
3292 last_sequence_number: 0,
3293 properties: HashMap::new(),
3294 snapshot_log: vec![],
3295 metadata_log: Vec::new(),
3296 refs: HashMap::new(),
3297 statistics: HashMap::new(),
3298 partition_statistics: HashMap::new(),
3299 encryption_keys: HashMap::new(),
3300 next_row_id: INITIAL_ROW_ID,
3301 };
3302
3303 check_table_metadata_serde(&metadata, expected);
3304 }
3305
3306 #[test]
3307 fn test_empty_snapshot_id_is_normalized_to_none() {
3308 let metadata =
3309 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3310 let deserialized: TableMetadata = serde_json::from_str(&metadata).unwrap();
3311 assert_eq!(
3312 deserialized.current_snapshot_id(),
3313 None,
3314 "current_snapshot_id of -1 should be deserialized as None"
3315 );
3316 }
3317
3318 #[test]
3319 fn test_table_metadata_v1_compat() {
3320 let metadata =
3321 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3322
3323 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3325 .expect("Failed to deserialize TableMetadataV1Compat.json");
3326
3327 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3329 assert_eq!(
3330 desered_type.uuid(),
3331 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3332 );
3333 assert_eq!(
3334 desered_type.location(),
3335 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3336 );
3337 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3338 assert_eq!(desered_type.current_schema_id(), 0);
3339 }
3340
3341 #[test]
3342 fn test_table_metadata_v1_schemas_without_current_id() {
3343 let metadata = fs::read_to_string(
3344 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3345 )
3346 .unwrap();
3347
3348 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3350 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3351
3352 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3354 assert_eq!(
3355 desered_type.uuid(),
3356 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3357 );
3358
3359 let schema = desered_type.current_schema();
3361 assert_eq!(schema.as_struct().fields().len(), 3);
3362 assert_eq!(schema.as_struct().fields()[0].name, "x");
3363 assert_eq!(schema.as_struct().fields()[1].name, "y");
3364 assert_eq!(schema.as_struct().fields()[2].name, "z");
3365 }
3366
3367 #[test]
3368 fn test_table_metadata_v1_no_valid_schema() {
3369 let metadata =
3370 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3371 .unwrap();
3372
3373 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3375
3376 assert!(desered.is_err());
3377 let error_message = desered.unwrap_err().to_string();
3378 assert!(
3379 error_message.contains("No valid schema configuration found"),
3380 "Expected error about no valid schema configuration, got: {error_message}"
3381 );
3382 }
3383
3384 #[test]
3385 fn test_table_metadata_v1_partition_specs_without_default_id() {
3386 let metadata = fs::read_to_string(
3387 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3388 )
3389 .unwrap();
3390
3391 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3393 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3394
3395 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3397 assert_eq!(
3398 desered_type.uuid(),
3399 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3400 );
3401
3402 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3405
3406 let default_spec = &desered_type.default_spec;
3408 assert_eq!(default_spec.spec_id(), 2);
3409 assert_eq!(default_spec.fields().len(), 1);
3410 assert_eq!(default_spec.fields()[0].name, "y");
3411 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3412 assert_eq!(default_spec.fields()[0].source_id, 2);
3413 }
3414
3415 #[test]
3416 fn test_table_metadata_v2_schema_not_found() {
3417 let metadata =
3418 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3419 .unwrap();
3420
3421 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3422
3423 assert_eq!(
3424 desered.unwrap_err().to_string(),
3425 "DataInvalid => No schema exists with the current schema id 2."
3426 )
3427 }
3428
3429 #[test]
3430 fn test_table_metadata_v2_missing_sort_order() {
3431 let metadata =
3432 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3433 .unwrap();
3434
3435 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3436
3437 assert_eq!(
3438 desered.unwrap_err().to_string(),
3439 "data did not match any variant of untagged enum TableMetadataEnum"
3440 )
3441 }
3442
3443 #[test]
3444 fn test_table_metadata_v2_missing_partition_specs() {
3445 let metadata =
3446 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3447 .unwrap();
3448
3449 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3450
3451 assert_eq!(
3452 desered.unwrap_err().to_string(),
3453 "data did not match any variant of untagged enum TableMetadataEnum"
3454 )
3455 }
3456
3457 #[test]
3458 fn test_table_metadata_v2_missing_last_partition_id() {
3459 let metadata = fs::read_to_string(
3460 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3461 )
3462 .unwrap();
3463
3464 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3465
3466 assert_eq!(
3467 desered.unwrap_err().to_string(),
3468 "data did not match any variant of untagged enum TableMetadataEnum"
3469 )
3470 }
3471
3472 #[test]
3473 fn test_table_metadata_v2_missing_schemas() {
3474 let metadata =
3475 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3476 .unwrap();
3477
3478 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3479
3480 assert_eq!(
3481 desered.unwrap_err().to_string(),
3482 "data did not match any variant of untagged enum TableMetadataEnum"
3483 )
3484 }
3485
3486 #[test]
3487 fn test_table_metadata_v2_unsupported_version() {
3488 let metadata =
3489 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3490 .unwrap();
3491
3492 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3493
3494 assert_eq!(
3495 desered.unwrap_err().to_string(),
3496 "data did not match any variant of untagged enum TableMetadataEnum"
3497 )
3498 }
3499
3500 #[test]
3501 fn test_order_of_format_version() {
3502 assert!(FormatVersion::V1 < FormatVersion::V2);
3503 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3504 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3505 }
3506
3507 #[test]
3508 fn test_default_partition_spec() {
3509 let default_spec_id = 1234;
3510 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3511 let partition_spec = PartitionSpec::unpartition_spec();
3512 table_meta_data.default_spec = partition_spec.clone().into();
3513 table_meta_data
3514 .partition_specs
3515 .insert(default_spec_id, Arc::new(partition_spec));
3516
3517 assert_eq!(
3518 (*table_meta_data.default_partition_spec().clone()).clone(),
3519 (*table_meta_data
3520 .partition_spec_by_id(default_spec_id)
3521 .unwrap()
3522 .clone())
3523 .clone()
3524 );
3525 }
3526 #[test]
3527 fn test_default_sort_order() {
3528 let default_sort_order_id = 1234;
3529 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3530 table_meta_data.default_sort_order_id = default_sort_order_id;
3531 table_meta_data
3532 .sort_orders
3533 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3534
3535 assert_eq!(
3536 table_meta_data.default_sort_order(),
3537 table_meta_data
3538 .sort_orders
3539 .get(&default_sort_order_id)
3540 .unwrap()
3541 )
3542 }
3543
3544 #[test]
3545 fn test_table_metadata_builder_from_table_creation() {
3546 let table_creation = TableCreation::builder()
3547 .location("s3://db/table".to_string())
3548 .name("table".to_string())
3549 .properties(HashMap::new())
3550 .schema(Schema::builder().build().unwrap())
3551 .build();
3552 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3553 .unwrap()
3554 .build()
3555 .unwrap()
3556 .metadata;
3557 assert_eq!(table_metadata.location, "s3://db/table");
3558 assert_eq!(table_metadata.schemas.len(), 1);
3559 assert_eq!(
3560 table_metadata
3561 .schemas
3562 .get(&0)
3563 .unwrap()
3564 .as_struct()
3565 .fields()
3566 .len(),
3567 0
3568 );
3569 assert_eq!(table_metadata.properties.len(), 0);
3570 assert_eq!(
3571 table_metadata.partition_specs,
3572 HashMap::from([(
3573 0,
3574 Arc::new(
3575 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3576 .with_spec_id(0)
3577 .build()
3578 .unwrap()
3579 )
3580 )])
3581 );
3582 assert_eq!(
3583 table_metadata.sort_orders,
3584 HashMap::from([(
3585 0,
3586 Arc::new(SortOrder {
3587 order_id: 0,
3588 fields: vec![]
3589 })
3590 )])
3591 );
3592 }
3593
3594 #[tokio::test]
3595 async fn test_table_metadata_read_write() {
3596 let temp_dir = TempDir::new().unwrap();
3598 let temp_path = temp_dir.path().to_str().unwrap();
3599
3600 let file_io = FileIO::new_with_fs();
3602
3603 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3605
3606 let metadata_location = MetadataLocation::new_with_metadata(temp_path, &original_metadata);
3608 let metadata_location_str = metadata_location.to_string();
3609
3610 original_metadata
3612 .write_to(&file_io, &metadata_location)
3613 .await
3614 .unwrap();
3615
3616 assert!(fs::metadata(&metadata_location_str).is_ok());
3618
3619 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3621 .await
3622 .unwrap();
3623
3624 assert_eq!(read_metadata, original_metadata);
3626 }
3627
3628 #[tokio::test]
3629 async fn test_table_metadata_read_compressed() {
3630 let temp_dir = TempDir::new().unwrap();
3631 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3632
3633 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3634 let json = serde_json::to_string(&original_metadata).unwrap();
3635
3636 let compressed = CompressionCodec::gzip_default()
3637 .compress(json.into_bytes())
3638 .expect("failed to compress metadata");
3639 std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3640
3641 let file_io = FileIO::new_with_fs();
3643 let metadata_location = metadata_location.to_str().unwrap();
3644 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3645 .await
3646 .unwrap();
3647
3648 assert_eq!(read_metadata, original_metadata);
3650 }
3651
3652 #[tokio::test]
3653 async fn test_table_metadata_read_nonexistent_file() {
3654 let file_io = FileIO::new_with_fs();
3656
3657 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3659
3660 assert!(result.is_err());
3662 }
3663
3664 #[tokio::test]
3665 async fn test_table_metadata_write_with_gzip_compression() {
3666 let temp_dir = TempDir::new().unwrap();
3667 let temp_path = temp_dir.path().to_str().unwrap();
3668 let file_io = FileIO::new_with_fs();
3669
3670 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3672
3673 let mut props = original_metadata.properties.clone();
3675 props.insert(
3676 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
3677 "GziP".to_string(),
3678 );
3679 let compressed_metadata =
3681 TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None)
3682 .assign_uuid(original_metadata.table_uuid)
3683 .set_properties(props.clone())
3684 .unwrap()
3685 .build()
3686 .unwrap()
3687 .metadata;
3688
3689 let metadata_location =
3691 MetadataLocation::new_with_metadata(temp_path, &compressed_metadata);
3692 let metadata_location_str = metadata_location.to_string();
3693
3694 assert!(metadata_location_str.contains(".gz.metadata.json"));
3696
3697 compressed_metadata
3699 .write_to(&file_io, &metadata_location)
3700 .await
3701 .unwrap();
3702
3703 assert!(std::path::Path::new(&metadata_location_str).exists());
3705
3706 let raw_content = std::fs::read(&metadata_location_str).unwrap();
3708 assert!(raw_content.len() > 2);
3709 assert_eq!(raw_content[0], 0x1F); assert_eq!(raw_content[1], 0x8B); let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3714 .await
3715 .unwrap();
3716
3717 assert_eq!(read_metadata, compressed_metadata);
3719 }
3720
3721 #[test]
3722 fn test_partition_name_exists() {
3723 let schema = Schema::builder()
3724 .with_fields(vec![
3725 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3726 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3727 .into(),
3728 ])
3729 .build()
3730 .unwrap();
3731
3732 let spec1 = PartitionSpec::builder(schema.clone())
3733 .with_spec_id(1)
3734 .add_partition_field("data", "data_partition", Transform::Identity)
3735 .unwrap()
3736 .build()
3737 .unwrap();
3738
3739 let spec2 = PartitionSpec::builder(schema.clone())
3740 .with_spec_id(2)
3741 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3742 .unwrap()
3743 .build()
3744 .unwrap();
3745
3746 let metadata = TableMetadataBuilder::new(
3748 schema,
3749 spec1.clone().into_unbound(),
3750 SortOrder::unsorted_order(),
3751 "s3://test/location".to_string(),
3752 FormatVersion::V2,
3753 HashMap::new(),
3754 )
3755 .unwrap()
3756 .add_partition_spec(spec2.into_unbound())
3757 .unwrap()
3758 .build()
3759 .unwrap()
3760 .metadata;
3761
3762 assert!(metadata.partition_name_exists("data_partition"));
3763 assert!(metadata.partition_name_exists("partition_bucket"));
3764
3765 assert!(!metadata.partition_name_exists("nonexistent_field"));
3766 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3768 }
3769
3770 #[test]
3771 fn test_partition_name_exists_empty_specs() {
3772 let schema = Schema::builder()
3774 .with_fields(vec![
3775 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3776 ])
3777 .build()
3778 .unwrap();
3779
3780 let metadata = TableMetadataBuilder::new(
3781 schema,
3782 PartitionSpec::unpartition_spec().into_unbound(),
3783 SortOrder::unsorted_order(),
3784 "s3://test/location".to_string(),
3785 FormatVersion::V2,
3786 HashMap::new(),
3787 )
3788 .unwrap()
3789 .build()
3790 .unwrap()
3791 .metadata;
3792
3793 assert!(!metadata.partition_name_exists("any_field"));
3794 assert!(!metadata.partition_name_exists("data"));
3795 }
3796
3797 #[test]
3798 fn test_name_exists_in_any_schema() {
3799 let schema1 = Schema::builder()
3801 .with_schema_id(1)
3802 .with_fields(vec![
3803 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3804 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3805 ])
3806 .build()
3807 .unwrap();
3808
3809 let schema2 = Schema::builder()
3810 .with_schema_id(2)
3811 .with_fields(vec![
3812 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3813 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3814 ])
3815 .build()
3816 .unwrap();
3817
3818 let metadata = TableMetadataBuilder::new(
3819 schema1,
3820 PartitionSpec::unpartition_spec().into_unbound(),
3821 SortOrder::unsorted_order(),
3822 "s3://test/location".to_string(),
3823 FormatVersion::V2,
3824 HashMap::new(),
3825 )
3826 .unwrap()
3827 .add_current_schema(schema2)
3828 .unwrap()
3829 .build()
3830 .unwrap()
3831 .metadata;
3832
3833 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3838 assert!(!metadata.name_exists_in_any_schema("field4"));
3839 assert!(!metadata.name_exists_in_any_schema(""));
3840 }
3841
3842 #[test]
3843 fn test_name_exists_in_any_schema_empty_schemas() {
3844 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3845
3846 let metadata = TableMetadataBuilder::new(
3847 schema,
3848 PartitionSpec::unpartition_spec().into_unbound(),
3849 SortOrder::unsorted_order(),
3850 "s3://test/location".to_string(),
3851 FormatVersion::V2,
3852 HashMap::new(),
3853 )
3854 .unwrap()
3855 .build()
3856 .unwrap()
3857 .metadata;
3858
3859 assert!(!metadata.name_exists_in_any_schema("any_field"));
3860 }
3861
3862 #[test]
3863 fn test_helper_methods_multi_version_scenario() {
3864 let initial_schema = Schema::builder()
3866 .with_fields(vec![
3867 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3868 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3869 NestedField::required(
3870 3,
3871 "deprecated_field",
3872 Type::Primitive(PrimitiveType::String),
3873 )
3874 .into(),
3875 ])
3876 .build()
3877 .unwrap();
3878
3879 let metadata = TableMetadataBuilder::new(
3880 initial_schema,
3881 PartitionSpec::unpartition_spec().into_unbound(),
3882 SortOrder::unsorted_order(),
3883 "s3://test/location".to_string(),
3884 FormatVersion::V2,
3885 HashMap::new(),
3886 )
3887 .unwrap();
3888
3889 let evolved_schema = Schema::builder()
3890 .with_fields(vec![
3891 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3892 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3893 NestedField::required(
3894 3,
3895 "deprecated_field",
3896 Type::Primitive(PrimitiveType::String),
3897 )
3898 .into(),
3899 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3900 .into(),
3901 ])
3902 .build()
3903 .unwrap();
3904
3905 let _final_schema = Schema::builder()
3907 .with_fields(vec![
3908 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3909 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3910 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3911 .into(),
3912 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3913 .into(),
3914 ])
3915 .build()
3916 .unwrap();
3917
3918 let final_metadata = metadata
3919 .add_current_schema(evolved_schema)
3920 .unwrap()
3921 .build()
3922 .unwrap()
3923 .metadata;
3924
3925 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3932 }
3933
3934 #[test]
3935 fn test_invalid_sort_order_id_zero_with_fields() {
3936 let metadata = r#"
3937 {
3938 "format-version": 2,
3939 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3940 "location": "s3://bucket/test/location",
3941 "last-sequence-number": 111,
3942 "last-updated-ms": 1600000000000,
3943 "last-column-id": 3,
3944 "current-schema-id": 1,
3945 "schemas": [
3946 {
3947 "type": "struct",
3948 "schema-id": 1,
3949 "fields": [
3950 {"id": 1, "name": "x", "required": true, "type": "long"},
3951 {"id": 2, "name": "y", "required": true, "type": "long"}
3952 ]
3953 }
3954 ],
3955 "default-spec-id": 0,
3956 "partition-specs": [{"spec-id": 0, "fields": []}],
3957 "last-partition-id": 999,
3958 "default-sort-order-id": 0,
3959 "sort-orders": [
3960 {
3961 "order-id": 0,
3962 "fields": [
3963 {
3964 "transform": "identity",
3965 "source-id": 1,
3966 "direction": "asc",
3967 "null-order": "nulls-first"
3968 }
3969 ]
3970 }
3971 ],
3972 "properties": {},
3973 "current-snapshot-id": -1,
3974 "snapshots": []
3975 }
3976 "#;
3977
3978 let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3979
3980 assert!(
3982 result.is_err(),
3983 "Parsing should fail for sort order ID 0 with fields"
3984 );
3985 }
3986
3987 #[test]
3988 fn test_table_properties_with_defaults() {
3989 use crate::spec::TableProperties;
3990
3991 let schema = Schema::builder()
3992 .with_fields(vec![
3993 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3994 ])
3995 .build()
3996 .unwrap();
3997
3998 let metadata = TableMetadataBuilder::new(
3999 schema,
4000 PartitionSpec::unpartition_spec().into_unbound(),
4001 SortOrder::unsorted_order(),
4002 "s3://test/location".to_string(),
4003 FormatVersion::V2,
4004 HashMap::new(),
4005 )
4006 .unwrap()
4007 .build()
4008 .unwrap()
4009 .metadata;
4010
4011 let props = metadata.table_properties().unwrap();
4012
4013 assert_eq!(
4014 props.commit_num_retries,
4015 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
4016 );
4017 assert_eq!(
4018 props.write_target_file_size_bytes,
4019 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
4020 );
4021 }
4022
4023 #[test]
4024 fn test_table_properties_with_custom_values() {
4025 use crate::spec::TableProperties;
4026
4027 let schema = Schema::builder()
4028 .with_fields(vec![
4029 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4030 ])
4031 .build()
4032 .unwrap();
4033
4034 let properties = HashMap::from([
4035 (
4036 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
4037 "10".to_string(),
4038 ),
4039 (
4040 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
4041 "1024".to_string(),
4042 ),
4043 ]);
4044
4045 let metadata = TableMetadataBuilder::new(
4046 schema,
4047 PartitionSpec::unpartition_spec().into_unbound(),
4048 SortOrder::unsorted_order(),
4049 "s3://test/location".to_string(),
4050 FormatVersion::V2,
4051 properties,
4052 )
4053 .unwrap()
4054 .build()
4055 .unwrap()
4056 .metadata;
4057
4058 let props = metadata.table_properties().unwrap();
4059
4060 assert_eq!(props.commit_num_retries, 10);
4061 assert_eq!(props.write_target_file_size_bytes, 1024);
4062 }
4063
4064 #[test]
4065 fn test_table_properties_with_invalid_value() {
4066 let schema = Schema::builder()
4067 .with_fields(vec![
4068 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4069 ])
4070 .build()
4071 .unwrap();
4072
4073 let properties = HashMap::from([(
4074 "commit.retry.num-retries".to_string(),
4075 "not_a_number".to_string(),
4076 )]);
4077
4078 let metadata = TableMetadataBuilder::new(
4079 schema,
4080 PartitionSpec::unpartition_spec().into_unbound(),
4081 SortOrder::unsorted_order(),
4082 "s3://test/location".to_string(),
4083 FormatVersion::V2,
4084 properties,
4085 )
4086 .unwrap()
4087 .build()
4088 .unwrap()
4089 .metadata;
4090
4091 let err = metadata.table_properties().unwrap_err();
4092 assert_eq!(err.kind(), ErrorKind::DataInvalid);
4093 assert!(err.message().contains("Invalid table properties"));
4094 }
4095
4096 #[test]
4097 fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
4098 let schema = Schema::builder()
4100 .with_fields(vec![
4101 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4102 ])
4103 .build()
4104 .unwrap();
4105
4106 let v2_metadata = TableMetadataBuilder::new(
4107 schema,
4108 PartitionSpec::unpartition_spec().into_unbound(),
4109 SortOrder::unsorted_order(),
4110 "s3://bucket/test/location".to_string(),
4111 FormatVersion::V2,
4112 HashMap::new(),
4113 )
4114 .unwrap()
4115 .build()
4116 .unwrap()
4117 .metadata;
4118
4119 let snapshot = Snapshot::builder()
4121 .with_snapshot_id(1)
4122 .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4123 .with_sequence_number(1)
4124 .with_schema_id(0)
4125 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4126 .with_summary(Summary {
4127 operation: Operation::Append,
4128 additional_properties: HashMap::from([(
4129 "added-data-files".to_string(),
4130 "1".to_string(),
4131 )]),
4132 })
4133 .build();
4134
4135 let v2_with_snapshot = v2_metadata
4136 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4137 .add_snapshot(snapshot)
4138 .unwrap()
4139 .set_ref("main", SnapshotReference {
4140 snapshot_id: 1,
4141 retention: SnapshotRetention::Branch {
4142 min_snapshots_to_keep: None,
4143 max_snapshot_age_ms: None,
4144 max_ref_age_ms: None,
4145 },
4146 })
4147 .unwrap()
4148 .build()
4149 .unwrap()
4150 .metadata;
4151
4152 let v2_json = serde_json::to_string(&v2_with_snapshot);
4154 assert!(v2_json.is_ok(), "v2 serialization should work");
4155
4156 let v3_metadata = v2_with_snapshot
4158 .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4159 .upgrade_format_version(FormatVersion::V3)
4160 .unwrap()
4161 .build()
4162 .unwrap()
4163 .metadata;
4164
4165 assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4166 assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4167 assert_eq!(v3_metadata.snapshots.len(), 1);
4168
4169 let snapshot = v3_metadata.snapshots.values().next().unwrap();
4171 assert!(
4172 snapshot.row_range().is_none(),
4173 "Snapshot should have no row_range after upgrade"
4174 );
4175
4176 let v3_json = serde_json::to_string(&v3_metadata);
4178 assert!(
4179 v3_json.is_ok(),
4180 "v3 serialization should work for upgraded tables"
4181 );
4182
4183 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4185 assert_eq!(deserialized.format_version, FormatVersion::V3);
4186 assert_eq!(deserialized.snapshots.len(), 1);
4187
4188 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4190 assert!(
4191 deserialized_snapshot.row_range().is_none(),
4192 "Deserialized snapshot should have no row_range"
4193 );
4194 }
4195
4196 #[test]
4197 fn test_v3_snapshot_with_row_lineage_serialization() {
4198 let schema = Schema::builder()
4200 .with_fields(vec![
4201 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4202 ])
4203 .build()
4204 .unwrap();
4205
4206 let v3_metadata = TableMetadataBuilder::new(
4207 schema,
4208 PartitionSpec::unpartition_spec().into_unbound(),
4209 SortOrder::unsorted_order(),
4210 "s3://bucket/test/location".to_string(),
4211 FormatVersion::V3,
4212 HashMap::new(),
4213 )
4214 .unwrap()
4215 .build()
4216 .unwrap()
4217 .metadata;
4218
4219 let snapshot = Snapshot::builder()
4221 .with_snapshot_id(1)
4222 .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4223 .with_sequence_number(1)
4224 .with_schema_id(0)
4225 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4226 .with_summary(Summary {
4227 operation: Operation::Append,
4228 additional_properties: HashMap::from([(
4229 "added-data-files".to_string(),
4230 "1".to_string(),
4231 )]),
4232 })
4233 .with_row_range(100, 50) .build();
4235
4236 let v3_with_snapshot = v3_metadata
4237 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4238 .add_snapshot(snapshot)
4239 .unwrap()
4240 .set_ref("main", SnapshotReference {
4241 snapshot_id: 1,
4242 retention: SnapshotRetention::Branch {
4243 min_snapshots_to_keep: None,
4244 max_snapshot_age_ms: None,
4245 max_ref_age_ms: None,
4246 },
4247 })
4248 .unwrap()
4249 .build()
4250 .unwrap()
4251 .metadata;
4252
4253 let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4255 assert!(
4256 snapshot.row_range().is_some(),
4257 "Snapshot should have row_range"
4258 );
4259 let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4260 assert_eq!(first_row_id, 100);
4261 assert_eq!(added_rows, 50);
4262
4263 let v3_json = serde_json::to_string(&v3_with_snapshot);
4265 assert!(
4266 v3_json.is_ok(),
4267 "v3 serialization should work for snapshots with row lineage"
4268 );
4269
4270 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4272 assert_eq!(deserialized.format_version, FormatVersion::V3);
4273 assert_eq!(deserialized.snapshots.len(), 1);
4274
4275 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4277 assert!(
4278 deserialized_snapshot.row_range().is_some(),
4279 "Deserialized snapshot should have row_range"
4280 );
4281 let (deserialized_first_row_id, deserialized_added_rows) =
4282 deserialized_snapshot.row_range().unwrap();
4283 assert_eq!(deserialized_first_row_id, 100);
4284 assert_eq!(deserialized_added_rows, 50);
4285 }
4286}