1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38 TableProperties,
39};
40use crate::compression::CompressionCodec;
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52pub const INITIAL_ROW_ID: u64 = 0;
54pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61pub struct TableMetadata {
66 pub(crate) format_version: FormatVersion,
68 pub(crate) table_uuid: Uuid,
70 pub(crate) location: String,
72 pub(crate) last_sequence_number: i64,
74 pub(crate) last_updated_ms: i64,
76 pub(crate) last_column_id: i32,
78 pub(crate) schemas: HashMap<i32, SchemaRef>,
80 pub(crate) current_schema_id: i32,
82 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84 pub(crate) default_spec: PartitionSpecRef,
86 pub(crate) default_partition_type: StructType,
88 pub(crate) last_partition_id: i32,
90 pub(crate) properties: HashMap<String, String>,
94 pub(crate) current_snapshot_id: Option<i64>,
97 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102 pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110 pub(crate) metadata_log: Vec<MetadataLog>,
117
118 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120 pub(crate) default_sort_order_id: i64,
124 pub(crate) refs: HashMap<String, SnapshotReference>,
129 pub(crate) statistics: HashMap<i64, StatisticsFile>,
131 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135 pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140 #[must_use]
147 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148 TableMetadataBuilder::new_from_metadata(self, current_file_location)
149 }
150
151 #[inline]
153 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154 self.partition_specs
155 .values()
156 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157 }
158
159 #[inline]
161 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162 self.schemas
163 .values()
164 .any(|schema| schema.field_by_name(name).is_some())
165 }
166
167 #[inline]
169 pub fn format_version(&self) -> FormatVersion {
170 self.format_version
171 }
172
173 #[inline]
175 pub fn uuid(&self) -> Uuid {
176 self.table_uuid
177 }
178
179 #[inline]
181 pub fn location(&self) -> &str {
182 self.location.as_str()
183 }
184
185 #[inline]
187 pub fn last_sequence_number(&self) -> i64 {
188 self.last_sequence_number
189 }
190
191 #[inline]
196 pub fn next_sequence_number(&self) -> i64 {
197 match self.format_version {
198 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199 _ => self.last_sequence_number + 1,
200 }
201 }
202
203 #[inline]
205 pub fn last_column_id(&self) -> i32 {
206 self.last_column_id
207 }
208
209 #[inline]
211 pub fn last_partition_id(&self) -> i32 {
212 self.last_partition_id
213 }
214
215 #[inline]
217 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218 timestamp_ms_to_utc(self.last_updated_ms)
219 }
220
221 #[inline]
223 pub fn last_updated_ms(&self) -> i64 {
224 self.last_updated_ms
225 }
226
227 #[inline]
229 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230 self.schemas.values()
231 }
232
233 #[inline]
235 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236 self.schemas.get(&schema_id)
237 }
238
239 #[inline]
241 pub fn current_schema(&self) -> &SchemaRef {
242 self.schema_by_id(self.current_schema_id)
243 .expect("Current schema id set, but not found in table metadata")
244 }
245
246 #[inline]
248 pub fn current_schema_id(&self) -> SchemaId {
249 self.current_schema_id
250 }
251
252 #[inline]
254 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255 self.partition_specs.values()
256 }
257
258 #[inline]
260 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261 self.partition_specs.get(&spec_id)
262 }
263
264 #[inline]
266 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267 &self.default_spec
268 }
269
270 #[inline]
272 pub fn default_partition_type(&self) -> &StructType {
273 &self.default_partition_type
274 }
275
276 #[inline]
277 pub fn default_partition_spec_id(&self) -> i32 {
279 self.default_spec.spec_id()
280 }
281
282 #[inline]
284 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285 self.snapshots.values()
286 }
287
288 #[inline]
290 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291 self.snapshots.get(&snapshot_id)
292 }
293
294 #[inline]
296 pub fn history(&self) -> &[SnapshotLog] {
297 &self.snapshot_log
298 }
299
300 #[inline]
302 pub fn metadata_log(&self) -> &[MetadataLog] {
303 &self.metadata_log
304 }
305
306 #[inline]
308 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309 self.current_snapshot_id.map(|s| {
310 self.snapshot_by_id(s)
311 .expect("Current snapshot id has been set, but doesn't exist in metadata")
312 })
313 }
314
315 #[inline]
317 pub fn current_snapshot_id(&self) -> Option<i64> {
318 self.current_snapshot_id
319 }
320
321 #[inline]
324 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325 self.refs.get(ref_name).map(|r| {
326 self.snapshot_by_id(r.snapshot_id)
327 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328 })
329 }
330
331 #[inline]
333 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334 self.sort_orders.values()
335 }
336
337 #[inline]
339 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340 self.sort_orders.get(&sort_order_id)
341 }
342
343 #[inline]
345 pub fn default_sort_order(&self) -> &SortOrderRef {
346 self.sort_orders
347 .get(&self.default_sort_order_id)
348 .expect("Default order id has been set, but not found in table metadata!")
349 }
350
351 #[inline]
353 pub fn default_sort_order_id(&self) -> i64 {
354 self.default_sort_order_id
355 }
356
357 #[inline]
359 pub fn properties(&self) -> &HashMap<String, String> {
360 &self.properties
361 }
362
363 pub fn table_properties(&self) -> Result<TableProperties> {
365 TableProperties::try_from(&self.properties).map_err(|e| {
366 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
367 })
368 }
369
370 #[inline]
372 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
373 self.statistics.values()
374 }
375
376 #[inline]
378 pub fn partition_statistics_iter(
379 &self,
380 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
381 self.partition_statistics.values()
382 }
383
384 #[inline]
386 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
387 self.statistics.get(&snapshot_id)
388 }
389
390 #[inline]
392 pub fn partition_statistics_for_snapshot(
393 &self,
394 snapshot_id: i64,
395 ) -> Option<&PartitionStatisticsFile> {
396 self.partition_statistics.get(&snapshot_id)
397 }
398
399 fn construct_refs(&mut self) {
400 if let Some(current_snapshot_id) = self.current_snapshot_id
401 && !self.refs.contains_key(MAIN_BRANCH)
402 {
403 self.refs
404 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
405 snapshot_id: current_snapshot_id,
406 retention: SnapshotRetention::Branch {
407 min_snapshots_to_keep: None,
408 max_snapshot_age_ms: None,
409 max_ref_age_ms: None,
410 },
411 });
412 }
413 }
414
415 #[inline]
417 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
418 self.encryption_keys.values()
419 }
420
421 #[inline]
423 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
424 self.encryption_keys.get(key_id)
425 }
426
427 #[inline]
429 pub fn next_row_id(&self) -> u64 {
430 self.next_row_id
431 }
432
433 pub async fn read_from(
435 file_io: &FileIO,
436 metadata_location: impl AsRef<str>,
437 ) -> Result<TableMetadata> {
438 let metadata_location = metadata_location.as_ref();
439 let input_file = file_io.new_input(metadata_location)?;
440 let metadata_content = input_file.read().await?;
441
442 let metadata = if metadata_content.len() > 2
444 && metadata_content[0] == 0x1F
445 && metadata_content[1] == 0x8B
446 {
447 let decompressed_data = CompressionCodec::Gzip
448 .decompress(metadata_content.to_vec())
449 .map_err(|e| {
450 Error::new(
451 ErrorKind::DataInvalid,
452 "Trying to read compressed metadata file",
453 )
454 .with_context("file_path", metadata_location)
455 .with_source(e)
456 })?;
457 serde_json::from_slice(&decompressed_data)?
458 } else {
459 serde_json::from_slice(&metadata_content)?
460 };
461
462 Ok(metadata)
463 }
464
465 pub async fn write_to(
467 &self,
468 file_io: &FileIO,
469 metadata_location: impl AsRef<str>,
470 ) -> Result<()> {
471 file_io
472 .new_output(metadata_location)?
473 .write(serde_json::to_vec(self)?.into())
474 .await
475 }
476
477 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
485 self.validate_current_schema()?;
486 self.normalize_current_snapshot()?;
487 self.construct_refs();
488 self.validate_refs()?;
489 self.validate_chronological_snapshot_logs()?;
490 self.validate_chronological_metadata_logs()?;
491 self.location = self.location.trim_end_matches('/').to_string();
493 self.validate_snapshot_sequence_number()?;
494 self.try_normalize_partition_spec()?;
495 self.try_normalize_sort_order()?;
496 Ok(self)
497 }
498
499 fn try_normalize_partition_spec(&mut self) -> Result<()> {
501 if self
502 .partition_spec_by_id(self.default_spec.spec_id())
503 .is_none()
504 {
505 self.partition_specs.insert(
506 self.default_spec.spec_id(),
507 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
508 );
509 }
510
511 Ok(())
512 }
513
514 fn try_normalize_sort_order(&mut self) -> Result<()> {
516 if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
518 && !sort_order.fields.is_empty()
519 {
520 return Err(Error::new(
521 ErrorKind::Unexpected,
522 format!(
523 "Sort order ID {} is reserved for unsorted order",
524 SortOrder::UNSORTED_ORDER_ID
525 ),
526 ));
527 }
528
529 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
530 return Ok(());
531 }
532
533 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
534 return Err(Error::new(
535 ErrorKind::DataInvalid,
536 format!(
537 "No sort order exists with the default sort order id {}.",
538 self.default_sort_order_id
539 ),
540 ));
541 }
542
543 let sort_order = SortOrder::unsorted_order();
544 self.sort_orders
545 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
546 Ok(())
547 }
548
549 fn validate_current_schema(&self) -> Result<()> {
551 if self.schema_by_id(self.current_schema_id).is_none() {
552 return Err(Error::new(
553 ErrorKind::DataInvalid,
554 format!(
555 "No schema exists with the current schema id {}.",
556 self.current_schema_id
557 ),
558 ));
559 }
560 Ok(())
561 }
562
563 fn normalize_current_snapshot(&mut self) -> Result<()> {
565 if let Some(current_snapshot_id) = self.current_snapshot_id {
566 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
567 self.current_snapshot_id = None;
568 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
569 return Err(Error::new(
570 ErrorKind::DataInvalid,
571 format!(
572 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
573 ),
574 ));
575 }
576 }
577 Ok(())
578 }
579
580 fn validate_refs(&self) -> Result<()> {
582 for (name, snapshot_ref) in self.refs.iter() {
583 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
584 return Err(Error::new(
585 ErrorKind::DataInvalid,
586 format!(
587 "Snapshot for reference {name} does not exist in the existing snapshots list"
588 ),
589 ));
590 }
591 }
592
593 let main_ref = self.refs.get(MAIN_BRANCH);
594 if self.current_snapshot_id.is_some() {
595 if let Some(main_ref) = main_ref
596 && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
597 {
598 return Err(Error::new(
599 ErrorKind::DataInvalid,
600 format!(
601 "Current snapshot id does not match main branch ({:?} != {:?})",
602 self.current_snapshot_id.unwrap_or_default(),
603 main_ref.snapshot_id
604 ),
605 ));
606 }
607 } else if main_ref.is_some() {
608 return Err(Error::new(
609 ErrorKind::DataInvalid,
610 "Current snapshot is not set, but main branch exists",
611 ));
612 }
613
614 Ok(())
615 }
616
617 fn validate_snapshot_sequence_number(&self) -> Result<()> {
619 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
620 return Err(Error::new(
621 ErrorKind::DataInvalid,
622 format!(
623 "Last sequence number must be 0 in v1. Found {}",
624 self.last_sequence_number
625 ),
626 ));
627 }
628
629 if self.format_version >= FormatVersion::V2
630 && let Some(snapshot) = self
631 .snapshots
632 .values()
633 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
634 {
635 return Err(Error::new(
636 ErrorKind::DataInvalid,
637 format!(
638 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
639 snapshot.snapshot_id(),
640 snapshot.sequence_number(),
641 self.last_sequence_number
642 ),
643 ));
644 }
645
646 Ok(())
647 }
648
649 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
651 for window in self.snapshot_log.windows(2) {
652 let (prev, curr) = (&window[0], &window[1]);
653 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
656 return Err(Error::new(
657 ErrorKind::DataInvalid,
658 "Expected sorted snapshot log entries",
659 ));
660 }
661 }
662
663 if let Some(last) = self.snapshot_log.last() {
664 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
667 return Err(Error::new(
668 ErrorKind::DataInvalid,
669 format!(
670 "Invalid update timestamp {}: before last snapshot log entry at {}",
671 self.last_updated_ms, last.timestamp_ms
672 ),
673 ));
674 }
675 }
676 Ok(())
677 }
678
679 fn validate_chronological_metadata_logs(&self) -> Result<()> {
680 for window in self.metadata_log.windows(2) {
681 let (prev, curr) = (&window[0], &window[1]);
682 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
685 return Err(Error::new(
686 ErrorKind::DataInvalid,
687 "Expected sorted metadata log entries",
688 ));
689 }
690 }
691
692 if let Some(last) = self.metadata_log.last() {
693 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
696 return Err(Error::new(
697 ErrorKind::DataInvalid,
698 format!(
699 "Invalid update timestamp {}: before last metadata log entry at {}",
700 self.last_updated_ms, last.timestamp_ms
701 ),
702 ));
703 }
704 }
705
706 Ok(())
707 }
708}
709
710pub(super) mod _serde {
711 use std::borrow::BorrowMut;
712 use std::collections::HashMap;
717 use std::sync::Arc;
722
723 use serde::{Deserialize, Serialize};
724 use uuid::Uuid;
725
726 use super::{
727 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
728 TableMetadata,
729 };
730 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
731 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
732 use crate::spec::{
733 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
734 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
735 SortOrder, StatisticsFile,
736 };
737 use crate::{Error, ErrorKind};
738
739 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
740 #[serde(untagged)]
741 pub(super) enum TableMetadataEnum {
742 V3(TableMetadataV3),
743 V2(TableMetadataV2),
744 V1(TableMetadataV1),
745 }
746
747 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
748 #[serde(rename_all = "kebab-case")]
749 pub(super) struct TableMetadataV3 {
751 pub format_version: VersionNumber<3>,
752 #[serde(flatten)]
753 pub shared: TableMetadataV2V3Shared,
754 pub next_row_id: u64,
755 #[serde(skip_serializing_if = "Option::is_none")]
756 pub encryption_keys: Option<Vec<EncryptedKey>>,
757 #[serde(skip_serializing_if = "Option::is_none")]
758 pub snapshots: Option<Vec<SnapshotV3>>,
759 }
760
761 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
762 #[serde(rename_all = "kebab-case")]
763 pub(super) struct TableMetadataV2V3Shared {
765 pub table_uuid: Uuid,
766 pub location: String,
767 pub last_sequence_number: i64,
768 pub last_updated_ms: i64,
769 pub last_column_id: i32,
770 pub schemas: Vec<SchemaV2>,
771 pub current_schema_id: i32,
772 pub partition_specs: Vec<PartitionSpec>,
773 pub default_spec_id: i32,
774 pub last_partition_id: i32,
775 #[serde(skip_serializing_if = "Option::is_none")]
776 pub properties: Option<HashMap<String, String>>,
777 #[serde(skip_serializing_if = "Option::is_none")]
778 pub current_snapshot_id: Option<i64>,
779 #[serde(skip_serializing_if = "Option::is_none")]
780 pub snapshot_log: Option<Vec<SnapshotLog>>,
781 #[serde(skip_serializing_if = "Option::is_none")]
782 pub metadata_log: Option<Vec<MetadataLog>>,
783 pub sort_orders: Vec<SortOrder>,
784 pub default_sort_order_id: i64,
785 #[serde(skip_serializing_if = "Option::is_none")]
786 pub refs: Option<HashMap<String, SnapshotReference>>,
787 #[serde(default, skip_serializing_if = "Vec::is_empty")]
788 pub statistics: Vec<StatisticsFile>,
789 #[serde(default, skip_serializing_if = "Vec::is_empty")]
790 pub partition_statistics: Vec<PartitionStatisticsFile>,
791 }
792
793 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
794 #[serde(rename_all = "kebab-case")]
795 pub(super) struct TableMetadataV2 {
797 pub format_version: VersionNumber<2>,
798 #[serde(flatten)]
799 pub shared: TableMetadataV2V3Shared,
800 #[serde(skip_serializing_if = "Option::is_none")]
801 pub snapshots: Option<Vec<SnapshotV2>>,
802 }
803
804 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
805 #[serde(rename_all = "kebab-case")]
806 pub(super) struct TableMetadataV1 {
808 pub format_version: VersionNumber<1>,
809 #[serde(skip_serializing_if = "Option::is_none")]
810 pub table_uuid: Option<Uuid>,
811 pub location: String,
812 pub last_updated_ms: i64,
813 pub last_column_id: i32,
814 pub schema: Option<SchemaV1>,
816 #[serde(skip_serializing_if = "Option::is_none")]
817 pub schemas: Option<Vec<SchemaV1>>,
818 #[serde(skip_serializing_if = "Option::is_none")]
819 pub current_schema_id: Option<i32>,
820 pub partition_spec: Option<Vec<PartitionField>>,
822 #[serde(skip_serializing_if = "Option::is_none")]
823 pub partition_specs: Option<Vec<PartitionSpec>>,
824 #[serde(skip_serializing_if = "Option::is_none")]
825 pub default_spec_id: Option<i32>,
826 #[serde(skip_serializing_if = "Option::is_none")]
827 pub last_partition_id: Option<i32>,
828 #[serde(skip_serializing_if = "Option::is_none")]
829 pub properties: Option<HashMap<String, String>>,
830 #[serde(skip_serializing_if = "Option::is_none")]
831 pub current_snapshot_id: Option<i64>,
832 #[serde(skip_serializing_if = "Option::is_none")]
833 pub snapshots: Option<Vec<SnapshotV1>>,
834 #[serde(skip_serializing_if = "Option::is_none")]
835 pub snapshot_log: Option<Vec<SnapshotLog>>,
836 #[serde(skip_serializing_if = "Option::is_none")]
837 pub metadata_log: Option<Vec<MetadataLog>>,
838 pub sort_orders: Option<Vec<SortOrder>>,
839 pub default_sort_order_id: Option<i64>,
840 #[serde(default, skip_serializing_if = "Vec::is_empty")]
841 pub statistics: Vec<StatisticsFile>,
842 #[serde(default, skip_serializing_if = "Vec::is_empty")]
843 pub partition_statistics: Vec<PartitionStatisticsFile>,
844 }
845
846 #[derive(Debug, PartialEq, Eq)]
848 pub(crate) struct VersionNumber<const V: u8>;
849
850 impl Serialize for TableMetadata {
851 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
852 where S: serde::Serializer {
853 let table_metadata_enum: TableMetadataEnum =
855 self.clone().try_into().map_err(serde::ser::Error::custom)?;
856
857 table_metadata_enum.serialize(serializer)
858 }
859 }
860
861 impl<const V: u8> Serialize for VersionNumber<V> {
862 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
863 where S: serde::Serializer {
864 serializer.serialize_u8(V)
865 }
866 }
867
868 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
869 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
870 where D: serde::Deserializer<'de> {
871 let value = u8::deserialize(deserializer)?;
872 if value == V {
873 Ok(VersionNumber::<V>)
874 } else {
875 Err(serde::de::Error::custom("Invalid Version"))
876 }
877 }
878 }
879
880 impl TryFrom<TableMetadataEnum> for TableMetadata {
881 type Error = Error;
882 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
883 match value {
884 TableMetadataEnum::V3(value) => value.try_into(),
885 TableMetadataEnum::V2(value) => value.try_into(),
886 TableMetadataEnum::V1(value) => value.try_into(),
887 }
888 }
889 }
890
891 impl TryFrom<TableMetadata> for TableMetadataEnum {
892 type Error = Error;
893 fn try_from(value: TableMetadata) -> Result<Self, Error> {
894 Ok(match value.format_version {
895 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
896 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
897 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
898 })
899 }
900 }
901
902 impl TryFrom<TableMetadataV3> for TableMetadata {
903 type Error = Error;
904 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
905 let TableMetadataV3 {
906 format_version: _,
907 shared: value,
908 next_row_id,
909 encryption_keys,
910 snapshots,
911 } = value;
912 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
913 None
914 } else {
915 value.current_snapshot_id
916 };
917 let schemas = HashMap::from_iter(
918 value
919 .schemas
920 .into_iter()
921 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
922 .collect::<Result<Vec<_>, Error>>()?,
923 );
924
925 let current_schema: &SchemaRef =
926 schemas.get(&value.current_schema_id).ok_or_else(|| {
927 Error::new(
928 ErrorKind::DataInvalid,
929 format!(
930 "No schema exists with the current schema id {}.",
931 value.current_schema_id
932 ),
933 )
934 })?;
935 let partition_specs = HashMap::from_iter(
936 value
937 .partition_specs
938 .into_iter()
939 .map(|x| (x.spec_id(), Arc::new(x))),
940 );
941 let default_spec_id = value.default_spec_id;
942 let default_spec: PartitionSpecRef = partition_specs
943 .get(&value.default_spec_id)
944 .map(|spec| (**spec).clone())
945 .or_else(|| {
946 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
947 .then(PartitionSpec::unpartition_spec)
948 })
949 .ok_or_else(|| {
950 Error::new(
951 ErrorKind::DataInvalid,
952 format!("Default partition spec {default_spec_id} not found"),
953 )
954 })?
955 .into();
956 let default_partition_type = default_spec.partition_type(current_schema)?;
957
958 let mut metadata = TableMetadata {
959 format_version: FormatVersion::V3,
960 table_uuid: value.table_uuid,
961 location: value.location,
962 last_sequence_number: value.last_sequence_number,
963 last_updated_ms: value.last_updated_ms,
964 last_column_id: value.last_column_id,
965 current_schema_id: value.current_schema_id,
966 schemas,
967 partition_specs,
968 default_partition_type,
969 default_spec,
970 last_partition_id: value.last_partition_id,
971 properties: value.properties.unwrap_or_default(),
972 current_snapshot_id,
973 snapshots: snapshots
974 .map(|snapshots| {
975 HashMap::from_iter(
976 snapshots
977 .into_iter()
978 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
979 )
980 })
981 .unwrap_or_default(),
982 snapshot_log: value.snapshot_log.unwrap_or_default(),
983 metadata_log: value.metadata_log.unwrap_or_default(),
984 sort_orders: HashMap::from_iter(
985 value
986 .sort_orders
987 .into_iter()
988 .map(|x| (x.order_id, Arc::new(x))),
989 ),
990 default_sort_order_id: value.default_sort_order_id,
991 refs: value.refs.unwrap_or_else(|| {
992 if let Some(snapshot_id) = current_snapshot_id {
993 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
994 snapshot_id,
995 retention: SnapshotRetention::Branch {
996 min_snapshots_to_keep: None,
997 max_snapshot_age_ms: None,
998 max_ref_age_ms: None,
999 },
1000 })])
1001 } else {
1002 HashMap::new()
1003 }
1004 }),
1005 statistics: index_statistics(value.statistics),
1006 partition_statistics: index_partition_statistics(value.partition_statistics),
1007 encryption_keys: encryption_keys
1008 .map(|keys| {
1009 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1010 })
1011 .unwrap_or_default(),
1012 next_row_id,
1013 };
1014
1015 metadata.borrow_mut().try_normalize()?;
1016 Ok(metadata)
1017 }
1018 }
1019
1020 impl TryFrom<TableMetadataV2> for TableMetadata {
1021 type Error = Error;
1022 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1023 let snapshots = value.snapshots;
1024 let value = value.shared;
1025 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1026 None
1027 } else {
1028 value.current_snapshot_id
1029 };
1030 let schemas = HashMap::from_iter(
1031 value
1032 .schemas
1033 .into_iter()
1034 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1035 .collect::<Result<Vec<_>, Error>>()?,
1036 );
1037
1038 let current_schema: &SchemaRef =
1039 schemas.get(&value.current_schema_id).ok_or_else(|| {
1040 Error::new(
1041 ErrorKind::DataInvalid,
1042 format!(
1043 "No schema exists with the current schema id {}.",
1044 value.current_schema_id
1045 ),
1046 )
1047 })?;
1048 let partition_specs = HashMap::from_iter(
1049 value
1050 .partition_specs
1051 .into_iter()
1052 .map(|x| (x.spec_id(), Arc::new(x))),
1053 );
1054 let default_spec_id = value.default_spec_id;
1055 let default_spec: PartitionSpecRef = partition_specs
1056 .get(&value.default_spec_id)
1057 .map(|spec| (**spec).clone())
1058 .or_else(|| {
1059 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1060 .then(PartitionSpec::unpartition_spec)
1061 })
1062 .ok_or_else(|| {
1063 Error::new(
1064 ErrorKind::DataInvalid,
1065 format!("Default partition spec {default_spec_id} not found"),
1066 )
1067 })?
1068 .into();
1069 let default_partition_type = default_spec.partition_type(current_schema)?;
1070
1071 let mut metadata = TableMetadata {
1072 format_version: FormatVersion::V2,
1073 table_uuid: value.table_uuid,
1074 location: value.location,
1075 last_sequence_number: value.last_sequence_number,
1076 last_updated_ms: value.last_updated_ms,
1077 last_column_id: value.last_column_id,
1078 current_schema_id: value.current_schema_id,
1079 schemas,
1080 partition_specs,
1081 default_partition_type,
1082 default_spec,
1083 last_partition_id: value.last_partition_id,
1084 properties: value.properties.unwrap_or_default(),
1085 current_snapshot_id,
1086 snapshots: snapshots
1087 .map(|snapshots| {
1088 HashMap::from_iter(
1089 snapshots
1090 .into_iter()
1091 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1092 )
1093 })
1094 .unwrap_or_default(),
1095 snapshot_log: value.snapshot_log.unwrap_or_default(),
1096 metadata_log: value.metadata_log.unwrap_or_default(),
1097 sort_orders: HashMap::from_iter(
1098 value
1099 .sort_orders
1100 .into_iter()
1101 .map(|x| (x.order_id, Arc::new(x))),
1102 ),
1103 default_sort_order_id: value.default_sort_order_id,
1104 refs: value.refs.unwrap_or_else(|| {
1105 if let Some(snapshot_id) = current_snapshot_id {
1106 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1107 snapshot_id,
1108 retention: SnapshotRetention::Branch {
1109 min_snapshots_to_keep: None,
1110 max_snapshot_age_ms: None,
1111 max_ref_age_ms: None,
1112 },
1113 })])
1114 } else {
1115 HashMap::new()
1116 }
1117 }),
1118 statistics: index_statistics(value.statistics),
1119 partition_statistics: index_partition_statistics(value.partition_statistics),
1120 encryption_keys: HashMap::new(),
1121 next_row_id: INITIAL_ROW_ID,
1122 };
1123
1124 metadata.borrow_mut().try_normalize()?;
1125 Ok(metadata)
1126 }
1127 }
1128
1129 impl TryFrom<TableMetadataV1> for TableMetadata {
1130 type Error = Error;
1131 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1132 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1133 None
1134 } else {
1135 value.current_snapshot_id
1136 };
1137
1138 let (schemas, current_schema_id, current_schema) =
1139 if let (Some(schemas_vec), Some(schema_id)) =
1140 (&value.schemas, value.current_schema_id)
1141 {
1142 let schema_map = HashMap::from_iter(
1144 schemas_vec
1145 .clone()
1146 .into_iter()
1147 .map(|schema| {
1148 let schema: Schema = schema.try_into()?;
1149 Ok((schema.schema_id(), Arc::new(schema)))
1150 })
1151 .collect::<Result<Vec<_>, Error>>()?,
1152 );
1153
1154 let schema = schema_map
1155 .get(&schema_id)
1156 .ok_or_else(|| {
1157 Error::new(
1158 ErrorKind::DataInvalid,
1159 format!("No schema exists with the current schema id {schema_id}."),
1160 )
1161 })?
1162 .clone();
1163 (schema_map, schema_id, schema)
1164 } else if let Some(schema) = value.schema {
1165 let schema: Schema = schema.try_into()?;
1167 let schema_id = schema.schema_id();
1168 let schema_arc = Arc::new(schema);
1169 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1170 (schema_map, schema_id, schema_arc)
1171 } else {
1172 return Err(Error::new(
1174 ErrorKind::DataInvalid,
1175 "No valid schema configuration found in table metadata",
1176 ));
1177 };
1178
1179 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1181 specs_vec
1183 .into_iter()
1184 .map(|x| (x.spec_id(), Arc::new(x)))
1185 .collect::<HashMap<_, _>>()
1186 } else if let Some(partition_spec) = value.partition_spec {
1187 let spec = PartitionSpec::builder(current_schema.clone())
1189 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1190 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1191 .build()?;
1192
1193 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1194 } else {
1195 let spec = PartitionSpec::builder(current_schema.clone())
1197 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1198 .build()?;
1199
1200 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1201 };
1202
1203 let default_spec_id = value
1205 .default_spec_id
1206 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1207
1208 let default_spec: PartitionSpecRef = partition_specs
1210 .get(&default_spec_id)
1211 .map(|x| Arc::unwrap_or_clone(x.clone()))
1212 .ok_or_else(|| {
1213 Error::new(
1214 ErrorKind::DataInvalid,
1215 format!("Default partition spec {default_spec_id} not found"),
1216 )
1217 })?
1218 .into();
1219 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1220
1221 let mut metadata = TableMetadata {
1222 format_version: FormatVersion::V1,
1223 table_uuid: value.table_uuid.unwrap_or_default(),
1224 location: value.location,
1225 last_sequence_number: 0,
1226 last_updated_ms: value.last_updated_ms,
1227 last_column_id: value.last_column_id,
1228 current_schema_id,
1229 default_spec,
1230 default_partition_type,
1231 last_partition_id: value
1232 .last_partition_id
1233 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1234 partition_specs,
1235 schemas,
1236 properties: value.properties.unwrap_or_default(),
1237 current_snapshot_id,
1238 snapshots: value
1239 .snapshots
1240 .map(|snapshots| {
1241 Ok::<_, Error>(HashMap::from_iter(
1242 snapshots
1243 .into_iter()
1244 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1245 .collect::<Result<Vec<_>, Error>>()?,
1246 ))
1247 })
1248 .transpose()?
1249 .unwrap_or_default(),
1250 snapshot_log: value.snapshot_log.unwrap_or_default(),
1251 metadata_log: value.metadata_log.unwrap_or_default(),
1252 sort_orders: match value.sort_orders {
1253 Some(sort_orders) => HashMap::from_iter(
1254 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1255 ),
1256 None => HashMap::new(),
1257 },
1258 default_sort_order_id: value
1259 .default_sort_order_id
1260 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1261 refs: if let Some(snapshot_id) = current_snapshot_id {
1262 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1263 snapshot_id,
1264 retention: SnapshotRetention::Branch {
1265 min_snapshots_to_keep: None,
1266 max_snapshot_age_ms: None,
1267 max_ref_age_ms: None,
1268 },
1269 })])
1270 } else {
1271 HashMap::new()
1272 },
1273 statistics: index_statistics(value.statistics),
1274 partition_statistics: index_partition_statistics(value.partition_statistics),
1275 encryption_keys: HashMap::new(),
1276 next_row_id: INITIAL_ROW_ID, };
1278
1279 metadata.borrow_mut().try_normalize()?;
1280 Ok(metadata)
1281 }
1282 }
1283
1284 impl TryFrom<TableMetadata> for TableMetadataV3 {
1285 type Error = Error;
1286
1287 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1288 let next_row_id = v.next_row_id;
1289 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1290 let snapshots = std::mem::take(&mut v.snapshots);
1291 let shared = v.into();
1292
1293 Ok(TableMetadataV3 {
1294 format_version: VersionNumber::<3>,
1295 shared,
1296 next_row_id,
1297 encryption_keys: if encryption_keys.is_empty() {
1298 None
1299 } else {
1300 Some(encryption_keys.into_values().collect())
1301 },
1302 snapshots: if snapshots.is_empty() {
1303 None
1304 } else {
1305 Some(
1306 snapshots
1307 .into_values()
1308 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1309 .collect::<Result<_, _>>()?,
1310 )
1311 },
1312 })
1313 }
1314 }
1315
1316 impl From<TableMetadata> for TableMetadataV2 {
1317 fn from(mut v: TableMetadata) -> Self {
1318 let snapshots = std::mem::take(&mut v.snapshots);
1319 let shared = v.into();
1320
1321 TableMetadataV2 {
1322 format_version: VersionNumber::<2>,
1323 shared,
1324 snapshots: if snapshots.is_empty() {
1325 None
1326 } else {
1327 Some(
1328 snapshots
1329 .into_values()
1330 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1331 .collect(),
1332 )
1333 },
1334 }
1335 }
1336 }
1337
1338 impl From<TableMetadata> for TableMetadataV2V3Shared {
1339 fn from(v: TableMetadata) -> Self {
1340 TableMetadataV2V3Shared {
1341 table_uuid: v.table_uuid,
1342 location: v.location,
1343 last_sequence_number: v.last_sequence_number,
1344 last_updated_ms: v.last_updated_ms,
1345 last_column_id: v.last_column_id,
1346 schemas: v
1347 .schemas
1348 .into_values()
1349 .map(|x| {
1350 Arc::try_unwrap(x)
1351 .unwrap_or_else(|schema| schema.as_ref().clone())
1352 .into()
1353 })
1354 .collect(),
1355 current_schema_id: v.current_schema_id,
1356 partition_specs: v
1357 .partition_specs
1358 .into_values()
1359 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1360 .collect(),
1361 default_spec_id: v.default_spec.spec_id(),
1362 last_partition_id: v.last_partition_id,
1363 properties: if v.properties.is_empty() {
1364 None
1365 } else {
1366 Some(v.properties)
1367 },
1368 current_snapshot_id: v.current_snapshot_id,
1369 snapshot_log: if v.snapshot_log.is_empty() {
1370 None
1371 } else {
1372 Some(v.snapshot_log)
1373 },
1374 metadata_log: if v.metadata_log.is_empty() {
1375 None
1376 } else {
1377 Some(v.metadata_log)
1378 },
1379 sort_orders: v
1380 .sort_orders
1381 .into_values()
1382 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1383 .collect(),
1384 default_sort_order_id: v.default_sort_order_id,
1385 refs: Some(v.refs),
1386 statistics: v.statistics.into_values().collect(),
1387 partition_statistics: v.partition_statistics.into_values().collect(),
1388 }
1389 }
1390 }
1391
1392 impl TryFrom<TableMetadata> for TableMetadataV1 {
1393 type Error = Error;
1394 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1395 Ok(TableMetadataV1 {
1396 format_version: VersionNumber::<1>,
1397 table_uuid: Some(v.table_uuid),
1398 location: v.location,
1399 last_updated_ms: v.last_updated_ms,
1400 last_column_id: v.last_column_id,
1401 schema: Some(
1402 v.schemas
1403 .get(&v.current_schema_id)
1404 .ok_or(Error::new(
1405 ErrorKind::Unexpected,
1406 "current_schema_id not found in schemas",
1407 ))?
1408 .as_ref()
1409 .clone()
1410 .into(),
1411 ),
1412 schemas: Some(
1413 v.schemas
1414 .into_values()
1415 .map(|x| {
1416 Arc::try_unwrap(x)
1417 .unwrap_or_else(|schema| schema.as_ref().clone())
1418 .into()
1419 })
1420 .collect(),
1421 ),
1422 current_schema_id: Some(v.current_schema_id),
1423 partition_spec: Some(v.default_spec.fields().to_vec()),
1424 partition_specs: Some(
1425 v.partition_specs
1426 .into_values()
1427 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1428 .collect(),
1429 ),
1430 default_spec_id: Some(v.default_spec.spec_id()),
1431 last_partition_id: Some(v.last_partition_id),
1432 properties: if v.properties.is_empty() {
1433 None
1434 } else {
1435 Some(v.properties)
1436 },
1437 current_snapshot_id: v.current_snapshot_id,
1438 snapshots: if v.snapshots.is_empty() {
1439 None
1440 } else {
1441 Some(
1442 v.snapshots
1443 .into_values()
1444 .map(|x| Snapshot::clone(&x).into())
1445 .collect(),
1446 )
1447 },
1448 snapshot_log: if v.snapshot_log.is_empty() {
1449 None
1450 } else {
1451 Some(v.snapshot_log)
1452 },
1453 metadata_log: if v.metadata_log.is_empty() {
1454 None
1455 } else {
1456 Some(v.metadata_log)
1457 },
1458 sort_orders: Some(
1459 v.sort_orders
1460 .into_values()
1461 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1462 .collect(),
1463 ),
1464 default_sort_order_id: Some(v.default_sort_order_id),
1465 statistics: v.statistics.into_values().collect(),
1466 partition_statistics: v.partition_statistics.into_values().collect(),
1467 })
1468 }
1469 }
1470
1471 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1472 statistics
1473 .into_iter()
1474 .rev()
1475 .map(|s| (s.snapshot_id, s))
1476 .collect()
1477 }
1478
1479 fn index_partition_statistics(
1480 statistics: Vec<PartitionStatisticsFile>,
1481 ) -> HashMap<i64, PartitionStatisticsFile> {
1482 statistics
1483 .into_iter()
1484 .rev()
1485 .map(|s| (s.snapshot_id, s))
1486 .collect()
1487 }
1488}
1489
1490#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1491#[repr(u8)]
1492pub enum FormatVersion {
1494 V1 = 1u8,
1496 V2 = 2u8,
1498 V3 = 3u8,
1500}
1501
1502impl PartialOrd for FormatVersion {
1503 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1504 Some(self.cmp(other))
1505 }
1506}
1507
1508impl Ord for FormatVersion {
1509 fn cmp(&self, other: &Self) -> Ordering {
1510 (*self as u8).cmp(&(*other as u8))
1511 }
1512}
1513
1514impl Display for FormatVersion {
1515 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1516 match self {
1517 FormatVersion::V1 => write!(f, "v1"),
1518 FormatVersion::V2 => write!(f, "v2"),
1519 FormatVersion::V3 => write!(f, "v3"),
1520 }
1521 }
1522}
1523
1524#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1525#[serde(rename_all = "kebab-case")]
1526pub struct MetadataLog {
1528 pub metadata_file: String,
1530 pub timestamp_ms: i64,
1532}
1533
1534#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1535#[serde(rename_all = "kebab-case")]
1536pub struct SnapshotLog {
1538 pub snapshot_id: i64,
1540 pub timestamp_ms: i64,
1542}
1543
1544impl SnapshotLog {
1545 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1547 timestamp_ms_to_utc(self.timestamp_ms)
1548 }
1549
1550 #[inline]
1552 pub fn timestamp_ms(&self) -> i64 {
1553 self.timestamp_ms
1554 }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559 use std::collections::HashMap;
1560 use std::fs;
1561 use std::sync::Arc;
1562
1563 use anyhow::Result;
1564 use base64::Engine as _;
1565 use pretty_assertions::assert_eq;
1566 use tempfile::TempDir;
1567 use uuid::Uuid;
1568
1569 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1570 use crate::compression::CompressionCodec;
1571 use crate::io::FileIOBuilder;
1572 use crate::spec::table_metadata::TableMetadata;
1573 use crate::spec::{
1574 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1575 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1576 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1577 Summary, Transform, Type, UnboundPartitionField,
1578 };
1579 use crate::{ErrorKind, TableCreation};
1580
1581 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1582 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1583 assert_eq!(desered_type, expected_type);
1584
1585 let sered_json = serde_json::to_string(&expected_type).unwrap();
1586 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1587
1588 assert_eq!(parsed_json_value, desered_type);
1589 }
1590
1591 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1592 let path = format!("testdata/table_metadata/{file_name}");
1593 let metadata: String = fs::read_to_string(path).unwrap();
1594
1595 serde_json::from_str(&metadata).unwrap()
1596 }
1597
1598 #[test]
1599 fn test_table_data_v2() {
1600 let data = r#"
1601 {
1602 "format-version" : 2,
1603 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1604 "location": "s3://b/wh/data.db/table",
1605 "last-sequence-number" : 1,
1606 "last-updated-ms": 1515100955770,
1607 "last-column-id": 1,
1608 "schemas": [
1609 {
1610 "schema-id" : 1,
1611 "type" : "struct",
1612 "fields" :[
1613 {
1614 "id": 1,
1615 "name": "struct_name",
1616 "required": true,
1617 "type": "fixed[1]"
1618 },
1619 {
1620 "id": 4,
1621 "name": "ts",
1622 "required": true,
1623 "type": "timestamp"
1624 }
1625 ]
1626 }
1627 ],
1628 "current-schema-id" : 1,
1629 "partition-specs": [
1630 {
1631 "spec-id": 0,
1632 "fields": [
1633 {
1634 "source-id": 4,
1635 "field-id": 1000,
1636 "name": "ts_day",
1637 "transform": "day"
1638 }
1639 ]
1640 }
1641 ],
1642 "default-spec-id": 0,
1643 "last-partition-id": 1000,
1644 "properties": {
1645 "commit.retry.num-retries": "1"
1646 },
1647 "metadata-log": [
1648 {
1649 "metadata-file": "s3://bucket/.../v1.json",
1650 "timestamp-ms": 1515100
1651 }
1652 ],
1653 "refs": {},
1654 "sort-orders": [
1655 {
1656 "order-id": 0,
1657 "fields": []
1658 }
1659 ],
1660 "default-sort-order-id": 0
1661 }
1662 "#;
1663
1664 let schema = Schema::builder()
1665 .with_schema_id(1)
1666 .with_fields(vec![
1667 Arc::new(NestedField::required(
1668 1,
1669 "struct_name",
1670 Type::Primitive(PrimitiveType::Fixed(1)),
1671 )),
1672 Arc::new(NestedField::required(
1673 4,
1674 "ts",
1675 Type::Primitive(PrimitiveType::Timestamp),
1676 )),
1677 ])
1678 .build()
1679 .unwrap();
1680
1681 let partition_spec = PartitionSpec::builder(schema.clone())
1682 .with_spec_id(0)
1683 .add_unbound_field(UnboundPartitionField {
1684 name: "ts_day".to_string(),
1685 transform: Transform::Day,
1686 source_id: 4,
1687 field_id: Some(1000),
1688 })
1689 .unwrap()
1690 .build()
1691 .unwrap();
1692
1693 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1694 let expected = TableMetadata {
1695 format_version: FormatVersion::V2,
1696 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1697 location: "s3://b/wh/data.db/table".to_string(),
1698 last_updated_ms: 1515100955770,
1699 last_column_id: 1,
1700 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1701 current_schema_id: 1,
1702 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1703 default_partition_type,
1704 default_spec: partition_spec.into(),
1705 last_partition_id: 1000,
1706 default_sort_order_id: 0,
1707 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1708 snapshots: HashMap::default(),
1709 current_snapshot_id: None,
1710 last_sequence_number: 1,
1711 properties: HashMap::from_iter(vec![(
1712 "commit.retry.num-retries".to_string(),
1713 "1".to_string(),
1714 )]),
1715 snapshot_log: Vec::new(),
1716 metadata_log: vec![MetadataLog {
1717 metadata_file: "s3://bucket/.../v1.json".to_string(),
1718 timestamp_ms: 1515100,
1719 }],
1720 refs: HashMap::new(),
1721 statistics: HashMap::new(),
1722 partition_statistics: HashMap::new(),
1723 encryption_keys: HashMap::new(),
1724 next_row_id: INITIAL_ROW_ID,
1725 };
1726
1727 let expected_json_value = serde_json::to_value(&expected).unwrap();
1728 check_table_metadata_serde(data, expected);
1729
1730 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1731 assert_eq!(json_value, expected_json_value);
1732 }
1733
1734 #[test]
1735 fn test_table_data_v3() {
1736 let data = r#"
1737 {
1738 "format-version" : 3,
1739 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1740 "location": "s3://b/wh/data.db/table",
1741 "last-sequence-number" : 1,
1742 "last-updated-ms": 1515100955770,
1743 "last-column-id": 1,
1744 "next-row-id": 5,
1745 "schemas": [
1746 {
1747 "schema-id" : 1,
1748 "type" : "struct",
1749 "fields" :[
1750 {
1751 "id": 4,
1752 "name": "ts",
1753 "required": true,
1754 "type": "timestamp"
1755 }
1756 ]
1757 }
1758 ],
1759 "current-schema-id" : 1,
1760 "partition-specs": [
1761 {
1762 "spec-id": 0,
1763 "fields": [
1764 {
1765 "source-id": 4,
1766 "field-id": 1000,
1767 "name": "ts_day",
1768 "transform": "day"
1769 }
1770 ]
1771 }
1772 ],
1773 "default-spec-id": 0,
1774 "last-partition-id": 1000,
1775 "properties": {
1776 "commit.retry.num-retries": "1"
1777 },
1778 "metadata-log": [
1779 {
1780 "metadata-file": "s3://bucket/.../v1.json",
1781 "timestamp-ms": 1515100
1782 }
1783 ],
1784 "refs": {},
1785 "snapshots" : [ {
1786 "snapshot-id" : 1,
1787 "timestamp-ms" : 1662532818843,
1788 "sequence-number" : 0,
1789 "first-row-id" : 0,
1790 "added-rows" : 4,
1791 "key-id" : "key1",
1792 "summary" : {
1793 "operation" : "append"
1794 },
1795 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1796 "schema-id" : 0
1797 }
1798 ],
1799 "encryption-keys": [
1800 {
1801 "key-id": "key1",
1802 "encrypted-by-id": "KMS",
1803 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1804 "properties": {
1805 "p1": "v1"
1806 }
1807 }
1808 ],
1809 "sort-orders": [
1810 {
1811 "order-id": 0,
1812 "fields": []
1813 }
1814 ],
1815 "default-sort-order-id": 0
1816 }
1817 "#;
1818
1819 let schema = Schema::builder()
1820 .with_schema_id(1)
1821 .with_fields(vec![Arc::new(NestedField::required(
1822 4,
1823 "ts",
1824 Type::Primitive(PrimitiveType::Timestamp),
1825 ))])
1826 .build()
1827 .unwrap();
1828
1829 let partition_spec = PartitionSpec::builder(schema.clone())
1830 .with_spec_id(0)
1831 .add_unbound_field(UnboundPartitionField {
1832 name: "ts_day".to_string(),
1833 transform: Transform::Day,
1834 source_id: 4,
1835 field_id: Some(1000),
1836 })
1837 .unwrap()
1838 .build()
1839 .unwrap();
1840
1841 let snapshot = Snapshot::builder()
1842 .with_snapshot_id(1)
1843 .with_timestamp_ms(1662532818843)
1844 .with_sequence_number(0)
1845 .with_row_range(0, 4)
1846 .with_encryption_key_id(Some("key1".to_string()))
1847 .with_summary(Summary {
1848 operation: Operation::Append,
1849 additional_properties: HashMap::new(),
1850 })
1851 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1852 .with_schema_id(0)
1853 .build();
1854
1855 let encryption_key = EncryptedKey::builder()
1856 .key_id("key1".to_string())
1857 .encrypted_by_id("KMS".to_string())
1858 .encrypted_key_metadata(
1859 base64::prelude::BASE64_STANDARD
1860 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1861 .unwrap(),
1862 )
1863 .properties(HashMap::from_iter(vec![(
1864 "p1".to_string(),
1865 "v1".to_string(),
1866 )]))
1867 .build();
1868
1869 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1870 let expected = TableMetadata {
1871 format_version: FormatVersion::V3,
1872 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1873 location: "s3://b/wh/data.db/table".to_string(),
1874 last_updated_ms: 1515100955770,
1875 last_column_id: 1,
1876 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1877 current_schema_id: 1,
1878 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1879 default_partition_type,
1880 default_spec: partition_spec.into(),
1881 last_partition_id: 1000,
1882 default_sort_order_id: 0,
1883 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1884 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1885 current_snapshot_id: None,
1886 last_sequence_number: 1,
1887 properties: HashMap::from_iter(vec![(
1888 "commit.retry.num-retries".to_string(),
1889 "1".to_string(),
1890 )]),
1891 snapshot_log: Vec::new(),
1892 metadata_log: vec![MetadataLog {
1893 metadata_file: "s3://bucket/.../v1.json".to_string(),
1894 timestamp_ms: 1515100,
1895 }],
1896 refs: HashMap::new(),
1897 statistics: HashMap::new(),
1898 partition_statistics: HashMap::new(),
1899 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1900 next_row_id: 5,
1901 };
1902
1903 let expected_json_value = serde_json::to_value(&expected).unwrap();
1904 check_table_metadata_serde(data, expected);
1905
1906 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1907 assert_eq!(json_value, expected_json_value);
1908 }
1909
1910 #[test]
1911 fn test_table_data_v1() {
1912 let data = r#"
1913 {
1914 "format-version" : 1,
1915 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1916 "location" : "/home/iceberg/warehouse/nyc/taxis",
1917 "last-updated-ms" : 1662532818843,
1918 "last-column-id" : 5,
1919 "schema" : {
1920 "type" : "struct",
1921 "schema-id" : 0,
1922 "fields" : [ {
1923 "id" : 1,
1924 "name" : "vendor_id",
1925 "required" : false,
1926 "type" : "long"
1927 }, {
1928 "id" : 2,
1929 "name" : "trip_id",
1930 "required" : false,
1931 "type" : "long"
1932 }, {
1933 "id" : 3,
1934 "name" : "trip_distance",
1935 "required" : false,
1936 "type" : "float"
1937 }, {
1938 "id" : 4,
1939 "name" : "fare_amount",
1940 "required" : false,
1941 "type" : "double"
1942 }, {
1943 "id" : 5,
1944 "name" : "store_and_fwd_flag",
1945 "required" : false,
1946 "type" : "string"
1947 } ]
1948 },
1949 "partition-spec" : [ {
1950 "name" : "vendor_id",
1951 "transform" : "identity",
1952 "source-id" : 1,
1953 "field-id" : 1000
1954 } ],
1955 "last-partition-id" : 1000,
1956 "default-sort-order-id" : 0,
1957 "sort-orders" : [ {
1958 "order-id" : 0,
1959 "fields" : [ ]
1960 } ],
1961 "properties" : {
1962 "owner" : "root"
1963 },
1964 "current-snapshot-id" : 638933773299822130,
1965 "refs" : {
1966 "main" : {
1967 "snapshot-id" : 638933773299822130,
1968 "type" : "branch"
1969 }
1970 },
1971 "snapshots" : [ {
1972 "snapshot-id" : 638933773299822130,
1973 "timestamp-ms" : 1662532818843,
1974 "sequence-number" : 0,
1975 "summary" : {
1976 "operation" : "append",
1977 "spark.app.id" : "local-1662532784305",
1978 "added-data-files" : "4",
1979 "added-records" : "4",
1980 "added-files-size" : "6001"
1981 },
1982 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1983 "schema-id" : 0
1984 } ],
1985 "snapshot-log" : [ {
1986 "timestamp-ms" : 1662532818843,
1987 "snapshot-id" : 638933773299822130
1988 } ],
1989 "metadata-log" : [ {
1990 "timestamp-ms" : 1662532805245,
1991 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1992 } ]
1993 }
1994 "#;
1995
1996 let schema = Schema::builder()
1997 .with_fields(vec![
1998 Arc::new(NestedField::optional(
1999 1,
2000 "vendor_id",
2001 Type::Primitive(PrimitiveType::Long),
2002 )),
2003 Arc::new(NestedField::optional(
2004 2,
2005 "trip_id",
2006 Type::Primitive(PrimitiveType::Long),
2007 )),
2008 Arc::new(NestedField::optional(
2009 3,
2010 "trip_distance",
2011 Type::Primitive(PrimitiveType::Float),
2012 )),
2013 Arc::new(NestedField::optional(
2014 4,
2015 "fare_amount",
2016 Type::Primitive(PrimitiveType::Double),
2017 )),
2018 Arc::new(NestedField::optional(
2019 5,
2020 "store_and_fwd_flag",
2021 Type::Primitive(PrimitiveType::String),
2022 )),
2023 ])
2024 .build()
2025 .unwrap();
2026
2027 let schema = Arc::new(schema);
2028 let partition_spec = PartitionSpec::builder(schema.clone())
2029 .with_spec_id(0)
2030 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2031 .unwrap()
2032 .build()
2033 .unwrap();
2034
2035 let sort_order = SortOrder::builder()
2036 .with_order_id(0)
2037 .build_unbound()
2038 .unwrap();
2039
2040 let snapshot = Snapshot::builder()
2041 .with_snapshot_id(638933773299822130)
2042 .with_timestamp_ms(1662532818843)
2043 .with_sequence_number(0)
2044 .with_schema_id(0)
2045 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2046 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2047 .build();
2048
2049 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2050 let expected = TableMetadata {
2051 format_version: FormatVersion::V1,
2052 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2053 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2054 last_updated_ms: 1662532818843,
2055 last_column_id: 5,
2056 schemas: HashMap::from_iter(vec![(0, schema)]),
2057 current_schema_id: 0,
2058 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2059 default_partition_type,
2060 default_spec: Arc::new(partition_spec),
2061 last_partition_id: 1000,
2062 default_sort_order_id: 0,
2063 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2064 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2065 current_snapshot_id: Some(638933773299822130),
2066 last_sequence_number: 0,
2067 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2068 snapshot_log: vec![SnapshotLog {
2069 snapshot_id: 638933773299822130,
2070 timestamp_ms: 1662532818843,
2071 }],
2072 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2073 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2074 statistics: HashMap::new(),
2075 partition_statistics: HashMap::new(),
2076 encryption_keys: HashMap::new(),
2077 next_row_id: INITIAL_ROW_ID,
2078 };
2079
2080 check_table_metadata_serde(data, expected);
2081 }
2082
2083 #[test]
2084 fn test_table_data_v2_no_snapshots() {
2085 let data = r#"
2086 {
2087 "format-version" : 2,
2088 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2089 "location": "s3://b/wh/data.db/table",
2090 "last-sequence-number" : 1,
2091 "last-updated-ms": 1515100955770,
2092 "last-column-id": 1,
2093 "schemas": [
2094 {
2095 "schema-id" : 1,
2096 "type" : "struct",
2097 "fields" :[
2098 {
2099 "id": 1,
2100 "name": "struct_name",
2101 "required": true,
2102 "type": "fixed[1]"
2103 }
2104 ]
2105 }
2106 ],
2107 "current-schema-id" : 1,
2108 "partition-specs": [
2109 {
2110 "spec-id": 0,
2111 "fields": []
2112 }
2113 ],
2114 "refs": {},
2115 "default-spec-id": 0,
2116 "last-partition-id": 1000,
2117 "metadata-log": [
2118 {
2119 "metadata-file": "s3://bucket/.../v1.json",
2120 "timestamp-ms": 1515100
2121 }
2122 ],
2123 "sort-orders": [
2124 {
2125 "order-id": 0,
2126 "fields": []
2127 }
2128 ],
2129 "default-sort-order-id": 0
2130 }
2131 "#;
2132
2133 let schema = Schema::builder()
2134 .with_schema_id(1)
2135 .with_fields(vec![Arc::new(NestedField::required(
2136 1,
2137 "struct_name",
2138 Type::Primitive(PrimitiveType::Fixed(1)),
2139 ))])
2140 .build()
2141 .unwrap();
2142
2143 let partition_spec = PartitionSpec::builder(schema.clone())
2144 .with_spec_id(0)
2145 .build()
2146 .unwrap();
2147
2148 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2149 let expected = TableMetadata {
2150 format_version: FormatVersion::V2,
2151 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2152 location: "s3://b/wh/data.db/table".to_string(),
2153 last_updated_ms: 1515100955770,
2154 last_column_id: 1,
2155 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2156 current_schema_id: 1,
2157 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2158 default_partition_type,
2159 default_spec: partition_spec.into(),
2160 last_partition_id: 1000,
2161 default_sort_order_id: 0,
2162 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2163 snapshots: HashMap::default(),
2164 current_snapshot_id: None,
2165 last_sequence_number: 1,
2166 properties: HashMap::new(),
2167 snapshot_log: Vec::new(),
2168 metadata_log: vec![MetadataLog {
2169 metadata_file: "s3://bucket/.../v1.json".to_string(),
2170 timestamp_ms: 1515100,
2171 }],
2172 refs: HashMap::new(),
2173 statistics: HashMap::new(),
2174 partition_statistics: HashMap::new(),
2175 encryption_keys: HashMap::new(),
2176 next_row_id: INITIAL_ROW_ID,
2177 };
2178
2179 let expected_json_value = serde_json::to_value(&expected).unwrap();
2180 check_table_metadata_serde(data, expected);
2181
2182 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2183 assert_eq!(json_value, expected_json_value);
2184 }
2185
2186 #[test]
2187 fn test_current_snapshot_id_must_match_main_branch() {
2188 let data = r#"
2189 {
2190 "format-version" : 2,
2191 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2192 "location": "s3://b/wh/data.db/table",
2193 "last-sequence-number" : 1,
2194 "last-updated-ms": 1515100955770,
2195 "last-column-id": 1,
2196 "schemas": [
2197 {
2198 "schema-id" : 1,
2199 "type" : "struct",
2200 "fields" :[
2201 {
2202 "id": 1,
2203 "name": "struct_name",
2204 "required": true,
2205 "type": "fixed[1]"
2206 },
2207 {
2208 "id": 4,
2209 "name": "ts",
2210 "required": true,
2211 "type": "timestamp"
2212 }
2213 ]
2214 }
2215 ],
2216 "current-schema-id" : 1,
2217 "partition-specs": [
2218 {
2219 "spec-id": 0,
2220 "fields": [
2221 {
2222 "source-id": 4,
2223 "field-id": 1000,
2224 "name": "ts_day",
2225 "transform": "day"
2226 }
2227 ]
2228 }
2229 ],
2230 "default-spec-id": 0,
2231 "last-partition-id": 1000,
2232 "properties": {
2233 "commit.retry.num-retries": "1"
2234 },
2235 "metadata-log": [
2236 {
2237 "metadata-file": "s3://bucket/.../v1.json",
2238 "timestamp-ms": 1515100
2239 }
2240 ],
2241 "sort-orders": [
2242 {
2243 "order-id": 0,
2244 "fields": []
2245 }
2246 ],
2247 "default-sort-order-id": 0,
2248 "current-snapshot-id" : 1,
2249 "refs" : {
2250 "main" : {
2251 "snapshot-id" : 2,
2252 "type" : "branch"
2253 }
2254 },
2255 "snapshots" : [ {
2256 "snapshot-id" : 1,
2257 "timestamp-ms" : 1662532818843,
2258 "sequence-number" : 0,
2259 "summary" : {
2260 "operation" : "append",
2261 "spark.app.id" : "local-1662532784305",
2262 "added-data-files" : "4",
2263 "added-records" : "4",
2264 "added-files-size" : "6001"
2265 },
2266 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2267 "schema-id" : 0
2268 },
2269 {
2270 "snapshot-id" : 2,
2271 "timestamp-ms" : 1662532818844,
2272 "sequence-number" : 0,
2273 "summary" : {
2274 "operation" : "append",
2275 "spark.app.id" : "local-1662532784305",
2276 "added-data-files" : "4",
2277 "added-records" : "4",
2278 "added-files-size" : "6001"
2279 },
2280 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2281 "schema-id" : 0
2282 } ]
2283 }
2284 "#;
2285
2286 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2287 assert!(
2288 err.to_string()
2289 .contains("Current snapshot id does not match main branch")
2290 );
2291 }
2292
2293 #[test]
2294 fn test_main_without_current() {
2295 let data = r#"
2296 {
2297 "format-version" : 2,
2298 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2299 "location": "s3://b/wh/data.db/table",
2300 "last-sequence-number" : 1,
2301 "last-updated-ms": 1515100955770,
2302 "last-column-id": 1,
2303 "schemas": [
2304 {
2305 "schema-id" : 1,
2306 "type" : "struct",
2307 "fields" :[
2308 {
2309 "id": 1,
2310 "name": "struct_name",
2311 "required": true,
2312 "type": "fixed[1]"
2313 },
2314 {
2315 "id": 4,
2316 "name": "ts",
2317 "required": true,
2318 "type": "timestamp"
2319 }
2320 ]
2321 }
2322 ],
2323 "current-schema-id" : 1,
2324 "partition-specs": [
2325 {
2326 "spec-id": 0,
2327 "fields": [
2328 {
2329 "source-id": 4,
2330 "field-id": 1000,
2331 "name": "ts_day",
2332 "transform": "day"
2333 }
2334 ]
2335 }
2336 ],
2337 "default-spec-id": 0,
2338 "last-partition-id": 1000,
2339 "properties": {
2340 "commit.retry.num-retries": "1"
2341 },
2342 "metadata-log": [
2343 {
2344 "metadata-file": "s3://bucket/.../v1.json",
2345 "timestamp-ms": 1515100
2346 }
2347 ],
2348 "sort-orders": [
2349 {
2350 "order-id": 0,
2351 "fields": []
2352 }
2353 ],
2354 "default-sort-order-id": 0,
2355 "refs" : {
2356 "main" : {
2357 "snapshot-id" : 1,
2358 "type" : "branch"
2359 }
2360 },
2361 "snapshots" : [ {
2362 "snapshot-id" : 1,
2363 "timestamp-ms" : 1662532818843,
2364 "sequence-number" : 0,
2365 "summary" : {
2366 "operation" : "append",
2367 "spark.app.id" : "local-1662532784305",
2368 "added-data-files" : "4",
2369 "added-records" : "4",
2370 "added-files-size" : "6001"
2371 },
2372 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2373 "schema-id" : 0
2374 } ]
2375 }
2376 "#;
2377
2378 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2379 assert!(
2380 err.to_string()
2381 .contains("Current snapshot is not set, but main branch exists")
2382 );
2383 }
2384
2385 #[test]
2386 fn test_branch_snapshot_missing() {
2387 let data = r#"
2388 {
2389 "format-version" : 2,
2390 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2391 "location": "s3://b/wh/data.db/table",
2392 "last-sequence-number" : 1,
2393 "last-updated-ms": 1515100955770,
2394 "last-column-id": 1,
2395 "schemas": [
2396 {
2397 "schema-id" : 1,
2398 "type" : "struct",
2399 "fields" :[
2400 {
2401 "id": 1,
2402 "name": "struct_name",
2403 "required": true,
2404 "type": "fixed[1]"
2405 },
2406 {
2407 "id": 4,
2408 "name": "ts",
2409 "required": true,
2410 "type": "timestamp"
2411 }
2412 ]
2413 }
2414 ],
2415 "current-schema-id" : 1,
2416 "partition-specs": [
2417 {
2418 "spec-id": 0,
2419 "fields": [
2420 {
2421 "source-id": 4,
2422 "field-id": 1000,
2423 "name": "ts_day",
2424 "transform": "day"
2425 }
2426 ]
2427 }
2428 ],
2429 "default-spec-id": 0,
2430 "last-partition-id": 1000,
2431 "properties": {
2432 "commit.retry.num-retries": "1"
2433 },
2434 "metadata-log": [
2435 {
2436 "metadata-file": "s3://bucket/.../v1.json",
2437 "timestamp-ms": 1515100
2438 }
2439 ],
2440 "sort-orders": [
2441 {
2442 "order-id": 0,
2443 "fields": []
2444 }
2445 ],
2446 "default-sort-order-id": 0,
2447 "refs" : {
2448 "main" : {
2449 "snapshot-id" : 1,
2450 "type" : "branch"
2451 },
2452 "foo" : {
2453 "snapshot-id" : 2,
2454 "type" : "branch"
2455 }
2456 },
2457 "snapshots" : [ {
2458 "snapshot-id" : 1,
2459 "timestamp-ms" : 1662532818843,
2460 "sequence-number" : 0,
2461 "summary" : {
2462 "operation" : "append",
2463 "spark.app.id" : "local-1662532784305",
2464 "added-data-files" : "4",
2465 "added-records" : "4",
2466 "added-files-size" : "6001"
2467 },
2468 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2469 "schema-id" : 0
2470 } ]
2471 }
2472 "#;
2473
2474 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2475 assert!(
2476 err.to_string().contains(
2477 "Snapshot for reference foo does not exist in the existing snapshots list"
2478 )
2479 );
2480 }
2481
2482 #[test]
2483 fn test_v2_wrong_max_snapshot_sequence_number() {
2484 let data = r#"
2485 {
2486 "format-version": 2,
2487 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2488 "location": "s3://bucket/test/location",
2489 "last-sequence-number": 1,
2490 "last-updated-ms": 1602638573590,
2491 "last-column-id": 3,
2492 "current-schema-id": 0,
2493 "schemas": [
2494 {
2495 "type": "struct",
2496 "schema-id": 0,
2497 "fields": [
2498 {
2499 "id": 1,
2500 "name": "x",
2501 "required": true,
2502 "type": "long"
2503 }
2504 ]
2505 }
2506 ],
2507 "default-spec-id": 0,
2508 "partition-specs": [
2509 {
2510 "spec-id": 0,
2511 "fields": []
2512 }
2513 ],
2514 "last-partition-id": 1000,
2515 "default-sort-order-id": 0,
2516 "sort-orders": [
2517 {
2518 "order-id": 0,
2519 "fields": []
2520 }
2521 ],
2522 "properties": {},
2523 "current-snapshot-id": 3055729675574597004,
2524 "snapshots": [
2525 {
2526 "snapshot-id": 3055729675574597004,
2527 "timestamp-ms": 1555100955770,
2528 "sequence-number": 4,
2529 "summary": {
2530 "operation": "append"
2531 },
2532 "manifest-list": "s3://a/b/2.avro",
2533 "schema-id": 0
2534 }
2535 ],
2536 "statistics": [],
2537 "snapshot-log": [],
2538 "metadata-log": []
2539 }
2540 "#;
2541
2542 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2543 assert!(err.to_string().contains(
2544 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2545 ));
2546
2547 let data = data.replace(
2549 r#""last-sequence-number": 1,"#,
2550 r#""last-sequence-number": 4,"#,
2551 );
2552 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2553 assert_eq!(metadata.last_sequence_number, 4);
2554
2555 let data = data.replace(
2557 r#""last-sequence-number": 4,"#,
2558 r#""last-sequence-number": 5,"#,
2559 );
2560 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2561 assert_eq!(metadata.last_sequence_number, 5);
2562 }
2563
2564 #[test]
2565 fn test_statistic_files() {
2566 let data = r#"
2567 {
2568 "format-version": 2,
2569 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2570 "location": "s3://bucket/test/location",
2571 "last-sequence-number": 34,
2572 "last-updated-ms": 1602638573590,
2573 "last-column-id": 3,
2574 "current-schema-id": 0,
2575 "schemas": [
2576 {
2577 "type": "struct",
2578 "schema-id": 0,
2579 "fields": [
2580 {
2581 "id": 1,
2582 "name": "x",
2583 "required": true,
2584 "type": "long"
2585 }
2586 ]
2587 }
2588 ],
2589 "default-spec-id": 0,
2590 "partition-specs": [
2591 {
2592 "spec-id": 0,
2593 "fields": []
2594 }
2595 ],
2596 "last-partition-id": 1000,
2597 "default-sort-order-id": 0,
2598 "sort-orders": [
2599 {
2600 "order-id": 0,
2601 "fields": []
2602 }
2603 ],
2604 "properties": {},
2605 "current-snapshot-id": 3055729675574597004,
2606 "snapshots": [
2607 {
2608 "snapshot-id": 3055729675574597004,
2609 "timestamp-ms": 1555100955770,
2610 "sequence-number": 1,
2611 "summary": {
2612 "operation": "append"
2613 },
2614 "manifest-list": "s3://a/b/2.avro",
2615 "schema-id": 0
2616 }
2617 ],
2618 "statistics": [
2619 {
2620 "snapshot-id": 3055729675574597004,
2621 "statistics-path": "s3://a/b/stats.puffin",
2622 "file-size-in-bytes": 413,
2623 "file-footer-size-in-bytes": 42,
2624 "blob-metadata": [
2625 {
2626 "type": "ndv",
2627 "snapshot-id": 3055729675574597004,
2628 "sequence-number": 1,
2629 "fields": [
2630 1
2631 ]
2632 }
2633 ]
2634 }
2635 ],
2636 "snapshot-log": [],
2637 "metadata-log": []
2638 }
2639 "#;
2640
2641 let schema = Schema::builder()
2642 .with_schema_id(0)
2643 .with_fields(vec![Arc::new(NestedField::required(
2644 1,
2645 "x",
2646 Type::Primitive(PrimitiveType::Long),
2647 ))])
2648 .build()
2649 .unwrap();
2650 let partition_spec = PartitionSpec::builder(schema.clone())
2651 .with_spec_id(0)
2652 .build()
2653 .unwrap();
2654 let snapshot = Snapshot::builder()
2655 .with_snapshot_id(3055729675574597004)
2656 .with_timestamp_ms(1555100955770)
2657 .with_sequence_number(1)
2658 .with_manifest_list("s3://a/b/2.avro")
2659 .with_schema_id(0)
2660 .with_summary(Summary {
2661 operation: Operation::Append,
2662 additional_properties: HashMap::new(),
2663 })
2664 .build();
2665
2666 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2667 let expected = TableMetadata {
2668 format_version: FormatVersion::V2,
2669 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2670 location: "s3://bucket/test/location".to_string(),
2671 last_updated_ms: 1602638573590,
2672 last_column_id: 3,
2673 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2674 current_schema_id: 0,
2675 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2676 default_partition_type,
2677 default_spec: Arc::new(partition_spec),
2678 last_partition_id: 1000,
2679 default_sort_order_id: 0,
2680 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2681 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2682 current_snapshot_id: Some(3055729675574597004),
2683 last_sequence_number: 34,
2684 properties: HashMap::new(),
2685 snapshot_log: Vec::new(),
2686 metadata_log: Vec::new(),
2687 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2688 snapshot_id: 3055729675574597004,
2689 statistics_path: "s3://a/b/stats.puffin".to_string(),
2690 file_size_in_bytes: 413,
2691 file_footer_size_in_bytes: 42,
2692 key_metadata: None,
2693 blob_metadata: vec![BlobMetadata {
2694 snapshot_id: 3055729675574597004,
2695 sequence_number: 1,
2696 fields: vec![1],
2697 r#type: "ndv".to_string(),
2698 properties: HashMap::new(),
2699 }],
2700 })]),
2701 partition_statistics: HashMap::new(),
2702 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2703 snapshot_id: 3055729675574597004,
2704 retention: SnapshotRetention::Branch {
2705 min_snapshots_to_keep: None,
2706 max_snapshot_age_ms: None,
2707 max_ref_age_ms: None,
2708 },
2709 })]),
2710 encryption_keys: HashMap::new(),
2711 next_row_id: INITIAL_ROW_ID,
2712 };
2713
2714 check_table_metadata_serde(data, expected);
2715 }
2716
2717 #[test]
2718 fn test_partition_statistics_file() {
2719 let data = r#"
2720 {
2721 "format-version": 2,
2722 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2723 "location": "s3://bucket/test/location",
2724 "last-sequence-number": 34,
2725 "last-updated-ms": 1602638573590,
2726 "last-column-id": 3,
2727 "current-schema-id": 0,
2728 "schemas": [
2729 {
2730 "type": "struct",
2731 "schema-id": 0,
2732 "fields": [
2733 {
2734 "id": 1,
2735 "name": "x",
2736 "required": true,
2737 "type": "long"
2738 }
2739 ]
2740 }
2741 ],
2742 "default-spec-id": 0,
2743 "partition-specs": [
2744 {
2745 "spec-id": 0,
2746 "fields": []
2747 }
2748 ],
2749 "last-partition-id": 1000,
2750 "default-sort-order-id": 0,
2751 "sort-orders": [
2752 {
2753 "order-id": 0,
2754 "fields": []
2755 }
2756 ],
2757 "properties": {},
2758 "current-snapshot-id": 3055729675574597004,
2759 "snapshots": [
2760 {
2761 "snapshot-id": 3055729675574597004,
2762 "timestamp-ms": 1555100955770,
2763 "sequence-number": 1,
2764 "summary": {
2765 "operation": "append"
2766 },
2767 "manifest-list": "s3://a/b/2.avro",
2768 "schema-id": 0
2769 }
2770 ],
2771 "partition-statistics": [
2772 {
2773 "snapshot-id": 3055729675574597004,
2774 "statistics-path": "s3://a/b/partition-stats.parquet",
2775 "file-size-in-bytes": 43
2776 }
2777 ],
2778 "snapshot-log": [],
2779 "metadata-log": []
2780 }
2781 "#;
2782
2783 let schema = Schema::builder()
2784 .with_schema_id(0)
2785 .with_fields(vec![Arc::new(NestedField::required(
2786 1,
2787 "x",
2788 Type::Primitive(PrimitiveType::Long),
2789 ))])
2790 .build()
2791 .unwrap();
2792 let partition_spec = PartitionSpec::builder(schema.clone())
2793 .with_spec_id(0)
2794 .build()
2795 .unwrap();
2796 let snapshot = Snapshot::builder()
2797 .with_snapshot_id(3055729675574597004)
2798 .with_timestamp_ms(1555100955770)
2799 .with_sequence_number(1)
2800 .with_manifest_list("s3://a/b/2.avro")
2801 .with_schema_id(0)
2802 .with_summary(Summary {
2803 operation: Operation::Append,
2804 additional_properties: HashMap::new(),
2805 })
2806 .build();
2807
2808 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2809 let expected = TableMetadata {
2810 format_version: FormatVersion::V2,
2811 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2812 location: "s3://bucket/test/location".to_string(),
2813 last_updated_ms: 1602638573590,
2814 last_column_id: 3,
2815 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2816 current_schema_id: 0,
2817 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2818 default_spec: Arc::new(partition_spec),
2819 default_partition_type,
2820 last_partition_id: 1000,
2821 default_sort_order_id: 0,
2822 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2823 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2824 current_snapshot_id: Some(3055729675574597004),
2825 last_sequence_number: 34,
2826 properties: HashMap::new(),
2827 snapshot_log: Vec::new(),
2828 metadata_log: Vec::new(),
2829 statistics: HashMap::new(),
2830 partition_statistics: HashMap::from_iter(vec![(
2831 3055729675574597004,
2832 PartitionStatisticsFile {
2833 snapshot_id: 3055729675574597004,
2834 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2835 file_size_in_bytes: 43,
2836 },
2837 )]),
2838 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2839 snapshot_id: 3055729675574597004,
2840 retention: SnapshotRetention::Branch {
2841 min_snapshots_to_keep: None,
2842 max_snapshot_age_ms: None,
2843 max_ref_age_ms: None,
2844 },
2845 })]),
2846 encryption_keys: HashMap::new(),
2847 next_row_id: INITIAL_ROW_ID,
2848 };
2849
2850 check_table_metadata_serde(data, expected);
2851 }
2852
2853 #[test]
2854 fn test_invalid_table_uuid() -> Result<()> {
2855 let data = r#"
2856 {
2857 "format-version" : 2,
2858 "table-uuid": "xxxx"
2859 }
2860 "#;
2861 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2862 Ok(())
2863 }
2864
2865 #[test]
2866 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2867 let data = r#"
2868 {
2869 "format-version" : 1
2870 }
2871 "#;
2872 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2873 Ok(())
2874 }
2875
2876 #[test]
2877 fn test_table_metadata_v3_valid_minimal() {
2878 let metadata_str =
2879 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2880
2881 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2882 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2883
2884 let schema = Schema::builder()
2885 .with_schema_id(0)
2886 .with_fields(vec![
2887 Arc::new(
2888 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2889 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2890 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2891 ),
2892 Arc::new(
2893 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2894 .with_doc("comment"),
2895 ),
2896 Arc::new(NestedField::required(
2897 3,
2898 "z",
2899 Type::Primitive(PrimitiveType::Long),
2900 )),
2901 ])
2902 .build()
2903 .unwrap();
2904
2905 let partition_spec = PartitionSpec::builder(schema.clone())
2906 .with_spec_id(0)
2907 .add_unbound_field(UnboundPartitionField {
2908 name: "x".to_string(),
2909 transform: Transform::Identity,
2910 source_id: 1,
2911 field_id: Some(1000),
2912 })
2913 .unwrap()
2914 .build()
2915 .unwrap();
2916
2917 let sort_order = SortOrder::builder()
2918 .with_order_id(3)
2919 .with_sort_field(SortField {
2920 source_id: 2,
2921 transform: Transform::Identity,
2922 direction: SortDirection::Ascending,
2923 null_order: NullOrder::First,
2924 })
2925 .with_sort_field(SortField {
2926 source_id: 3,
2927 transform: Transform::Bucket(4),
2928 direction: SortDirection::Descending,
2929 null_order: NullOrder::Last,
2930 })
2931 .build_unbound()
2932 .unwrap();
2933
2934 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2935 let expected = TableMetadata {
2936 format_version: FormatVersion::V3,
2937 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2938 location: "s3://bucket/test/location".to_string(),
2939 last_updated_ms: 1602638573590,
2940 last_column_id: 3,
2941 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2942 current_schema_id: 0,
2943 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2944 default_spec: Arc::new(partition_spec),
2945 default_partition_type,
2946 last_partition_id: 1000,
2947 default_sort_order_id: 3,
2948 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2949 snapshots: HashMap::default(),
2950 current_snapshot_id: None,
2951 last_sequence_number: 34,
2952 properties: HashMap::new(),
2953 snapshot_log: Vec::new(),
2954 metadata_log: Vec::new(),
2955 refs: HashMap::new(),
2956 statistics: HashMap::new(),
2957 partition_statistics: HashMap::new(),
2958 encryption_keys: HashMap::new(),
2959 next_row_id: 0, };
2961
2962 check_table_metadata_serde(&metadata_str, expected);
2963 }
2964
2965 #[test]
2966 fn test_table_metadata_v2_file_valid() {
2967 let metadata =
2968 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2969
2970 let schema1 = Schema::builder()
2971 .with_schema_id(0)
2972 .with_fields(vec![Arc::new(NestedField::required(
2973 1,
2974 "x",
2975 Type::Primitive(PrimitiveType::Long),
2976 ))])
2977 .build()
2978 .unwrap();
2979
2980 let schema2 = Schema::builder()
2981 .with_schema_id(1)
2982 .with_fields(vec![
2983 Arc::new(NestedField::required(
2984 1,
2985 "x",
2986 Type::Primitive(PrimitiveType::Long),
2987 )),
2988 Arc::new(
2989 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2990 .with_doc("comment"),
2991 ),
2992 Arc::new(NestedField::required(
2993 3,
2994 "z",
2995 Type::Primitive(PrimitiveType::Long),
2996 )),
2997 ])
2998 .with_identifier_field_ids(vec![1, 2])
2999 .build()
3000 .unwrap();
3001
3002 let partition_spec = PartitionSpec::builder(schema2.clone())
3003 .with_spec_id(0)
3004 .add_unbound_field(UnboundPartitionField {
3005 name: "x".to_string(),
3006 transform: Transform::Identity,
3007 source_id: 1,
3008 field_id: Some(1000),
3009 })
3010 .unwrap()
3011 .build()
3012 .unwrap();
3013
3014 let sort_order = SortOrder::builder()
3015 .with_order_id(3)
3016 .with_sort_field(SortField {
3017 source_id: 2,
3018 transform: Transform::Identity,
3019 direction: SortDirection::Ascending,
3020 null_order: NullOrder::First,
3021 })
3022 .with_sort_field(SortField {
3023 source_id: 3,
3024 transform: Transform::Bucket(4),
3025 direction: SortDirection::Descending,
3026 null_order: NullOrder::Last,
3027 })
3028 .build_unbound()
3029 .unwrap();
3030
3031 let snapshot1 = Snapshot::builder()
3032 .with_snapshot_id(3051729675574597004)
3033 .with_timestamp_ms(1515100955770)
3034 .with_sequence_number(0)
3035 .with_manifest_list("s3://a/b/1.avro")
3036 .with_summary(Summary {
3037 operation: Operation::Append,
3038 additional_properties: HashMap::new(),
3039 })
3040 .build();
3041
3042 let snapshot2 = Snapshot::builder()
3043 .with_snapshot_id(3055729675574597004)
3044 .with_parent_snapshot_id(Some(3051729675574597004))
3045 .with_timestamp_ms(1555100955770)
3046 .with_sequence_number(1)
3047 .with_schema_id(1)
3048 .with_manifest_list("s3://a/b/2.avro")
3049 .with_summary(Summary {
3050 operation: Operation::Append,
3051 additional_properties: HashMap::new(),
3052 })
3053 .build();
3054
3055 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3056 let expected = TableMetadata {
3057 format_version: FormatVersion::V2,
3058 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3059 location: "s3://bucket/test/location".to_string(),
3060 last_updated_ms: 1602638573590,
3061 last_column_id: 3,
3062 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3063 current_schema_id: 1,
3064 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3065 default_spec: Arc::new(partition_spec),
3066 default_partition_type,
3067 last_partition_id: 1000,
3068 default_sort_order_id: 3,
3069 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3070 snapshots: HashMap::from_iter(vec![
3071 (3051729675574597004, Arc::new(snapshot1)),
3072 (3055729675574597004, Arc::new(snapshot2)),
3073 ]),
3074 current_snapshot_id: Some(3055729675574597004),
3075 last_sequence_number: 34,
3076 properties: HashMap::new(),
3077 snapshot_log: vec![
3078 SnapshotLog {
3079 snapshot_id: 3051729675574597004,
3080 timestamp_ms: 1515100955770,
3081 },
3082 SnapshotLog {
3083 snapshot_id: 3055729675574597004,
3084 timestamp_ms: 1555100955770,
3085 },
3086 ],
3087 metadata_log: Vec::new(),
3088 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3089 snapshot_id: 3055729675574597004,
3090 retention: SnapshotRetention::Branch {
3091 min_snapshots_to_keep: None,
3092 max_snapshot_age_ms: None,
3093 max_ref_age_ms: None,
3094 },
3095 })]),
3096 statistics: HashMap::new(),
3097 partition_statistics: HashMap::new(),
3098 encryption_keys: HashMap::new(),
3099 next_row_id: INITIAL_ROW_ID,
3100 };
3101
3102 check_table_metadata_serde(&metadata, expected);
3103 }
3104
3105 #[test]
3106 fn test_table_metadata_v2_file_valid_minimal() {
3107 let metadata =
3108 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3109
3110 let schema = Schema::builder()
3111 .with_schema_id(0)
3112 .with_fields(vec![
3113 Arc::new(NestedField::required(
3114 1,
3115 "x",
3116 Type::Primitive(PrimitiveType::Long),
3117 )),
3118 Arc::new(
3119 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3120 .with_doc("comment"),
3121 ),
3122 Arc::new(NestedField::required(
3123 3,
3124 "z",
3125 Type::Primitive(PrimitiveType::Long),
3126 )),
3127 ])
3128 .build()
3129 .unwrap();
3130
3131 let partition_spec = PartitionSpec::builder(schema.clone())
3132 .with_spec_id(0)
3133 .add_unbound_field(UnboundPartitionField {
3134 name: "x".to_string(),
3135 transform: Transform::Identity,
3136 source_id: 1,
3137 field_id: Some(1000),
3138 })
3139 .unwrap()
3140 .build()
3141 .unwrap();
3142
3143 let sort_order = SortOrder::builder()
3144 .with_order_id(3)
3145 .with_sort_field(SortField {
3146 source_id: 2,
3147 transform: Transform::Identity,
3148 direction: SortDirection::Ascending,
3149 null_order: NullOrder::First,
3150 })
3151 .with_sort_field(SortField {
3152 source_id: 3,
3153 transform: Transform::Bucket(4),
3154 direction: SortDirection::Descending,
3155 null_order: NullOrder::Last,
3156 })
3157 .build_unbound()
3158 .unwrap();
3159
3160 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3161 let expected = TableMetadata {
3162 format_version: FormatVersion::V2,
3163 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3164 location: "s3://bucket/test/location".to_string(),
3165 last_updated_ms: 1602638573590,
3166 last_column_id: 3,
3167 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3168 current_schema_id: 0,
3169 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3170 default_partition_type,
3171 default_spec: Arc::new(partition_spec),
3172 last_partition_id: 1000,
3173 default_sort_order_id: 3,
3174 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3175 snapshots: HashMap::default(),
3176 current_snapshot_id: None,
3177 last_sequence_number: 34,
3178 properties: HashMap::new(),
3179 snapshot_log: vec![],
3180 metadata_log: Vec::new(),
3181 refs: HashMap::new(),
3182 statistics: HashMap::new(),
3183 partition_statistics: HashMap::new(),
3184 encryption_keys: HashMap::new(),
3185 next_row_id: INITIAL_ROW_ID,
3186 };
3187
3188 check_table_metadata_serde(&metadata, expected);
3189 }
3190
3191 #[test]
3192 fn test_table_metadata_v1_file_valid() {
3193 let metadata =
3194 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3195
3196 let schema = Schema::builder()
3197 .with_schema_id(0)
3198 .with_fields(vec![
3199 Arc::new(NestedField::required(
3200 1,
3201 "x",
3202 Type::Primitive(PrimitiveType::Long),
3203 )),
3204 Arc::new(
3205 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3206 .with_doc("comment"),
3207 ),
3208 Arc::new(NestedField::required(
3209 3,
3210 "z",
3211 Type::Primitive(PrimitiveType::Long),
3212 )),
3213 ])
3214 .build()
3215 .unwrap();
3216
3217 let partition_spec = PartitionSpec::builder(schema.clone())
3218 .with_spec_id(0)
3219 .add_unbound_field(UnboundPartitionField {
3220 name: "x".to_string(),
3221 transform: Transform::Identity,
3222 source_id: 1,
3223 field_id: Some(1000),
3224 })
3225 .unwrap()
3226 .build()
3227 .unwrap();
3228
3229 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3230 let expected = TableMetadata {
3231 format_version: FormatVersion::V1,
3232 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3233 location: "s3://bucket/test/location".to_string(),
3234 last_updated_ms: 1602638573874,
3235 last_column_id: 3,
3236 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3237 current_schema_id: 0,
3238 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3239 default_spec: Arc::new(partition_spec),
3240 default_partition_type,
3241 last_partition_id: 0,
3242 default_sort_order_id: 0,
3243 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3245 snapshots: HashMap::new(),
3246 current_snapshot_id: None,
3247 last_sequence_number: 0,
3248 properties: HashMap::new(),
3249 snapshot_log: vec![],
3250 metadata_log: Vec::new(),
3251 refs: HashMap::new(),
3252 statistics: HashMap::new(),
3253 partition_statistics: HashMap::new(),
3254 encryption_keys: HashMap::new(),
3255 next_row_id: INITIAL_ROW_ID,
3256 };
3257
3258 check_table_metadata_serde(&metadata, expected);
3259 }
3260
3261 #[test]
3262 fn test_table_metadata_v1_compat() {
3263 let metadata =
3264 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3265
3266 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3268 .expect("Failed to deserialize TableMetadataV1Compat.json");
3269
3270 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3272 assert_eq!(
3273 desered_type.uuid(),
3274 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3275 );
3276 assert_eq!(
3277 desered_type.location(),
3278 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3279 );
3280 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3281 assert_eq!(desered_type.current_schema_id(), 0);
3282 }
3283
3284 #[test]
3285 fn test_table_metadata_v1_schemas_without_current_id() {
3286 let metadata = fs::read_to_string(
3287 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3288 )
3289 .unwrap();
3290
3291 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3293 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3294
3295 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3297 assert_eq!(
3298 desered_type.uuid(),
3299 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3300 );
3301
3302 let schema = desered_type.current_schema();
3304 assert_eq!(schema.as_struct().fields().len(), 3);
3305 assert_eq!(schema.as_struct().fields()[0].name, "x");
3306 assert_eq!(schema.as_struct().fields()[1].name, "y");
3307 assert_eq!(schema.as_struct().fields()[2].name, "z");
3308 }
3309
3310 #[test]
3311 fn test_table_metadata_v1_no_valid_schema() {
3312 let metadata =
3313 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3314 .unwrap();
3315
3316 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3318
3319 assert!(desered.is_err());
3320 let error_message = desered.unwrap_err().to_string();
3321 assert!(
3322 error_message.contains("No valid schema configuration found"),
3323 "Expected error about no valid schema configuration, got: {error_message}"
3324 );
3325 }
3326
3327 #[test]
3328 fn test_table_metadata_v1_partition_specs_without_default_id() {
3329 let metadata = fs::read_to_string(
3330 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3331 )
3332 .unwrap();
3333
3334 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3336 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3337
3338 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3340 assert_eq!(
3341 desered_type.uuid(),
3342 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3343 );
3344
3345 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3348
3349 let default_spec = &desered_type.default_spec;
3351 assert_eq!(default_spec.spec_id(), 2);
3352 assert_eq!(default_spec.fields().len(), 1);
3353 assert_eq!(default_spec.fields()[0].name, "y");
3354 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3355 assert_eq!(default_spec.fields()[0].source_id, 2);
3356 }
3357
3358 #[test]
3359 fn test_table_metadata_v2_schema_not_found() {
3360 let metadata =
3361 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3362 .unwrap();
3363
3364 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3365
3366 assert_eq!(
3367 desered.unwrap_err().to_string(),
3368 "DataInvalid => No schema exists with the current schema id 2."
3369 )
3370 }
3371
3372 #[test]
3373 fn test_table_metadata_v2_missing_sort_order() {
3374 let metadata =
3375 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3376 .unwrap();
3377
3378 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3379
3380 assert_eq!(
3381 desered.unwrap_err().to_string(),
3382 "data did not match any variant of untagged enum TableMetadataEnum"
3383 )
3384 }
3385
3386 #[test]
3387 fn test_table_metadata_v2_missing_partition_specs() {
3388 let metadata =
3389 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3390 .unwrap();
3391
3392 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3393
3394 assert_eq!(
3395 desered.unwrap_err().to_string(),
3396 "data did not match any variant of untagged enum TableMetadataEnum"
3397 )
3398 }
3399
3400 #[test]
3401 fn test_table_metadata_v2_missing_last_partition_id() {
3402 let metadata = fs::read_to_string(
3403 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3404 )
3405 .unwrap();
3406
3407 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3408
3409 assert_eq!(
3410 desered.unwrap_err().to_string(),
3411 "data did not match any variant of untagged enum TableMetadataEnum"
3412 )
3413 }
3414
3415 #[test]
3416 fn test_table_metadata_v2_missing_schemas() {
3417 let metadata =
3418 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3419 .unwrap();
3420
3421 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3422
3423 assert_eq!(
3424 desered.unwrap_err().to_string(),
3425 "data did not match any variant of untagged enum TableMetadataEnum"
3426 )
3427 }
3428
3429 #[test]
3430 fn test_table_metadata_v2_unsupported_version() {
3431 let metadata =
3432 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3433 .unwrap();
3434
3435 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3436
3437 assert_eq!(
3438 desered.unwrap_err().to_string(),
3439 "data did not match any variant of untagged enum TableMetadataEnum"
3440 )
3441 }
3442
3443 #[test]
3444 fn test_order_of_format_version() {
3445 assert!(FormatVersion::V1 < FormatVersion::V2);
3446 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3447 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3448 }
3449
3450 #[test]
3451 fn test_default_partition_spec() {
3452 let default_spec_id = 1234;
3453 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3454 let partition_spec = PartitionSpec::unpartition_spec();
3455 table_meta_data.default_spec = partition_spec.clone().into();
3456 table_meta_data
3457 .partition_specs
3458 .insert(default_spec_id, Arc::new(partition_spec));
3459
3460 assert_eq!(
3461 (*table_meta_data.default_partition_spec().clone()).clone(),
3462 (*table_meta_data
3463 .partition_spec_by_id(default_spec_id)
3464 .unwrap()
3465 .clone())
3466 .clone()
3467 );
3468 }
3469 #[test]
3470 fn test_default_sort_order() {
3471 let default_sort_order_id = 1234;
3472 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3473 table_meta_data.default_sort_order_id = default_sort_order_id;
3474 table_meta_data
3475 .sort_orders
3476 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3477
3478 assert_eq!(
3479 table_meta_data.default_sort_order(),
3480 table_meta_data
3481 .sort_orders
3482 .get(&default_sort_order_id)
3483 .unwrap()
3484 )
3485 }
3486
3487 #[test]
3488 fn test_table_metadata_builder_from_table_creation() {
3489 let table_creation = TableCreation::builder()
3490 .location("s3://db/table".to_string())
3491 .name("table".to_string())
3492 .properties(HashMap::new())
3493 .schema(Schema::builder().build().unwrap())
3494 .build();
3495 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3496 .unwrap()
3497 .build()
3498 .unwrap()
3499 .metadata;
3500 assert_eq!(table_metadata.location, "s3://db/table");
3501 assert_eq!(table_metadata.schemas.len(), 1);
3502 assert_eq!(
3503 table_metadata
3504 .schemas
3505 .get(&0)
3506 .unwrap()
3507 .as_struct()
3508 .fields()
3509 .len(),
3510 0
3511 );
3512 assert_eq!(table_metadata.properties.len(), 0);
3513 assert_eq!(
3514 table_metadata.partition_specs,
3515 HashMap::from([(
3516 0,
3517 Arc::new(
3518 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3519 .with_spec_id(0)
3520 .build()
3521 .unwrap()
3522 )
3523 )])
3524 );
3525 assert_eq!(
3526 table_metadata.sort_orders,
3527 HashMap::from([(
3528 0,
3529 Arc::new(SortOrder {
3530 order_id: 0,
3531 fields: vec![]
3532 })
3533 )])
3534 );
3535 }
3536
3537 #[tokio::test]
3538 async fn test_table_metadata_read_write() {
3539 let temp_dir = TempDir::new().unwrap();
3541 let temp_path = temp_dir.path().to_str().unwrap();
3542
3543 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3545
3546 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3548
3549 let metadata_location = format!("{temp_path}/metadata.json");
3551
3552 original_metadata
3554 .write_to(&file_io, &metadata_location)
3555 .await
3556 .unwrap();
3557
3558 assert!(fs::metadata(&metadata_location).is_ok());
3560
3561 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3563 .await
3564 .unwrap();
3565
3566 assert_eq!(read_metadata, original_metadata);
3568 }
3569
3570 #[tokio::test]
3571 async fn test_table_metadata_read_compressed() {
3572 let temp_dir = TempDir::new().unwrap();
3573 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3574
3575 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3576 let json = serde_json::to_string(&original_metadata).unwrap();
3577
3578 let compressed = CompressionCodec::Gzip
3579 .compress(json.into_bytes())
3580 .expect("failed to compress metadata");
3581 std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3582
3583 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3585 let metadata_location = metadata_location.to_str().unwrap();
3586 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3587 .await
3588 .unwrap();
3589
3590 assert_eq!(read_metadata, original_metadata);
3592 }
3593
3594 #[tokio::test]
3595 async fn test_table_metadata_read_nonexistent_file() {
3596 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3598
3599 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3601
3602 assert!(result.is_err());
3604 }
3605
3606 #[test]
3607 fn test_partition_name_exists() {
3608 let schema = Schema::builder()
3609 .with_fields(vec![
3610 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3611 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3612 .into(),
3613 ])
3614 .build()
3615 .unwrap();
3616
3617 let spec1 = PartitionSpec::builder(schema.clone())
3618 .with_spec_id(1)
3619 .add_partition_field("data", "data_partition", Transform::Identity)
3620 .unwrap()
3621 .build()
3622 .unwrap();
3623
3624 let spec2 = PartitionSpec::builder(schema.clone())
3625 .with_spec_id(2)
3626 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3627 .unwrap()
3628 .build()
3629 .unwrap();
3630
3631 let metadata = TableMetadataBuilder::new(
3633 schema,
3634 spec1.clone().into_unbound(),
3635 SortOrder::unsorted_order(),
3636 "s3://test/location".to_string(),
3637 FormatVersion::V2,
3638 HashMap::new(),
3639 )
3640 .unwrap()
3641 .add_partition_spec(spec2.into_unbound())
3642 .unwrap()
3643 .build()
3644 .unwrap()
3645 .metadata;
3646
3647 assert!(metadata.partition_name_exists("data_partition"));
3648 assert!(metadata.partition_name_exists("partition_bucket"));
3649
3650 assert!(!metadata.partition_name_exists("nonexistent_field"));
3651 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3653 }
3654
3655 #[test]
3656 fn test_partition_name_exists_empty_specs() {
3657 let schema = Schema::builder()
3659 .with_fields(vec![
3660 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3661 ])
3662 .build()
3663 .unwrap();
3664
3665 let metadata = TableMetadataBuilder::new(
3666 schema,
3667 PartitionSpec::unpartition_spec().into_unbound(),
3668 SortOrder::unsorted_order(),
3669 "s3://test/location".to_string(),
3670 FormatVersion::V2,
3671 HashMap::new(),
3672 )
3673 .unwrap()
3674 .build()
3675 .unwrap()
3676 .metadata;
3677
3678 assert!(!metadata.partition_name_exists("any_field"));
3679 assert!(!metadata.partition_name_exists("data"));
3680 }
3681
3682 #[test]
3683 fn test_name_exists_in_any_schema() {
3684 let schema1 = Schema::builder()
3686 .with_schema_id(1)
3687 .with_fields(vec![
3688 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3689 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3690 ])
3691 .build()
3692 .unwrap();
3693
3694 let schema2 = Schema::builder()
3695 .with_schema_id(2)
3696 .with_fields(vec![
3697 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3698 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3699 ])
3700 .build()
3701 .unwrap();
3702
3703 let metadata = TableMetadataBuilder::new(
3704 schema1,
3705 PartitionSpec::unpartition_spec().into_unbound(),
3706 SortOrder::unsorted_order(),
3707 "s3://test/location".to_string(),
3708 FormatVersion::V2,
3709 HashMap::new(),
3710 )
3711 .unwrap()
3712 .add_current_schema(schema2)
3713 .unwrap()
3714 .build()
3715 .unwrap()
3716 .metadata;
3717
3718 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3723 assert!(!metadata.name_exists_in_any_schema("field4"));
3724 assert!(!metadata.name_exists_in_any_schema(""));
3725 }
3726
3727 #[test]
3728 fn test_name_exists_in_any_schema_empty_schemas() {
3729 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3730
3731 let metadata = TableMetadataBuilder::new(
3732 schema,
3733 PartitionSpec::unpartition_spec().into_unbound(),
3734 SortOrder::unsorted_order(),
3735 "s3://test/location".to_string(),
3736 FormatVersion::V2,
3737 HashMap::new(),
3738 )
3739 .unwrap()
3740 .build()
3741 .unwrap()
3742 .metadata;
3743
3744 assert!(!metadata.name_exists_in_any_schema("any_field"));
3745 }
3746
3747 #[test]
3748 fn test_helper_methods_multi_version_scenario() {
3749 let initial_schema = Schema::builder()
3751 .with_fields(vec![
3752 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3753 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3754 NestedField::required(
3755 3,
3756 "deprecated_field",
3757 Type::Primitive(PrimitiveType::String),
3758 )
3759 .into(),
3760 ])
3761 .build()
3762 .unwrap();
3763
3764 let metadata = TableMetadataBuilder::new(
3765 initial_schema,
3766 PartitionSpec::unpartition_spec().into_unbound(),
3767 SortOrder::unsorted_order(),
3768 "s3://test/location".to_string(),
3769 FormatVersion::V2,
3770 HashMap::new(),
3771 )
3772 .unwrap();
3773
3774 let evolved_schema = Schema::builder()
3775 .with_fields(vec![
3776 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3777 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3778 NestedField::required(
3779 3,
3780 "deprecated_field",
3781 Type::Primitive(PrimitiveType::String),
3782 )
3783 .into(),
3784 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3785 .into(),
3786 ])
3787 .build()
3788 .unwrap();
3789
3790 let _final_schema = Schema::builder()
3792 .with_fields(vec![
3793 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3794 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3795 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3796 .into(),
3797 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3798 .into(),
3799 ])
3800 .build()
3801 .unwrap();
3802
3803 let final_metadata = metadata
3804 .add_current_schema(evolved_schema)
3805 .unwrap()
3806 .build()
3807 .unwrap()
3808 .metadata;
3809
3810 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3817 }
3818
3819 #[test]
3820 fn test_invalid_sort_order_id_zero_with_fields() {
3821 let metadata = r#"
3822 {
3823 "format-version": 2,
3824 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3825 "location": "s3://bucket/test/location",
3826 "last-sequence-number": 111,
3827 "last-updated-ms": 1600000000000,
3828 "last-column-id": 3,
3829 "current-schema-id": 1,
3830 "schemas": [
3831 {
3832 "type": "struct",
3833 "schema-id": 1,
3834 "fields": [
3835 {"id": 1, "name": "x", "required": true, "type": "long"},
3836 {"id": 2, "name": "y", "required": true, "type": "long"}
3837 ]
3838 }
3839 ],
3840 "default-spec-id": 0,
3841 "partition-specs": [{"spec-id": 0, "fields": []}],
3842 "last-partition-id": 999,
3843 "default-sort-order-id": 0,
3844 "sort-orders": [
3845 {
3846 "order-id": 0,
3847 "fields": [
3848 {
3849 "transform": "identity",
3850 "source-id": 1,
3851 "direction": "asc",
3852 "null-order": "nulls-first"
3853 }
3854 ]
3855 }
3856 ],
3857 "properties": {},
3858 "current-snapshot-id": -1,
3859 "snapshots": []
3860 }
3861 "#;
3862
3863 let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3864
3865 assert!(
3867 result.is_err(),
3868 "Parsing should fail for sort order ID 0 with fields"
3869 );
3870 }
3871
3872 #[test]
3873 fn test_table_properties_with_defaults() {
3874 use crate::spec::TableProperties;
3875
3876 let schema = Schema::builder()
3877 .with_fields(vec![
3878 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3879 ])
3880 .build()
3881 .unwrap();
3882
3883 let metadata = TableMetadataBuilder::new(
3884 schema,
3885 PartitionSpec::unpartition_spec().into_unbound(),
3886 SortOrder::unsorted_order(),
3887 "s3://test/location".to_string(),
3888 FormatVersion::V2,
3889 HashMap::new(),
3890 )
3891 .unwrap()
3892 .build()
3893 .unwrap()
3894 .metadata;
3895
3896 let props = metadata.table_properties().unwrap();
3897
3898 assert_eq!(
3899 props.commit_num_retries,
3900 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
3901 );
3902 assert_eq!(
3903 props.write_target_file_size_bytes,
3904 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
3905 );
3906 }
3907
3908 #[test]
3909 fn test_table_properties_with_custom_values() {
3910 use crate::spec::TableProperties;
3911
3912 let schema = Schema::builder()
3913 .with_fields(vec![
3914 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3915 ])
3916 .build()
3917 .unwrap();
3918
3919 let properties = HashMap::from([
3920 (
3921 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
3922 "10".to_string(),
3923 ),
3924 (
3925 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
3926 "1024".to_string(),
3927 ),
3928 ]);
3929
3930 let metadata = TableMetadataBuilder::new(
3931 schema,
3932 PartitionSpec::unpartition_spec().into_unbound(),
3933 SortOrder::unsorted_order(),
3934 "s3://test/location".to_string(),
3935 FormatVersion::V2,
3936 properties,
3937 )
3938 .unwrap()
3939 .build()
3940 .unwrap()
3941 .metadata;
3942
3943 let props = metadata.table_properties().unwrap();
3944
3945 assert_eq!(props.commit_num_retries, 10);
3946 assert_eq!(props.write_target_file_size_bytes, 1024);
3947 }
3948
3949 #[test]
3950 fn test_table_properties_with_invalid_value() {
3951 let schema = Schema::builder()
3952 .with_fields(vec![
3953 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3954 ])
3955 .build()
3956 .unwrap();
3957
3958 let properties = HashMap::from([(
3959 "commit.retry.num-retries".to_string(),
3960 "not_a_number".to_string(),
3961 )]);
3962
3963 let metadata = TableMetadataBuilder::new(
3964 schema,
3965 PartitionSpec::unpartition_spec().into_unbound(),
3966 SortOrder::unsorted_order(),
3967 "s3://test/location".to_string(),
3968 FormatVersion::V2,
3969 properties,
3970 )
3971 .unwrap()
3972 .build()
3973 .unwrap()
3974 .metadata;
3975
3976 let err = metadata.table_properties().unwrap_err();
3977 assert_eq!(err.kind(), ErrorKind::DataInvalid);
3978 assert!(err.message().contains("Invalid table properties"));
3979 }
3980
3981 #[test]
3982 fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
3983 let schema = Schema::builder()
3985 .with_fields(vec![
3986 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3987 ])
3988 .build()
3989 .unwrap();
3990
3991 let v2_metadata = TableMetadataBuilder::new(
3992 schema,
3993 PartitionSpec::unpartition_spec().into_unbound(),
3994 SortOrder::unsorted_order(),
3995 "s3://bucket/test/location".to_string(),
3996 FormatVersion::V2,
3997 HashMap::new(),
3998 )
3999 .unwrap()
4000 .build()
4001 .unwrap()
4002 .metadata;
4003
4004 let snapshot = Snapshot::builder()
4006 .with_snapshot_id(1)
4007 .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4008 .with_sequence_number(1)
4009 .with_schema_id(0)
4010 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4011 .with_summary(Summary {
4012 operation: Operation::Append,
4013 additional_properties: HashMap::from([(
4014 "added-data-files".to_string(),
4015 "1".to_string(),
4016 )]),
4017 })
4018 .build();
4019
4020 let v2_with_snapshot = v2_metadata
4021 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4022 .add_snapshot(snapshot)
4023 .unwrap()
4024 .set_ref("main", SnapshotReference {
4025 snapshot_id: 1,
4026 retention: SnapshotRetention::Branch {
4027 min_snapshots_to_keep: None,
4028 max_snapshot_age_ms: None,
4029 max_ref_age_ms: None,
4030 },
4031 })
4032 .unwrap()
4033 .build()
4034 .unwrap()
4035 .metadata;
4036
4037 let v2_json = serde_json::to_string(&v2_with_snapshot);
4039 assert!(v2_json.is_ok(), "v2 serialization should work");
4040
4041 let v3_metadata = v2_with_snapshot
4043 .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4044 .upgrade_format_version(FormatVersion::V3)
4045 .unwrap()
4046 .build()
4047 .unwrap()
4048 .metadata;
4049
4050 assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4051 assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4052 assert_eq!(v3_metadata.snapshots.len(), 1);
4053
4054 let snapshot = v3_metadata.snapshots.values().next().unwrap();
4056 assert!(
4057 snapshot.row_range().is_none(),
4058 "Snapshot should have no row_range after upgrade"
4059 );
4060
4061 let v3_json = serde_json::to_string(&v3_metadata);
4063 assert!(
4064 v3_json.is_ok(),
4065 "v3 serialization should work for upgraded tables"
4066 );
4067
4068 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4070 assert_eq!(deserialized.format_version, FormatVersion::V3);
4071 assert_eq!(deserialized.snapshots.len(), 1);
4072
4073 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4075 assert!(
4076 deserialized_snapshot.row_range().is_none(),
4077 "Deserialized snapshot should have no row_range"
4078 );
4079 }
4080
4081 #[test]
4082 fn test_v3_snapshot_with_row_lineage_serialization() {
4083 let schema = Schema::builder()
4085 .with_fields(vec![
4086 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4087 ])
4088 .build()
4089 .unwrap();
4090
4091 let v3_metadata = TableMetadataBuilder::new(
4092 schema,
4093 PartitionSpec::unpartition_spec().into_unbound(),
4094 SortOrder::unsorted_order(),
4095 "s3://bucket/test/location".to_string(),
4096 FormatVersion::V3,
4097 HashMap::new(),
4098 )
4099 .unwrap()
4100 .build()
4101 .unwrap()
4102 .metadata;
4103
4104 let snapshot = Snapshot::builder()
4106 .with_snapshot_id(1)
4107 .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4108 .with_sequence_number(1)
4109 .with_schema_id(0)
4110 .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4111 .with_summary(Summary {
4112 operation: Operation::Append,
4113 additional_properties: HashMap::from([(
4114 "added-data-files".to_string(),
4115 "1".to_string(),
4116 )]),
4117 })
4118 .with_row_range(100, 50) .build();
4120
4121 let v3_with_snapshot = v3_metadata
4122 .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4123 .add_snapshot(snapshot)
4124 .unwrap()
4125 .set_ref("main", SnapshotReference {
4126 snapshot_id: 1,
4127 retention: SnapshotRetention::Branch {
4128 min_snapshots_to_keep: None,
4129 max_snapshot_age_ms: None,
4130 max_ref_age_ms: None,
4131 },
4132 })
4133 .unwrap()
4134 .build()
4135 .unwrap()
4136 .metadata;
4137
4138 let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4140 assert!(
4141 snapshot.row_range().is_some(),
4142 "Snapshot should have row_range"
4143 );
4144 let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4145 assert_eq!(first_row_id, 100);
4146 assert_eq!(added_rows, 50);
4147
4148 let v3_json = serde_json::to_string(&v3_with_snapshot);
4150 assert!(
4151 v3_json.is_ok(),
4152 "v3 serialization should work for snapshots with row lineage"
4153 );
4154
4155 let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4157 assert_eq!(deserialized.format_version, FormatVersion::V3);
4158 assert_eq!(deserialized.snapshots.len(), 1);
4159
4160 let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4162 assert!(
4163 deserialized_snapshot.row_range().is_some(),
4164 "Deserialized snapshot should have row_range"
4165 );
4166 let (deserialized_first_row_id, deserialized_added_rows) =
4167 deserialized_snapshot.row_range().unwrap();
4168 assert_eq!(deserialized_first_row_id, 100);
4169 assert_eq!(deserialized_added_rows, 50);
4170 }
4171}