1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40};
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52pub const INITIAL_ROW_ID: u64 = 0;
54pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61pub struct TableMetadata {
66 pub(crate) format_version: FormatVersion,
68 pub(crate) table_uuid: Uuid,
70 pub(crate) location: String,
72 pub(crate) last_sequence_number: i64,
74 pub(crate) last_updated_ms: i64,
76 pub(crate) last_column_id: i32,
78 pub(crate) schemas: HashMap<i32, SchemaRef>,
80 pub(crate) current_schema_id: i32,
82 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84 pub(crate) default_spec: PartitionSpecRef,
86 pub(crate) default_partition_type: StructType,
88 pub(crate) last_partition_id: i32,
90 pub(crate) properties: HashMap<String, String>,
94 pub(crate) current_snapshot_id: Option<i64>,
97 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102 pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110 pub(crate) metadata_log: Vec<MetadataLog>,
117
118 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120 pub(crate) default_sort_order_id: i64,
124 pub(crate) refs: HashMap<String, SnapshotReference>,
129 pub(crate) statistics: HashMap<i64, StatisticsFile>,
131 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135 pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140 #[must_use]
147 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148 TableMetadataBuilder::new_from_metadata(self, current_file_location)
149 }
150
151 #[inline]
153 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154 self.partition_specs
155 .values()
156 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157 }
158
159 #[inline]
161 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162 self.schemas
163 .values()
164 .any(|schema| schema.field_by_name(name).is_some())
165 }
166
167 #[inline]
169 pub fn format_version(&self) -> FormatVersion {
170 self.format_version
171 }
172
173 #[inline]
175 pub fn uuid(&self) -> Uuid {
176 self.table_uuid
177 }
178
179 #[inline]
181 pub fn location(&self) -> &str {
182 self.location.as_str()
183 }
184
185 #[inline]
187 pub fn last_sequence_number(&self) -> i64 {
188 self.last_sequence_number
189 }
190
191 #[inline]
196 pub fn next_sequence_number(&self) -> i64 {
197 match self.format_version {
198 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199 _ => self.last_sequence_number + 1,
200 }
201 }
202
203 #[inline]
205 pub fn last_column_id(&self) -> i32 {
206 self.last_column_id
207 }
208
209 #[inline]
211 pub fn last_partition_id(&self) -> i32 {
212 self.last_partition_id
213 }
214
215 #[inline]
217 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218 timestamp_ms_to_utc(self.last_updated_ms)
219 }
220
221 #[inline]
223 pub fn last_updated_ms(&self) -> i64 {
224 self.last_updated_ms
225 }
226
227 #[inline]
229 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230 self.schemas.values()
231 }
232
233 #[inline]
235 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236 self.schemas.get(&schema_id)
237 }
238
239 #[inline]
241 pub fn current_schema(&self) -> &SchemaRef {
242 self.schema_by_id(self.current_schema_id)
243 .expect("Current schema id set, but not found in table metadata")
244 }
245
246 #[inline]
248 pub fn current_schema_id(&self) -> SchemaId {
249 self.current_schema_id
250 }
251
252 #[inline]
254 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255 self.partition_specs.values()
256 }
257
258 #[inline]
260 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261 self.partition_specs.get(&spec_id)
262 }
263
264 #[inline]
266 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267 &self.default_spec
268 }
269
270 #[inline]
272 pub fn default_partition_type(&self) -> &StructType {
273 &self.default_partition_type
274 }
275
276 #[inline]
277 pub fn default_partition_spec_id(&self) -> i32 {
279 self.default_spec.spec_id()
280 }
281
282 #[inline]
284 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285 self.snapshots.values()
286 }
287
288 #[inline]
290 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291 self.snapshots.get(&snapshot_id)
292 }
293
294 #[inline]
296 pub fn history(&self) -> &[SnapshotLog] {
297 &self.snapshot_log
298 }
299
300 #[inline]
302 pub fn metadata_log(&self) -> &[MetadataLog] {
303 &self.metadata_log
304 }
305
306 #[inline]
308 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309 self.current_snapshot_id.map(|s| {
310 self.snapshot_by_id(s)
311 .expect("Current snapshot id has been set, but doesn't exist in metadata")
312 })
313 }
314
315 #[inline]
317 pub fn current_snapshot_id(&self) -> Option<i64> {
318 self.current_snapshot_id
319 }
320
321 #[inline]
324 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325 self.refs.get(ref_name).map(|r| {
326 self.snapshot_by_id(r.snapshot_id)
327 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328 })
329 }
330
331 #[inline]
333 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334 self.sort_orders.values()
335 }
336
337 #[inline]
339 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340 self.sort_orders.get(&sort_order_id)
341 }
342
343 #[inline]
345 pub fn default_sort_order(&self) -> &SortOrderRef {
346 self.sort_orders
347 .get(&self.default_sort_order_id)
348 .expect("Default order id has been set, but not found in table metadata!")
349 }
350
351 #[inline]
353 pub fn default_sort_order_id(&self) -> i64 {
354 self.default_sort_order_id
355 }
356
357 #[inline]
359 pub fn properties(&self) -> &HashMap<String, String> {
360 &self.properties
361 }
362
363 #[inline]
365 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
366 self.statistics.values()
367 }
368
369 #[inline]
371 pub fn partition_statistics_iter(
372 &self,
373 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
374 self.partition_statistics.values()
375 }
376
377 #[inline]
379 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
380 self.statistics.get(&snapshot_id)
381 }
382
383 #[inline]
385 pub fn partition_statistics_for_snapshot(
386 &self,
387 snapshot_id: i64,
388 ) -> Option<&PartitionStatisticsFile> {
389 self.partition_statistics.get(&snapshot_id)
390 }
391
392 fn construct_refs(&mut self) {
393 if let Some(current_snapshot_id) = self.current_snapshot_id {
394 if !self.refs.contains_key(MAIN_BRANCH) {
395 self.refs
396 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
397 snapshot_id: current_snapshot_id,
398 retention: SnapshotRetention::Branch {
399 min_snapshots_to_keep: None,
400 max_snapshot_age_ms: None,
401 max_ref_age_ms: None,
402 },
403 });
404 }
405 }
406 }
407
408 #[inline]
410 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
411 self.encryption_keys.values()
412 }
413
414 #[inline]
416 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
417 self.encryption_keys.get(key_id)
418 }
419
420 #[inline]
422 pub fn next_row_id(&self) -> u64 {
423 self.next_row_id
424 }
425
426 pub async fn read_from(
428 file_io: &FileIO,
429 metadata_location: impl AsRef<str>,
430 ) -> Result<TableMetadata> {
431 let metadata_location = metadata_location.as_ref();
432 let input_file = file_io.new_input(metadata_location)?;
433 let metadata_content = input_file.read().await?;
434
435 let metadata = if metadata_content.len() > 2
437 && metadata_content[0] == 0x1F
438 && metadata_content[1] == 0x8B
439 {
440 let mut decoder = GzDecoder::new(metadata_content.as_ref());
441 let mut decompressed_data = Vec::new();
442 decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443 Error::new(
444 ErrorKind::DataInvalid,
445 "Trying to read compressed metadata file",
446 )
447 .with_context("file_path", metadata_location)
448 .with_source(e)
449 })?;
450 serde_json::from_slice(&decompressed_data)?
451 } else {
452 serde_json::from_slice(&metadata_content)?
453 };
454
455 Ok(metadata)
456 }
457
458 pub async fn write_to(
460 &self,
461 file_io: &FileIO,
462 metadata_location: impl AsRef<str>,
463 ) -> Result<()> {
464 file_io
465 .new_output(metadata_location)?
466 .write(serde_json::to_vec(self)?.into())
467 .await
468 }
469
470 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
478 self.validate_current_schema()?;
479 self.normalize_current_snapshot()?;
480 self.construct_refs();
481 self.validate_refs()?;
482 self.validate_chronological_snapshot_logs()?;
483 self.validate_chronological_metadata_logs()?;
484 self.location = self.location.trim_end_matches('/').to_string();
486 self.validate_snapshot_sequence_number()?;
487 self.try_normalize_partition_spec()?;
488 self.try_normalize_sort_order()?;
489 Ok(self)
490 }
491
492 fn try_normalize_partition_spec(&mut self) -> Result<()> {
494 if self
495 .partition_spec_by_id(self.default_spec.spec_id())
496 .is_none()
497 {
498 self.partition_specs.insert(
499 self.default_spec.spec_id(),
500 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
501 );
502 }
503
504 Ok(())
505 }
506
507 fn try_normalize_sort_order(&mut self) -> Result<()> {
509 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
510 return Ok(());
511 }
512
513 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
514 return Err(Error::new(
515 ErrorKind::DataInvalid,
516 format!(
517 "No sort order exists with the default sort order id {}.",
518 self.default_sort_order_id
519 ),
520 ));
521 }
522
523 let sort_order = SortOrder::unsorted_order();
524 self.sort_orders
525 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
526 Ok(())
527 }
528
529 fn validate_current_schema(&self) -> Result<()> {
531 if self.schema_by_id(self.current_schema_id).is_none() {
532 return Err(Error::new(
533 ErrorKind::DataInvalid,
534 format!(
535 "No schema exists with the current schema id {}.",
536 self.current_schema_id
537 ),
538 ));
539 }
540 Ok(())
541 }
542
543 fn normalize_current_snapshot(&mut self) -> Result<()> {
545 if let Some(current_snapshot_id) = self.current_snapshot_id {
546 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
547 self.current_snapshot_id = None;
548 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
549 return Err(Error::new(
550 ErrorKind::DataInvalid,
551 format!(
552 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
553 ),
554 ));
555 }
556 }
557 Ok(())
558 }
559
560 fn validate_refs(&self) -> Result<()> {
562 for (name, snapshot_ref) in self.refs.iter() {
563 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
564 return Err(Error::new(
565 ErrorKind::DataInvalid,
566 format!(
567 "Snapshot for reference {name} does not exist in the existing snapshots list"
568 ),
569 ));
570 }
571 }
572
573 let main_ref = self.refs.get(MAIN_BRANCH);
574 if self.current_snapshot_id.is_some() {
575 if let Some(main_ref) = main_ref {
576 if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() {
577 return Err(Error::new(
578 ErrorKind::DataInvalid,
579 format!(
580 "Current snapshot id does not match main branch ({:?} != {:?})",
581 self.current_snapshot_id.unwrap_or_default(),
582 main_ref.snapshot_id
583 ),
584 ));
585 }
586 }
587 } else if main_ref.is_some() {
588 return Err(Error::new(
589 ErrorKind::DataInvalid,
590 "Current snapshot is not set, but main branch exists",
591 ));
592 }
593
594 Ok(())
595 }
596
597 fn validate_snapshot_sequence_number(&self) -> Result<()> {
599 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
600 return Err(Error::new(
601 ErrorKind::DataInvalid,
602 format!(
603 "Last sequence number must be 0 in v1. Found {}",
604 self.last_sequence_number
605 ),
606 ));
607 }
608
609 if self.format_version >= FormatVersion::V2 {
610 if let Some(snapshot) = self
611 .snapshots
612 .values()
613 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
614 {
615 return Err(Error::new(
616 ErrorKind::DataInvalid,
617 format!(
618 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
619 snapshot.snapshot_id(),
620 snapshot.sequence_number(),
621 self.last_sequence_number
622 ),
623 ));
624 }
625 }
626
627 Ok(())
628 }
629
630 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
632 for window in self.snapshot_log.windows(2) {
633 let (prev, curr) = (&window[0], &window[1]);
634 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
637 return Err(Error::new(
638 ErrorKind::DataInvalid,
639 "Expected sorted snapshot log entries",
640 ));
641 }
642 }
643
644 if let Some(last) = self.snapshot_log.last() {
645 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
648 return Err(Error::new(
649 ErrorKind::DataInvalid,
650 format!(
651 "Invalid update timestamp {}: before last snapshot log entry at {}",
652 self.last_updated_ms, last.timestamp_ms
653 ),
654 ));
655 }
656 }
657 Ok(())
658 }
659
660 fn validate_chronological_metadata_logs(&self) -> Result<()> {
661 for window in self.metadata_log.windows(2) {
662 let (prev, curr) = (&window[0], &window[1]);
663 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
666 return Err(Error::new(
667 ErrorKind::DataInvalid,
668 "Expected sorted metadata log entries",
669 ));
670 }
671 }
672
673 if let Some(last) = self.metadata_log.last() {
674 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
677 return Err(Error::new(
678 ErrorKind::DataInvalid,
679 format!(
680 "Invalid update timestamp {}: before last metadata log entry at {}",
681 self.last_updated_ms, last.timestamp_ms
682 ),
683 ));
684 }
685 }
686
687 Ok(())
688 }
689}
690
691pub(super) mod _serde {
692 use std::borrow::BorrowMut;
693 use std::collections::HashMap;
698 use std::sync::Arc;
703
704 use serde::{Deserialize, Serialize};
705 use uuid::Uuid;
706
707 use super::{
708 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
709 TableMetadata,
710 };
711 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
712 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
713 use crate::spec::{
714 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
715 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
716 SortOrder, StatisticsFile,
717 };
718 use crate::{Error, ErrorKind};
719
720 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
721 #[serde(untagged)]
722 pub(super) enum TableMetadataEnum {
723 V3(TableMetadataV3),
724 V2(TableMetadataV2),
725 V1(TableMetadataV1),
726 }
727
728 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
729 #[serde(rename_all = "kebab-case")]
730 pub(super) struct TableMetadataV3 {
732 pub format_version: VersionNumber<3>,
733 #[serde(flatten)]
734 pub shared: TableMetadataV2V3Shared,
735 pub next_row_id: u64,
736 #[serde(skip_serializing_if = "Option::is_none")]
737 pub encryption_keys: Option<Vec<EncryptedKey>>,
738 #[serde(skip_serializing_if = "Option::is_none")]
739 pub snapshots: Option<Vec<SnapshotV3>>,
740 }
741
742 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
743 #[serde(rename_all = "kebab-case")]
744 pub(super) struct TableMetadataV2V3Shared {
746 pub table_uuid: Uuid,
747 pub location: String,
748 pub last_sequence_number: i64,
749 pub last_updated_ms: i64,
750 pub last_column_id: i32,
751 pub schemas: Vec<SchemaV2>,
752 pub current_schema_id: i32,
753 pub partition_specs: Vec<PartitionSpec>,
754 pub default_spec_id: i32,
755 pub last_partition_id: i32,
756 #[serde(skip_serializing_if = "Option::is_none")]
757 pub properties: Option<HashMap<String, String>>,
758 #[serde(skip_serializing_if = "Option::is_none")]
759 pub current_snapshot_id: Option<i64>,
760 #[serde(skip_serializing_if = "Option::is_none")]
761 pub snapshot_log: Option<Vec<SnapshotLog>>,
762 #[serde(skip_serializing_if = "Option::is_none")]
763 pub metadata_log: Option<Vec<MetadataLog>>,
764 pub sort_orders: Vec<SortOrder>,
765 pub default_sort_order_id: i64,
766 #[serde(skip_serializing_if = "Option::is_none")]
767 pub refs: Option<HashMap<String, SnapshotReference>>,
768 #[serde(default, skip_serializing_if = "Vec::is_empty")]
769 pub statistics: Vec<StatisticsFile>,
770 #[serde(default, skip_serializing_if = "Vec::is_empty")]
771 pub partition_statistics: Vec<PartitionStatisticsFile>,
772 }
773
774 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
775 #[serde(rename_all = "kebab-case")]
776 pub(super) struct TableMetadataV2 {
778 pub format_version: VersionNumber<2>,
779 #[serde(flatten)]
780 pub shared: TableMetadataV2V3Shared,
781 #[serde(skip_serializing_if = "Option::is_none")]
782 pub snapshots: Option<Vec<SnapshotV2>>,
783 }
784
785 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
786 #[serde(rename_all = "kebab-case")]
787 pub(super) struct TableMetadataV1 {
789 pub format_version: VersionNumber<1>,
790 #[serde(skip_serializing_if = "Option::is_none")]
791 pub table_uuid: Option<Uuid>,
792 pub location: String,
793 pub last_updated_ms: i64,
794 pub last_column_id: i32,
795 pub schema: Option<SchemaV1>,
797 #[serde(skip_serializing_if = "Option::is_none")]
798 pub schemas: Option<Vec<SchemaV1>>,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 pub current_schema_id: Option<i32>,
801 pub partition_spec: Option<Vec<PartitionField>>,
803 #[serde(skip_serializing_if = "Option::is_none")]
804 pub partition_specs: Option<Vec<PartitionSpec>>,
805 #[serde(skip_serializing_if = "Option::is_none")]
806 pub default_spec_id: Option<i32>,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 pub last_partition_id: Option<i32>,
809 #[serde(skip_serializing_if = "Option::is_none")]
810 pub properties: Option<HashMap<String, String>>,
811 #[serde(skip_serializing_if = "Option::is_none")]
812 pub current_snapshot_id: Option<i64>,
813 #[serde(skip_serializing_if = "Option::is_none")]
814 pub snapshots: Option<Vec<SnapshotV1>>,
815 #[serde(skip_serializing_if = "Option::is_none")]
816 pub snapshot_log: Option<Vec<SnapshotLog>>,
817 #[serde(skip_serializing_if = "Option::is_none")]
818 pub metadata_log: Option<Vec<MetadataLog>>,
819 pub sort_orders: Option<Vec<SortOrder>>,
820 pub default_sort_order_id: Option<i64>,
821 #[serde(default, skip_serializing_if = "Vec::is_empty")]
822 pub statistics: Vec<StatisticsFile>,
823 #[serde(default, skip_serializing_if = "Vec::is_empty")]
824 pub partition_statistics: Vec<PartitionStatisticsFile>,
825 }
826
827 #[derive(Debug, PartialEq, Eq)]
829 pub(crate) struct VersionNumber<const V: u8>;
830
831 impl Serialize for TableMetadata {
832 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
833 where S: serde::Serializer {
834 let table_metadata_enum: TableMetadataEnum =
836 self.clone().try_into().map_err(serde::ser::Error::custom)?;
837
838 table_metadata_enum.serialize(serializer)
839 }
840 }
841
842 impl<const V: u8> Serialize for VersionNumber<V> {
843 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
844 where S: serde::Serializer {
845 serializer.serialize_u8(V)
846 }
847 }
848
849 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
850 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
851 where D: serde::Deserializer<'de> {
852 let value = u8::deserialize(deserializer)?;
853 if value == V {
854 Ok(VersionNumber::<V>)
855 } else {
856 Err(serde::de::Error::custom("Invalid Version"))
857 }
858 }
859 }
860
861 impl TryFrom<TableMetadataEnum> for TableMetadata {
862 type Error = Error;
863 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
864 match value {
865 TableMetadataEnum::V3(value) => value.try_into(),
866 TableMetadataEnum::V2(value) => value.try_into(),
867 TableMetadataEnum::V1(value) => value.try_into(),
868 }
869 }
870 }
871
872 impl TryFrom<TableMetadata> for TableMetadataEnum {
873 type Error = Error;
874 fn try_from(value: TableMetadata) -> Result<Self, Error> {
875 Ok(match value.format_version {
876 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
877 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
878 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
879 })
880 }
881 }
882
883 impl TryFrom<TableMetadataV3> for TableMetadata {
884 type Error = Error;
885 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
886 let TableMetadataV3 {
887 format_version: _,
888 shared: value,
889 next_row_id,
890 encryption_keys,
891 snapshots,
892 } = value;
893 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
894 None
895 } else {
896 value.current_snapshot_id
897 };
898 let schemas = HashMap::from_iter(
899 value
900 .schemas
901 .into_iter()
902 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
903 .collect::<Result<Vec<_>, Error>>()?,
904 );
905
906 let current_schema: &SchemaRef =
907 schemas.get(&value.current_schema_id).ok_or_else(|| {
908 Error::new(
909 ErrorKind::DataInvalid,
910 format!(
911 "No schema exists with the current schema id {}.",
912 value.current_schema_id
913 ),
914 )
915 })?;
916 let partition_specs = HashMap::from_iter(
917 value
918 .partition_specs
919 .into_iter()
920 .map(|x| (x.spec_id(), Arc::new(x))),
921 );
922 let default_spec_id = value.default_spec_id;
923 let default_spec: PartitionSpecRef = partition_specs
924 .get(&value.default_spec_id)
925 .map(|spec| (**spec).clone())
926 .or_else(|| {
927 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
928 .then(PartitionSpec::unpartition_spec)
929 })
930 .ok_or_else(|| {
931 Error::new(
932 ErrorKind::DataInvalid,
933 format!("Default partition spec {default_spec_id} not found"),
934 )
935 })?
936 .into();
937 let default_partition_type = default_spec.partition_type(current_schema)?;
938
939 let mut metadata = TableMetadata {
940 format_version: FormatVersion::V3,
941 table_uuid: value.table_uuid,
942 location: value.location,
943 last_sequence_number: value.last_sequence_number,
944 last_updated_ms: value.last_updated_ms,
945 last_column_id: value.last_column_id,
946 current_schema_id: value.current_schema_id,
947 schemas,
948 partition_specs,
949 default_partition_type,
950 default_spec,
951 last_partition_id: value.last_partition_id,
952 properties: value.properties.unwrap_or_default(),
953 current_snapshot_id,
954 snapshots: snapshots
955 .map(|snapshots| {
956 HashMap::from_iter(
957 snapshots
958 .into_iter()
959 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
960 )
961 })
962 .unwrap_or_default(),
963 snapshot_log: value.snapshot_log.unwrap_or_default(),
964 metadata_log: value.metadata_log.unwrap_or_default(),
965 sort_orders: HashMap::from_iter(
966 value
967 .sort_orders
968 .into_iter()
969 .map(|x| (x.order_id, Arc::new(x))),
970 ),
971 default_sort_order_id: value.default_sort_order_id,
972 refs: value.refs.unwrap_or_else(|| {
973 if let Some(snapshot_id) = current_snapshot_id {
974 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
975 snapshot_id,
976 retention: SnapshotRetention::Branch {
977 min_snapshots_to_keep: None,
978 max_snapshot_age_ms: None,
979 max_ref_age_ms: None,
980 },
981 })])
982 } else {
983 HashMap::new()
984 }
985 }),
986 statistics: index_statistics(value.statistics),
987 partition_statistics: index_partition_statistics(value.partition_statistics),
988 encryption_keys: encryption_keys
989 .map(|keys| {
990 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
991 })
992 .unwrap_or_default(),
993 next_row_id,
994 };
995
996 metadata.borrow_mut().try_normalize()?;
997 Ok(metadata)
998 }
999 }
1000
1001 impl TryFrom<TableMetadataV2> for TableMetadata {
1002 type Error = Error;
1003 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1004 let snapshots = value.snapshots;
1005 let value = value.shared;
1006 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1007 None
1008 } else {
1009 value.current_snapshot_id
1010 };
1011 let schemas = HashMap::from_iter(
1012 value
1013 .schemas
1014 .into_iter()
1015 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1016 .collect::<Result<Vec<_>, Error>>()?,
1017 );
1018
1019 let current_schema: &SchemaRef =
1020 schemas.get(&value.current_schema_id).ok_or_else(|| {
1021 Error::new(
1022 ErrorKind::DataInvalid,
1023 format!(
1024 "No schema exists with the current schema id {}.",
1025 value.current_schema_id
1026 ),
1027 )
1028 })?;
1029 let partition_specs = HashMap::from_iter(
1030 value
1031 .partition_specs
1032 .into_iter()
1033 .map(|x| (x.spec_id(), Arc::new(x))),
1034 );
1035 let default_spec_id = value.default_spec_id;
1036 let default_spec: PartitionSpecRef = partition_specs
1037 .get(&value.default_spec_id)
1038 .map(|spec| (**spec).clone())
1039 .or_else(|| {
1040 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1041 .then(PartitionSpec::unpartition_spec)
1042 })
1043 .ok_or_else(|| {
1044 Error::new(
1045 ErrorKind::DataInvalid,
1046 format!("Default partition spec {default_spec_id} not found"),
1047 )
1048 })?
1049 .into();
1050 let default_partition_type = default_spec.partition_type(current_schema)?;
1051
1052 let mut metadata = TableMetadata {
1053 format_version: FormatVersion::V2,
1054 table_uuid: value.table_uuid,
1055 location: value.location,
1056 last_sequence_number: value.last_sequence_number,
1057 last_updated_ms: value.last_updated_ms,
1058 last_column_id: value.last_column_id,
1059 current_schema_id: value.current_schema_id,
1060 schemas,
1061 partition_specs,
1062 default_partition_type,
1063 default_spec,
1064 last_partition_id: value.last_partition_id,
1065 properties: value.properties.unwrap_or_default(),
1066 current_snapshot_id,
1067 snapshots: snapshots
1068 .map(|snapshots| {
1069 HashMap::from_iter(
1070 snapshots
1071 .into_iter()
1072 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1073 )
1074 })
1075 .unwrap_or_default(),
1076 snapshot_log: value.snapshot_log.unwrap_or_default(),
1077 metadata_log: value.metadata_log.unwrap_or_default(),
1078 sort_orders: HashMap::from_iter(
1079 value
1080 .sort_orders
1081 .into_iter()
1082 .map(|x| (x.order_id, Arc::new(x))),
1083 ),
1084 default_sort_order_id: value.default_sort_order_id,
1085 refs: value.refs.unwrap_or_else(|| {
1086 if let Some(snapshot_id) = current_snapshot_id {
1087 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1088 snapshot_id,
1089 retention: SnapshotRetention::Branch {
1090 min_snapshots_to_keep: None,
1091 max_snapshot_age_ms: None,
1092 max_ref_age_ms: None,
1093 },
1094 })])
1095 } else {
1096 HashMap::new()
1097 }
1098 }),
1099 statistics: index_statistics(value.statistics),
1100 partition_statistics: index_partition_statistics(value.partition_statistics),
1101 encryption_keys: HashMap::new(),
1102 next_row_id: INITIAL_ROW_ID,
1103 };
1104
1105 metadata.borrow_mut().try_normalize()?;
1106 Ok(metadata)
1107 }
1108 }
1109
1110 impl TryFrom<TableMetadataV1> for TableMetadata {
1111 type Error = Error;
1112 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1113 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1114 None
1115 } else {
1116 value.current_snapshot_id
1117 };
1118
1119 let (schemas, current_schema_id, current_schema) =
1120 if let (Some(schemas_vec), Some(schema_id)) =
1121 (&value.schemas, value.current_schema_id)
1122 {
1123 let schema_map = HashMap::from_iter(
1125 schemas_vec
1126 .clone()
1127 .into_iter()
1128 .map(|schema| {
1129 let schema: Schema = schema.try_into()?;
1130 Ok((schema.schema_id(), Arc::new(schema)))
1131 })
1132 .collect::<Result<Vec<_>, Error>>()?,
1133 );
1134
1135 let schema = schema_map
1136 .get(&schema_id)
1137 .ok_or_else(|| {
1138 Error::new(
1139 ErrorKind::DataInvalid,
1140 format!("No schema exists with the current schema id {schema_id}."),
1141 )
1142 })?
1143 .clone();
1144 (schema_map, schema_id, schema)
1145 } else if let Some(schema) = value.schema {
1146 let schema: Schema = schema.try_into()?;
1148 let schema_id = schema.schema_id();
1149 let schema_arc = Arc::new(schema);
1150 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1151 (schema_map, schema_id, schema_arc)
1152 } else {
1153 return Err(Error::new(
1155 ErrorKind::DataInvalid,
1156 "No valid schema configuration found in table metadata",
1157 ));
1158 };
1159
1160 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1162 specs_vec
1164 .into_iter()
1165 .map(|x| (x.spec_id(), Arc::new(x)))
1166 .collect::<HashMap<_, _>>()
1167 } else if let Some(partition_spec) = value.partition_spec {
1168 let spec = PartitionSpec::builder(current_schema.clone())
1170 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1171 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1172 .build()?;
1173
1174 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1175 } else {
1176 let spec = PartitionSpec::builder(current_schema.clone())
1178 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1179 .build()?;
1180
1181 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1182 };
1183
1184 let default_spec_id = value
1186 .default_spec_id
1187 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1188
1189 let default_spec: PartitionSpecRef = partition_specs
1191 .get(&default_spec_id)
1192 .map(|x| Arc::unwrap_or_clone(x.clone()))
1193 .ok_or_else(|| {
1194 Error::new(
1195 ErrorKind::DataInvalid,
1196 format!("Default partition spec {default_spec_id} not found"),
1197 )
1198 })?
1199 .into();
1200 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1201
1202 let mut metadata = TableMetadata {
1203 format_version: FormatVersion::V1,
1204 table_uuid: value.table_uuid.unwrap_or_default(),
1205 location: value.location,
1206 last_sequence_number: 0,
1207 last_updated_ms: value.last_updated_ms,
1208 last_column_id: value.last_column_id,
1209 current_schema_id,
1210 default_spec,
1211 default_partition_type,
1212 last_partition_id: value
1213 .last_partition_id
1214 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1215 partition_specs,
1216 schemas,
1217 properties: value.properties.unwrap_or_default(),
1218 current_snapshot_id,
1219 snapshots: value
1220 .snapshots
1221 .map(|snapshots| {
1222 Ok::<_, Error>(HashMap::from_iter(
1223 snapshots
1224 .into_iter()
1225 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1226 .collect::<Result<Vec<_>, Error>>()?,
1227 ))
1228 })
1229 .transpose()?
1230 .unwrap_or_default(),
1231 snapshot_log: value.snapshot_log.unwrap_or_default(),
1232 metadata_log: value.metadata_log.unwrap_or_default(),
1233 sort_orders: match value.sort_orders {
1234 Some(sort_orders) => HashMap::from_iter(
1235 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1236 ),
1237 None => HashMap::new(),
1238 },
1239 default_sort_order_id: value
1240 .default_sort_order_id
1241 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1242 refs: if let Some(snapshot_id) = current_snapshot_id {
1243 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1244 snapshot_id,
1245 retention: SnapshotRetention::Branch {
1246 min_snapshots_to_keep: None,
1247 max_snapshot_age_ms: None,
1248 max_ref_age_ms: None,
1249 },
1250 })])
1251 } else {
1252 HashMap::new()
1253 },
1254 statistics: index_statistics(value.statistics),
1255 partition_statistics: index_partition_statistics(value.partition_statistics),
1256 encryption_keys: HashMap::new(),
1257 next_row_id: INITIAL_ROW_ID, };
1259
1260 metadata.borrow_mut().try_normalize()?;
1261 Ok(metadata)
1262 }
1263 }
1264
1265 impl TryFrom<TableMetadata> for TableMetadataV3 {
1266 type Error = Error;
1267
1268 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1269 let next_row_id = v.next_row_id;
1270 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1271 let snapshots = std::mem::take(&mut v.snapshots);
1272 let shared = v.into();
1273
1274 Ok(TableMetadataV3 {
1275 format_version: VersionNumber::<3>,
1276 shared,
1277 next_row_id,
1278 encryption_keys: if encryption_keys.is_empty() {
1279 None
1280 } else {
1281 Some(encryption_keys.into_values().collect())
1282 },
1283 snapshots: if snapshots.is_empty() {
1284 None
1285 } else {
1286 Some(
1287 snapshots
1288 .into_values()
1289 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1290 .collect::<Result<_, _>>()?,
1291 )
1292 },
1293 })
1294 }
1295 }
1296
1297 impl From<TableMetadata> for TableMetadataV2 {
1298 fn from(mut v: TableMetadata) -> Self {
1299 let snapshots = std::mem::take(&mut v.snapshots);
1300 let shared = v.into();
1301
1302 TableMetadataV2 {
1303 format_version: VersionNumber::<2>,
1304 shared,
1305 snapshots: if snapshots.is_empty() {
1306 None
1307 } else {
1308 Some(
1309 snapshots
1310 .into_values()
1311 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1312 .collect(),
1313 )
1314 },
1315 }
1316 }
1317 }
1318
1319 impl From<TableMetadata> for TableMetadataV2V3Shared {
1320 fn from(v: TableMetadata) -> Self {
1321 TableMetadataV2V3Shared {
1322 table_uuid: v.table_uuid,
1323 location: v.location,
1324 last_sequence_number: v.last_sequence_number,
1325 last_updated_ms: v.last_updated_ms,
1326 last_column_id: v.last_column_id,
1327 schemas: v
1328 .schemas
1329 .into_values()
1330 .map(|x| {
1331 Arc::try_unwrap(x)
1332 .unwrap_or_else(|schema| schema.as_ref().clone())
1333 .into()
1334 })
1335 .collect(),
1336 current_schema_id: v.current_schema_id,
1337 partition_specs: v
1338 .partition_specs
1339 .into_values()
1340 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1341 .collect(),
1342 default_spec_id: v.default_spec.spec_id(),
1343 last_partition_id: v.last_partition_id,
1344 properties: if v.properties.is_empty() {
1345 None
1346 } else {
1347 Some(v.properties)
1348 },
1349 current_snapshot_id: v.current_snapshot_id,
1350 snapshot_log: if v.snapshot_log.is_empty() {
1351 None
1352 } else {
1353 Some(v.snapshot_log)
1354 },
1355 metadata_log: if v.metadata_log.is_empty() {
1356 None
1357 } else {
1358 Some(v.metadata_log)
1359 },
1360 sort_orders: v
1361 .sort_orders
1362 .into_values()
1363 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1364 .collect(),
1365 default_sort_order_id: v.default_sort_order_id,
1366 refs: Some(v.refs),
1367 statistics: v.statistics.into_values().collect(),
1368 partition_statistics: v.partition_statistics.into_values().collect(),
1369 }
1370 }
1371 }
1372
1373 impl TryFrom<TableMetadata> for TableMetadataV1 {
1374 type Error = Error;
1375 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1376 Ok(TableMetadataV1 {
1377 format_version: VersionNumber::<1>,
1378 table_uuid: Some(v.table_uuid),
1379 location: v.location,
1380 last_updated_ms: v.last_updated_ms,
1381 last_column_id: v.last_column_id,
1382 schema: Some(
1383 v.schemas
1384 .get(&v.current_schema_id)
1385 .ok_or(Error::new(
1386 ErrorKind::Unexpected,
1387 "current_schema_id not found in schemas",
1388 ))?
1389 .as_ref()
1390 .clone()
1391 .into(),
1392 ),
1393 schemas: Some(
1394 v.schemas
1395 .into_values()
1396 .map(|x| {
1397 Arc::try_unwrap(x)
1398 .unwrap_or_else(|schema| schema.as_ref().clone())
1399 .into()
1400 })
1401 .collect(),
1402 ),
1403 current_schema_id: Some(v.current_schema_id),
1404 partition_spec: Some(v.default_spec.fields().to_vec()),
1405 partition_specs: Some(
1406 v.partition_specs
1407 .into_values()
1408 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1409 .collect(),
1410 ),
1411 default_spec_id: Some(v.default_spec.spec_id()),
1412 last_partition_id: Some(v.last_partition_id),
1413 properties: if v.properties.is_empty() {
1414 None
1415 } else {
1416 Some(v.properties)
1417 },
1418 current_snapshot_id: v.current_snapshot_id,
1419 snapshots: if v.snapshots.is_empty() {
1420 None
1421 } else {
1422 Some(
1423 v.snapshots
1424 .into_values()
1425 .map(|x| Snapshot::clone(&x).into())
1426 .collect(),
1427 )
1428 },
1429 snapshot_log: if v.snapshot_log.is_empty() {
1430 None
1431 } else {
1432 Some(v.snapshot_log)
1433 },
1434 metadata_log: if v.metadata_log.is_empty() {
1435 None
1436 } else {
1437 Some(v.metadata_log)
1438 },
1439 sort_orders: Some(
1440 v.sort_orders
1441 .into_values()
1442 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1443 .collect(),
1444 ),
1445 default_sort_order_id: Some(v.default_sort_order_id),
1446 statistics: v.statistics.into_values().collect(),
1447 partition_statistics: v.partition_statistics.into_values().collect(),
1448 })
1449 }
1450 }
1451
1452 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1453 statistics
1454 .into_iter()
1455 .rev()
1456 .map(|s| (s.snapshot_id, s))
1457 .collect()
1458 }
1459
1460 fn index_partition_statistics(
1461 statistics: Vec<PartitionStatisticsFile>,
1462 ) -> HashMap<i64, PartitionStatisticsFile> {
1463 statistics
1464 .into_iter()
1465 .rev()
1466 .map(|s| (s.snapshot_id, s))
1467 .collect()
1468 }
1469}
1470
1471#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1472#[repr(u8)]
1473pub enum FormatVersion {
1475 V1 = 1u8,
1477 V2 = 2u8,
1479 V3 = 3u8,
1481}
1482
1483impl PartialOrd for FormatVersion {
1484 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1485 Some(self.cmp(other))
1486 }
1487}
1488
1489impl Ord for FormatVersion {
1490 fn cmp(&self, other: &Self) -> Ordering {
1491 (*self as u8).cmp(&(*other as u8))
1492 }
1493}
1494
1495impl Display for FormatVersion {
1496 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1497 match self {
1498 FormatVersion::V1 => write!(f, "v1"),
1499 FormatVersion::V2 => write!(f, "v2"),
1500 FormatVersion::V3 => write!(f, "v3"),
1501 }
1502 }
1503}
1504
1505#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1506#[serde(rename_all = "kebab-case")]
1507pub struct MetadataLog {
1509 pub metadata_file: String,
1511 pub timestamp_ms: i64,
1513}
1514
1515#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1516#[serde(rename_all = "kebab-case")]
1517pub struct SnapshotLog {
1519 pub snapshot_id: i64,
1521 pub timestamp_ms: i64,
1523}
1524
1525impl SnapshotLog {
1526 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1528 timestamp_ms_to_utc(self.timestamp_ms)
1529 }
1530
1531 #[inline]
1533 pub fn timestamp_ms(&self) -> i64 {
1534 self.timestamp_ms
1535 }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540 use std::collections::HashMap;
1541 use std::fs;
1542 use std::io::Write as _;
1543 use std::sync::Arc;
1544
1545 use anyhow::Result;
1546 use base64::Engine as _;
1547 use pretty_assertions::assert_eq;
1548 use tempfile::TempDir;
1549 use uuid::Uuid;
1550
1551 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1552 use crate::TableCreation;
1553 use crate::io::FileIOBuilder;
1554 use crate::spec::table_metadata::TableMetadata;
1555 use crate::spec::{
1556 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1557 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1558 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1559 Summary, Transform, Type, UnboundPartitionField,
1560 };
1561
1562 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1563 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1564 assert_eq!(desered_type, expected_type);
1565
1566 let sered_json = serde_json::to_string(&expected_type).unwrap();
1567 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1568
1569 assert_eq!(parsed_json_value, desered_type);
1570 }
1571
1572 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1573 let path = format!("testdata/table_metadata/{file_name}");
1574 let metadata: String = fs::read_to_string(path).unwrap();
1575
1576 serde_json::from_str(&metadata).unwrap()
1577 }
1578
1579 #[test]
1580 fn test_table_data_v2() {
1581 let data = r#"
1582 {
1583 "format-version" : 2,
1584 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1585 "location": "s3://b/wh/data.db/table",
1586 "last-sequence-number" : 1,
1587 "last-updated-ms": 1515100955770,
1588 "last-column-id": 1,
1589 "schemas": [
1590 {
1591 "schema-id" : 1,
1592 "type" : "struct",
1593 "fields" :[
1594 {
1595 "id": 1,
1596 "name": "struct_name",
1597 "required": true,
1598 "type": "fixed[1]"
1599 },
1600 {
1601 "id": 4,
1602 "name": "ts",
1603 "required": true,
1604 "type": "timestamp"
1605 }
1606 ]
1607 }
1608 ],
1609 "current-schema-id" : 1,
1610 "partition-specs": [
1611 {
1612 "spec-id": 0,
1613 "fields": [
1614 {
1615 "source-id": 4,
1616 "field-id": 1000,
1617 "name": "ts_day",
1618 "transform": "day"
1619 }
1620 ]
1621 }
1622 ],
1623 "default-spec-id": 0,
1624 "last-partition-id": 1000,
1625 "properties": {
1626 "commit.retry.num-retries": "1"
1627 },
1628 "metadata-log": [
1629 {
1630 "metadata-file": "s3://bucket/.../v1.json",
1631 "timestamp-ms": 1515100
1632 }
1633 ],
1634 "refs": {},
1635 "sort-orders": [
1636 {
1637 "order-id": 0,
1638 "fields": []
1639 }
1640 ],
1641 "default-sort-order-id": 0
1642 }
1643 "#;
1644
1645 let schema = Schema::builder()
1646 .with_schema_id(1)
1647 .with_fields(vec![
1648 Arc::new(NestedField::required(
1649 1,
1650 "struct_name",
1651 Type::Primitive(PrimitiveType::Fixed(1)),
1652 )),
1653 Arc::new(NestedField::required(
1654 4,
1655 "ts",
1656 Type::Primitive(PrimitiveType::Timestamp),
1657 )),
1658 ])
1659 .build()
1660 .unwrap();
1661
1662 let partition_spec = PartitionSpec::builder(schema.clone())
1663 .with_spec_id(0)
1664 .add_unbound_field(UnboundPartitionField {
1665 name: "ts_day".to_string(),
1666 transform: Transform::Day,
1667 source_id: 4,
1668 field_id: Some(1000),
1669 })
1670 .unwrap()
1671 .build()
1672 .unwrap();
1673
1674 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1675 let expected = TableMetadata {
1676 format_version: FormatVersion::V2,
1677 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1678 location: "s3://b/wh/data.db/table".to_string(),
1679 last_updated_ms: 1515100955770,
1680 last_column_id: 1,
1681 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1682 current_schema_id: 1,
1683 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1684 default_partition_type,
1685 default_spec: partition_spec.into(),
1686 last_partition_id: 1000,
1687 default_sort_order_id: 0,
1688 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1689 snapshots: HashMap::default(),
1690 current_snapshot_id: None,
1691 last_sequence_number: 1,
1692 properties: HashMap::from_iter(vec![(
1693 "commit.retry.num-retries".to_string(),
1694 "1".to_string(),
1695 )]),
1696 snapshot_log: Vec::new(),
1697 metadata_log: vec![MetadataLog {
1698 metadata_file: "s3://bucket/.../v1.json".to_string(),
1699 timestamp_ms: 1515100,
1700 }],
1701 refs: HashMap::new(),
1702 statistics: HashMap::new(),
1703 partition_statistics: HashMap::new(),
1704 encryption_keys: HashMap::new(),
1705 next_row_id: INITIAL_ROW_ID,
1706 };
1707
1708 let expected_json_value = serde_json::to_value(&expected).unwrap();
1709 check_table_metadata_serde(data, expected);
1710
1711 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1712 assert_eq!(json_value, expected_json_value);
1713 }
1714
1715 #[test]
1716 fn test_table_data_v3() {
1717 let data = r#"
1718 {
1719 "format-version" : 3,
1720 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1721 "location": "s3://b/wh/data.db/table",
1722 "last-sequence-number" : 1,
1723 "last-updated-ms": 1515100955770,
1724 "last-column-id": 1,
1725 "next-row-id": 5,
1726 "schemas": [
1727 {
1728 "schema-id" : 1,
1729 "type" : "struct",
1730 "fields" :[
1731 {
1732 "id": 4,
1733 "name": "ts",
1734 "required": true,
1735 "type": "timestamp"
1736 }
1737 ]
1738 }
1739 ],
1740 "current-schema-id" : 1,
1741 "partition-specs": [
1742 {
1743 "spec-id": 0,
1744 "fields": [
1745 {
1746 "source-id": 4,
1747 "field-id": 1000,
1748 "name": "ts_day",
1749 "transform": "day"
1750 }
1751 ]
1752 }
1753 ],
1754 "default-spec-id": 0,
1755 "last-partition-id": 1000,
1756 "properties": {
1757 "commit.retry.num-retries": "1"
1758 },
1759 "metadata-log": [
1760 {
1761 "metadata-file": "s3://bucket/.../v1.json",
1762 "timestamp-ms": 1515100
1763 }
1764 ],
1765 "refs": {},
1766 "snapshots" : [ {
1767 "snapshot-id" : 1,
1768 "timestamp-ms" : 1662532818843,
1769 "sequence-number" : 0,
1770 "first-row-id" : 0,
1771 "added-rows" : 4,
1772 "key-id" : "key1",
1773 "summary" : {
1774 "operation" : "append"
1775 },
1776 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1777 "schema-id" : 0
1778 }
1779 ],
1780 "encryption-keys": [
1781 {
1782 "key-id": "key1",
1783 "encrypted-by-id": "KMS",
1784 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1785 "properties": {
1786 "p1": "v1"
1787 }
1788 }
1789 ],
1790 "sort-orders": [
1791 {
1792 "order-id": 0,
1793 "fields": []
1794 }
1795 ],
1796 "default-sort-order-id": 0
1797 }
1798 "#;
1799
1800 let schema = Schema::builder()
1801 .with_schema_id(1)
1802 .with_fields(vec![Arc::new(NestedField::required(
1803 4,
1804 "ts",
1805 Type::Primitive(PrimitiveType::Timestamp),
1806 ))])
1807 .build()
1808 .unwrap();
1809
1810 let partition_spec = PartitionSpec::builder(schema.clone())
1811 .with_spec_id(0)
1812 .add_unbound_field(UnboundPartitionField {
1813 name: "ts_day".to_string(),
1814 transform: Transform::Day,
1815 source_id: 4,
1816 field_id: Some(1000),
1817 })
1818 .unwrap()
1819 .build()
1820 .unwrap();
1821
1822 let snapshot = Snapshot::builder()
1823 .with_snapshot_id(1)
1824 .with_timestamp_ms(1662532818843)
1825 .with_sequence_number(0)
1826 .with_row_range(0, 4)
1827 .with_encryption_key_id(Some("key1".to_string()))
1828 .with_summary(Summary {
1829 operation: Operation::Append,
1830 additional_properties: HashMap::new(),
1831 })
1832 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1833 .with_schema_id(0)
1834 .build();
1835
1836 let encryption_key = EncryptedKey::builder()
1837 .key_id("key1".to_string())
1838 .encrypted_by_id("KMS".to_string())
1839 .encrypted_key_metadata(
1840 base64::prelude::BASE64_STANDARD
1841 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1842 .unwrap(),
1843 )
1844 .properties(HashMap::from_iter(vec![(
1845 "p1".to_string(),
1846 "v1".to_string(),
1847 )]))
1848 .build();
1849
1850 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1851 let expected = TableMetadata {
1852 format_version: FormatVersion::V3,
1853 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1854 location: "s3://b/wh/data.db/table".to_string(),
1855 last_updated_ms: 1515100955770,
1856 last_column_id: 1,
1857 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1858 current_schema_id: 1,
1859 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1860 default_partition_type,
1861 default_spec: partition_spec.into(),
1862 last_partition_id: 1000,
1863 default_sort_order_id: 0,
1864 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1865 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1866 current_snapshot_id: None,
1867 last_sequence_number: 1,
1868 properties: HashMap::from_iter(vec![(
1869 "commit.retry.num-retries".to_string(),
1870 "1".to_string(),
1871 )]),
1872 snapshot_log: Vec::new(),
1873 metadata_log: vec![MetadataLog {
1874 metadata_file: "s3://bucket/.../v1.json".to_string(),
1875 timestamp_ms: 1515100,
1876 }],
1877 refs: HashMap::new(),
1878 statistics: HashMap::new(),
1879 partition_statistics: HashMap::new(),
1880 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1881 next_row_id: 5,
1882 };
1883
1884 let expected_json_value = serde_json::to_value(&expected).unwrap();
1885 check_table_metadata_serde(data, expected);
1886
1887 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1888 assert_eq!(json_value, expected_json_value);
1889 }
1890
1891 #[test]
1892 fn test_table_data_v1() {
1893 let data = r#"
1894 {
1895 "format-version" : 1,
1896 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1897 "location" : "/home/iceberg/warehouse/nyc/taxis",
1898 "last-updated-ms" : 1662532818843,
1899 "last-column-id" : 5,
1900 "schema" : {
1901 "type" : "struct",
1902 "schema-id" : 0,
1903 "fields" : [ {
1904 "id" : 1,
1905 "name" : "vendor_id",
1906 "required" : false,
1907 "type" : "long"
1908 }, {
1909 "id" : 2,
1910 "name" : "trip_id",
1911 "required" : false,
1912 "type" : "long"
1913 }, {
1914 "id" : 3,
1915 "name" : "trip_distance",
1916 "required" : false,
1917 "type" : "float"
1918 }, {
1919 "id" : 4,
1920 "name" : "fare_amount",
1921 "required" : false,
1922 "type" : "double"
1923 }, {
1924 "id" : 5,
1925 "name" : "store_and_fwd_flag",
1926 "required" : false,
1927 "type" : "string"
1928 } ]
1929 },
1930 "partition-spec" : [ {
1931 "name" : "vendor_id",
1932 "transform" : "identity",
1933 "source-id" : 1,
1934 "field-id" : 1000
1935 } ],
1936 "last-partition-id" : 1000,
1937 "default-sort-order-id" : 0,
1938 "sort-orders" : [ {
1939 "order-id" : 0,
1940 "fields" : [ ]
1941 } ],
1942 "properties" : {
1943 "owner" : "root"
1944 },
1945 "current-snapshot-id" : 638933773299822130,
1946 "refs" : {
1947 "main" : {
1948 "snapshot-id" : 638933773299822130,
1949 "type" : "branch"
1950 }
1951 },
1952 "snapshots" : [ {
1953 "snapshot-id" : 638933773299822130,
1954 "timestamp-ms" : 1662532818843,
1955 "sequence-number" : 0,
1956 "summary" : {
1957 "operation" : "append",
1958 "spark.app.id" : "local-1662532784305",
1959 "added-data-files" : "4",
1960 "added-records" : "4",
1961 "added-files-size" : "6001"
1962 },
1963 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1964 "schema-id" : 0
1965 } ],
1966 "snapshot-log" : [ {
1967 "timestamp-ms" : 1662532818843,
1968 "snapshot-id" : 638933773299822130
1969 } ],
1970 "metadata-log" : [ {
1971 "timestamp-ms" : 1662532805245,
1972 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1973 } ]
1974 }
1975 "#;
1976
1977 let schema = Schema::builder()
1978 .with_fields(vec![
1979 Arc::new(NestedField::optional(
1980 1,
1981 "vendor_id",
1982 Type::Primitive(PrimitiveType::Long),
1983 )),
1984 Arc::new(NestedField::optional(
1985 2,
1986 "trip_id",
1987 Type::Primitive(PrimitiveType::Long),
1988 )),
1989 Arc::new(NestedField::optional(
1990 3,
1991 "trip_distance",
1992 Type::Primitive(PrimitiveType::Float),
1993 )),
1994 Arc::new(NestedField::optional(
1995 4,
1996 "fare_amount",
1997 Type::Primitive(PrimitiveType::Double),
1998 )),
1999 Arc::new(NestedField::optional(
2000 5,
2001 "store_and_fwd_flag",
2002 Type::Primitive(PrimitiveType::String),
2003 )),
2004 ])
2005 .build()
2006 .unwrap();
2007
2008 let schema = Arc::new(schema);
2009 let partition_spec = PartitionSpec::builder(schema.clone())
2010 .with_spec_id(0)
2011 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2012 .unwrap()
2013 .build()
2014 .unwrap();
2015
2016 let sort_order = SortOrder::builder()
2017 .with_order_id(0)
2018 .build_unbound()
2019 .unwrap();
2020
2021 let snapshot = Snapshot::builder()
2022 .with_snapshot_id(638933773299822130)
2023 .with_timestamp_ms(1662532818843)
2024 .with_sequence_number(0)
2025 .with_schema_id(0)
2026 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2027 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2028 .build();
2029
2030 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2031 let expected = TableMetadata {
2032 format_version: FormatVersion::V1,
2033 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2034 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2035 last_updated_ms: 1662532818843,
2036 last_column_id: 5,
2037 schemas: HashMap::from_iter(vec![(0, schema)]),
2038 current_schema_id: 0,
2039 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2040 default_partition_type,
2041 default_spec: Arc::new(partition_spec),
2042 last_partition_id: 1000,
2043 default_sort_order_id: 0,
2044 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2045 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2046 current_snapshot_id: Some(638933773299822130),
2047 last_sequence_number: 0,
2048 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2049 snapshot_log: vec![SnapshotLog {
2050 snapshot_id: 638933773299822130,
2051 timestamp_ms: 1662532818843,
2052 }],
2053 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2054 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2055 statistics: HashMap::new(),
2056 partition_statistics: HashMap::new(),
2057 encryption_keys: HashMap::new(),
2058 next_row_id: INITIAL_ROW_ID,
2059 };
2060
2061 check_table_metadata_serde(data, expected);
2062 }
2063
2064 #[test]
2065 fn test_table_data_v2_no_snapshots() {
2066 let data = r#"
2067 {
2068 "format-version" : 2,
2069 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2070 "location": "s3://b/wh/data.db/table",
2071 "last-sequence-number" : 1,
2072 "last-updated-ms": 1515100955770,
2073 "last-column-id": 1,
2074 "schemas": [
2075 {
2076 "schema-id" : 1,
2077 "type" : "struct",
2078 "fields" :[
2079 {
2080 "id": 1,
2081 "name": "struct_name",
2082 "required": true,
2083 "type": "fixed[1]"
2084 }
2085 ]
2086 }
2087 ],
2088 "current-schema-id" : 1,
2089 "partition-specs": [
2090 {
2091 "spec-id": 0,
2092 "fields": []
2093 }
2094 ],
2095 "refs": {},
2096 "default-spec-id": 0,
2097 "last-partition-id": 1000,
2098 "metadata-log": [
2099 {
2100 "metadata-file": "s3://bucket/.../v1.json",
2101 "timestamp-ms": 1515100
2102 }
2103 ],
2104 "sort-orders": [
2105 {
2106 "order-id": 0,
2107 "fields": []
2108 }
2109 ],
2110 "default-sort-order-id": 0
2111 }
2112 "#;
2113
2114 let schema = Schema::builder()
2115 .with_schema_id(1)
2116 .with_fields(vec![Arc::new(NestedField::required(
2117 1,
2118 "struct_name",
2119 Type::Primitive(PrimitiveType::Fixed(1)),
2120 ))])
2121 .build()
2122 .unwrap();
2123
2124 let partition_spec = PartitionSpec::builder(schema.clone())
2125 .with_spec_id(0)
2126 .build()
2127 .unwrap();
2128
2129 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2130 let expected = TableMetadata {
2131 format_version: FormatVersion::V2,
2132 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2133 location: "s3://b/wh/data.db/table".to_string(),
2134 last_updated_ms: 1515100955770,
2135 last_column_id: 1,
2136 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2137 current_schema_id: 1,
2138 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2139 default_partition_type,
2140 default_spec: partition_spec.into(),
2141 last_partition_id: 1000,
2142 default_sort_order_id: 0,
2143 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2144 snapshots: HashMap::default(),
2145 current_snapshot_id: None,
2146 last_sequence_number: 1,
2147 properties: HashMap::new(),
2148 snapshot_log: Vec::new(),
2149 metadata_log: vec![MetadataLog {
2150 metadata_file: "s3://bucket/.../v1.json".to_string(),
2151 timestamp_ms: 1515100,
2152 }],
2153 refs: HashMap::new(),
2154 statistics: HashMap::new(),
2155 partition_statistics: HashMap::new(),
2156 encryption_keys: HashMap::new(),
2157 next_row_id: INITIAL_ROW_ID,
2158 };
2159
2160 let expected_json_value = serde_json::to_value(&expected).unwrap();
2161 check_table_metadata_serde(data, expected);
2162
2163 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2164 assert_eq!(json_value, expected_json_value);
2165 }
2166
2167 #[test]
2168 fn test_current_snapshot_id_must_match_main_branch() {
2169 let data = r#"
2170 {
2171 "format-version" : 2,
2172 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2173 "location": "s3://b/wh/data.db/table",
2174 "last-sequence-number" : 1,
2175 "last-updated-ms": 1515100955770,
2176 "last-column-id": 1,
2177 "schemas": [
2178 {
2179 "schema-id" : 1,
2180 "type" : "struct",
2181 "fields" :[
2182 {
2183 "id": 1,
2184 "name": "struct_name",
2185 "required": true,
2186 "type": "fixed[1]"
2187 },
2188 {
2189 "id": 4,
2190 "name": "ts",
2191 "required": true,
2192 "type": "timestamp"
2193 }
2194 ]
2195 }
2196 ],
2197 "current-schema-id" : 1,
2198 "partition-specs": [
2199 {
2200 "spec-id": 0,
2201 "fields": [
2202 {
2203 "source-id": 4,
2204 "field-id": 1000,
2205 "name": "ts_day",
2206 "transform": "day"
2207 }
2208 ]
2209 }
2210 ],
2211 "default-spec-id": 0,
2212 "last-partition-id": 1000,
2213 "properties": {
2214 "commit.retry.num-retries": "1"
2215 },
2216 "metadata-log": [
2217 {
2218 "metadata-file": "s3://bucket/.../v1.json",
2219 "timestamp-ms": 1515100
2220 }
2221 ],
2222 "sort-orders": [
2223 {
2224 "order-id": 0,
2225 "fields": []
2226 }
2227 ],
2228 "default-sort-order-id": 0,
2229 "current-snapshot-id" : 1,
2230 "refs" : {
2231 "main" : {
2232 "snapshot-id" : 2,
2233 "type" : "branch"
2234 }
2235 },
2236 "snapshots" : [ {
2237 "snapshot-id" : 1,
2238 "timestamp-ms" : 1662532818843,
2239 "sequence-number" : 0,
2240 "summary" : {
2241 "operation" : "append",
2242 "spark.app.id" : "local-1662532784305",
2243 "added-data-files" : "4",
2244 "added-records" : "4",
2245 "added-files-size" : "6001"
2246 },
2247 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2248 "schema-id" : 0
2249 },
2250 {
2251 "snapshot-id" : 2,
2252 "timestamp-ms" : 1662532818844,
2253 "sequence-number" : 0,
2254 "summary" : {
2255 "operation" : "append",
2256 "spark.app.id" : "local-1662532784305",
2257 "added-data-files" : "4",
2258 "added-records" : "4",
2259 "added-files-size" : "6001"
2260 },
2261 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2262 "schema-id" : 0
2263 } ]
2264 }
2265 "#;
2266
2267 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2268 assert!(
2269 err.to_string()
2270 .contains("Current snapshot id does not match main branch")
2271 );
2272 }
2273
2274 #[test]
2275 fn test_main_without_current() {
2276 let data = r#"
2277 {
2278 "format-version" : 2,
2279 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2280 "location": "s3://b/wh/data.db/table",
2281 "last-sequence-number" : 1,
2282 "last-updated-ms": 1515100955770,
2283 "last-column-id": 1,
2284 "schemas": [
2285 {
2286 "schema-id" : 1,
2287 "type" : "struct",
2288 "fields" :[
2289 {
2290 "id": 1,
2291 "name": "struct_name",
2292 "required": true,
2293 "type": "fixed[1]"
2294 },
2295 {
2296 "id": 4,
2297 "name": "ts",
2298 "required": true,
2299 "type": "timestamp"
2300 }
2301 ]
2302 }
2303 ],
2304 "current-schema-id" : 1,
2305 "partition-specs": [
2306 {
2307 "spec-id": 0,
2308 "fields": [
2309 {
2310 "source-id": 4,
2311 "field-id": 1000,
2312 "name": "ts_day",
2313 "transform": "day"
2314 }
2315 ]
2316 }
2317 ],
2318 "default-spec-id": 0,
2319 "last-partition-id": 1000,
2320 "properties": {
2321 "commit.retry.num-retries": "1"
2322 },
2323 "metadata-log": [
2324 {
2325 "metadata-file": "s3://bucket/.../v1.json",
2326 "timestamp-ms": 1515100
2327 }
2328 ],
2329 "sort-orders": [
2330 {
2331 "order-id": 0,
2332 "fields": []
2333 }
2334 ],
2335 "default-sort-order-id": 0,
2336 "refs" : {
2337 "main" : {
2338 "snapshot-id" : 1,
2339 "type" : "branch"
2340 }
2341 },
2342 "snapshots" : [ {
2343 "snapshot-id" : 1,
2344 "timestamp-ms" : 1662532818843,
2345 "sequence-number" : 0,
2346 "summary" : {
2347 "operation" : "append",
2348 "spark.app.id" : "local-1662532784305",
2349 "added-data-files" : "4",
2350 "added-records" : "4",
2351 "added-files-size" : "6001"
2352 },
2353 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2354 "schema-id" : 0
2355 } ]
2356 }
2357 "#;
2358
2359 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2360 assert!(
2361 err.to_string()
2362 .contains("Current snapshot is not set, but main branch exists")
2363 );
2364 }
2365
2366 #[test]
2367 fn test_branch_snapshot_missing() {
2368 let data = r#"
2369 {
2370 "format-version" : 2,
2371 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2372 "location": "s3://b/wh/data.db/table",
2373 "last-sequence-number" : 1,
2374 "last-updated-ms": 1515100955770,
2375 "last-column-id": 1,
2376 "schemas": [
2377 {
2378 "schema-id" : 1,
2379 "type" : "struct",
2380 "fields" :[
2381 {
2382 "id": 1,
2383 "name": "struct_name",
2384 "required": true,
2385 "type": "fixed[1]"
2386 },
2387 {
2388 "id": 4,
2389 "name": "ts",
2390 "required": true,
2391 "type": "timestamp"
2392 }
2393 ]
2394 }
2395 ],
2396 "current-schema-id" : 1,
2397 "partition-specs": [
2398 {
2399 "spec-id": 0,
2400 "fields": [
2401 {
2402 "source-id": 4,
2403 "field-id": 1000,
2404 "name": "ts_day",
2405 "transform": "day"
2406 }
2407 ]
2408 }
2409 ],
2410 "default-spec-id": 0,
2411 "last-partition-id": 1000,
2412 "properties": {
2413 "commit.retry.num-retries": "1"
2414 },
2415 "metadata-log": [
2416 {
2417 "metadata-file": "s3://bucket/.../v1.json",
2418 "timestamp-ms": 1515100
2419 }
2420 ],
2421 "sort-orders": [
2422 {
2423 "order-id": 0,
2424 "fields": []
2425 }
2426 ],
2427 "default-sort-order-id": 0,
2428 "refs" : {
2429 "main" : {
2430 "snapshot-id" : 1,
2431 "type" : "branch"
2432 },
2433 "foo" : {
2434 "snapshot-id" : 2,
2435 "type" : "branch"
2436 }
2437 },
2438 "snapshots" : [ {
2439 "snapshot-id" : 1,
2440 "timestamp-ms" : 1662532818843,
2441 "sequence-number" : 0,
2442 "summary" : {
2443 "operation" : "append",
2444 "spark.app.id" : "local-1662532784305",
2445 "added-data-files" : "4",
2446 "added-records" : "4",
2447 "added-files-size" : "6001"
2448 },
2449 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2450 "schema-id" : 0
2451 } ]
2452 }
2453 "#;
2454
2455 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2456 assert!(
2457 err.to_string().contains(
2458 "Snapshot for reference foo does not exist in the existing snapshots list"
2459 )
2460 );
2461 }
2462
2463 #[test]
2464 fn test_v2_wrong_max_snapshot_sequence_number() {
2465 let data = r#"
2466 {
2467 "format-version": 2,
2468 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2469 "location": "s3://bucket/test/location",
2470 "last-sequence-number": 1,
2471 "last-updated-ms": 1602638573590,
2472 "last-column-id": 3,
2473 "current-schema-id": 0,
2474 "schemas": [
2475 {
2476 "type": "struct",
2477 "schema-id": 0,
2478 "fields": [
2479 {
2480 "id": 1,
2481 "name": "x",
2482 "required": true,
2483 "type": "long"
2484 }
2485 ]
2486 }
2487 ],
2488 "default-spec-id": 0,
2489 "partition-specs": [
2490 {
2491 "spec-id": 0,
2492 "fields": []
2493 }
2494 ],
2495 "last-partition-id": 1000,
2496 "default-sort-order-id": 0,
2497 "sort-orders": [
2498 {
2499 "order-id": 0,
2500 "fields": []
2501 }
2502 ],
2503 "properties": {},
2504 "current-snapshot-id": 3055729675574597004,
2505 "snapshots": [
2506 {
2507 "snapshot-id": 3055729675574597004,
2508 "timestamp-ms": 1555100955770,
2509 "sequence-number": 4,
2510 "summary": {
2511 "operation": "append"
2512 },
2513 "manifest-list": "s3://a/b/2.avro",
2514 "schema-id": 0
2515 }
2516 ],
2517 "statistics": [],
2518 "snapshot-log": [],
2519 "metadata-log": []
2520 }
2521 "#;
2522
2523 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2524 assert!(err.to_string().contains(
2525 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2526 ));
2527
2528 let data = data.replace(
2530 r#""last-sequence-number": 1,"#,
2531 r#""last-sequence-number": 4,"#,
2532 );
2533 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2534 assert_eq!(metadata.last_sequence_number, 4);
2535
2536 let data = data.replace(
2538 r#""last-sequence-number": 4,"#,
2539 r#""last-sequence-number": 5,"#,
2540 );
2541 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2542 assert_eq!(metadata.last_sequence_number, 5);
2543 }
2544
2545 #[test]
2546 fn test_statistic_files() {
2547 let data = r#"
2548 {
2549 "format-version": 2,
2550 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2551 "location": "s3://bucket/test/location",
2552 "last-sequence-number": 34,
2553 "last-updated-ms": 1602638573590,
2554 "last-column-id": 3,
2555 "current-schema-id": 0,
2556 "schemas": [
2557 {
2558 "type": "struct",
2559 "schema-id": 0,
2560 "fields": [
2561 {
2562 "id": 1,
2563 "name": "x",
2564 "required": true,
2565 "type": "long"
2566 }
2567 ]
2568 }
2569 ],
2570 "default-spec-id": 0,
2571 "partition-specs": [
2572 {
2573 "spec-id": 0,
2574 "fields": []
2575 }
2576 ],
2577 "last-partition-id": 1000,
2578 "default-sort-order-id": 0,
2579 "sort-orders": [
2580 {
2581 "order-id": 0,
2582 "fields": []
2583 }
2584 ],
2585 "properties": {},
2586 "current-snapshot-id": 3055729675574597004,
2587 "snapshots": [
2588 {
2589 "snapshot-id": 3055729675574597004,
2590 "timestamp-ms": 1555100955770,
2591 "sequence-number": 1,
2592 "summary": {
2593 "operation": "append"
2594 },
2595 "manifest-list": "s3://a/b/2.avro",
2596 "schema-id": 0
2597 }
2598 ],
2599 "statistics": [
2600 {
2601 "snapshot-id": 3055729675574597004,
2602 "statistics-path": "s3://a/b/stats.puffin",
2603 "file-size-in-bytes": 413,
2604 "file-footer-size-in-bytes": 42,
2605 "blob-metadata": [
2606 {
2607 "type": "ndv",
2608 "snapshot-id": 3055729675574597004,
2609 "sequence-number": 1,
2610 "fields": [
2611 1
2612 ]
2613 }
2614 ]
2615 }
2616 ],
2617 "snapshot-log": [],
2618 "metadata-log": []
2619 }
2620 "#;
2621
2622 let schema = Schema::builder()
2623 .with_schema_id(0)
2624 .with_fields(vec![Arc::new(NestedField::required(
2625 1,
2626 "x",
2627 Type::Primitive(PrimitiveType::Long),
2628 ))])
2629 .build()
2630 .unwrap();
2631 let partition_spec = PartitionSpec::builder(schema.clone())
2632 .with_spec_id(0)
2633 .build()
2634 .unwrap();
2635 let snapshot = Snapshot::builder()
2636 .with_snapshot_id(3055729675574597004)
2637 .with_timestamp_ms(1555100955770)
2638 .with_sequence_number(1)
2639 .with_manifest_list("s3://a/b/2.avro")
2640 .with_schema_id(0)
2641 .with_summary(Summary {
2642 operation: Operation::Append,
2643 additional_properties: HashMap::new(),
2644 })
2645 .build();
2646
2647 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2648 let expected = TableMetadata {
2649 format_version: FormatVersion::V2,
2650 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2651 location: "s3://bucket/test/location".to_string(),
2652 last_updated_ms: 1602638573590,
2653 last_column_id: 3,
2654 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2655 current_schema_id: 0,
2656 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2657 default_partition_type,
2658 default_spec: Arc::new(partition_spec),
2659 last_partition_id: 1000,
2660 default_sort_order_id: 0,
2661 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2662 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2663 current_snapshot_id: Some(3055729675574597004),
2664 last_sequence_number: 34,
2665 properties: HashMap::new(),
2666 snapshot_log: Vec::new(),
2667 metadata_log: Vec::new(),
2668 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2669 snapshot_id: 3055729675574597004,
2670 statistics_path: "s3://a/b/stats.puffin".to_string(),
2671 file_size_in_bytes: 413,
2672 file_footer_size_in_bytes: 42,
2673 key_metadata: None,
2674 blob_metadata: vec![BlobMetadata {
2675 snapshot_id: 3055729675574597004,
2676 sequence_number: 1,
2677 fields: vec![1],
2678 r#type: "ndv".to_string(),
2679 properties: HashMap::new(),
2680 }],
2681 })]),
2682 partition_statistics: HashMap::new(),
2683 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2684 snapshot_id: 3055729675574597004,
2685 retention: SnapshotRetention::Branch {
2686 min_snapshots_to_keep: None,
2687 max_snapshot_age_ms: None,
2688 max_ref_age_ms: None,
2689 },
2690 })]),
2691 encryption_keys: HashMap::new(),
2692 next_row_id: INITIAL_ROW_ID,
2693 };
2694
2695 check_table_metadata_serde(data, expected);
2696 }
2697
2698 #[test]
2699 fn test_partition_statistics_file() {
2700 let data = r#"
2701 {
2702 "format-version": 2,
2703 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2704 "location": "s3://bucket/test/location",
2705 "last-sequence-number": 34,
2706 "last-updated-ms": 1602638573590,
2707 "last-column-id": 3,
2708 "current-schema-id": 0,
2709 "schemas": [
2710 {
2711 "type": "struct",
2712 "schema-id": 0,
2713 "fields": [
2714 {
2715 "id": 1,
2716 "name": "x",
2717 "required": true,
2718 "type": "long"
2719 }
2720 ]
2721 }
2722 ],
2723 "default-spec-id": 0,
2724 "partition-specs": [
2725 {
2726 "spec-id": 0,
2727 "fields": []
2728 }
2729 ],
2730 "last-partition-id": 1000,
2731 "default-sort-order-id": 0,
2732 "sort-orders": [
2733 {
2734 "order-id": 0,
2735 "fields": []
2736 }
2737 ],
2738 "properties": {},
2739 "current-snapshot-id": 3055729675574597004,
2740 "snapshots": [
2741 {
2742 "snapshot-id": 3055729675574597004,
2743 "timestamp-ms": 1555100955770,
2744 "sequence-number": 1,
2745 "summary": {
2746 "operation": "append"
2747 },
2748 "manifest-list": "s3://a/b/2.avro",
2749 "schema-id": 0
2750 }
2751 ],
2752 "partition-statistics": [
2753 {
2754 "snapshot-id": 3055729675574597004,
2755 "statistics-path": "s3://a/b/partition-stats.parquet",
2756 "file-size-in-bytes": 43
2757 }
2758 ],
2759 "snapshot-log": [],
2760 "metadata-log": []
2761 }
2762 "#;
2763
2764 let schema = Schema::builder()
2765 .with_schema_id(0)
2766 .with_fields(vec![Arc::new(NestedField::required(
2767 1,
2768 "x",
2769 Type::Primitive(PrimitiveType::Long),
2770 ))])
2771 .build()
2772 .unwrap();
2773 let partition_spec = PartitionSpec::builder(schema.clone())
2774 .with_spec_id(0)
2775 .build()
2776 .unwrap();
2777 let snapshot = Snapshot::builder()
2778 .with_snapshot_id(3055729675574597004)
2779 .with_timestamp_ms(1555100955770)
2780 .with_sequence_number(1)
2781 .with_manifest_list("s3://a/b/2.avro")
2782 .with_schema_id(0)
2783 .with_summary(Summary {
2784 operation: Operation::Append,
2785 additional_properties: HashMap::new(),
2786 })
2787 .build();
2788
2789 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2790 let expected = TableMetadata {
2791 format_version: FormatVersion::V2,
2792 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2793 location: "s3://bucket/test/location".to_string(),
2794 last_updated_ms: 1602638573590,
2795 last_column_id: 3,
2796 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2797 current_schema_id: 0,
2798 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2799 default_spec: Arc::new(partition_spec),
2800 default_partition_type,
2801 last_partition_id: 1000,
2802 default_sort_order_id: 0,
2803 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2804 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2805 current_snapshot_id: Some(3055729675574597004),
2806 last_sequence_number: 34,
2807 properties: HashMap::new(),
2808 snapshot_log: Vec::new(),
2809 metadata_log: Vec::new(),
2810 statistics: HashMap::new(),
2811 partition_statistics: HashMap::from_iter(vec![(
2812 3055729675574597004,
2813 PartitionStatisticsFile {
2814 snapshot_id: 3055729675574597004,
2815 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2816 file_size_in_bytes: 43,
2817 },
2818 )]),
2819 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2820 snapshot_id: 3055729675574597004,
2821 retention: SnapshotRetention::Branch {
2822 min_snapshots_to_keep: None,
2823 max_snapshot_age_ms: None,
2824 max_ref_age_ms: None,
2825 },
2826 })]),
2827 encryption_keys: HashMap::new(),
2828 next_row_id: INITIAL_ROW_ID,
2829 };
2830
2831 check_table_metadata_serde(data, expected);
2832 }
2833
2834 #[test]
2835 fn test_invalid_table_uuid() -> Result<()> {
2836 let data = r#"
2837 {
2838 "format-version" : 2,
2839 "table-uuid": "xxxx"
2840 }
2841 "#;
2842 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2843 Ok(())
2844 }
2845
2846 #[test]
2847 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2848 let data = r#"
2849 {
2850 "format-version" : 1
2851 }
2852 "#;
2853 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2854 Ok(())
2855 }
2856
2857 #[test]
2858 fn test_table_metadata_v3_valid_minimal() {
2859 let metadata_str =
2860 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2861
2862 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2863 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2864
2865 let schema = Schema::builder()
2866 .with_schema_id(0)
2867 .with_fields(vec![
2868 Arc::new(
2869 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2870 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2871 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2872 ),
2873 Arc::new(
2874 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2875 .with_doc("comment"),
2876 ),
2877 Arc::new(NestedField::required(
2878 3,
2879 "z",
2880 Type::Primitive(PrimitiveType::Long),
2881 )),
2882 ])
2883 .build()
2884 .unwrap();
2885
2886 let partition_spec = PartitionSpec::builder(schema.clone())
2887 .with_spec_id(0)
2888 .add_unbound_field(UnboundPartitionField {
2889 name: "x".to_string(),
2890 transform: Transform::Identity,
2891 source_id: 1,
2892 field_id: Some(1000),
2893 })
2894 .unwrap()
2895 .build()
2896 .unwrap();
2897
2898 let sort_order = SortOrder::builder()
2899 .with_order_id(3)
2900 .with_sort_field(SortField {
2901 source_id: 2,
2902 transform: Transform::Identity,
2903 direction: SortDirection::Ascending,
2904 null_order: NullOrder::First,
2905 })
2906 .with_sort_field(SortField {
2907 source_id: 3,
2908 transform: Transform::Bucket(4),
2909 direction: SortDirection::Descending,
2910 null_order: NullOrder::Last,
2911 })
2912 .build_unbound()
2913 .unwrap();
2914
2915 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2916 let expected = TableMetadata {
2917 format_version: FormatVersion::V3,
2918 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2919 location: "s3://bucket/test/location".to_string(),
2920 last_updated_ms: 1602638573590,
2921 last_column_id: 3,
2922 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2923 current_schema_id: 0,
2924 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2925 default_spec: Arc::new(partition_spec),
2926 default_partition_type,
2927 last_partition_id: 1000,
2928 default_sort_order_id: 3,
2929 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2930 snapshots: HashMap::default(),
2931 current_snapshot_id: None,
2932 last_sequence_number: 34,
2933 properties: HashMap::new(),
2934 snapshot_log: Vec::new(),
2935 metadata_log: Vec::new(),
2936 refs: HashMap::new(),
2937 statistics: HashMap::new(),
2938 partition_statistics: HashMap::new(),
2939 encryption_keys: HashMap::new(),
2940 next_row_id: 0, };
2942
2943 check_table_metadata_serde(&metadata_str, expected);
2944 }
2945
2946 #[test]
2947 fn test_table_metadata_v2_file_valid() {
2948 let metadata =
2949 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2950
2951 let schema1 = Schema::builder()
2952 .with_schema_id(0)
2953 .with_fields(vec![Arc::new(NestedField::required(
2954 1,
2955 "x",
2956 Type::Primitive(PrimitiveType::Long),
2957 ))])
2958 .build()
2959 .unwrap();
2960
2961 let schema2 = Schema::builder()
2962 .with_schema_id(1)
2963 .with_fields(vec![
2964 Arc::new(NestedField::required(
2965 1,
2966 "x",
2967 Type::Primitive(PrimitiveType::Long),
2968 )),
2969 Arc::new(
2970 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2971 .with_doc("comment"),
2972 ),
2973 Arc::new(NestedField::required(
2974 3,
2975 "z",
2976 Type::Primitive(PrimitiveType::Long),
2977 )),
2978 ])
2979 .with_identifier_field_ids(vec![1, 2])
2980 .build()
2981 .unwrap();
2982
2983 let partition_spec = PartitionSpec::builder(schema2.clone())
2984 .with_spec_id(0)
2985 .add_unbound_field(UnboundPartitionField {
2986 name: "x".to_string(),
2987 transform: Transform::Identity,
2988 source_id: 1,
2989 field_id: Some(1000),
2990 })
2991 .unwrap()
2992 .build()
2993 .unwrap();
2994
2995 let sort_order = SortOrder::builder()
2996 .with_order_id(3)
2997 .with_sort_field(SortField {
2998 source_id: 2,
2999 transform: Transform::Identity,
3000 direction: SortDirection::Ascending,
3001 null_order: NullOrder::First,
3002 })
3003 .with_sort_field(SortField {
3004 source_id: 3,
3005 transform: Transform::Bucket(4),
3006 direction: SortDirection::Descending,
3007 null_order: NullOrder::Last,
3008 })
3009 .build_unbound()
3010 .unwrap();
3011
3012 let snapshot1 = Snapshot::builder()
3013 .with_snapshot_id(3051729675574597004)
3014 .with_timestamp_ms(1515100955770)
3015 .with_sequence_number(0)
3016 .with_manifest_list("s3://a/b/1.avro")
3017 .with_summary(Summary {
3018 operation: Operation::Append,
3019 additional_properties: HashMap::new(),
3020 })
3021 .build();
3022
3023 let snapshot2 = Snapshot::builder()
3024 .with_snapshot_id(3055729675574597004)
3025 .with_parent_snapshot_id(Some(3051729675574597004))
3026 .with_timestamp_ms(1555100955770)
3027 .with_sequence_number(1)
3028 .with_schema_id(1)
3029 .with_manifest_list("s3://a/b/2.avro")
3030 .with_summary(Summary {
3031 operation: Operation::Append,
3032 additional_properties: HashMap::new(),
3033 })
3034 .build();
3035
3036 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3037 let expected = TableMetadata {
3038 format_version: FormatVersion::V2,
3039 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3040 location: "s3://bucket/test/location".to_string(),
3041 last_updated_ms: 1602638573590,
3042 last_column_id: 3,
3043 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3044 current_schema_id: 1,
3045 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3046 default_spec: Arc::new(partition_spec),
3047 default_partition_type,
3048 last_partition_id: 1000,
3049 default_sort_order_id: 3,
3050 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3051 snapshots: HashMap::from_iter(vec![
3052 (3051729675574597004, Arc::new(snapshot1)),
3053 (3055729675574597004, Arc::new(snapshot2)),
3054 ]),
3055 current_snapshot_id: Some(3055729675574597004),
3056 last_sequence_number: 34,
3057 properties: HashMap::new(),
3058 snapshot_log: vec![
3059 SnapshotLog {
3060 snapshot_id: 3051729675574597004,
3061 timestamp_ms: 1515100955770,
3062 },
3063 SnapshotLog {
3064 snapshot_id: 3055729675574597004,
3065 timestamp_ms: 1555100955770,
3066 },
3067 ],
3068 metadata_log: Vec::new(),
3069 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3070 snapshot_id: 3055729675574597004,
3071 retention: SnapshotRetention::Branch {
3072 min_snapshots_to_keep: None,
3073 max_snapshot_age_ms: None,
3074 max_ref_age_ms: None,
3075 },
3076 })]),
3077 statistics: HashMap::new(),
3078 partition_statistics: HashMap::new(),
3079 encryption_keys: HashMap::new(),
3080 next_row_id: INITIAL_ROW_ID,
3081 };
3082
3083 check_table_metadata_serde(&metadata, expected);
3084 }
3085
3086 #[test]
3087 fn test_table_metadata_v2_file_valid_minimal() {
3088 let metadata =
3089 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3090
3091 let schema = Schema::builder()
3092 .with_schema_id(0)
3093 .with_fields(vec![
3094 Arc::new(NestedField::required(
3095 1,
3096 "x",
3097 Type::Primitive(PrimitiveType::Long),
3098 )),
3099 Arc::new(
3100 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3101 .with_doc("comment"),
3102 ),
3103 Arc::new(NestedField::required(
3104 3,
3105 "z",
3106 Type::Primitive(PrimitiveType::Long),
3107 )),
3108 ])
3109 .build()
3110 .unwrap();
3111
3112 let partition_spec = PartitionSpec::builder(schema.clone())
3113 .with_spec_id(0)
3114 .add_unbound_field(UnboundPartitionField {
3115 name: "x".to_string(),
3116 transform: Transform::Identity,
3117 source_id: 1,
3118 field_id: Some(1000),
3119 })
3120 .unwrap()
3121 .build()
3122 .unwrap();
3123
3124 let sort_order = SortOrder::builder()
3125 .with_order_id(3)
3126 .with_sort_field(SortField {
3127 source_id: 2,
3128 transform: Transform::Identity,
3129 direction: SortDirection::Ascending,
3130 null_order: NullOrder::First,
3131 })
3132 .with_sort_field(SortField {
3133 source_id: 3,
3134 transform: Transform::Bucket(4),
3135 direction: SortDirection::Descending,
3136 null_order: NullOrder::Last,
3137 })
3138 .build_unbound()
3139 .unwrap();
3140
3141 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3142 let expected = TableMetadata {
3143 format_version: FormatVersion::V2,
3144 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3145 location: "s3://bucket/test/location".to_string(),
3146 last_updated_ms: 1602638573590,
3147 last_column_id: 3,
3148 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3149 current_schema_id: 0,
3150 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3151 default_partition_type,
3152 default_spec: Arc::new(partition_spec),
3153 last_partition_id: 1000,
3154 default_sort_order_id: 3,
3155 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3156 snapshots: HashMap::default(),
3157 current_snapshot_id: None,
3158 last_sequence_number: 34,
3159 properties: HashMap::new(),
3160 snapshot_log: vec![],
3161 metadata_log: Vec::new(),
3162 refs: HashMap::new(),
3163 statistics: HashMap::new(),
3164 partition_statistics: HashMap::new(),
3165 encryption_keys: HashMap::new(),
3166 next_row_id: INITIAL_ROW_ID,
3167 };
3168
3169 check_table_metadata_serde(&metadata, expected);
3170 }
3171
3172 #[test]
3173 fn test_table_metadata_v1_file_valid() {
3174 let metadata =
3175 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3176
3177 let schema = Schema::builder()
3178 .with_schema_id(0)
3179 .with_fields(vec![
3180 Arc::new(NestedField::required(
3181 1,
3182 "x",
3183 Type::Primitive(PrimitiveType::Long),
3184 )),
3185 Arc::new(
3186 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3187 .with_doc("comment"),
3188 ),
3189 Arc::new(NestedField::required(
3190 3,
3191 "z",
3192 Type::Primitive(PrimitiveType::Long),
3193 )),
3194 ])
3195 .build()
3196 .unwrap();
3197
3198 let partition_spec = PartitionSpec::builder(schema.clone())
3199 .with_spec_id(0)
3200 .add_unbound_field(UnboundPartitionField {
3201 name: "x".to_string(),
3202 transform: Transform::Identity,
3203 source_id: 1,
3204 field_id: Some(1000),
3205 })
3206 .unwrap()
3207 .build()
3208 .unwrap();
3209
3210 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3211 let expected = TableMetadata {
3212 format_version: FormatVersion::V1,
3213 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3214 location: "s3://bucket/test/location".to_string(),
3215 last_updated_ms: 1602638573874,
3216 last_column_id: 3,
3217 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3218 current_schema_id: 0,
3219 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3220 default_spec: Arc::new(partition_spec),
3221 default_partition_type,
3222 last_partition_id: 0,
3223 default_sort_order_id: 0,
3224 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3226 snapshots: HashMap::new(),
3227 current_snapshot_id: None,
3228 last_sequence_number: 0,
3229 properties: HashMap::new(),
3230 snapshot_log: vec![],
3231 metadata_log: Vec::new(),
3232 refs: HashMap::new(),
3233 statistics: HashMap::new(),
3234 partition_statistics: HashMap::new(),
3235 encryption_keys: HashMap::new(),
3236 next_row_id: INITIAL_ROW_ID,
3237 };
3238
3239 check_table_metadata_serde(&metadata, expected);
3240 }
3241
3242 #[test]
3243 fn test_table_metadata_v1_compat() {
3244 let metadata =
3245 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3246
3247 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3249 .expect("Failed to deserialize TableMetadataV1Compat.json");
3250
3251 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3253 assert_eq!(
3254 desered_type.uuid(),
3255 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3256 );
3257 assert_eq!(
3258 desered_type.location(),
3259 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3260 );
3261 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3262 assert_eq!(desered_type.current_schema_id(), 0);
3263 }
3264
3265 #[test]
3266 fn test_table_metadata_v1_schemas_without_current_id() {
3267 let metadata = fs::read_to_string(
3268 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3269 )
3270 .unwrap();
3271
3272 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3274 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3275
3276 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3278 assert_eq!(
3279 desered_type.uuid(),
3280 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3281 );
3282
3283 let schema = desered_type.current_schema();
3285 assert_eq!(schema.as_struct().fields().len(), 3);
3286 assert_eq!(schema.as_struct().fields()[0].name, "x");
3287 assert_eq!(schema.as_struct().fields()[1].name, "y");
3288 assert_eq!(schema.as_struct().fields()[2].name, "z");
3289 }
3290
3291 #[test]
3292 fn test_table_metadata_v1_no_valid_schema() {
3293 let metadata =
3294 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3295 .unwrap();
3296
3297 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3299
3300 assert!(desered.is_err());
3301 let error_message = desered.unwrap_err().to_string();
3302 assert!(
3303 error_message.contains("No valid schema configuration found"),
3304 "Expected error about no valid schema configuration, got: {error_message}"
3305 );
3306 }
3307
3308 #[test]
3309 fn test_table_metadata_v1_partition_specs_without_default_id() {
3310 let metadata = fs::read_to_string(
3311 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3312 )
3313 .unwrap();
3314
3315 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3317 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3318
3319 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3321 assert_eq!(
3322 desered_type.uuid(),
3323 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3324 );
3325
3326 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3329
3330 let default_spec = &desered_type.default_spec;
3332 assert_eq!(default_spec.spec_id(), 2);
3333 assert_eq!(default_spec.fields().len(), 1);
3334 assert_eq!(default_spec.fields()[0].name, "y");
3335 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3336 assert_eq!(default_spec.fields()[0].source_id, 2);
3337 }
3338
3339 #[test]
3340 fn test_table_metadata_v2_schema_not_found() {
3341 let metadata =
3342 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3343 .unwrap();
3344
3345 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3346
3347 assert_eq!(
3348 desered.unwrap_err().to_string(),
3349 "DataInvalid => No schema exists with the current schema id 2."
3350 )
3351 }
3352
3353 #[test]
3354 fn test_table_metadata_v2_missing_sort_order() {
3355 let metadata =
3356 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3357 .unwrap();
3358
3359 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3360
3361 assert_eq!(
3362 desered.unwrap_err().to_string(),
3363 "data did not match any variant of untagged enum TableMetadataEnum"
3364 )
3365 }
3366
3367 #[test]
3368 fn test_table_metadata_v2_missing_partition_specs() {
3369 let metadata =
3370 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3371 .unwrap();
3372
3373 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3374
3375 assert_eq!(
3376 desered.unwrap_err().to_string(),
3377 "data did not match any variant of untagged enum TableMetadataEnum"
3378 )
3379 }
3380
3381 #[test]
3382 fn test_table_metadata_v2_missing_last_partition_id() {
3383 let metadata = fs::read_to_string(
3384 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3385 )
3386 .unwrap();
3387
3388 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3389
3390 assert_eq!(
3391 desered.unwrap_err().to_string(),
3392 "data did not match any variant of untagged enum TableMetadataEnum"
3393 )
3394 }
3395
3396 #[test]
3397 fn test_table_metadata_v2_missing_schemas() {
3398 let metadata =
3399 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3400 .unwrap();
3401
3402 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3403
3404 assert_eq!(
3405 desered.unwrap_err().to_string(),
3406 "data did not match any variant of untagged enum TableMetadataEnum"
3407 )
3408 }
3409
3410 #[test]
3411 fn test_table_metadata_v2_unsupported_version() {
3412 let metadata =
3413 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3414 .unwrap();
3415
3416 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3417
3418 assert_eq!(
3419 desered.unwrap_err().to_string(),
3420 "data did not match any variant of untagged enum TableMetadataEnum"
3421 )
3422 }
3423
3424 #[test]
3425 fn test_order_of_format_version() {
3426 assert!(FormatVersion::V1 < FormatVersion::V2);
3427 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3428 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3429 }
3430
3431 #[test]
3432 fn test_default_partition_spec() {
3433 let default_spec_id = 1234;
3434 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3435 let partition_spec = PartitionSpec::unpartition_spec();
3436 table_meta_data.default_spec = partition_spec.clone().into();
3437 table_meta_data
3438 .partition_specs
3439 .insert(default_spec_id, Arc::new(partition_spec));
3440
3441 assert_eq!(
3442 (*table_meta_data.default_partition_spec().clone()).clone(),
3443 (*table_meta_data
3444 .partition_spec_by_id(default_spec_id)
3445 .unwrap()
3446 .clone())
3447 .clone()
3448 );
3449 }
3450 #[test]
3451 fn test_default_sort_order() {
3452 let default_sort_order_id = 1234;
3453 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3454 table_meta_data.default_sort_order_id = default_sort_order_id;
3455 table_meta_data
3456 .sort_orders
3457 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3458
3459 assert_eq!(
3460 table_meta_data.default_sort_order(),
3461 table_meta_data
3462 .sort_orders
3463 .get(&default_sort_order_id)
3464 .unwrap()
3465 )
3466 }
3467
3468 #[test]
3469 fn test_table_metadata_builder_from_table_creation() {
3470 let table_creation = TableCreation::builder()
3471 .location("s3://db/table".to_string())
3472 .name("table".to_string())
3473 .properties(HashMap::new())
3474 .schema(Schema::builder().build().unwrap())
3475 .build();
3476 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3477 .unwrap()
3478 .build()
3479 .unwrap()
3480 .metadata;
3481 assert_eq!(table_metadata.location, "s3://db/table");
3482 assert_eq!(table_metadata.schemas.len(), 1);
3483 assert_eq!(
3484 table_metadata
3485 .schemas
3486 .get(&0)
3487 .unwrap()
3488 .as_struct()
3489 .fields()
3490 .len(),
3491 0
3492 );
3493 assert_eq!(table_metadata.properties.len(), 0);
3494 assert_eq!(
3495 table_metadata.partition_specs,
3496 HashMap::from([(
3497 0,
3498 Arc::new(
3499 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3500 .with_spec_id(0)
3501 .build()
3502 .unwrap()
3503 )
3504 )])
3505 );
3506 assert_eq!(
3507 table_metadata.sort_orders,
3508 HashMap::from([(
3509 0,
3510 Arc::new(SortOrder {
3511 order_id: 0,
3512 fields: vec![]
3513 })
3514 )])
3515 );
3516 }
3517
3518 #[tokio::test]
3519 async fn test_table_metadata_read_write() {
3520 let temp_dir = TempDir::new().unwrap();
3522 let temp_path = temp_dir.path().to_str().unwrap();
3523
3524 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3526
3527 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3529
3530 let metadata_location = format!("{temp_path}/metadata.json");
3532
3533 original_metadata
3535 .write_to(&file_io, &metadata_location)
3536 .await
3537 .unwrap();
3538
3539 assert!(fs::metadata(&metadata_location).is_ok());
3541
3542 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3544 .await
3545 .unwrap();
3546
3547 assert_eq!(read_metadata, original_metadata);
3549 }
3550
3551 #[tokio::test]
3552 async fn test_table_metadata_read_compressed() {
3553 let temp_dir = TempDir::new().unwrap();
3554 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3555
3556 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3557 let json = serde_json::to_string(&original_metadata).unwrap();
3558
3559 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3560 encoder.write_all(json.as_bytes()).unwrap();
3561 std::fs::write(&metadata_location, encoder.finish().unwrap())
3562 .expect("failed to write metadata");
3563
3564 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3566 let metadata_location = metadata_location.to_str().unwrap();
3567 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3568 .await
3569 .unwrap();
3570
3571 assert_eq!(read_metadata, original_metadata);
3573 }
3574
3575 #[tokio::test]
3576 async fn test_table_metadata_read_nonexistent_file() {
3577 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3579
3580 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3582
3583 assert!(result.is_err());
3585 }
3586
3587 #[test]
3588 fn test_partition_name_exists() {
3589 let schema = Schema::builder()
3590 .with_fields(vec![
3591 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3592 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3593 .into(),
3594 ])
3595 .build()
3596 .unwrap();
3597
3598 let spec1 = PartitionSpec::builder(schema.clone())
3599 .with_spec_id(1)
3600 .add_partition_field("data", "data_partition", Transform::Identity)
3601 .unwrap()
3602 .build()
3603 .unwrap();
3604
3605 let spec2 = PartitionSpec::builder(schema.clone())
3606 .with_spec_id(2)
3607 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3608 .unwrap()
3609 .build()
3610 .unwrap();
3611
3612 let metadata = TableMetadataBuilder::new(
3614 schema,
3615 spec1.clone().into_unbound(),
3616 SortOrder::unsorted_order(),
3617 "s3://test/location".to_string(),
3618 FormatVersion::V2,
3619 HashMap::new(),
3620 )
3621 .unwrap()
3622 .add_partition_spec(spec2.into_unbound())
3623 .unwrap()
3624 .build()
3625 .unwrap()
3626 .metadata;
3627
3628 assert!(metadata.partition_name_exists("data_partition"));
3629 assert!(metadata.partition_name_exists("partition_bucket"));
3630
3631 assert!(!metadata.partition_name_exists("nonexistent_field"));
3632 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3634 }
3635
3636 #[test]
3637 fn test_partition_name_exists_empty_specs() {
3638 let schema = Schema::builder()
3640 .with_fields(vec![
3641 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3642 ])
3643 .build()
3644 .unwrap();
3645
3646 let metadata = TableMetadataBuilder::new(
3647 schema,
3648 PartitionSpec::unpartition_spec().into_unbound(),
3649 SortOrder::unsorted_order(),
3650 "s3://test/location".to_string(),
3651 FormatVersion::V2,
3652 HashMap::new(),
3653 )
3654 .unwrap()
3655 .build()
3656 .unwrap()
3657 .metadata;
3658
3659 assert!(!metadata.partition_name_exists("any_field"));
3660 assert!(!metadata.partition_name_exists("data"));
3661 }
3662
3663 #[test]
3664 fn test_name_exists_in_any_schema() {
3665 let schema1 = Schema::builder()
3667 .with_schema_id(1)
3668 .with_fields(vec![
3669 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3670 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3671 ])
3672 .build()
3673 .unwrap();
3674
3675 let schema2 = Schema::builder()
3676 .with_schema_id(2)
3677 .with_fields(vec![
3678 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3679 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3680 ])
3681 .build()
3682 .unwrap();
3683
3684 let metadata = TableMetadataBuilder::new(
3685 schema1,
3686 PartitionSpec::unpartition_spec().into_unbound(),
3687 SortOrder::unsorted_order(),
3688 "s3://test/location".to_string(),
3689 FormatVersion::V2,
3690 HashMap::new(),
3691 )
3692 .unwrap()
3693 .add_current_schema(schema2)
3694 .unwrap()
3695 .build()
3696 .unwrap()
3697 .metadata;
3698
3699 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3704 assert!(!metadata.name_exists_in_any_schema("field4"));
3705 assert!(!metadata.name_exists_in_any_schema(""));
3706 }
3707
3708 #[test]
3709 fn test_name_exists_in_any_schema_empty_schemas() {
3710 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3711
3712 let metadata = TableMetadataBuilder::new(
3713 schema,
3714 PartitionSpec::unpartition_spec().into_unbound(),
3715 SortOrder::unsorted_order(),
3716 "s3://test/location".to_string(),
3717 FormatVersion::V2,
3718 HashMap::new(),
3719 )
3720 .unwrap()
3721 .build()
3722 .unwrap()
3723 .metadata;
3724
3725 assert!(!metadata.name_exists_in_any_schema("any_field"));
3726 }
3727
3728 #[test]
3729 fn test_helper_methods_multi_version_scenario() {
3730 let initial_schema = Schema::builder()
3732 .with_fields(vec![
3733 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3734 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3735 NestedField::required(
3736 3,
3737 "deprecated_field",
3738 Type::Primitive(PrimitiveType::String),
3739 )
3740 .into(),
3741 ])
3742 .build()
3743 .unwrap();
3744
3745 let metadata = TableMetadataBuilder::new(
3746 initial_schema,
3747 PartitionSpec::unpartition_spec().into_unbound(),
3748 SortOrder::unsorted_order(),
3749 "s3://test/location".to_string(),
3750 FormatVersion::V2,
3751 HashMap::new(),
3752 )
3753 .unwrap();
3754
3755 let evolved_schema = Schema::builder()
3756 .with_fields(vec![
3757 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3758 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3759 NestedField::required(
3760 3,
3761 "deprecated_field",
3762 Type::Primitive(PrimitiveType::String),
3763 )
3764 .into(),
3765 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3766 .into(),
3767 ])
3768 .build()
3769 .unwrap();
3770
3771 let _final_schema = Schema::builder()
3773 .with_fields(vec![
3774 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3775 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3776 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3777 .into(),
3778 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3779 .into(),
3780 ])
3781 .build()
3782 .unwrap();
3783
3784 let final_metadata = metadata
3785 .add_current_schema(evolved_schema)
3786 .unwrap()
3787 .build()
3788 .unwrap()
3789 .metadata;
3790
3791 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3798 }
3799}