1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40 TableProperties,
41};
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
51pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
52
53pub const INITIAL_ROW_ID: u64 = 0;
55pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
57pub type TableMetadataRef = Arc<TableMetadata>;
59
60#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
61#[serde(try_from = "TableMetadataEnum")]
62pub struct TableMetadata {
67 pub(crate) format_version: FormatVersion,
69 pub(crate) table_uuid: Uuid,
71 pub(crate) location: String,
73 pub(crate) last_sequence_number: i64,
75 pub(crate) last_updated_ms: i64,
77 pub(crate) last_column_id: i32,
79 pub(crate) schemas: HashMap<i32, SchemaRef>,
81 pub(crate) current_schema_id: i32,
83 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
85 pub(crate) default_spec: PartitionSpecRef,
87 pub(crate) default_partition_type: StructType,
89 pub(crate) last_partition_id: i32,
91 pub(crate) properties: HashMap<String, String>,
95 pub(crate) current_snapshot_id: Option<i64>,
98 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
103 pub(crate) snapshot_log: Vec<SnapshotLog>,
110
111 pub(crate) metadata_log: Vec<MetadataLog>,
118
119 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
121 pub(crate) default_sort_order_id: i64,
125 pub(crate) refs: HashMap<String, SnapshotReference>,
130 pub(crate) statistics: HashMap<i64, StatisticsFile>,
132 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
134 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
136 pub(crate) next_row_id: u64,
138}
139
140impl TableMetadata {
141 #[must_use]
148 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
149 TableMetadataBuilder::new_from_metadata(self, current_file_location)
150 }
151
152 #[inline]
154 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
155 self.partition_specs
156 .values()
157 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
158 }
159
160 #[inline]
162 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
163 self.schemas
164 .values()
165 .any(|schema| schema.field_by_name(name).is_some())
166 }
167
168 #[inline]
170 pub fn format_version(&self) -> FormatVersion {
171 self.format_version
172 }
173
174 #[inline]
176 pub fn uuid(&self) -> Uuid {
177 self.table_uuid
178 }
179
180 #[inline]
182 pub fn location(&self) -> &str {
183 self.location.as_str()
184 }
185
186 #[inline]
188 pub fn last_sequence_number(&self) -> i64 {
189 self.last_sequence_number
190 }
191
192 #[inline]
197 pub fn next_sequence_number(&self) -> i64 {
198 match self.format_version {
199 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
200 _ => self.last_sequence_number + 1,
201 }
202 }
203
204 #[inline]
206 pub fn last_column_id(&self) -> i32 {
207 self.last_column_id
208 }
209
210 #[inline]
212 pub fn last_partition_id(&self) -> i32 {
213 self.last_partition_id
214 }
215
216 #[inline]
218 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
219 timestamp_ms_to_utc(self.last_updated_ms)
220 }
221
222 #[inline]
224 pub fn last_updated_ms(&self) -> i64 {
225 self.last_updated_ms
226 }
227
228 #[inline]
230 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
231 self.schemas.values()
232 }
233
234 #[inline]
236 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
237 self.schemas.get(&schema_id)
238 }
239
240 #[inline]
242 pub fn current_schema(&self) -> &SchemaRef {
243 self.schema_by_id(self.current_schema_id)
244 .expect("Current schema id set, but not found in table metadata")
245 }
246
247 #[inline]
249 pub fn current_schema_id(&self) -> SchemaId {
250 self.current_schema_id
251 }
252
253 #[inline]
255 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
256 self.partition_specs.values()
257 }
258
259 #[inline]
261 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
262 self.partition_specs.get(&spec_id)
263 }
264
265 #[inline]
267 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
268 &self.default_spec
269 }
270
271 #[inline]
273 pub fn default_partition_type(&self) -> &StructType {
274 &self.default_partition_type
275 }
276
277 #[inline]
278 pub fn default_partition_spec_id(&self) -> i32 {
280 self.default_spec.spec_id()
281 }
282
283 #[inline]
285 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
286 self.snapshots.values()
287 }
288
289 #[inline]
291 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
292 self.snapshots.get(&snapshot_id)
293 }
294
295 #[inline]
297 pub fn history(&self) -> &[SnapshotLog] {
298 &self.snapshot_log
299 }
300
301 #[inline]
303 pub fn metadata_log(&self) -> &[MetadataLog] {
304 &self.metadata_log
305 }
306
307 #[inline]
309 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
310 self.current_snapshot_id.map(|s| {
311 self.snapshot_by_id(s)
312 .expect("Current snapshot id has been set, but doesn't exist in metadata")
313 })
314 }
315
316 #[inline]
318 pub fn current_snapshot_id(&self) -> Option<i64> {
319 self.current_snapshot_id
320 }
321
322 #[inline]
325 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
326 self.refs.get(ref_name).map(|r| {
327 self.snapshot_by_id(r.snapshot_id)
328 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
329 })
330 }
331
332 #[inline]
334 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
335 self.sort_orders.values()
336 }
337
338 #[inline]
340 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
341 self.sort_orders.get(&sort_order_id)
342 }
343
344 #[inline]
346 pub fn default_sort_order(&self) -> &SortOrderRef {
347 self.sort_orders
348 .get(&self.default_sort_order_id)
349 .expect("Default order id has been set, but not found in table metadata!")
350 }
351
352 #[inline]
354 pub fn default_sort_order_id(&self) -> i64 {
355 self.default_sort_order_id
356 }
357
358 #[inline]
360 pub fn properties(&self) -> &HashMap<String, String> {
361 &self.properties
362 }
363
364 pub fn table_properties(&self) -> Result<TableProperties> {
366 TableProperties::try_from(&self.properties).map_err(|e| {
367 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
368 })
369 }
370
371 #[inline]
373 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
374 self.statistics.values()
375 }
376
377 #[inline]
379 pub fn partition_statistics_iter(
380 &self,
381 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
382 self.partition_statistics.values()
383 }
384
385 #[inline]
387 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
388 self.statistics.get(&snapshot_id)
389 }
390
391 #[inline]
393 pub fn partition_statistics_for_snapshot(
394 &self,
395 snapshot_id: i64,
396 ) -> Option<&PartitionStatisticsFile> {
397 self.partition_statistics.get(&snapshot_id)
398 }
399
400 fn construct_refs(&mut self) {
401 if let Some(current_snapshot_id) = self.current_snapshot_id
402 && !self.refs.contains_key(MAIN_BRANCH)
403 {
404 self.refs
405 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
406 snapshot_id: current_snapshot_id,
407 retention: SnapshotRetention::Branch {
408 min_snapshots_to_keep: None,
409 max_snapshot_age_ms: None,
410 max_ref_age_ms: None,
411 },
412 });
413 }
414 }
415
416 #[inline]
418 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
419 self.encryption_keys.values()
420 }
421
422 #[inline]
424 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
425 self.encryption_keys.get(key_id)
426 }
427
428 #[inline]
430 pub fn next_row_id(&self) -> u64 {
431 self.next_row_id
432 }
433
434 pub async fn read_from(
436 file_io: &FileIO,
437 metadata_location: impl AsRef<str>,
438 ) -> Result<TableMetadata> {
439 let metadata_location = metadata_location.as_ref();
440 let input_file = file_io.new_input(metadata_location)?;
441 let metadata_content = input_file.read().await?;
442
443 let metadata = if metadata_content.len() > 2
445 && metadata_content[0] == 0x1F
446 && metadata_content[1] == 0x8B
447 {
448 let mut decoder = GzDecoder::new(metadata_content.as_ref());
449 let mut decompressed_data = Vec::new();
450 decoder.read_to_end(&mut decompressed_data).map_err(|e| {
451 Error::new(
452 ErrorKind::DataInvalid,
453 "Trying to read compressed metadata file",
454 )
455 .with_context("file_path", metadata_location)
456 .with_source(e)
457 })?;
458 serde_json::from_slice(&decompressed_data)?
459 } else {
460 serde_json::from_slice(&metadata_content)?
461 };
462
463 Ok(metadata)
464 }
465
466 pub async fn write_to(
468 &self,
469 file_io: &FileIO,
470 metadata_location: impl AsRef<str>,
471 ) -> Result<()> {
472 file_io
473 .new_output(metadata_location)?
474 .write(serde_json::to_vec(self)?.into())
475 .await
476 }
477
478 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
486 self.validate_current_schema()?;
487 self.normalize_current_snapshot()?;
488 self.construct_refs();
489 self.validate_refs()?;
490 self.validate_chronological_snapshot_logs()?;
491 self.validate_chronological_metadata_logs()?;
492 self.location = self.location.trim_end_matches('/').to_string();
494 self.validate_snapshot_sequence_number()?;
495 self.try_normalize_partition_spec()?;
496 self.try_normalize_sort_order()?;
497 Ok(self)
498 }
499
500 fn try_normalize_partition_spec(&mut self) -> Result<()> {
502 if self
503 .partition_spec_by_id(self.default_spec.spec_id())
504 .is_none()
505 {
506 self.partition_specs.insert(
507 self.default_spec.spec_id(),
508 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
509 );
510 }
511
512 Ok(())
513 }
514
515 fn try_normalize_sort_order(&mut self) -> Result<()> {
517 if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
519 && !sort_order.fields.is_empty()
520 {
521 return Err(Error::new(
522 ErrorKind::Unexpected,
523 format!(
524 "Sort order ID {} is reserved for unsorted order",
525 SortOrder::UNSORTED_ORDER_ID
526 ),
527 ));
528 }
529
530 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
531 return Ok(());
532 }
533
534 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
535 return Err(Error::new(
536 ErrorKind::DataInvalid,
537 format!(
538 "No sort order exists with the default sort order id {}.",
539 self.default_sort_order_id
540 ),
541 ));
542 }
543
544 let sort_order = SortOrder::unsorted_order();
545 self.sort_orders
546 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
547 Ok(())
548 }
549
550 fn validate_current_schema(&self) -> Result<()> {
552 if self.schema_by_id(self.current_schema_id).is_none() {
553 return Err(Error::new(
554 ErrorKind::DataInvalid,
555 format!(
556 "No schema exists with the current schema id {}.",
557 self.current_schema_id
558 ),
559 ));
560 }
561 Ok(())
562 }
563
564 fn normalize_current_snapshot(&mut self) -> Result<()> {
566 if let Some(current_snapshot_id) = self.current_snapshot_id {
567 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
568 self.current_snapshot_id = None;
569 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
570 return Err(Error::new(
571 ErrorKind::DataInvalid,
572 format!(
573 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
574 ),
575 ));
576 }
577 }
578 Ok(())
579 }
580
581 fn validate_refs(&self) -> Result<()> {
583 for (name, snapshot_ref) in self.refs.iter() {
584 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
585 return Err(Error::new(
586 ErrorKind::DataInvalid,
587 format!(
588 "Snapshot for reference {name} does not exist in the existing snapshots list"
589 ),
590 ));
591 }
592 }
593
594 let main_ref = self.refs.get(MAIN_BRANCH);
595 if self.current_snapshot_id.is_some() {
596 if let Some(main_ref) = main_ref
597 && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
598 {
599 return Err(Error::new(
600 ErrorKind::DataInvalid,
601 format!(
602 "Current snapshot id does not match main branch ({:?} != {:?})",
603 self.current_snapshot_id.unwrap_or_default(),
604 main_ref.snapshot_id
605 ),
606 ));
607 }
608 } else if main_ref.is_some() {
609 return Err(Error::new(
610 ErrorKind::DataInvalid,
611 "Current snapshot is not set, but main branch exists",
612 ));
613 }
614
615 Ok(())
616 }
617
618 fn validate_snapshot_sequence_number(&self) -> Result<()> {
620 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
621 return Err(Error::new(
622 ErrorKind::DataInvalid,
623 format!(
624 "Last sequence number must be 0 in v1. Found {}",
625 self.last_sequence_number
626 ),
627 ));
628 }
629
630 if self.format_version >= FormatVersion::V2
631 && let Some(snapshot) = self
632 .snapshots
633 .values()
634 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
635 {
636 return Err(Error::new(
637 ErrorKind::DataInvalid,
638 format!(
639 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
640 snapshot.snapshot_id(),
641 snapshot.sequence_number(),
642 self.last_sequence_number
643 ),
644 ));
645 }
646
647 Ok(())
648 }
649
650 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
652 for window in self.snapshot_log.windows(2) {
653 let (prev, curr) = (&window[0], &window[1]);
654 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
657 return Err(Error::new(
658 ErrorKind::DataInvalid,
659 "Expected sorted snapshot log entries",
660 ));
661 }
662 }
663
664 if let Some(last) = self.snapshot_log.last() {
665 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
668 return Err(Error::new(
669 ErrorKind::DataInvalid,
670 format!(
671 "Invalid update timestamp {}: before last snapshot log entry at {}",
672 self.last_updated_ms, last.timestamp_ms
673 ),
674 ));
675 }
676 }
677 Ok(())
678 }
679
680 fn validate_chronological_metadata_logs(&self) -> Result<()> {
681 for window in self.metadata_log.windows(2) {
682 let (prev, curr) = (&window[0], &window[1]);
683 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
686 return Err(Error::new(
687 ErrorKind::DataInvalid,
688 "Expected sorted metadata log entries",
689 ));
690 }
691 }
692
693 if let Some(last) = self.metadata_log.last() {
694 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
697 return Err(Error::new(
698 ErrorKind::DataInvalid,
699 format!(
700 "Invalid update timestamp {}: before last metadata log entry at {}",
701 self.last_updated_ms, last.timestamp_ms
702 ),
703 ));
704 }
705 }
706
707 Ok(())
708 }
709}
710
711pub(super) mod _serde {
712 use std::borrow::BorrowMut;
713 use std::collections::HashMap;
718 use std::sync::Arc;
723
724 use serde::{Deserialize, Serialize};
725 use uuid::Uuid;
726
727 use super::{
728 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
729 TableMetadata,
730 };
731 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
732 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
733 use crate::spec::{
734 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
735 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
736 SortOrder, StatisticsFile,
737 };
738 use crate::{Error, ErrorKind};
739
740 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
741 #[serde(untagged)]
742 pub(super) enum TableMetadataEnum {
743 V3(TableMetadataV3),
744 V2(TableMetadataV2),
745 V1(TableMetadataV1),
746 }
747
748 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
749 #[serde(rename_all = "kebab-case")]
750 pub(super) struct TableMetadataV3 {
752 pub format_version: VersionNumber<3>,
753 #[serde(flatten)]
754 pub shared: TableMetadataV2V3Shared,
755 pub next_row_id: u64,
756 #[serde(skip_serializing_if = "Option::is_none")]
757 pub encryption_keys: Option<Vec<EncryptedKey>>,
758 #[serde(skip_serializing_if = "Option::is_none")]
759 pub snapshots: Option<Vec<SnapshotV3>>,
760 }
761
762 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
763 #[serde(rename_all = "kebab-case")]
764 pub(super) struct TableMetadataV2V3Shared {
766 pub table_uuid: Uuid,
767 pub location: String,
768 pub last_sequence_number: i64,
769 pub last_updated_ms: i64,
770 pub last_column_id: i32,
771 pub schemas: Vec<SchemaV2>,
772 pub current_schema_id: i32,
773 pub partition_specs: Vec<PartitionSpec>,
774 pub default_spec_id: i32,
775 pub last_partition_id: i32,
776 #[serde(skip_serializing_if = "Option::is_none")]
777 pub properties: Option<HashMap<String, String>>,
778 #[serde(skip_serializing_if = "Option::is_none")]
779 pub current_snapshot_id: Option<i64>,
780 #[serde(skip_serializing_if = "Option::is_none")]
781 pub snapshot_log: Option<Vec<SnapshotLog>>,
782 #[serde(skip_serializing_if = "Option::is_none")]
783 pub metadata_log: Option<Vec<MetadataLog>>,
784 pub sort_orders: Vec<SortOrder>,
785 pub default_sort_order_id: i64,
786 #[serde(skip_serializing_if = "Option::is_none")]
787 pub refs: Option<HashMap<String, SnapshotReference>>,
788 #[serde(default, skip_serializing_if = "Vec::is_empty")]
789 pub statistics: Vec<StatisticsFile>,
790 #[serde(default, skip_serializing_if = "Vec::is_empty")]
791 pub partition_statistics: Vec<PartitionStatisticsFile>,
792 }
793
794 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
795 #[serde(rename_all = "kebab-case")]
796 pub(super) struct TableMetadataV2 {
798 pub format_version: VersionNumber<2>,
799 #[serde(flatten)]
800 pub shared: TableMetadataV2V3Shared,
801 #[serde(skip_serializing_if = "Option::is_none")]
802 pub snapshots: Option<Vec<SnapshotV2>>,
803 }
804
805 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
806 #[serde(rename_all = "kebab-case")]
807 pub(super) struct TableMetadataV1 {
809 pub format_version: VersionNumber<1>,
810 #[serde(skip_serializing_if = "Option::is_none")]
811 pub table_uuid: Option<Uuid>,
812 pub location: String,
813 pub last_updated_ms: i64,
814 pub last_column_id: i32,
815 pub schema: Option<SchemaV1>,
817 #[serde(skip_serializing_if = "Option::is_none")]
818 pub schemas: Option<Vec<SchemaV1>>,
819 #[serde(skip_serializing_if = "Option::is_none")]
820 pub current_schema_id: Option<i32>,
821 pub partition_spec: Option<Vec<PartitionField>>,
823 #[serde(skip_serializing_if = "Option::is_none")]
824 pub partition_specs: Option<Vec<PartitionSpec>>,
825 #[serde(skip_serializing_if = "Option::is_none")]
826 pub default_spec_id: Option<i32>,
827 #[serde(skip_serializing_if = "Option::is_none")]
828 pub last_partition_id: Option<i32>,
829 #[serde(skip_serializing_if = "Option::is_none")]
830 pub properties: Option<HashMap<String, String>>,
831 #[serde(skip_serializing_if = "Option::is_none")]
832 pub current_snapshot_id: Option<i64>,
833 #[serde(skip_serializing_if = "Option::is_none")]
834 pub snapshots: Option<Vec<SnapshotV1>>,
835 #[serde(skip_serializing_if = "Option::is_none")]
836 pub snapshot_log: Option<Vec<SnapshotLog>>,
837 #[serde(skip_serializing_if = "Option::is_none")]
838 pub metadata_log: Option<Vec<MetadataLog>>,
839 pub sort_orders: Option<Vec<SortOrder>>,
840 pub default_sort_order_id: Option<i64>,
841 #[serde(default, skip_serializing_if = "Vec::is_empty")]
842 pub statistics: Vec<StatisticsFile>,
843 #[serde(default, skip_serializing_if = "Vec::is_empty")]
844 pub partition_statistics: Vec<PartitionStatisticsFile>,
845 }
846
847 #[derive(Debug, PartialEq, Eq)]
849 pub(crate) struct VersionNumber<const V: u8>;
850
851 impl Serialize for TableMetadata {
852 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
853 where S: serde::Serializer {
854 let table_metadata_enum: TableMetadataEnum =
856 self.clone().try_into().map_err(serde::ser::Error::custom)?;
857
858 table_metadata_enum.serialize(serializer)
859 }
860 }
861
862 impl<const V: u8> Serialize for VersionNumber<V> {
863 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
864 where S: serde::Serializer {
865 serializer.serialize_u8(V)
866 }
867 }
868
869 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
870 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
871 where D: serde::Deserializer<'de> {
872 let value = u8::deserialize(deserializer)?;
873 if value == V {
874 Ok(VersionNumber::<V>)
875 } else {
876 Err(serde::de::Error::custom("Invalid Version"))
877 }
878 }
879 }
880
881 impl TryFrom<TableMetadataEnum> for TableMetadata {
882 type Error = Error;
883 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
884 match value {
885 TableMetadataEnum::V3(value) => value.try_into(),
886 TableMetadataEnum::V2(value) => value.try_into(),
887 TableMetadataEnum::V1(value) => value.try_into(),
888 }
889 }
890 }
891
892 impl TryFrom<TableMetadata> for TableMetadataEnum {
893 type Error = Error;
894 fn try_from(value: TableMetadata) -> Result<Self, Error> {
895 Ok(match value.format_version {
896 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
897 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
898 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
899 })
900 }
901 }
902
903 impl TryFrom<TableMetadataV3> for TableMetadata {
904 type Error = Error;
905 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
906 let TableMetadataV3 {
907 format_version: _,
908 shared: value,
909 next_row_id,
910 encryption_keys,
911 snapshots,
912 } = value;
913 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
914 None
915 } else {
916 value.current_snapshot_id
917 };
918 let schemas = HashMap::from_iter(
919 value
920 .schemas
921 .into_iter()
922 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
923 .collect::<Result<Vec<_>, Error>>()?,
924 );
925
926 let current_schema: &SchemaRef =
927 schemas.get(&value.current_schema_id).ok_or_else(|| {
928 Error::new(
929 ErrorKind::DataInvalid,
930 format!(
931 "No schema exists with the current schema id {}.",
932 value.current_schema_id
933 ),
934 )
935 })?;
936 let partition_specs = HashMap::from_iter(
937 value
938 .partition_specs
939 .into_iter()
940 .map(|x| (x.spec_id(), Arc::new(x))),
941 );
942 let default_spec_id = value.default_spec_id;
943 let default_spec: PartitionSpecRef = partition_specs
944 .get(&value.default_spec_id)
945 .map(|spec| (**spec).clone())
946 .or_else(|| {
947 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
948 .then(PartitionSpec::unpartition_spec)
949 })
950 .ok_or_else(|| {
951 Error::new(
952 ErrorKind::DataInvalid,
953 format!("Default partition spec {default_spec_id} not found"),
954 )
955 })?
956 .into();
957 let default_partition_type = default_spec.partition_type(current_schema)?;
958
959 let mut metadata = TableMetadata {
960 format_version: FormatVersion::V3,
961 table_uuid: value.table_uuid,
962 location: value.location,
963 last_sequence_number: value.last_sequence_number,
964 last_updated_ms: value.last_updated_ms,
965 last_column_id: value.last_column_id,
966 current_schema_id: value.current_schema_id,
967 schemas,
968 partition_specs,
969 default_partition_type,
970 default_spec,
971 last_partition_id: value.last_partition_id,
972 properties: value.properties.unwrap_or_default(),
973 current_snapshot_id,
974 snapshots: snapshots
975 .map(|snapshots| {
976 HashMap::from_iter(
977 snapshots
978 .into_iter()
979 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
980 )
981 })
982 .unwrap_or_default(),
983 snapshot_log: value.snapshot_log.unwrap_or_default(),
984 metadata_log: value.metadata_log.unwrap_or_default(),
985 sort_orders: HashMap::from_iter(
986 value
987 .sort_orders
988 .into_iter()
989 .map(|x| (x.order_id, Arc::new(x))),
990 ),
991 default_sort_order_id: value.default_sort_order_id,
992 refs: value.refs.unwrap_or_else(|| {
993 if let Some(snapshot_id) = current_snapshot_id {
994 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
995 snapshot_id,
996 retention: SnapshotRetention::Branch {
997 min_snapshots_to_keep: None,
998 max_snapshot_age_ms: None,
999 max_ref_age_ms: None,
1000 },
1001 })])
1002 } else {
1003 HashMap::new()
1004 }
1005 }),
1006 statistics: index_statistics(value.statistics),
1007 partition_statistics: index_partition_statistics(value.partition_statistics),
1008 encryption_keys: encryption_keys
1009 .map(|keys| {
1010 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1011 })
1012 .unwrap_or_default(),
1013 next_row_id,
1014 };
1015
1016 metadata.borrow_mut().try_normalize()?;
1017 Ok(metadata)
1018 }
1019 }
1020
1021 impl TryFrom<TableMetadataV2> for TableMetadata {
1022 type Error = Error;
1023 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1024 let snapshots = value.snapshots;
1025 let value = value.shared;
1026 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1027 None
1028 } else {
1029 value.current_snapshot_id
1030 };
1031 let schemas = HashMap::from_iter(
1032 value
1033 .schemas
1034 .into_iter()
1035 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1036 .collect::<Result<Vec<_>, Error>>()?,
1037 );
1038
1039 let current_schema: &SchemaRef =
1040 schemas.get(&value.current_schema_id).ok_or_else(|| {
1041 Error::new(
1042 ErrorKind::DataInvalid,
1043 format!(
1044 "No schema exists with the current schema id {}.",
1045 value.current_schema_id
1046 ),
1047 )
1048 })?;
1049 let partition_specs = HashMap::from_iter(
1050 value
1051 .partition_specs
1052 .into_iter()
1053 .map(|x| (x.spec_id(), Arc::new(x))),
1054 );
1055 let default_spec_id = value.default_spec_id;
1056 let default_spec: PartitionSpecRef = partition_specs
1057 .get(&value.default_spec_id)
1058 .map(|spec| (**spec).clone())
1059 .or_else(|| {
1060 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1061 .then(PartitionSpec::unpartition_spec)
1062 })
1063 .ok_or_else(|| {
1064 Error::new(
1065 ErrorKind::DataInvalid,
1066 format!("Default partition spec {default_spec_id} not found"),
1067 )
1068 })?
1069 .into();
1070 let default_partition_type = default_spec.partition_type(current_schema)?;
1071
1072 let mut metadata = TableMetadata {
1073 format_version: FormatVersion::V2,
1074 table_uuid: value.table_uuid,
1075 location: value.location,
1076 last_sequence_number: value.last_sequence_number,
1077 last_updated_ms: value.last_updated_ms,
1078 last_column_id: value.last_column_id,
1079 current_schema_id: value.current_schema_id,
1080 schemas,
1081 partition_specs,
1082 default_partition_type,
1083 default_spec,
1084 last_partition_id: value.last_partition_id,
1085 properties: value.properties.unwrap_or_default(),
1086 current_snapshot_id,
1087 snapshots: snapshots
1088 .map(|snapshots| {
1089 HashMap::from_iter(
1090 snapshots
1091 .into_iter()
1092 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1093 )
1094 })
1095 .unwrap_or_default(),
1096 snapshot_log: value.snapshot_log.unwrap_or_default(),
1097 metadata_log: value.metadata_log.unwrap_or_default(),
1098 sort_orders: HashMap::from_iter(
1099 value
1100 .sort_orders
1101 .into_iter()
1102 .map(|x| (x.order_id, Arc::new(x))),
1103 ),
1104 default_sort_order_id: value.default_sort_order_id,
1105 refs: value.refs.unwrap_or_else(|| {
1106 if let Some(snapshot_id) = current_snapshot_id {
1107 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1108 snapshot_id,
1109 retention: SnapshotRetention::Branch {
1110 min_snapshots_to_keep: None,
1111 max_snapshot_age_ms: None,
1112 max_ref_age_ms: None,
1113 },
1114 })])
1115 } else {
1116 HashMap::new()
1117 }
1118 }),
1119 statistics: index_statistics(value.statistics),
1120 partition_statistics: index_partition_statistics(value.partition_statistics),
1121 encryption_keys: HashMap::new(),
1122 next_row_id: INITIAL_ROW_ID,
1123 };
1124
1125 metadata.borrow_mut().try_normalize()?;
1126 Ok(metadata)
1127 }
1128 }
1129
1130 impl TryFrom<TableMetadataV1> for TableMetadata {
1131 type Error = Error;
1132 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1133 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1134 None
1135 } else {
1136 value.current_snapshot_id
1137 };
1138
1139 let (schemas, current_schema_id, current_schema) =
1140 if let (Some(schemas_vec), Some(schema_id)) =
1141 (&value.schemas, value.current_schema_id)
1142 {
1143 let schema_map = HashMap::from_iter(
1145 schemas_vec
1146 .clone()
1147 .into_iter()
1148 .map(|schema| {
1149 let schema: Schema = schema.try_into()?;
1150 Ok((schema.schema_id(), Arc::new(schema)))
1151 })
1152 .collect::<Result<Vec<_>, Error>>()?,
1153 );
1154
1155 let schema = schema_map
1156 .get(&schema_id)
1157 .ok_or_else(|| {
1158 Error::new(
1159 ErrorKind::DataInvalid,
1160 format!("No schema exists with the current schema id {schema_id}."),
1161 )
1162 })?
1163 .clone();
1164 (schema_map, schema_id, schema)
1165 } else if let Some(schema) = value.schema {
1166 let schema: Schema = schema.try_into()?;
1168 let schema_id = schema.schema_id();
1169 let schema_arc = Arc::new(schema);
1170 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1171 (schema_map, schema_id, schema_arc)
1172 } else {
1173 return Err(Error::new(
1175 ErrorKind::DataInvalid,
1176 "No valid schema configuration found in table metadata",
1177 ));
1178 };
1179
1180 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1182 specs_vec
1184 .into_iter()
1185 .map(|x| (x.spec_id(), Arc::new(x)))
1186 .collect::<HashMap<_, _>>()
1187 } else if let Some(partition_spec) = value.partition_spec {
1188 let spec = PartitionSpec::builder(current_schema.clone())
1190 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1191 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1192 .build()?;
1193
1194 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1195 } else {
1196 let spec = PartitionSpec::builder(current_schema.clone())
1198 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1199 .build()?;
1200
1201 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1202 };
1203
1204 let default_spec_id = value
1206 .default_spec_id
1207 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1208
1209 let default_spec: PartitionSpecRef = partition_specs
1211 .get(&default_spec_id)
1212 .map(|x| Arc::unwrap_or_clone(x.clone()))
1213 .ok_or_else(|| {
1214 Error::new(
1215 ErrorKind::DataInvalid,
1216 format!("Default partition spec {default_spec_id} not found"),
1217 )
1218 })?
1219 .into();
1220 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1221
1222 let mut metadata = TableMetadata {
1223 format_version: FormatVersion::V1,
1224 table_uuid: value.table_uuid.unwrap_or_default(),
1225 location: value.location,
1226 last_sequence_number: 0,
1227 last_updated_ms: value.last_updated_ms,
1228 last_column_id: value.last_column_id,
1229 current_schema_id,
1230 default_spec,
1231 default_partition_type,
1232 last_partition_id: value
1233 .last_partition_id
1234 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1235 partition_specs,
1236 schemas,
1237 properties: value.properties.unwrap_or_default(),
1238 current_snapshot_id,
1239 snapshots: value
1240 .snapshots
1241 .map(|snapshots| {
1242 Ok::<_, Error>(HashMap::from_iter(
1243 snapshots
1244 .into_iter()
1245 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1246 .collect::<Result<Vec<_>, Error>>()?,
1247 ))
1248 })
1249 .transpose()?
1250 .unwrap_or_default(),
1251 snapshot_log: value.snapshot_log.unwrap_or_default(),
1252 metadata_log: value.metadata_log.unwrap_or_default(),
1253 sort_orders: match value.sort_orders {
1254 Some(sort_orders) => HashMap::from_iter(
1255 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1256 ),
1257 None => HashMap::new(),
1258 },
1259 default_sort_order_id: value
1260 .default_sort_order_id
1261 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1262 refs: if let Some(snapshot_id) = current_snapshot_id {
1263 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1264 snapshot_id,
1265 retention: SnapshotRetention::Branch {
1266 min_snapshots_to_keep: None,
1267 max_snapshot_age_ms: None,
1268 max_ref_age_ms: None,
1269 },
1270 })])
1271 } else {
1272 HashMap::new()
1273 },
1274 statistics: index_statistics(value.statistics),
1275 partition_statistics: index_partition_statistics(value.partition_statistics),
1276 encryption_keys: HashMap::new(),
1277 next_row_id: INITIAL_ROW_ID, };
1279
1280 metadata.borrow_mut().try_normalize()?;
1281 Ok(metadata)
1282 }
1283 }
1284
1285 impl TryFrom<TableMetadata> for TableMetadataV3 {
1286 type Error = Error;
1287
1288 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1289 let next_row_id = v.next_row_id;
1290 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1291 let snapshots = std::mem::take(&mut v.snapshots);
1292 let shared = v.into();
1293
1294 Ok(TableMetadataV3 {
1295 format_version: VersionNumber::<3>,
1296 shared,
1297 next_row_id,
1298 encryption_keys: if encryption_keys.is_empty() {
1299 None
1300 } else {
1301 Some(encryption_keys.into_values().collect())
1302 },
1303 snapshots: if snapshots.is_empty() {
1304 None
1305 } else {
1306 Some(
1307 snapshots
1308 .into_values()
1309 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1310 .collect::<Result<_, _>>()?,
1311 )
1312 },
1313 })
1314 }
1315 }
1316
1317 impl From<TableMetadata> for TableMetadataV2 {
1318 fn from(mut v: TableMetadata) -> Self {
1319 let snapshots = std::mem::take(&mut v.snapshots);
1320 let shared = v.into();
1321
1322 TableMetadataV2 {
1323 format_version: VersionNumber::<2>,
1324 shared,
1325 snapshots: if snapshots.is_empty() {
1326 None
1327 } else {
1328 Some(
1329 snapshots
1330 .into_values()
1331 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1332 .collect(),
1333 )
1334 },
1335 }
1336 }
1337 }
1338
1339 impl From<TableMetadata> for TableMetadataV2V3Shared {
1340 fn from(v: TableMetadata) -> Self {
1341 TableMetadataV2V3Shared {
1342 table_uuid: v.table_uuid,
1343 location: v.location,
1344 last_sequence_number: v.last_sequence_number,
1345 last_updated_ms: v.last_updated_ms,
1346 last_column_id: v.last_column_id,
1347 schemas: v
1348 .schemas
1349 .into_values()
1350 .map(|x| {
1351 Arc::try_unwrap(x)
1352 .unwrap_or_else(|schema| schema.as_ref().clone())
1353 .into()
1354 })
1355 .collect(),
1356 current_schema_id: v.current_schema_id,
1357 partition_specs: v
1358 .partition_specs
1359 .into_values()
1360 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1361 .collect(),
1362 default_spec_id: v.default_spec.spec_id(),
1363 last_partition_id: v.last_partition_id,
1364 properties: if v.properties.is_empty() {
1365 None
1366 } else {
1367 Some(v.properties)
1368 },
1369 current_snapshot_id: v.current_snapshot_id,
1370 snapshot_log: if v.snapshot_log.is_empty() {
1371 None
1372 } else {
1373 Some(v.snapshot_log)
1374 },
1375 metadata_log: if v.metadata_log.is_empty() {
1376 None
1377 } else {
1378 Some(v.metadata_log)
1379 },
1380 sort_orders: v
1381 .sort_orders
1382 .into_values()
1383 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1384 .collect(),
1385 default_sort_order_id: v.default_sort_order_id,
1386 refs: Some(v.refs),
1387 statistics: v.statistics.into_values().collect(),
1388 partition_statistics: v.partition_statistics.into_values().collect(),
1389 }
1390 }
1391 }
1392
1393 impl TryFrom<TableMetadata> for TableMetadataV1 {
1394 type Error = Error;
1395 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1396 Ok(TableMetadataV1 {
1397 format_version: VersionNumber::<1>,
1398 table_uuid: Some(v.table_uuid),
1399 location: v.location,
1400 last_updated_ms: v.last_updated_ms,
1401 last_column_id: v.last_column_id,
1402 schema: Some(
1403 v.schemas
1404 .get(&v.current_schema_id)
1405 .ok_or(Error::new(
1406 ErrorKind::Unexpected,
1407 "current_schema_id not found in schemas",
1408 ))?
1409 .as_ref()
1410 .clone()
1411 .into(),
1412 ),
1413 schemas: Some(
1414 v.schemas
1415 .into_values()
1416 .map(|x| {
1417 Arc::try_unwrap(x)
1418 .unwrap_or_else(|schema| schema.as_ref().clone())
1419 .into()
1420 })
1421 .collect(),
1422 ),
1423 current_schema_id: Some(v.current_schema_id),
1424 partition_spec: Some(v.default_spec.fields().to_vec()),
1425 partition_specs: Some(
1426 v.partition_specs
1427 .into_values()
1428 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1429 .collect(),
1430 ),
1431 default_spec_id: Some(v.default_spec.spec_id()),
1432 last_partition_id: Some(v.last_partition_id),
1433 properties: if v.properties.is_empty() {
1434 None
1435 } else {
1436 Some(v.properties)
1437 },
1438 current_snapshot_id: v.current_snapshot_id,
1439 snapshots: if v.snapshots.is_empty() {
1440 None
1441 } else {
1442 Some(
1443 v.snapshots
1444 .into_values()
1445 .map(|x| Snapshot::clone(&x).into())
1446 .collect(),
1447 )
1448 },
1449 snapshot_log: if v.snapshot_log.is_empty() {
1450 None
1451 } else {
1452 Some(v.snapshot_log)
1453 },
1454 metadata_log: if v.metadata_log.is_empty() {
1455 None
1456 } else {
1457 Some(v.metadata_log)
1458 },
1459 sort_orders: Some(
1460 v.sort_orders
1461 .into_values()
1462 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1463 .collect(),
1464 ),
1465 default_sort_order_id: Some(v.default_sort_order_id),
1466 statistics: v.statistics.into_values().collect(),
1467 partition_statistics: v.partition_statistics.into_values().collect(),
1468 })
1469 }
1470 }
1471
1472 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1473 statistics
1474 .into_iter()
1475 .rev()
1476 .map(|s| (s.snapshot_id, s))
1477 .collect()
1478 }
1479
1480 fn index_partition_statistics(
1481 statistics: Vec<PartitionStatisticsFile>,
1482 ) -> HashMap<i64, PartitionStatisticsFile> {
1483 statistics
1484 .into_iter()
1485 .rev()
1486 .map(|s| (s.snapshot_id, s))
1487 .collect()
1488 }
1489}
1490
1491#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1492#[repr(u8)]
1493pub enum FormatVersion {
1495 V1 = 1u8,
1497 V2 = 2u8,
1499 V3 = 3u8,
1501}
1502
1503impl PartialOrd for FormatVersion {
1504 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1505 Some(self.cmp(other))
1506 }
1507}
1508
1509impl Ord for FormatVersion {
1510 fn cmp(&self, other: &Self) -> Ordering {
1511 (*self as u8).cmp(&(*other as u8))
1512 }
1513}
1514
1515impl Display for FormatVersion {
1516 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1517 match self {
1518 FormatVersion::V1 => write!(f, "v1"),
1519 FormatVersion::V2 => write!(f, "v2"),
1520 FormatVersion::V3 => write!(f, "v3"),
1521 }
1522 }
1523}
1524
1525#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1526#[serde(rename_all = "kebab-case")]
1527pub struct MetadataLog {
1529 pub metadata_file: String,
1531 pub timestamp_ms: i64,
1533}
1534
1535#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1536#[serde(rename_all = "kebab-case")]
1537pub struct SnapshotLog {
1539 pub snapshot_id: i64,
1541 pub timestamp_ms: i64,
1543}
1544
1545impl SnapshotLog {
1546 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1548 timestamp_ms_to_utc(self.timestamp_ms)
1549 }
1550
1551 #[inline]
1553 pub fn timestamp_ms(&self) -> i64 {
1554 self.timestamp_ms
1555 }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560 use std::collections::HashMap;
1561 use std::fs;
1562 use std::io::Write as _;
1563 use std::sync::Arc;
1564
1565 use anyhow::Result;
1566 use base64::Engine as _;
1567 use pretty_assertions::assert_eq;
1568 use tempfile::TempDir;
1569 use uuid::Uuid;
1570
1571 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1572 use crate::io::FileIOBuilder;
1573 use crate::spec::table_metadata::TableMetadata;
1574 use crate::spec::{
1575 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1576 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1577 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1578 Summary, Transform, Type, UnboundPartitionField,
1579 };
1580 use crate::{ErrorKind, TableCreation};
1581
1582 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1583 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1584 assert_eq!(desered_type, expected_type);
1585
1586 let sered_json = serde_json::to_string(&expected_type).unwrap();
1587 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1588
1589 assert_eq!(parsed_json_value, desered_type);
1590 }
1591
1592 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1593 let path = format!("testdata/table_metadata/{file_name}");
1594 let metadata: String = fs::read_to_string(path).unwrap();
1595
1596 serde_json::from_str(&metadata).unwrap()
1597 }
1598
1599 #[test]
1600 fn test_table_data_v2() {
1601 let data = r#"
1602 {
1603 "format-version" : 2,
1604 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1605 "location": "s3://b/wh/data.db/table",
1606 "last-sequence-number" : 1,
1607 "last-updated-ms": 1515100955770,
1608 "last-column-id": 1,
1609 "schemas": [
1610 {
1611 "schema-id" : 1,
1612 "type" : "struct",
1613 "fields" :[
1614 {
1615 "id": 1,
1616 "name": "struct_name",
1617 "required": true,
1618 "type": "fixed[1]"
1619 },
1620 {
1621 "id": 4,
1622 "name": "ts",
1623 "required": true,
1624 "type": "timestamp"
1625 }
1626 ]
1627 }
1628 ],
1629 "current-schema-id" : 1,
1630 "partition-specs": [
1631 {
1632 "spec-id": 0,
1633 "fields": [
1634 {
1635 "source-id": 4,
1636 "field-id": 1000,
1637 "name": "ts_day",
1638 "transform": "day"
1639 }
1640 ]
1641 }
1642 ],
1643 "default-spec-id": 0,
1644 "last-partition-id": 1000,
1645 "properties": {
1646 "commit.retry.num-retries": "1"
1647 },
1648 "metadata-log": [
1649 {
1650 "metadata-file": "s3://bucket/.../v1.json",
1651 "timestamp-ms": 1515100
1652 }
1653 ],
1654 "refs": {},
1655 "sort-orders": [
1656 {
1657 "order-id": 0,
1658 "fields": []
1659 }
1660 ],
1661 "default-sort-order-id": 0
1662 }
1663 "#;
1664
1665 let schema = Schema::builder()
1666 .with_schema_id(1)
1667 .with_fields(vec![
1668 Arc::new(NestedField::required(
1669 1,
1670 "struct_name",
1671 Type::Primitive(PrimitiveType::Fixed(1)),
1672 )),
1673 Arc::new(NestedField::required(
1674 4,
1675 "ts",
1676 Type::Primitive(PrimitiveType::Timestamp),
1677 )),
1678 ])
1679 .build()
1680 .unwrap();
1681
1682 let partition_spec = PartitionSpec::builder(schema.clone())
1683 .with_spec_id(0)
1684 .add_unbound_field(UnboundPartitionField {
1685 name: "ts_day".to_string(),
1686 transform: Transform::Day,
1687 source_id: 4,
1688 field_id: Some(1000),
1689 })
1690 .unwrap()
1691 .build()
1692 .unwrap();
1693
1694 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1695 let expected = TableMetadata {
1696 format_version: FormatVersion::V2,
1697 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1698 location: "s3://b/wh/data.db/table".to_string(),
1699 last_updated_ms: 1515100955770,
1700 last_column_id: 1,
1701 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1702 current_schema_id: 1,
1703 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1704 default_partition_type,
1705 default_spec: partition_spec.into(),
1706 last_partition_id: 1000,
1707 default_sort_order_id: 0,
1708 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1709 snapshots: HashMap::default(),
1710 current_snapshot_id: None,
1711 last_sequence_number: 1,
1712 properties: HashMap::from_iter(vec![(
1713 "commit.retry.num-retries".to_string(),
1714 "1".to_string(),
1715 )]),
1716 snapshot_log: Vec::new(),
1717 metadata_log: vec![MetadataLog {
1718 metadata_file: "s3://bucket/.../v1.json".to_string(),
1719 timestamp_ms: 1515100,
1720 }],
1721 refs: HashMap::new(),
1722 statistics: HashMap::new(),
1723 partition_statistics: HashMap::new(),
1724 encryption_keys: HashMap::new(),
1725 next_row_id: INITIAL_ROW_ID,
1726 };
1727
1728 let expected_json_value = serde_json::to_value(&expected).unwrap();
1729 check_table_metadata_serde(data, expected);
1730
1731 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1732 assert_eq!(json_value, expected_json_value);
1733 }
1734
1735 #[test]
1736 fn test_table_data_v3() {
1737 let data = r#"
1738 {
1739 "format-version" : 3,
1740 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1741 "location": "s3://b/wh/data.db/table",
1742 "last-sequence-number" : 1,
1743 "last-updated-ms": 1515100955770,
1744 "last-column-id": 1,
1745 "next-row-id": 5,
1746 "schemas": [
1747 {
1748 "schema-id" : 1,
1749 "type" : "struct",
1750 "fields" :[
1751 {
1752 "id": 4,
1753 "name": "ts",
1754 "required": true,
1755 "type": "timestamp"
1756 }
1757 ]
1758 }
1759 ],
1760 "current-schema-id" : 1,
1761 "partition-specs": [
1762 {
1763 "spec-id": 0,
1764 "fields": [
1765 {
1766 "source-id": 4,
1767 "field-id": 1000,
1768 "name": "ts_day",
1769 "transform": "day"
1770 }
1771 ]
1772 }
1773 ],
1774 "default-spec-id": 0,
1775 "last-partition-id": 1000,
1776 "properties": {
1777 "commit.retry.num-retries": "1"
1778 },
1779 "metadata-log": [
1780 {
1781 "metadata-file": "s3://bucket/.../v1.json",
1782 "timestamp-ms": 1515100
1783 }
1784 ],
1785 "refs": {},
1786 "snapshots" : [ {
1787 "snapshot-id" : 1,
1788 "timestamp-ms" : 1662532818843,
1789 "sequence-number" : 0,
1790 "first-row-id" : 0,
1791 "added-rows" : 4,
1792 "key-id" : "key1",
1793 "summary" : {
1794 "operation" : "append"
1795 },
1796 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1797 "schema-id" : 0
1798 }
1799 ],
1800 "encryption-keys": [
1801 {
1802 "key-id": "key1",
1803 "encrypted-by-id": "KMS",
1804 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1805 "properties": {
1806 "p1": "v1"
1807 }
1808 }
1809 ],
1810 "sort-orders": [
1811 {
1812 "order-id": 0,
1813 "fields": []
1814 }
1815 ],
1816 "default-sort-order-id": 0
1817 }
1818 "#;
1819
1820 let schema = Schema::builder()
1821 .with_schema_id(1)
1822 .with_fields(vec![Arc::new(NestedField::required(
1823 4,
1824 "ts",
1825 Type::Primitive(PrimitiveType::Timestamp),
1826 ))])
1827 .build()
1828 .unwrap();
1829
1830 let partition_spec = PartitionSpec::builder(schema.clone())
1831 .with_spec_id(0)
1832 .add_unbound_field(UnboundPartitionField {
1833 name: "ts_day".to_string(),
1834 transform: Transform::Day,
1835 source_id: 4,
1836 field_id: Some(1000),
1837 })
1838 .unwrap()
1839 .build()
1840 .unwrap();
1841
1842 let snapshot = Snapshot::builder()
1843 .with_snapshot_id(1)
1844 .with_timestamp_ms(1662532818843)
1845 .with_sequence_number(0)
1846 .with_row_range(0, 4)
1847 .with_encryption_key_id(Some("key1".to_string()))
1848 .with_summary(Summary {
1849 operation: Operation::Append,
1850 additional_properties: HashMap::new(),
1851 })
1852 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1853 .with_schema_id(0)
1854 .build();
1855
1856 let encryption_key = EncryptedKey::builder()
1857 .key_id("key1".to_string())
1858 .encrypted_by_id("KMS".to_string())
1859 .encrypted_key_metadata(
1860 base64::prelude::BASE64_STANDARD
1861 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1862 .unwrap(),
1863 )
1864 .properties(HashMap::from_iter(vec![(
1865 "p1".to_string(),
1866 "v1".to_string(),
1867 )]))
1868 .build();
1869
1870 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1871 let expected = TableMetadata {
1872 format_version: FormatVersion::V3,
1873 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1874 location: "s3://b/wh/data.db/table".to_string(),
1875 last_updated_ms: 1515100955770,
1876 last_column_id: 1,
1877 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1878 current_schema_id: 1,
1879 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1880 default_partition_type,
1881 default_spec: partition_spec.into(),
1882 last_partition_id: 1000,
1883 default_sort_order_id: 0,
1884 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1885 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1886 current_snapshot_id: None,
1887 last_sequence_number: 1,
1888 properties: HashMap::from_iter(vec![(
1889 "commit.retry.num-retries".to_string(),
1890 "1".to_string(),
1891 )]),
1892 snapshot_log: Vec::new(),
1893 metadata_log: vec![MetadataLog {
1894 metadata_file: "s3://bucket/.../v1.json".to_string(),
1895 timestamp_ms: 1515100,
1896 }],
1897 refs: HashMap::new(),
1898 statistics: HashMap::new(),
1899 partition_statistics: HashMap::new(),
1900 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1901 next_row_id: 5,
1902 };
1903
1904 let expected_json_value = serde_json::to_value(&expected).unwrap();
1905 check_table_metadata_serde(data, expected);
1906
1907 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1908 assert_eq!(json_value, expected_json_value);
1909 }
1910
1911 #[test]
1912 fn test_table_data_v1() {
1913 let data = r#"
1914 {
1915 "format-version" : 1,
1916 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1917 "location" : "/home/iceberg/warehouse/nyc/taxis",
1918 "last-updated-ms" : 1662532818843,
1919 "last-column-id" : 5,
1920 "schema" : {
1921 "type" : "struct",
1922 "schema-id" : 0,
1923 "fields" : [ {
1924 "id" : 1,
1925 "name" : "vendor_id",
1926 "required" : false,
1927 "type" : "long"
1928 }, {
1929 "id" : 2,
1930 "name" : "trip_id",
1931 "required" : false,
1932 "type" : "long"
1933 }, {
1934 "id" : 3,
1935 "name" : "trip_distance",
1936 "required" : false,
1937 "type" : "float"
1938 }, {
1939 "id" : 4,
1940 "name" : "fare_amount",
1941 "required" : false,
1942 "type" : "double"
1943 }, {
1944 "id" : 5,
1945 "name" : "store_and_fwd_flag",
1946 "required" : false,
1947 "type" : "string"
1948 } ]
1949 },
1950 "partition-spec" : [ {
1951 "name" : "vendor_id",
1952 "transform" : "identity",
1953 "source-id" : 1,
1954 "field-id" : 1000
1955 } ],
1956 "last-partition-id" : 1000,
1957 "default-sort-order-id" : 0,
1958 "sort-orders" : [ {
1959 "order-id" : 0,
1960 "fields" : [ ]
1961 } ],
1962 "properties" : {
1963 "owner" : "root"
1964 },
1965 "current-snapshot-id" : 638933773299822130,
1966 "refs" : {
1967 "main" : {
1968 "snapshot-id" : 638933773299822130,
1969 "type" : "branch"
1970 }
1971 },
1972 "snapshots" : [ {
1973 "snapshot-id" : 638933773299822130,
1974 "timestamp-ms" : 1662532818843,
1975 "sequence-number" : 0,
1976 "summary" : {
1977 "operation" : "append",
1978 "spark.app.id" : "local-1662532784305",
1979 "added-data-files" : "4",
1980 "added-records" : "4",
1981 "added-files-size" : "6001"
1982 },
1983 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1984 "schema-id" : 0
1985 } ],
1986 "snapshot-log" : [ {
1987 "timestamp-ms" : 1662532818843,
1988 "snapshot-id" : 638933773299822130
1989 } ],
1990 "metadata-log" : [ {
1991 "timestamp-ms" : 1662532805245,
1992 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1993 } ]
1994 }
1995 "#;
1996
1997 let schema = Schema::builder()
1998 .with_fields(vec![
1999 Arc::new(NestedField::optional(
2000 1,
2001 "vendor_id",
2002 Type::Primitive(PrimitiveType::Long),
2003 )),
2004 Arc::new(NestedField::optional(
2005 2,
2006 "trip_id",
2007 Type::Primitive(PrimitiveType::Long),
2008 )),
2009 Arc::new(NestedField::optional(
2010 3,
2011 "trip_distance",
2012 Type::Primitive(PrimitiveType::Float),
2013 )),
2014 Arc::new(NestedField::optional(
2015 4,
2016 "fare_amount",
2017 Type::Primitive(PrimitiveType::Double),
2018 )),
2019 Arc::new(NestedField::optional(
2020 5,
2021 "store_and_fwd_flag",
2022 Type::Primitive(PrimitiveType::String),
2023 )),
2024 ])
2025 .build()
2026 .unwrap();
2027
2028 let schema = Arc::new(schema);
2029 let partition_spec = PartitionSpec::builder(schema.clone())
2030 .with_spec_id(0)
2031 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2032 .unwrap()
2033 .build()
2034 .unwrap();
2035
2036 let sort_order = SortOrder::builder()
2037 .with_order_id(0)
2038 .build_unbound()
2039 .unwrap();
2040
2041 let snapshot = Snapshot::builder()
2042 .with_snapshot_id(638933773299822130)
2043 .with_timestamp_ms(1662532818843)
2044 .with_sequence_number(0)
2045 .with_schema_id(0)
2046 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2047 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2048 .build();
2049
2050 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2051 let expected = TableMetadata {
2052 format_version: FormatVersion::V1,
2053 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2054 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2055 last_updated_ms: 1662532818843,
2056 last_column_id: 5,
2057 schemas: HashMap::from_iter(vec![(0, schema)]),
2058 current_schema_id: 0,
2059 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2060 default_partition_type,
2061 default_spec: Arc::new(partition_spec),
2062 last_partition_id: 1000,
2063 default_sort_order_id: 0,
2064 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2065 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2066 current_snapshot_id: Some(638933773299822130),
2067 last_sequence_number: 0,
2068 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2069 snapshot_log: vec![SnapshotLog {
2070 snapshot_id: 638933773299822130,
2071 timestamp_ms: 1662532818843,
2072 }],
2073 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2074 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2075 statistics: HashMap::new(),
2076 partition_statistics: HashMap::new(),
2077 encryption_keys: HashMap::new(),
2078 next_row_id: INITIAL_ROW_ID,
2079 };
2080
2081 check_table_metadata_serde(data, expected);
2082 }
2083
2084 #[test]
2085 fn test_table_data_v2_no_snapshots() {
2086 let data = r#"
2087 {
2088 "format-version" : 2,
2089 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2090 "location": "s3://b/wh/data.db/table",
2091 "last-sequence-number" : 1,
2092 "last-updated-ms": 1515100955770,
2093 "last-column-id": 1,
2094 "schemas": [
2095 {
2096 "schema-id" : 1,
2097 "type" : "struct",
2098 "fields" :[
2099 {
2100 "id": 1,
2101 "name": "struct_name",
2102 "required": true,
2103 "type": "fixed[1]"
2104 }
2105 ]
2106 }
2107 ],
2108 "current-schema-id" : 1,
2109 "partition-specs": [
2110 {
2111 "spec-id": 0,
2112 "fields": []
2113 }
2114 ],
2115 "refs": {},
2116 "default-spec-id": 0,
2117 "last-partition-id": 1000,
2118 "metadata-log": [
2119 {
2120 "metadata-file": "s3://bucket/.../v1.json",
2121 "timestamp-ms": 1515100
2122 }
2123 ],
2124 "sort-orders": [
2125 {
2126 "order-id": 0,
2127 "fields": []
2128 }
2129 ],
2130 "default-sort-order-id": 0
2131 }
2132 "#;
2133
2134 let schema = Schema::builder()
2135 .with_schema_id(1)
2136 .with_fields(vec![Arc::new(NestedField::required(
2137 1,
2138 "struct_name",
2139 Type::Primitive(PrimitiveType::Fixed(1)),
2140 ))])
2141 .build()
2142 .unwrap();
2143
2144 let partition_spec = PartitionSpec::builder(schema.clone())
2145 .with_spec_id(0)
2146 .build()
2147 .unwrap();
2148
2149 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2150 let expected = TableMetadata {
2151 format_version: FormatVersion::V2,
2152 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2153 location: "s3://b/wh/data.db/table".to_string(),
2154 last_updated_ms: 1515100955770,
2155 last_column_id: 1,
2156 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2157 current_schema_id: 1,
2158 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2159 default_partition_type,
2160 default_spec: partition_spec.into(),
2161 last_partition_id: 1000,
2162 default_sort_order_id: 0,
2163 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2164 snapshots: HashMap::default(),
2165 current_snapshot_id: None,
2166 last_sequence_number: 1,
2167 properties: HashMap::new(),
2168 snapshot_log: Vec::new(),
2169 metadata_log: vec![MetadataLog {
2170 metadata_file: "s3://bucket/.../v1.json".to_string(),
2171 timestamp_ms: 1515100,
2172 }],
2173 refs: HashMap::new(),
2174 statistics: HashMap::new(),
2175 partition_statistics: HashMap::new(),
2176 encryption_keys: HashMap::new(),
2177 next_row_id: INITIAL_ROW_ID,
2178 };
2179
2180 let expected_json_value = serde_json::to_value(&expected).unwrap();
2181 check_table_metadata_serde(data, expected);
2182
2183 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2184 assert_eq!(json_value, expected_json_value);
2185 }
2186
2187 #[test]
2188 fn test_current_snapshot_id_must_match_main_branch() {
2189 let data = r#"
2190 {
2191 "format-version" : 2,
2192 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2193 "location": "s3://b/wh/data.db/table",
2194 "last-sequence-number" : 1,
2195 "last-updated-ms": 1515100955770,
2196 "last-column-id": 1,
2197 "schemas": [
2198 {
2199 "schema-id" : 1,
2200 "type" : "struct",
2201 "fields" :[
2202 {
2203 "id": 1,
2204 "name": "struct_name",
2205 "required": true,
2206 "type": "fixed[1]"
2207 },
2208 {
2209 "id": 4,
2210 "name": "ts",
2211 "required": true,
2212 "type": "timestamp"
2213 }
2214 ]
2215 }
2216 ],
2217 "current-schema-id" : 1,
2218 "partition-specs": [
2219 {
2220 "spec-id": 0,
2221 "fields": [
2222 {
2223 "source-id": 4,
2224 "field-id": 1000,
2225 "name": "ts_day",
2226 "transform": "day"
2227 }
2228 ]
2229 }
2230 ],
2231 "default-spec-id": 0,
2232 "last-partition-id": 1000,
2233 "properties": {
2234 "commit.retry.num-retries": "1"
2235 },
2236 "metadata-log": [
2237 {
2238 "metadata-file": "s3://bucket/.../v1.json",
2239 "timestamp-ms": 1515100
2240 }
2241 ],
2242 "sort-orders": [
2243 {
2244 "order-id": 0,
2245 "fields": []
2246 }
2247 ],
2248 "default-sort-order-id": 0,
2249 "current-snapshot-id" : 1,
2250 "refs" : {
2251 "main" : {
2252 "snapshot-id" : 2,
2253 "type" : "branch"
2254 }
2255 },
2256 "snapshots" : [ {
2257 "snapshot-id" : 1,
2258 "timestamp-ms" : 1662532818843,
2259 "sequence-number" : 0,
2260 "summary" : {
2261 "operation" : "append",
2262 "spark.app.id" : "local-1662532784305",
2263 "added-data-files" : "4",
2264 "added-records" : "4",
2265 "added-files-size" : "6001"
2266 },
2267 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2268 "schema-id" : 0
2269 },
2270 {
2271 "snapshot-id" : 2,
2272 "timestamp-ms" : 1662532818844,
2273 "sequence-number" : 0,
2274 "summary" : {
2275 "operation" : "append",
2276 "spark.app.id" : "local-1662532784305",
2277 "added-data-files" : "4",
2278 "added-records" : "4",
2279 "added-files-size" : "6001"
2280 },
2281 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2282 "schema-id" : 0
2283 } ]
2284 }
2285 "#;
2286
2287 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2288 assert!(
2289 err.to_string()
2290 .contains("Current snapshot id does not match main branch")
2291 );
2292 }
2293
2294 #[test]
2295 fn test_main_without_current() {
2296 let data = r#"
2297 {
2298 "format-version" : 2,
2299 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2300 "location": "s3://b/wh/data.db/table",
2301 "last-sequence-number" : 1,
2302 "last-updated-ms": 1515100955770,
2303 "last-column-id": 1,
2304 "schemas": [
2305 {
2306 "schema-id" : 1,
2307 "type" : "struct",
2308 "fields" :[
2309 {
2310 "id": 1,
2311 "name": "struct_name",
2312 "required": true,
2313 "type": "fixed[1]"
2314 },
2315 {
2316 "id": 4,
2317 "name": "ts",
2318 "required": true,
2319 "type": "timestamp"
2320 }
2321 ]
2322 }
2323 ],
2324 "current-schema-id" : 1,
2325 "partition-specs": [
2326 {
2327 "spec-id": 0,
2328 "fields": [
2329 {
2330 "source-id": 4,
2331 "field-id": 1000,
2332 "name": "ts_day",
2333 "transform": "day"
2334 }
2335 ]
2336 }
2337 ],
2338 "default-spec-id": 0,
2339 "last-partition-id": 1000,
2340 "properties": {
2341 "commit.retry.num-retries": "1"
2342 },
2343 "metadata-log": [
2344 {
2345 "metadata-file": "s3://bucket/.../v1.json",
2346 "timestamp-ms": 1515100
2347 }
2348 ],
2349 "sort-orders": [
2350 {
2351 "order-id": 0,
2352 "fields": []
2353 }
2354 ],
2355 "default-sort-order-id": 0,
2356 "refs" : {
2357 "main" : {
2358 "snapshot-id" : 1,
2359 "type" : "branch"
2360 }
2361 },
2362 "snapshots" : [ {
2363 "snapshot-id" : 1,
2364 "timestamp-ms" : 1662532818843,
2365 "sequence-number" : 0,
2366 "summary" : {
2367 "operation" : "append",
2368 "spark.app.id" : "local-1662532784305",
2369 "added-data-files" : "4",
2370 "added-records" : "4",
2371 "added-files-size" : "6001"
2372 },
2373 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2374 "schema-id" : 0
2375 } ]
2376 }
2377 "#;
2378
2379 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2380 assert!(
2381 err.to_string()
2382 .contains("Current snapshot is not set, but main branch exists")
2383 );
2384 }
2385
2386 #[test]
2387 fn test_branch_snapshot_missing() {
2388 let data = r#"
2389 {
2390 "format-version" : 2,
2391 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2392 "location": "s3://b/wh/data.db/table",
2393 "last-sequence-number" : 1,
2394 "last-updated-ms": 1515100955770,
2395 "last-column-id": 1,
2396 "schemas": [
2397 {
2398 "schema-id" : 1,
2399 "type" : "struct",
2400 "fields" :[
2401 {
2402 "id": 1,
2403 "name": "struct_name",
2404 "required": true,
2405 "type": "fixed[1]"
2406 },
2407 {
2408 "id": 4,
2409 "name": "ts",
2410 "required": true,
2411 "type": "timestamp"
2412 }
2413 ]
2414 }
2415 ],
2416 "current-schema-id" : 1,
2417 "partition-specs": [
2418 {
2419 "spec-id": 0,
2420 "fields": [
2421 {
2422 "source-id": 4,
2423 "field-id": 1000,
2424 "name": "ts_day",
2425 "transform": "day"
2426 }
2427 ]
2428 }
2429 ],
2430 "default-spec-id": 0,
2431 "last-partition-id": 1000,
2432 "properties": {
2433 "commit.retry.num-retries": "1"
2434 },
2435 "metadata-log": [
2436 {
2437 "metadata-file": "s3://bucket/.../v1.json",
2438 "timestamp-ms": 1515100
2439 }
2440 ],
2441 "sort-orders": [
2442 {
2443 "order-id": 0,
2444 "fields": []
2445 }
2446 ],
2447 "default-sort-order-id": 0,
2448 "refs" : {
2449 "main" : {
2450 "snapshot-id" : 1,
2451 "type" : "branch"
2452 },
2453 "foo" : {
2454 "snapshot-id" : 2,
2455 "type" : "branch"
2456 }
2457 },
2458 "snapshots" : [ {
2459 "snapshot-id" : 1,
2460 "timestamp-ms" : 1662532818843,
2461 "sequence-number" : 0,
2462 "summary" : {
2463 "operation" : "append",
2464 "spark.app.id" : "local-1662532784305",
2465 "added-data-files" : "4",
2466 "added-records" : "4",
2467 "added-files-size" : "6001"
2468 },
2469 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2470 "schema-id" : 0
2471 } ]
2472 }
2473 "#;
2474
2475 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2476 assert!(
2477 err.to_string().contains(
2478 "Snapshot for reference foo does not exist in the existing snapshots list"
2479 )
2480 );
2481 }
2482
2483 #[test]
2484 fn test_v2_wrong_max_snapshot_sequence_number() {
2485 let data = r#"
2486 {
2487 "format-version": 2,
2488 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2489 "location": "s3://bucket/test/location",
2490 "last-sequence-number": 1,
2491 "last-updated-ms": 1602638573590,
2492 "last-column-id": 3,
2493 "current-schema-id": 0,
2494 "schemas": [
2495 {
2496 "type": "struct",
2497 "schema-id": 0,
2498 "fields": [
2499 {
2500 "id": 1,
2501 "name": "x",
2502 "required": true,
2503 "type": "long"
2504 }
2505 ]
2506 }
2507 ],
2508 "default-spec-id": 0,
2509 "partition-specs": [
2510 {
2511 "spec-id": 0,
2512 "fields": []
2513 }
2514 ],
2515 "last-partition-id": 1000,
2516 "default-sort-order-id": 0,
2517 "sort-orders": [
2518 {
2519 "order-id": 0,
2520 "fields": []
2521 }
2522 ],
2523 "properties": {},
2524 "current-snapshot-id": 3055729675574597004,
2525 "snapshots": [
2526 {
2527 "snapshot-id": 3055729675574597004,
2528 "timestamp-ms": 1555100955770,
2529 "sequence-number": 4,
2530 "summary": {
2531 "operation": "append"
2532 },
2533 "manifest-list": "s3://a/b/2.avro",
2534 "schema-id": 0
2535 }
2536 ],
2537 "statistics": [],
2538 "snapshot-log": [],
2539 "metadata-log": []
2540 }
2541 "#;
2542
2543 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2544 assert!(err.to_string().contains(
2545 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2546 ));
2547
2548 let data = data.replace(
2550 r#""last-sequence-number": 1,"#,
2551 r#""last-sequence-number": 4,"#,
2552 );
2553 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2554 assert_eq!(metadata.last_sequence_number, 4);
2555
2556 let data = data.replace(
2558 r#""last-sequence-number": 4,"#,
2559 r#""last-sequence-number": 5,"#,
2560 );
2561 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2562 assert_eq!(metadata.last_sequence_number, 5);
2563 }
2564
2565 #[test]
2566 fn test_statistic_files() {
2567 let data = r#"
2568 {
2569 "format-version": 2,
2570 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2571 "location": "s3://bucket/test/location",
2572 "last-sequence-number": 34,
2573 "last-updated-ms": 1602638573590,
2574 "last-column-id": 3,
2575 "current-schema-id": 0,
2576 "schemas": [
2577 {
2578 "type": "struct",
2579 "schema-id": 0,
2580 "fields": [
2581 {
2582 "id": 1,
2583 "name": "x",
2584 "required": true,
2585 "type": "long"
2586 }
2587 ]
2588 }
2589 ],
2590 "default-spec-id": 0,
2591 "partition-specs": [
2592 {
2593 "spec-id": 0,
2594 "fields": []
2595 }
2596 ],
2597 "last-partition-id": 1000,
2598 "default-sort-order-id": 0,
2599 "sort-orders": [
2600 {
2601 "order-id": 0,
2602 "fields": []
2603 }
2604 ],
2605 "properties": {},
2606 "current-snapshot-id": 3055729675574597004,
2607 "snapshots": [
2608 {
2609 "snapshot-id": 3055729675574597004,
2610 "timestamp-ms": 1555100955770,
2611 "sequence-number": 1,
2612 "summary": {
2613 "operation": "append"
2614 },
2615 "manifest-list": "s3://a/b/2.avro",
2616 "schema-id": 0
2617 }
2618 ],
2619 "statistics": [
2620 {
2621 "snapshot-id": 3055729675574597004,
2622 "statistics-path": "s3://a/b/stats.puffin",
2623 "file-size-in-bytes": 413,
2624 "file-footer-size-in-bytes": 42,
2625 "blob-metadata": [
2626 {
2627 "type": "ndv",
2628 "snapshot-id": 3055729675574597004,
2629 "sequence-number": 1,
2630 "fields": [
2631 1
2632 ]
2633 }
2634 ]
2635 }
2636 ],
2637 "snapshot-log": [],
2638 "metadata-log": []
2639 }
2640 "#;
2641
2642 let schema = Schema::builder()
2643 .with_schema_id(0)
2644 .with_fields(vec![Arc::new(NestedField::required(
2645 1,
2646 "x",
2647 Type::Primitive(PrimitiveType::Long),
2648 ))])
2649 .build()
2650 .unwrap();
2651 let partition_spec = PartitionSpec::builder(schema.clone())
2652 .with_spec_id(0)
2653 .build()
2654 .unwrap();
2655 let snapshot = Snapshot::builder()
2656 .with_snapshot_id(3055729675574597004)
2657 .with_timestamp_ms(1555100955770)
2658 .with_sequence_number(1)
2659 .with_manifest_list("s3://a/b/2.avro")
2660 .with_schema_id(0)
2661 .with_summary(Summary {
2662 operation: Operation::Append,
2663 additional_properties: HashMap::new(),
2664 })
2665 .build();
2666
2667 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2668 let expected = TableMetadata {
2669 format_version: FormatVersion::V2,
2670 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2671 location: "s3://bucket/test/location".to_string(),
2672 last_updated_ms: 1602638573590,
2673 last_column_id: 3,
2674 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2675 current_schema_id: 0,
2676 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2677 default_partition_type,
2678 default_spec: Arc::new(partition_spec),
2679 last_partition_id: 1000,
2680 default_sort_order_id: 0,
2681 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2682 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2683 current_snapshot_id: Some(3055729675574597004),
2684 last_sequence_number: 34,
2685 properties: HashMap::new(),
2686 snapshot_log: Vec::new(),
2687 metadata_log: Vec::new(),
2688 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2689 snapshot_id: 3055729675574597004,
2690 statistics_path: "s3://a/b/stats.puffin".to_string(),
2691 file_size_in_bytes: 413,
2692 file_footer_size_in_bytes: 42,
2693 key_metadata: None,
2694 blob_metadata: vec![BlobMetadata {
2695 snapshot_id: 3055729675574597004,
2696 sequence_number: 1,
2697 fields: vec![1],
2698 r#type: "ndv".to_string(),
2699 properties: HashMap::new(),
2700 }],
2701 })]),
2702 partition_statistics: HashMap::new(),
2703 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2704 snapshot_id: 3055729675574597004,
2705 retention: SnapshotRetention::Branch {
2706 min_snapshots_to_keep: None,
2707 max_snapshot_age_ms: None,
2708 max_ref_age_ms: None,
2709 },
2710 })]),
2711 encryption_keys: HashMap::new(),
2712 next_row_id: INITIAL_ROW_ID,
2713 };
2714
2715 check_table_metadata_serde(data, expected);
2716 }
2717
2718 #[test]
2719 fn test_partition_statistics_file() {
2720 let data = r#"
2721 {
2722 "format-version": 2,
2723 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2724 "location": "s3://bucket/test/location",
2725 "last-sequence-number": 34,
2726 "last-updated-ms": 1602638573590,
2727 "last-column-id": 3,
2728 "current-schema-id": 0,
2729 "schemas": [
2730 {
2731 "type": "struct",
2732 "schema-id": 0,
2733 "fields": [
2734 {
2735 "id": 1,
2736 "name": "x",
2737 "required": true,
2738 "type": "long"
2739 }
2740 ]
2741 }
2742 ],
2743 "default-spec-id": 0,
2744 "partition-specs": [
2745 {
2746 "spec-id": 0,
2747 "fields": []
2748 }
2749 ],
2750 "last-partition-id": 1000,
2751 "default-sort-order-id": 0,
2752 "sort-orders": [
2753 {
2754 "order-id": 0,
2755 "fields": []
2756 }
2757 ],
2758 "properties": {},
2759 "current-snapshot-id": 3055729675574597004,
2760 "snapshots": [
2761 {
2762 "snapshot-id": 3055729675574597004,
2763 "timestamp-ms": 1555100955770,
2764 "sequence-number": 1,
2765 "summary": {
2766 "operation": "append"
2767 },
2768 "manifest-list": "s3://a/b/2.avro",
2769 "schema-id": 0
2770 }
2771 ],
2772 "partition-statistics": [
2773 {
2774 "snapshot-id": 3055729675574597004,
2775 "statistics-path": "s3://a/b/partition-stats.parquet",
2776 "file-size-in-bytes": 43
2777 }
2778 ],
2779 "snapshot-log": [],
2780 "metadata-log": []
2781 }
2782 "#;
2783
2784 let schema = Schema::builder()
2785 .with_schema_id(0)
2786 .with_fields(vec![Arc::new(NestedField::required(
2787 1,
2788 "x",
2789 Type::Primitive(PrimitiveType::Long),
2790 ))])
2791 .build()
2792 .unwrap();
2793 let partition_spec = PartitionSpec::builder(schema.clone())
2794 .with_spec_id(0)
2795 .build()
2796 .unwrap();
2797 let snapshot = Snapshot::builder()
2798 .with_snapshot_id(3055729675574597004)
2799 .with_timestamp_ms(1555100955770)
2800 .with_sequence_number(1)
2801 .with_manifest_list("s3://a/b/2.avro")
2802 .with_schema_id(0)
2803 .with_summary(Summary {
2804 operation: Operation::Append,
2805 additional_properties: HashMap::new(),
2806 })
2807 .build();
2808
2809 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2810 let expected = TableMetadata {
2811 format_version: FormatVersion::V2,
2812 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2813 location: "s3://bucket/test/location".to_string(),
2814 last_updated_ms: 1602638573590,
2815 last_column_id: 3,
2816 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2817 current_schema_id: 0,
2818 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2819 default_spec: Arc::new(partition_spec),
2820 default_partition_type,
2821 last_partition_id: 1000,
2822 default_sort_order_id: 0,
2823 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2824 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2825 current_snapshot_id: Some(3055729675574597004),
2826 last_sequence_number: 34,
2827 properties: HashMap::new(),
2828 snapshot_log: Vec::new(),
2829 metadata_log: Vec::new(),
2830 statistics: HashMap::new(),
2831 partition_statistics: HashMap::from_iter(vec![(
2832 3055729675574597004,
2833 PartitionStatisticsFile {
2834 snapshot_id: 3055729675574597004,
2835 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2836 file_size_in_bytes: 43,
2837 },
2838 )]),
2839 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2840 snapshot_id: 3055729675574597004,
2841 retention: SnapshotRetention::Branch {
2842 min_snapshots_to_keep: None,
2843 max_snapshot_age_ms: None,
2844 max_ref_age_ms: None,
2845 },
2846 })]),
2847 encryption_keys: HashMap::new(),
2848 next_row_id: INITIAL_ROW_ID,
2849 };
2850
2851 check_table_metadata_serde(data, expected);
2852 }
2853
2854 #[test]
2855 fn test_invalid_table_uuid() -> Result<()> {
2856 let data = r#"
2857 {
2858 "format-version" : 2,
2859 "table-uuid": "xxxx"
2860 }
2861 "#;
2862 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2863 Ok(())
2864 }
2865
2866 #[test]
2867 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2868 let data = r#"
2869 {
2870 "format-version" : 1
2871 }
2872 "#;
2873 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2874 Ok(())
2875 }
2876
2877 #[test]
2878 fn test_table_metadata_v3_valid_minimal() {
2879 let metadata_str =
2880 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2881
2882 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2883 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2884
2885 let schema = Schema::builder()
2886 .with_schema_id(0)
2887 .with_fields(vec![
2888 Arc::new(
2889 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2890 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2891 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2892 ),
2893 Arc::new(
2894 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2895 .with_doc("comment"),
2896 ),
2897 Arc::new(NestedField::required(
2898 3,
2899 "z",
2900 Type::Primitive(PrimitiveType::Long),
2901 )),
2902 ])
2903 .build()
2904 .unwrap();
2905
2906 let partition_spec = PartitionSpec::builder(schema.clone())
2907 .with_spec_id(0)
2908 .add_unbound_field(UnboundPartitionField {
2909 name: "x".to_string(),
2910 transform: Transform::Identity,
2911 source_id: 1,
2912 field_id: Some(1000),
2913 })
2914 .unwrap()
2915 .build()
2916 .unwrap();
2917
2918 let sort_order = SortOrder::builder()
2919 .with_order_id(3)
2920 .with_sort_field(SortField {
2921 source_id: 2,
2922 transform: Transform::Identity,
2923 direction: SortDirection::Ascending,
2924 null_order: NullOrder::First,
2925 })
2926 .with_sort_field(SortField {
2927 source_id: 3,
2928 transform: Transform::Bucket(4),
2929 direction: SortDirection::Descending,
2930 null_order: NullOrder::Last,
2931 })
2932 .build_unbound()
2933 .unwrap();
2934
2935 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2936 let expected = TableMetadata {
2937 format_version: FormatVersion::V3,
2938 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2939 location: "s3://bucket/test/location".to_string(),
2940 last_updated_ms: 1602638573590,
2941 last_column_id: 3,
2942 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2943 current_schema_id: 0,
2944 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2945 default_spec: Arc::new(partition_spec),
2946 default_partition_type,
2947 last_partition_id: 1000,
2948 default_sort_order_id: 3,
2949 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2950 snapshots: HashMap::default(),
2951 current_snapshot_id: None,
2952 last_sequence_number: 34,
2953 properties: HashMap::new(),
2954 snapshot_log: Vec::new(),
2955 metadata_log: Vec::new(),
2956 refs: HashMap::new(),
2957 statistics: HashMap::new(),
2958 partition_statistics: HashMap::new(),
2959 encryption_keys: HashMap::new(),
2960 next_row_id: 0, };
2962
2963 check_table_metadata_serde(&metadata_str, expected);
2964 }
2965
2966 #[test]
2967 fn test_table_metadata_v2_file_valid() {
2968 let metadata =
2969 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2970
2971 let schema1 = Schema::builder()
2972 .with_schema_id(0)
2973 .with_fields(vec![Arc::new(NestedField::required(
2974 1,
2975 "x",
2976 Type::Primitive(PrimitiveType::Long),
2977 ))])
2978 .build()
2979 .unwrap();
2980
2981 let schema2 = Schema::builder()
2982 .with_schema_id(1)
2983 .with_fields(vec![
2984 Arc::new(NestedField::required(
2985 1,
2986 "x",
2987 Type::Primitive(PrimitiveType::Long),
2988 )),
2989 Arc::new(
2990 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2991 .with_doc("comment"),
2992 ),
2993 Arc::new(NestedField::required(
2994 3,
2995 "z",
2996 Type::Primitive(PrimitiveType::Long),
2997 )),
2998 ])
2999 .with_identifier_field_ids(vec![1, 2])
3000 .build()
3001 .unwrap();
3002
3003 let partition_spec = PartitionSpec::builder(schema2.clone())
3004 .with_spec_id(0)
3005 .add_unbound_field(UnboundPartitionField {
3006 name: "x".to_string(),
3007 transform: Transform::Identity,
3008 source_id: 1,
3009 field_id: Some(1000),
3010 })
3011 .unwrap()
3012 .build()
3013 .unwrap();
3014
3015 let sort_order = SortOrder::builder()
3016 .with_order_id(3)
3017 .with_sort_field(SortField {
3018 source_id: 2,
3019 transform: Transform::Identity,
3020 direction: SortDirection::Ascending,
3021 null_order: NullOrder::First,
3022 })
3023 .with_sort_field(SortField {
3024 source_id: 3,
3025 transform: Transform::Bucket(4),
3026 direction: SortDirection::Descending,
3027 null_order: NullOrder::Last,
3028 })
3029 .build_unbound()
3030 .unwrap();
3031
3032 let snapshot1 = Snapshot::builder()
3033 .with_snapshot_id(3051729675574597004)
3034 .with_timestamp_ms(1515100955770)
3035 .with_sequence_number(0)
3036 .with_manifest_list("s3://a/b/1.avro")
3037 .with_summary(Summary {
3038 operation: Operation::Append,
3039 additional_properties: HashMap::new(),
3040 })
3041 .build();
3042
3043 let snapshot2 = Snapshot::builder()
3044 .with_snapshot_id(3055729675574597004)
3045 .with_parent_snapshot_id(Some(3051729675574597004))
3046 .with_timestamp_ms(1555100955770)
3047 .with_sequence_number(1)
3048 .with_schema_id(1)
3049 .with_manifest_list("s3://a/b/2.avro")
3050 .with_summary(Summary {
3051 operation: Operation::Append,
3052 additional_properties: HashMap::new(),
3053 })
3054 .build();
3055
3056 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3057 let expected = TableMetadata {
3058 format_version: FormatVersion::V2,
3059 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3060 location: "s3://bucket/test/location".to_string(),
3061 last_updated_ms: 1602638573590,
3062 last_column_id: 3,
3063 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3064 current_schema_id: 1,
3065 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3066 default_spec: Arc::new(partition_spec),
3067 default_partition_type,
3068 last_partition_id: 1000,
3069 default_sort_order_id: 3,
3070 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3071 snapshots: HashMap::from_iter(vec![
3072 (3051729675574597004, Arc::new(snapshot1)),
3073 (3055729675574597004, Arc::new(snapshot2)),
3074 ]),
3075 current_snapshot_id: Some(3055729675574597004),
3076 last_sequence_number: 34,
3077 properties: HashMap::new(),
3078 snapshot_log: vec![
3079 SnapshotLog {
3080 snapshot_id: 3051729675574597004,
3081 timestamp_ms: 1515100955770,
3082 },
3083 SnapshotLog {
3084 snapshot_id: 3055729675574597004,
3085 timestamp_ms: 1555100955770,
3086 },
3087 ],
3088 metadata_log: Vec::new(),
3089 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3090 snapshot_id: 3055729675574597004,
3091 retention: SnapshotRetention::Branch {
3092 min_snapshots_to_keep: None,
3093 max_snapshot_age_ms: None,
3094 max_ref_age_ms: None,
3095 },
3096 })]),
3097 statistics: HashMap::new(),
3098 partition_statistics: HashMap::new(),
3099 encryption_keys: HashMap::new(),
3100 next_row_id: INITIAL_ROW_ID,
3101 };
3102
3103 check_table_metadata_serde(&metadata, expected);
3104 }
3105
3106 #[test]
3107 fn test_table_metadata_v2_file_valid_minimal() {
3108 let metadata =
3109 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3110
3111 let schema = Schema::builder()
3112 .with_schema_id(0)
3113 .with_fields(vec![
3114 Arc::new(NestedField::required(
3115 1,
3116 "x",
3117 Type::Primitive(PrimitiveType::Long),
3118 )),
3119 Arc::new(
3120 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3121 .with_doc("comment"),
3122 ),
3123 Arc::new(NestedField::required(
3124 3,
3125 "z",
3126 Type::Primitive(PrimitiveType::Long),
3127 )),
3128 ])
3129 .build()
3130 .unwrap();
3131
3132 let partition_spec = PartitionSpec::builder(schema.clone())
3133 .with_spec_id(0)
3134 .add_unbound_field(UnboundPartitionField {
3135 name: "x".to_string(),
3136 transform: Transform::Identity,
3137 source_id: 1,
3138 field_id: Some(1000),
3139 })
3140 .unwrap()
3141 .build()
3142 .unwrap();
3143
3144 let sort_order = SortOrder::builder()
3145 .with_order_id(3)
3146 .with_sort_field(SortField {
3147 source_id: 2,
3148 transform: Transform::Identity,
3149 direction: SortDirection::Ascending,
3150 null_order: NullOrder::First,
3151 })
3152 .with_sort_field(SortField {
3153 source_id: 3,
3154 transform: Transform::Bucket(4),
3155 direction: SortDirection::Descending,
3156 null_order: NullOrder::Last,
3157 })
3158 .build_unbound()
3159 .unwrap();
3160
3161 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3162 let expected = TableMetadata {
3163 format_version: FormatVersion::V2,
3164 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3165 location: "s3://bucket/test/location".to_string(),
3166 last_updated_ms: 1602638573590,
3167 last_column_id: 3,
3168 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3169 current_schema_id: 0,
3170 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3171 default_partition_type,
3172 default_spec: Arc::new(partition_spec),
3173 last_partition_id: 1000,
3174 default_sort_order_id: 3,
3175 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3176 snapshots: HashMap::default(),
3177 current_snapshot_id: None,
3178 last_sequence_number: 34,
3179 properties: HashMap::new(),
3180 snapshot_log: vec![],
3181 metadata_log: Vec::new(),
3182 refs: HashMap::new(),
3183 statistics: HashMap::new(),
3184 partition_statistics: HashMap::new(),
3185 encryption_keys: HashMap::new(),
3186 next_row_id: INITIAL_ROW_ID,
3187 };
3188
3189 check_table_metadata_serde(&metadata, expected);
3190 }
3191
3192 #[test]
3193 fn test_table_metadata_v1_file_valid() {
3194 let metadata =
3195 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3196
3197 let schema = Schema::builder()
3198 .with_schema_id(0)
3199 .with_fields(vec![
3200 Arc::new(NestedField::required(
3201 1,
3202 "x",
3203 Type::Primitive(PrimitiveType::Long),
3204 )),
3205 Arc::new(
3206 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3207 .with_doc("comment"),
3208 ),
3209 Arc::new(NestedField::required(
3210 3,
3211 "z",
3212 Type::Primitive(PrimitiveType::Long),
3213 )),
3214 ])
3215 .build()
3216 .unwrap();
3217
3218 let partition_spec = PartitionSpec::builder(schema.clone())
3219 .with_spec_id(0)
3220 .add_unbound_field(UnboundPartitionField {
3221 name: "x".to_string(),
3222 transform: Transform::Identity,
3223 source_id: 1,
3224 field_id: Some(1000),
3225 })
3226 .unwrap()
3227 .build()
3228 .unwrap();
3229
3230 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3231 let expected = TableMetadata {
3232 format_version: FormatVersion::V1,
3233 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3234 location: "s3://bucket/test/location".to_string(),
3235 last_updated_ms: 1602638573874,
3236 last_column_id: 3,
3237 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3238 current_schema_id: 0,
3239 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3240 default_spec: Arc::new(partition_spec),
3241 default_partition_type,
3242 last_partition_id: 0,
3243 default_sort_order_id: 0,
3244 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3246 snapshots: HashMap::new(),
3247 current_snapshot_id: None,
3248 last_sequence_number: 0,
3249 properties: HashMap::new(),
3250 snapshot_log: vec![],
3251 metadata_log: Vec::new(),
3252 refs: HashMap::new(),
3253 statistics: HashMap::new(),
3254 partition_statistics: HashMap::new(),
3255 encryption_keys: HashMap::new(),
3256 next_row_id: INITIAL_ROW_ID,
3257 };
3258
3259 check_table_metadata_serde(&metadata, expected);
3260 }
3261
3262 #[test]
3263 fn test_table_metadata_v1_compat() {
3264 let metadata =
3265 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3266
3267 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3269 .expect("Failed to deserialize TableMetadataV1Compat.json");
3270
3271 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3273 assert_eq!(
3274 desered_type.uuid(),
3275 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3276 );
3277 assert_eq!(
3278 desered_type.location(),
3279 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3280 );
3281 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3282 assert_eq!(desered_type.current_schema_id(), 0);
3283 }
3284
3285 #[test]
3286 fn test_table_metadata_v1_schemas_without_current_id() {
3287 let metadata = fs::read_to_string(
3288 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3289 )
3290 .unwrap();
3291
3292 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3294 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3295
3296 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3298 assert_eq!(
3299 desered_type.uuid(),
3300 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3301 );
3302
3303 let schema = desered_type.current_schema();
3305 assert_eq!(schema.as_struct().fields().len(), 3);
3306 assert_eq!(schema.as_struct().fields()[0].name, "x");
3307 assert_eq!(schema.as_struct().fields()[1].name, "y");
3308 assert_eq!(schema.as_struct().fields()[2].name, "z");
3309 }
3310
3311 #[test]
3312 fn test_table_metadata_v1_no_valid_schema() {
3313 let metadata =
3314 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3315 .unwrap();
3316
3317 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3319
3320 assert!(desered.is_err());
3321 let error_message = desered.unwrap_err().to_string();
3322 assert!(
3323 error_message.contains("No valid schema configuration found"),
3324 "Expected error about no valid schema configuration, got: {error_message}"
3325 );
3326 }
3327
3328 #[test]
3329 fn test_table_metadata_v1_partition_specs_without_default_id() {
3330 let metadata = fs::read_to_string(
3331 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3332 )
3333 .unwrap();
3334
3335 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3337 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3338
3339 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3341 assert_eq!(
3342 desered_type.uuid(),
3343 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3344 );
3345
3346 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3349
3350 let default_spec = &desered_type.default_spec;
3352 assert_eq!(default_spec.spec_id(), 2);
3353 assert_eq!(default_spec.fields().len(), 1);
3354 assert_eq!(default_spec.fields()[0].name, "y");
3355 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3356 assert_eq!(default_spec.fields()[0].source_id, 2);
3357 }
3358
3359 #[test]
3360 fn test_table_metadata_v2_schema_not_found() {
3361 let metadata =
3362 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3363 .unwrap();
3364
3365 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3366
3367 assert_eq!(
3368 desered.unwrap_err().to_string(),
3369 "DataInvalid => No schema exists with the current schema id 2."
3370 )
3371 }
3372
3373 #[test]
3374 fn test_table_metadata_v2_missing_sort_order() {
3375 let metadata =
3376 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3377 .unwrap();
3378
3379 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3380
3381 assert_eq!(
3382 desered.unwrap_err().to_string(),
3383 "data did not match any variant of untagged enum TableMetadataEnum"
3384 )
3385 }
3386
3387 #[test]
3388 fn test_table_metadata_v2_missing_partition_specs() {
3389 let metadata =
3390 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3391 .unwrap();
3392
3393 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3394
3395 assert_eq!(
3396 desered.unwrap_err().to_string(),
3397 "data did not match any variant of untagged enum TableMetadataEnum"
3398 )
3399 }
3400
3401 #[test]
3402 fn test_table_metadata_v2_missing_last_partition_id() {
3403 let metadata = fs::read_to_string(
3404 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3405 )
3406 .unwrap();
3407
3408 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3409
3410 assert_eq!(
3411 desered.unwrap_err().to_string(),
3412 "data did not match any variant of untagged enum TableMetadataEnum"
3413 )
3414 }
3415
3416 #[test]
3417 fn test_table_metadata_v2_missing_schemas() {
3418 let metadata =
3419 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3420 .unwrap();
3421
3422 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3423
3424 assert_eq!(
3425 desered.unwrap_err().to_string(),
3426 "data did not match any variant of untagged enum TableMetadataEnum"
3427 )
3428 }
3429
3430 #[test]
3431 fn test_table_metadata_v2_unsupported_version() {
3432 let metadata =
3433 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3434 .unwrap();
3435
3436 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3437
3438 assert_eq!(
3439 desered.unwrap_err().to_string(),
3440 "data did not match any variant of untagged enum TableMetadataEnum"
3441 )
3442 }
3443
3444 #[test]
3445 fn test_order_of_format_version() {
3446 assert!(FormatVersion::V1 < FormatVersion::V2);
3447 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3448 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3449 }
3450
3451 #[test]
3452 fn test_default_partition_spec() {
3453 let default_spec_id = 1234;
3454 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3455 let partition_spec = PartitionSpec::unpartition_spec();
3456 table_meta_data.default_spec = partition_spec.clone().into();
3457 table_meta_data
3458 .partition_specs
3459 .insert(default_spec_id, Arc::new(partition_spec));
3460
3461 assert_eq!(
3462 (*table_meta_data.default_partition_spec().clone()).clone(),
3463 (*table_meta_data
3464 .partition_spec_by_id(default_spec_id)
3465 .unwrap()
3466 .clone())
3467 .clone()
3468 );
3469 }
3470 #[test]
3471 fn test_default_sort_order() {
3472 let default_sort_order_id = 1234;
3473 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3474 table_meta_data.default_sort_order_id = default_sort_order_id;
3475 table_meta_data
3476 .sort_orders
3477 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3478
3479 assert_eq!(
3480 table_meta_data.default_sort_order(),
3481 table_meta_data
3482 .sort_orders
3483 .get(&default_sort_order_id)
3484 .unwrap()
3485 )
3486 }
3487
3488 #[test]
3489 fn test_table_metadata_builder_from_table_creation() {
3490 let table_creation = TableCreation::builder()
3491 .location("s3://db/table".to_string())
3492 .name("table".to_string())
3493 .properties(HashMap::new())
3494 .schema(Schema::builder().build().unwrap())
3495 .build();
3496 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3497 .unwrap()
3498 .build()
3499 .unwrap()
3500 .metadata;
3501 assert_eq!(table_metadata.location, "s3://db/table");
3502 assert_eq!(table_metadata.schemas.len(), 1);
3503 assert_eq!(
3504 table_metadata
3505 .schemas
3506 .get(&0)
3507 .unwrap()
3508 .as_struct()
3509 .fields()
3510 .len(),
3511 0
3512 );
3513 assert_eq!(table_metadata.properties.len(), 0);
3514 assert_eq!(
3515 table_metadata.partition_specs,
3516 HashMap::from([(
3517 0,
3518 Arc::new(
3519 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3520 .with_spec_id(0)
3521 .build()
3522 .unwrap()
3523 )
3524 )])
3525 );
3526 assert_eq!(
3527 table_metadata.sort_orders,
3528 HashMap::from([(
3529 0,
3530 Arc::new(SortOrder {
3531 order_id: 0,
3532 fields: vec![]
3533 })
3534 )])
3535 );
3536 }
3537
3538 #[tokio::test]
3539 async fn test_table_metadata_read_write() {
3540 let temp_dir = TempDir::new().unwrap();
3542 let temp_path = temp_dir.path().to_str().unwrap();
3543
3544 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3546
3547 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3549
3550 let metadata_location = format!("{temp_path}/metadata.json");
3552
3553 original_metadata
3555 .write_to(&file_io, &metadata_location)
3556 .await
3557 .unwrap();
3558
3559 assert!(fs::metadata(&metadata_location).is_ok());
3561
3562 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3564 .await
3565 .unwrap();
3566
3567 assert_eq!(read_metadata, original_metadata);
3569 }
3570
3571 #[tokio::test]
3572 async fn test_table_metadata_read_compressed() {
3573 let temp_dir = TempDir::new().unwrap();
3574 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3575
3576 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3577 let json = serde_json::to_string(&original_metadata).unwrap();
3578
3579 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3580 encoder.write_all(json.as_bytes()).unwrap();
3581 std::fs::write(&metadata_location, encoder.finish().unwrap())
3582 .expect("failed to write metadata");
3583
3584 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3586 let metadata_location = metadata_location.to_str().unwrap();
3587 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3588 .await
3589 .unwrap();
3590
3591 assert_eq!(read_metadata, original_metadata);
3593 }
3594
3595 #[tokio::test]
3596 async fn test_table_metadata_read_nonexistent_file() {
3597 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3599
3600 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3602
3603 assert!(result.is_err());
3605 }
3606
3607 #[test]
3608 fn test_partition_name_exists() {
3609 let schema = Schema::builder()
3610 .with_fields(vec![
3611 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3612 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3613 .into(),
3614 ])
3615 .build()
3616 .unwrap();
3617
3618 let spec1 = PartitionSpec::builder(schema.clone())
3619 .with_spec_id(1)
3620 .add_partition_field("data", "data_partition", Transform::Identity)
3621 .unwrap()
3622 .build()
3623 .unwrap();
3624
3625 let spec2 = PartitionSpec::builder(schema.clone())
3626 .with_spec_id(2)
3627 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3628 .unwrap()
3629 .build()
3630 .unwrap();
3631
3632 let metadata = TableMetadataBuilder::new(
3634 schema,
3635 spec1.clone().into_unbound(),
3636 SortOrder::unsorted_order(),
3637 "s3://test/location".to_string(),
3638 FormatVersion::V2,
3639 HashMap::new(),
3640 )
3641 .unwrap()
3642 .add_partition_spec(spec2.into_unbound())
3643 .unwrap()
3644 .build()
3645 .unwrap()
3646 .metadata;
3647
3648 assert!(metadata.partition_name_exists("data_partition"));
3649 assert!(metadata.partition_name_exists("partition_bucket"));
3650
3651 assert!(!metadata.partition_name_exists("nonexistent_field"));
3652 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3654 }
3655
3656 #[test]
3657 fn test_partition_name_exists_empty_specs() {
3658 let schema = Schema::builder()
3660 .with_fields(vec![
3661 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3662 ])
3663 .build()
3664 .unwrap();
3665
3666 let metadata = TableMetadataBuilder::new(
3667 schema,
3668 PartitionSpec::unpartition_spec().into_unbound(),
3669 SortOrder::unsorted_order(),
3670 "s3://test/location".to_string(),
3671 FormatVersion::V2,
3672 HashMap::new(),
3673 )
3674 .unwrap()
3675 .build()
3676 .unwrap()
3677 .metadata;
3678
3679 assert!(!metadata.partition_name_exists("any_field"));
3680 assert!(!metadata.partition_name_exists("data"));
3681 }
3682
3683 #[test]
3684 fn test_name_exists_in_any_schema() {
3685 let schema1 = Schema::builder()
3687 .with_schema_id(1)
3688 .with_fields(vec![
3689 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3690 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3691 ])
3692 .build()
3693 .unwrap();
3694
3695 let schema2 = Schema::builder()
3696 .with_schema_id(2)
3697 .with_fields(vec![
3698 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3699 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3700 ])
3701 .build()
3702 .unwrap();
3703
3704 let metadata = TableMetadataBuilder::new(
3705 schema1,
3706 PartitionSpec::unpartition_spec().into_unbound(),
3707 SortOrder::unsorted_order(),
3708 "s3://test/location".to_string(),
3709 FormatVersion::V2,
3710 HashMap::new(),
3711 )
3712 .unwrap()
3713 .add_current_schema(schema2)
3714 .unwrap()
3715 .build()
3716 .unwrap()
3717 .metadata;
3718
3719 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3724 assert!(!metadata.name_exists_in_any_schema("field4"));
3725 assert!(!metadata.name_exists_in_any_schema(""));
3726 }
3727
3728 #[test]
3729 fn test_name_exists_in_any_schema_empty_schemas() {
3730 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3731
3732 let metadata = TableMetadataBuilder::new(
3733 schema,
3734 PartitionSpec::unpartition_spec().into_unbound(),
3735 SortOrder::unsorted_order(),
3736 "s3://test/location".to_string(),
3737 FormatVersion::V2,
3738 HashMap::new(),
3739 )
3740 .unwrap()
3741 .build()
3742 .unwrap()
3743 .metadata;
3744
3745 assert!(!metadata.name_exists_in_any_schema("any_field"));
3746 }
3747
3748 #[test]
3749 fn test_helper_methods_multi_version_scenario() {
3750 let initial_schema = Schema::builder()
3752 .with_fields(vec![
3753 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3754 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3755 NestedField::required(
3756 3,
3757 "deprecated_field",
3758 Type::Primitive(PrimitiveType::String),
3759 )
3760 .into(),
3761 ])
3762 .build()
3763 .unwrap();
3764
3765 let metadata = TableMetadataBuilder::new(
3766 initial_schema,
3767 PartitionSpec::unpartition_spec().into_unbound(),
3768 SortOrder::unsorted_order(),
3769 "s3://test/location".to_string(),
3770 FormatVersion::V2,
3771 HashMap::new(),
3772 )
3773 .unwrap();
3774
3775 let evolved_schema = Schema::builder()
3776 .with_fields(vec![
3777 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3778 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3779 NestedField::required(
3780 3,
3781 "deprecated_field",
3782 Type::Primitive(PrimitiveType::String),
3783 )
3784 .into(),
3785 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3786 .into(),
3787 ])
3788 .build()
3789 .unwrap();
3790
3791 let _final_schema = Schema::builder()
3793 .with_fields(vec![
3794 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3795 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3796 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3797 .into(),
3798 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3799 .into(),
3800 ])
3801 .build()
3802 .unwrap();
3803
3804 let final_metadata = metadata
3805 .add_current_schema(evolved_schema)
3806 .unwrap()
3807 .build()
3808 .unwrap()
3809 .metadata;
3810
3811 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3818 }
3819
3820 #[test]
3821 fn test_invalid_sort_order_id_zero_with_fields() {
3822 let metadata = r#"
3823 {
3824 "format-version": 2,
3825 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3826 "location": "s3://bucket/test/location",
3827 "last-sequence-number": 111,
3828 "last-updated-ms": 1600000000000,
3829 "last-column-id": 3,
3830 "current-schema-id": 1,
3831 "schemas": [
3832 {
3833 "type": "struct",
3834 "schema-id": 1,
3835 "fields": [
3836 {"id": 1, "name": "x", "required": true, "type": "long"},
3837 {"id": 2, "name": "y", "required": true, "type": "long"}
3838 ]
3839 }
3840 ],
3841 "default-spec-id": 0,
3842 "partition-specs": [{"spec-id": 0, "fields": []}],
3843 "last-partition-id": 999,
3844 "default-sort-order-id": 0,
3845 "sort-orders": [
3846 {
3847 "order-id": 0,
3848 "fields": [
3849 {
3850 "transform": "identity",
3851 "source-id": 1,
3852 "direction": "asc",
3853 "null-order": "nulls-first"
3854 }
3855 ]
3856 }
3857 ],
3858 "properties": {},
3859 "current-snapshot-id": -1,
3860 "snapshots": []
3861 }
3862 "#;
3863
3864 let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3865
3866 assert!(
3868 result.is_err(),
3869 "Parsing should fail for sort order ID 0 with fields"
3870 );
3871 }
3872
3873 #[test]
3874 fn test_table_properties_with_defaults() {
3875 use crate::spec::TableProperties;
3876
3877 let schema = Schema::builder()
3878 .with_fields(vec![
3879 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3880 ])
3881 .build()
3882 .unwrap();
3883
3884 let metadata = TableMetadataBuilder::new(
3885 schema,
3886 PartitionSpec::unpartition_spec().into_unbound(),
3887 SortOrder::unsorted_order(),
3888 "s3://test/location".to_string(),
3889 FormatVersion::V2,
3890 HashMap::new(),
3891 )
3892 .unwrap()
3893 .build()
3894 .unwrap()
3895 .metadata;
3896
3897 let props = metadata.table_properties().unwrap();
3898
3899 assert_eq!(
3900 props.commit_num_retries,
3901 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
3902 );
3903 assert_eq!(
3904 props.write_target_file_size_bytes,
3905 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
3906 );
3907 }
3908
3909 #[test]
3910 fn test_table_properties_with_custom_values() {
3911 use crate::spec::TableProperties;
3912
3913 let schema = Schema::builder()
3914 .with_fields(vec![
3915 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3916 ])
3917 .build()
3918 .unwrap();
3919
3920 let properties = HashMap::from([
3921 (
3922 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
3923 "10".to_string(),
3924 ),
3925 (
3926 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
3927 "1024".to_string(),
3928 ),
3929 ]);
3930
3931 let metadata = TableMetadataBuilder::new(
3932 schema,
3933 PartitionSpec::unpartition_spec().into_unbound(),
3934 SortOrder::unsorted_order(),
3935 "s3://test/location".to_string(),
3936 FormatVersion::V2,
3937 properties,
3938 )
3939 .unwrap()
3940 .build()
3941 .unwrap()
3942 .metadata;
3943
3944 let props = metadata.table_properties().unwrap();
3945
3946 assert_eq!(props.commit_num_retries, 10);
3947 assert_eq!(props.write_target_file_size_bytes, 1024);
3948 }
3949
3950 #[test]
3951 fn test_table_properties_with_invalid_value() {
3952 let schema = Schema::builder()
3953 .with_fields(vec![
3954 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3955 ])
3956 .build()
3957 .unwrap();
3958
3959 let properties = HashMap::from([(
3960 "commit.retry.num-retries".to_string(),
3961 "not_a_number".to_string(),
3962 )]);
3963
3964 let metadata = TableMetadataBuilder::new(
3965 schema,
3966 PartitionSpec::unpartition_spec().into_unbound(),
3967 SortOrder::unsorted_order(),
3968 "s3://test/location".to_string(),
3969 FormatVersion::V2,
3970 properties,
3971 )
3972 .unwrap()
3973 .build()
3974 .unwrap()
3975 .metadata;
3976
3977 let err = metadata.table_properties().unwrap_err();
3978 assert_eq!(err.kind(), ErrorKind::DataInvalid);
3979 assert!(err.message().contains("Invalid table properties"));
3980 }
3981}