1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24 DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25 ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26 Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27 StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28 UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50 metadata: TableMetadata,
51 changes: Vec<TableUpdate>,
52 last_added_schema_id: Option<i32>,
53 last_added_spec_id: Option<i32>,
54 last_added_order_id: Option<i64>,
55 previous_history_entry: Option<MetadataLog>,
57 last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct TableMetadataBuildResult {
63 pub metadata: TableMetadata,
65 pub changes: Vec<TableUpdate>,
67 pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72 pub const LAST_ADDED: i32 = -1;
74
75 pub fn new(
80 schema: Schema,
81 spec: impl Into<UnboundPartitionSpec>,
82 sort_order: SortOrder,
83 location: String,
84 format_version: FormatVersion,
85 properties: HashMap<String, String>,
86 ) -> Result<Self> {
87 let (fresh_schema, fresh_spec, fresh_sort_order) =
89 Self::reassign_ids(schema, spec.into(), sort_order)?;
90 let schema_id = fresh_schema.schema_id();
91
92 let builder = Self {
93 metadata: TableMetadata {
94 format_version,
95 table_uuid: Uuid::now_v7(),
96 location: "".to_string(), last_sequence_number: 0,
98 last_updated_ms: 0, last_column_id: -1, current_schema_id: -1, schemas: HashMap::new(),
102 partition_specs: HashMap::new(),
103 default_spec: Arc::new(
104 PartitionSpec::unpartition_spec().with_spec_id(-1),
110 ), default_partition_type: StructType::new(vec![]),
112 last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113 properties: HashMap::new(),
114 current_snapshot_id: None,
115 snapshots: HashMap::new(),
116 snapshot_log: vec![],
117 sort_orders: HashMap::new(),
118 metadata_log: vec![],
119 default_sort_order_id: -1, refs: HashMap::default(),
121 statistics: HashMap::new(),
122 partition_statistics: HashMap::new(),
123 encryption_keys: HashMap::new(),
124 next_row_id: INITIAL_ROW_ID,
125 },
126 last_updated_ms: None,
127 changes: vec![],
128 last_added_schema_id: Some(schema_id),
129 last_added_spec_id: None,
130 last_added_order_id: None,
131 previous_history_entry: None,
132 };
133
134 builder
135 .set_location(location)
136 .add_current_schema(fresh_schema)?
137 .add_default_partition_spec(fresh_spec.into_unbound())?
138 .add_default_sort_order(fresh_sort_order)?
139 .set_properties(properties)
140 }
141
142 #[must_use]
148 pub fn new_from_metadata(
149 previous: TableMetadata,
150 current_file_location: Option<String>,
151 ) -> Self {
152 Self {
153 previous_history_entry: current_file_location.map(|l| MetadataLog {
154 metadata_file: l,
155 timestamp_ms: previous.last_updated_ms,
156 }),
157 metadata: previous,
158 changes: Vec::default(),
159 last_added_schema_id: None,
160 last_added_spec_id: None,
161 last_added_order_id: None,
162 last_updated_ms: None,
163 }
164 }
165
166 pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168 let TableCreation {
169 name: _,
170 location,
171 schema,
172 partition_spec,
173 sort_order,
174 properties,
175 format_version,
176 } = table_creation;
177
178 let location = location.ok_or_else(|| {
179 Error::new(
180 ErrorKind::DataInvalid,
181 "Can't create table without location",
182 )
183 })?;
184 let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185 spec_id: None,
186 fields: vec![],
187 });
188
189 Self::new(
190 schema,
191 partition_spec,
192 sort_order.unwrap_or(SortOrder::unsorted_order()),
193 location,
194 format_version,
195 properties,
196 )
197 }
198
199 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201 if self.metadata.table_uuid != uuid {
202 self.metadata.table_uuid = uuid;
203 self.changes.push(TableUpdate::AssignUuid { uuid });
204 }
205
206 self
207 }
208
209 pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214 if format_version < self.metadata.format_version {
215 return Err(Error::new(
216 ErrorKind::DataInvalid,
217 format!(
218 "Cannot downgrade FormatVersion from {} to {}",
219 self.metadata.format_version, format_version
220 ),
221 ));
222 }
223
224 if format_version != self.metadata.format_version {
225 match format_version {
226 FormatVersion::V1 => {
227 }
229 FormatVersion::V2 => {
230 self.metadata.format_version = format_version;
231 self.changes
232 .push(TableUpdate::UpgradeFormatVersion { format_version });
233 }
234 FormatVersion::V3 => {
235 self.metadata.format_version = format_version;
236 self.changes
237 .push(TableUpdate::UpgradeFormatVersion { format_version });
238 }
239 }
240 }
241
242 Ok(self)
243 }
244
245 pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
254 let reserved_properties = properties
256 .keys()
257 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
258 .map(ToString::to_string)
259 .collect::<Vec<_>>();
260
261 if !reserved_properties.is_empty() {
262 return Err(Error::new(
263 ErrorKind::DataInvalid,
264 format!(
265 "Table properties should not contain reserved properties, but got: [{}]",
266 reserved_properties.join(", ")
267 ),
268 ));
269 }
270
271 if properties.is_empty() {
272 return Ok(self);
273 }
274
275 self.metadata.properties.extend(properties.clone());
276 self.changes.push(TableUpdate::SetProperties {
277 updates: properties,
278 });
279
280 Ok(self)
281 }
282
283 pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
289 let properties = properties.iter().cloned().collect::<HashSet<_>>();
291
292 let reserved_properties = properties
294 .iter()
295 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
296 .map(ToString::to_string)
297 .collect::<Vec<_>>();
298
299 if !reserved_properties.is_empty() {
300 return Err(Error::new(
301 ErrorKind::DataInvalid,
302 format!(
303 "Table properties to remove contain reserved properties: [{}]",
304 reserved_properties.join(", ")
305 ),
306 ));
307 }
308
309 for property in &properties {
310 self.metadata.properties.remove(property);
311 }
312
313 if !properties.is_empty() {
314 self.changes.push(TableUpdate::RemoveProperties {
315 removals: properties.into_iter().collect(),
316 });
317 }
318
319 Ok(self)
320 }
321
322 pub fn set_location(mut self, location: String) -> Self {
324 let location = location.trim_end_matches('/').to_string();
325 if self.metadata.location != location {
326 self.changes.push(TableUpdate::SetLocation {
327 location: location.clone(),
328 });
329 self.metadata.location = location;
330 }
331
332 self
333 }
334
335 pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
344 if self
345 .metadata
346 .snapshots
347 .contains_key(&snapshot.snapshot_id())
348 {
349 return Err(Error::new(
350 ErrorKind::DataInvalid,
351 format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
352 ));
353 }
354
355 if self.metadata.format_version != FormatVersion::V1
356 && snapshot.sequence_number() <= self.metadata.last_sequence_number
357 && snapshot.parent_snapshot_id().is_some()
358 {
359 return Err(Error::new(
360 ErrorKind::DataInvalid,
361 format!(
362 "Cannot add snapshot with sequence number {} older than last sequence number {}",
363 snapshot.sequence_number(),
364 self.metadata.last_sequence_number
365 ),
366 ));
367 }
368
369 if let Some(last) = self.metadata.snapshot_log.last() {
370 if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
373 return Err(Error::new(
374 ErrorKind::DataInvalid,
375 format!(
376 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
377 snapshot.timestamp_ms(),
378 last.timestamp_ms
379 ),
380 ));
381 }
382 }
383
384 let max_last_updated = self
385 .last_updated_ms
386 .unwrap_or_default()
387 .max(self.metadata.last_updated_ms);
388 if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
389 return Err(Error::new(
390 ErrorKind::DataInvalid,
391 format!(
392 "Invalid snapshot timestamp {}: before last updated timestamp {}",
393 snapshot.timestamp_ms(),
394 max_last_updated
395 ),
396 ));
397 }
398
399 let mut added_rows = None;
400 if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
401 if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
402 if first_row_id < self.metadata.next_row_id {
403 return Err(Error::new(
404 ErrorKind::DataInvalid,
405 format!(
406 "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
407 self.metadata.next_row_id
408 ),
409 ));
410 }
411
412 added_rows = Some(added_rows_count);
413 } else {
414 return Err(Error::new(
415 ErrorKind::DataInvalid,
416 format!(
417 "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
418 ),
419 ));
420 }
421 }
422
423 if let Some(added_rows) = added_rows {
424 self.metadata.next_row_id = self
425 .metadata
426 .next_row_id
427 .checked_add(added_rows)
428 .ok_or_else(|| {
429 Error::new(
430 ErrorKind::DataInvalid,
431 "Cannot add snapshot: next-row-id overflowed when adding added-rows",
432 )
433 })?;
434 }
435
436 self.changes.push(TableUpdate::AddSnapshot {
438 snapshot: snapshot.clone(),
439 });
440
441 self.last_updated_ms = Some(snapshot.timestamp_ms());
442 self.metadata.last_sequence_number = snapshot.sequence_number();
443 self.metadata
444 .snapshots
445 .insert(snapshot.snapshot_id(), snapshot.into());
446
447 Ok(self)
448 }
449
450 pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
456 let reference = self.metadata.refs.get(branch).cloned();
457
458 let reference = if let Some(mut reference) = reference {
459 if !reference.is_branch() {
460 return Err(Error::new(
461 ErrorKind::DataInvalid,
462 format!("Cannot append snapshot to non-branch reference '{branch}'",),
463 ));
464 }
465
466 reference.snapshot_id = snapshot.snapshot_id();
467 reference
468 } else {
469 SnapshotReference {
470 snapshot_id: snapshot.snapshot_id(),
471 retention: SnapshotRetention::Branch {
472 min_snapshots_to_keep: None,
473 max_snapshot_age_ms: None,
474 max_ref_age_ms: None,
475 },
476 }
477 };
478
479 self.add_snapshot(snapshot)?.set_ref(branch, reference)
480 }
481
482 pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
486 let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
487
488 self.metadata.snapshots.retain(|k, _| {
489 if snapshot_ids.contains(k) {
490 removed_snapshots.push(*k);
491 false
492 } else {
493 true
494 }
495 });
496
497 if !removed_snapshots.is_empty() {
498 self.changes.push(TableUpdate::RemoveSnapshots {
499 snapshot_ids: removed_snapshots,
500 });
501 }
502
503 self.metadata
505 .refs
506 .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
507
508 self
509 }
510
511 pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
516 if self
517 .metadata
518 .refs
519 .get(ref_name)
520 .is_some_and(|snap_ref| snap_ref.eq(&reference))
521 {
522 return Ok(self);
523 }
524
525 let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
526 return Err(Error::new(
527 ErrorKind::DataInvalid,
528 format!(
529 "Cannot set '{ref_name}' to unknown snapshot: '{}'",
530 reference.snapshot_id
531 ),
532 ));
533 };
534
535 let is_added_snapshot = self.changes.iter().any(|update| {
537 matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
538 });
539 if is_added_snapshot {
540 self.last_updated_ms = Some(snapshot.timestamp_ms());
541 }
542
543 if ref_name == MAIN_BRANCH {
545 self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
546 let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
547 last_updated_ms
548 } else {
549 let last_updated_ms = chrono::Utc::now().timestamp_millis();
550 self.last_updated_ms = Some(last_updated_ms);
551 last_updated_ms
552 };
553
554 self.metadata.snapshot_log.push(SnapshotLog {
555 snapshot_id: snapshot.snapshot_id(),
556 timestamp_ms,
557 });
558 }
559
560 self.changes.push(TableUpdate::SetSnapshotRef {
561 ref_name: ref_name.to_string(),
562 reference: reference.clone(),
563 });
564 self.metadata.refs.insert(ref_name.to_string(), reference);
565
566 Ok(self)
567 }
568
569 pub fn remove_ref(mut self, ref_name: &str) -> Self {
573 if ref_name == MAIN_BRANCH {
574 self.metadata.current_snapshot_id = None;
575 self.metadata.snapshot_log.clear();
576 }
577
578 if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
579 self.changes.push(TableUpdate::RemoveSnapshotRef {
580 ref_name: ref_name.to_string(),
581 });
582 }
583
584 self
585 }
586
587 pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
589 self.metadata
590 .statistics
591 .insert(statistics.snapshot_id, statistics.clone());
592 self.changes.push(TableUpdate::SetStatistics {
593 statistics: statistics.clone(),
594 });
595 self
596 }
597
598 pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
600 let previous = self.metadata.statistics.remove(&snapshot_id);
601 if previous.is_some() {
602 self.changes
603 .push(TableUpdate::RemoveStatistics { snapshot_id });
604 }
605 self
606 }
607
608 pub fn set_partition_statistics(
610 mut self,
611 partition_statistics_file: PartitionStatisticsFile,
612 ) -> Self {
613 self.metadata.partition_statistics.insert(
614 partition_statistics_file.snapshot_id,
615 partition_statistics_file.clone(),
616 );
617 self.changes.push(TableUpdate::SetPartitionStatistics {
618 partition_statistics: partition_statistics_file,
619 });
620 self
621 }
622
623 pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
625 let previous = self.metadata.partition_statistics.remove(&snapshot_id);
626 if previous.is_some() {
627 self.changes
628 .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
629 }
630 self
631 }
632
633 pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
640 self.validate_schema_field_names(&schema)?;
642
643 let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
644 let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
645
646 if schema_found {
647 if self.last_added_schema_id != Some(new_schema_id) {
648 self.changes.push(TableUpdate::AddSchema {
649 schema: schema.clone(),
650 });
651 self.last_added_schema_id = Some(new_schema_id);
652 }
653
654 return Ok(self);
655 }
656
657 self.metadata.last_column_id =
660 std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
661
662 let schema = match new_schema_id == schema.schema_id() {
664 true => schema,
665 false => schema.with_schema_id(new_schema_id),
666 };
667
668 self.metadata
669 .schemas
670 .insert(new_schema_id, schema.clone().into());
671
672 self.changes.push(TableUpdate::AddSchema { schema });
673
674 self.last_added_schema_id = Some(new_schema_id);
675
676 Ok(self)
677 }
678
679 pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
687 if schema_id == Self::LAST_ADDED {
688 schema_id = self.last_added_schema_id.ok_or_else(|| {
689 Error::new(
690 ErrorKind::DataInvalid,
691 "Cannot set current schema to last added schema: no schema has been added.",
692 )
693 })?;
694 };
695 let schema_id = schema_id; if schema_id == self.metadata.current_schema_id {
698 return Ok(self);
699 }
700
701 let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
702 Error::new(
703 ErrorKind::DataInvalid,
704 format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
705 )
706 })?;
707
708 self.metadata.current_schema_id = schema_id;
714
715 if self.last_added_schema_id == Some(schema_id) {
716 self.changes.push(TableUpdate::SetCurrentSchema {
717 schema_id: Self::LAST_ADDED,
718 });
719 } else {
720 self.changes
721 .push(TableUpdate::SetCurrentSchema { schema_id });
722 }
723
724 Ok(self)
725 }
726
727 pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
729 self.add_schema(schema)?
730 .set_current_schema(Self::LAST_ADDED)
731 }
732
733 fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
742 if self.metadata.schemas.is_empty() {
743 return Ok(());
744 }
745
746 for field_name in schema.field_id_to_name_map().values() {
747 let has_partition_conflict = self.metadata.partition_name_exists(field_name);
748 let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
749
750 if has_partition_conflict && is_new_field {
751 return Err(Error::new(
752 ErrorKind::DataInvalid,
753 format!(
754 "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
755 Schema evolution cannot introduce field names that match existing partition field names."
756 ),
757 ));
758 }
759 }
760
761 Ok(())
762 }
763
764 fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
774 if self.metadata.schemas.is_empty() {
775 return Ok(());
776 }
777
778 let current_schema = self.get_current_schema()?;
779 for partition_field in unbound_spec.fields() {
780 let exists_in_any_schema = self
781 .metadata
782 .name_exists_in_any_schema(&partition_field.name);
783
784 if !exists_in_any_schema {
786 continue;
787 }
788
789 if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
791 let is_identity_transform =
792 partition_field.transform == crate::spec::Transform::Identity;
793 let has_matching_source_id = schema_field.id == partition_field.source_id;
794
795 if !is_identity_transform {
796 return Err(Error::new(
797 ErrorKind::DataInvalid,
798 format!(
799 "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
800 partition_field.name
801 ),
802 ));
803 }
804
805 if !has_matching_source_id {
806 return Err(Error::new(
807 ErrorKind::DataInvalid,
808 format!(
809 "Cannot create identity partition sourced from different field in schema. \
810 Field name '{}' has id `{}` in schema but partition source id is `{}`",
811 partition_field.name, schema_field.id, partition_field.source_id
812 ),
813 ));
814 }
815 }
816 }
817
818 Ok(())
819 }
820
821 pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
832 let schema = self.get_current_schema()?.clone();
833
834 self.validate_partition_field_names(&unbound_spec)?;
836
837 let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
838 .with_last_assigned_field_id(self.metadata.last_partition_id)
839 .build()?;
840
841 let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
842 let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
843 let spec = spec.with_spec_id(new_spec_id);
844 let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
845
846 if spec_found {
847 if self.last_added_spec_id != Some(new_spec_id) {
848 self.changes
849 .push(TableUpdate::AddSpec { spec: unbound_spec });
850 self.last_added_spec_id = Some(new_spec_id);
851 }
852
853 return Ok(self);
854 }
855
856 if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
857 return Err(Error::new(
858 ErrorKind::DataInvalid,
859 "Cannot add partition spec with non-sequential field ids to format version 1 table",
860 ));
861 }
862
863 let highest_field_id = spec
864 .highest_field_id()
865 .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
866 self.metadata
867 .partition_specs
868 .insert(new_spec_id, Arc::new(spec));
869 self.changes
870 .push(TableUpdate::AddSpec { spec: unbound_spec });
871
872 self.last_added_spec_id = Some(new_spec_id);
873 self.metadata.last_partition_id =
874 std::cmp::max(self.metadata.last_partition_id, highest_field_id);
875
876 Ok(self)
877 }
878
879 pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
885 if spec_id == Self::LAST_ADDED {
886 spec_id = self.last_added_spec_id.ok_or_else(|| {
887 Error::new(
888 ErrorKind::DataInvalid,
889 "Cannot set default partition spec to last added spec: no spec has been added.",
890 )
891 })?;
892 }
893
894 if self.metadata.default_spec.spec_id() == spec_id {
895 return Ok(self);
896 }
897
898 if !self.metadata.partition_specs.contains_key(&spec_id) {
899 return Err(Error::new(
900 ErrorKind::DataInvalid,
901 format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
902 ));
903 }
904
905 let schemaless_spec = self
906 .metadata
907 .partition_specs
908 .get(&spec_id)
909 .ok_or_else(|| {
910 Error::new(
911 ErrorKind::DataInvalid,
912 format!(
913 "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
914 ),
915 )
916 })?
917 .clone();
918 let spec = Arc::unwrap_or_clone(schemaless_spec);
919 let spec_type = spec.partition_type(self.get_current_schema()?)?;
920 self.metadata.default_spec = Arc::new(spec);
921 self.metadata.default_partition_type = spec_type;
922
923 if self.last_added_spec_id == Some(spec_id) {
924 self.changes.push(TableUpdate::SetDefaultSpec {
925 spec_id: Self::LAST_ADDED,
926 });
927 } else {
928 self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
929 }
930
931 Ok(self)
932 }
933
934 pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
936 self.add_partition_spec(unbound_spec)?
937 .set_default_partition_spec(Self::LAST_ADDED)
938 }
939
940 pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
947 if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
948 return Err(Error::new(
949 ErrorKind::DataInvalid,
950 "Cannot remove default partition spec",
951 ));
952 }
953
954 let mut removed_specs = Vec::with_capacity(spec_ids.len());
955 spec_ids.iter().for_each(|id| {
956 if self.metadata.partition_specs.remove(id).is_some() {
957 removed_specs.push(*id);
958 }
959 });
960
961 if !removed_specs.is_empty() {
962 self.changes.push(TableUpdate::RemovePartitionSpecs {
963 spec_ids: removed_specs,
964 });
965 }
966
967 Ok(self)
968 }
969
970 pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
981 let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
982 let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
983
984 if sort_order_found {
985 if self.last_added_order_id != Some(new_order_id) {
986 self.changes.push(TableUpdate::AddSortOrder {
987 sort_order: sort_order.clone().with_order_id(new_order_id),
988 });
989 self.last_added_order_id = Some(new_order_id);
990 }
991
992 return Ok(self);
993 }
994
995 let schema = self.get_current_schema()?.clone().as_ref().clone();
996 let sort_order = SortOrder::builder()
997 .with_order_id(new_order_id)
998 .with_fields(sort_order.fields)
999 .build(&schema)
1000 .map_err(|e| {
1001 Error::new(
1002 ErrorKind::DataInvalid,
1003 format!("Sort order to add is incompatible with current schema: {e}"),
1004 )
1005 .with_source(e)
1006 })?;
1007
1008 self.last_added_order_id = Some(new_order_id);
1009 self.metadata
1010 .sort_orders
1011 .insert(new_order_id, sort_order.clone().into());
1012 self.changes.push(TableUpdate::AddSortOrder { sort_order });
1013
1014 Ok(self)
1015 }
1016
1017 pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1023 if sort_order_id == Self::LAST_ADDED as i64 {
1024 sort_order_id = self.last_added_order_id.ok_or_else(|| {
1025 Error::new(
1026 ErrorKind::DataInvalid,
1027 "Cannot set default sort order to last added order: no order has been added.",
1028 )
1029 })?;
1030 }
1031
1032 if self.metadata.default_sort_order_id == sort_order_id {
1033 return Ok(self);
1034 }
1035
1036 if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1037 return Err(Error::new(
1038 ErrorKind::DataInvalid,
1039 format!(
1040 "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1041 ),
1042 ));
1043 }
1044
1045 self.metadata.default_sort_order_id = sort_order_id;
1046
1047 if self.last_added_order_id == Some(sort_order_id) {
1048 self.changes.push(TableUpdate::SetDefaultSortOrder {
1049 sort_order_id: Self::LAST_ADDED as i64,
1050 });
1051 } else {
1052 self.changes
1053 .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1054 }
1055
1056 Ok(self)
1057 }
1058
1059 fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1061 self.add_sort_order(sort_order)?
1062 .set_default_sort_order(Self::LAST_ADDED as i64)
1063 }
1064
1065 pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1067 let key_id = key.key_id().to_string();
1068 if self.metadata.encryption_keys.contains_key(&key_id) {
1069 return self;
1071 }
1072
1073 self.metadata.encryption_keys.insert(key_id, key.clone());
1074 self.changes.push(TableUpdate::AddEncryptionKey {
1075 encryption_key: key,
1076 });
1077 self
1078 }
1079
1080 pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1082 if self.metadata.encryption_keys.remove(key_id).is_some() {
1083 self.changes.push(TableUpdate::RemoveEncryptionKey {
1084 key_id: key_id.to_string(),
1085 });
1086 }
1087 self
1088 }
1089
1090 pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1092 self.metadata.last_updated_ms = self
1093 .last_updated_ms
1094 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1095
1096 let schema = self.get_current_schema()?.clone();
1100 let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1101
1102 self.metadata.default_spec = Arc::new(
1103 Arc::unwrap_or_clone(self.metadata.default_spec)
1104 .into_unbound()
1105 .bind(schema.clone())?,
1106 );
1107 self.metadata.default_partition_type =
1108 self.metadata.default_spec.partition_type(&schema)?;
1109 SortOrder::builder()
1110 .with_fields(sort_order.fields)
1111 .build(&schema)?;
1112
1113 self.update_snapshot_log()?;
1114 self.metadata.try_normalize()?;
1115
1116 if let Some(hist_entry) = self.previous_history_entry.take() {
1117 self.metadata.metadata_log.push(hist_entry);
1118 }
1119 let expired_metadata_logs = self.expire_metadata_log();
1120
1121 Ok(TableMetadataBuildResult {
1122 metadata: self.metadata,
1123 changes: self.changes,
1124 expired_metadata_logs,
1125 })
1126 }
1127
1128 fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1129 let max_size = self
1130 .metadata
1131 .properties
1132 .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1133 .and_then(|v| v.parse::<usize>().ok())
1134 .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1135 .max(1);
1136
1137 if self.metadata.metadata_log.len() > max_size {
1138 self.metadata
1139 .metadata_log
1140 .drain(0..self.metadata.metadata_log.len() - max_size)
1141 .collect()
1142 } else {
1143 Vec::new()
1144 }
1145 }
1146
1147 fn update_snapshot_log(&mut self) -> Result<()> {
1148 let intermediate_snapshots = self.get_intermediate_snapshots();
1149 let has_removed_snapshots = self
1150 .changes
1151 .iter()
1152 .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1153
1154 if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1155 return Ok(());
1156 }
1157
1158 let mut new_snapshot_log = Vec::new();
1159 for log_entry in &self.metadata.snapshot_log {
1160 let snapshot_id = log_entry.snapshot_id;
1161 if self.metadata.snapshots.contains_key(&snapshot_id) {
1162 if !intermediate_snapshots.contains(&snapshot_id) {
1163 new_snapshot_log.push(log_entry.clone());
1164 }
1165 } else if has_removed_snapshots {
1166 new_snapshot_log.clear();
1172 }
1173 }
1174
1175 if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1176 let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1177 if last_id != Some(current_snapshot_id) {
1178 return Err(Error::new(
1179 ErrorKind::DataInvalid,
1180 "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1181 ));
1182 }
1183 };
1184
1185 self.metadata.snapshot_log = new_snapshot_log;
1186 Ok(())
1187 }
1188
1189 fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1199 let added_snapshot_ids = self
1200 .changes
1201 .iter()
1202 .filter_map(|update| match update {
1203 TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1204 _ => None,
1205 })
1206 .collect::<HashSet<_>>();
1207
1208 self.changes
1209 .iter()
1210 .filter_map(|update| match update {
1211 TableUpdate::SetSnapshotRef {
1212 ref_name,
1213 reference,
1214 } => {
1215 if added_snapshot_ids.contains(&reference.snapshot_id)
1216 && ref_name == MAIN_BRANCH
1217 && reference.snapshot_id
1218 != self
1219 .metadata
1220 .current_snapshot_id
1221 .unwrap_or(i64::from(Self::LAST_ADDED))
1222 {
1223 Some(reference.snapshot_id)
1224 } else {
1225 None
1226 }
1227 }
1228 _ => None,
1229 })
1230 .collect()
1231 }
1232
1233 fn reassign_ids(
1234 schema: Schema,
1235 spec: UnboundPartitionSpec,
1236 sort_order: SortOrder,
1237 ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1238 let previous_id_to_name = schema.field_id_to_name_map().clone();
1240 let fresh_schema = schema
1241 .into_builder()
1242 .with_schema_id(DEFAULT_SCHEMA_ID)
1243 .with_reassigned_field_ids(FIRST_FIELD_ID)
1244 .build()?;
1245
1246 let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1248 for field in spec.fields() {
1249 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1250 Error::new(
1251 ErrorKind::DataInvalid,
1252 format!(
1253 "Cannot find source column with id {} for partition column {} in schema.",
1254 field.source_id, field.name
1255 ),
1256 )
1257 })?;
1258 fresh_spec =
1259 fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1260 }
1261 let fresh_spec = fresh_spec.build()?;
1262
1263 let mut fresh_order = SortOrder::builder();
1265 for mut field in sort_order.fields {
1266 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1267 Error::new(
1268 ErrorKind::DataInvalid,
1269 format!(
1270 "Cannot find source column with id {} for sort column in schema.",
1271 field.source_id
1272 ),
1273 )
1274 })?;
1275 let new_field_id = fresh_schema
1276 .field_by_name(source_field_name)
1277 .ok_or_else(|| {
1278 Error::new(
1279 ErrorKind::Unexpected,
1280 format!(
1281 "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1282 ),
1283 )
1284 })?.id;
1285 field.source_id = new_field_id;
1286 fresh_order.with_sort_field(field);
1287 }
1288 let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1289
1290 Ok((fresh_schema, fresh_spec, fresh_sort_order))
1291 }
1292
1293 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1294 self.metadata
1295 .schemas
1296 .iter()
1297 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1298 .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1299 }
1300
1301 fn get_highest_schema_id(&self) -> i32 {
1302 *self
1303 .metadata
1304 .schemas
1305 .keys()
1306 .max()
1307 .unwrap_or(&self.metadata.current_schema_id)
1308 }
1309
1310 fn get_current_schema(&self) -> Result<&SchemaRef> {
1311 self.metadata
1312 .schemas
1313 .get(&self.metadata.current_schema_id)
1314 .ok_or_else(|| {
1315 Error::new(
1316 ErrorKind::DataInvalid,
1317 format!(
1318 "Current schema with id '{}' not found in table metadata.",
1319 self.metadata.current_schema_id
1320 ),
1321 )
1322 })
1323 }
1324
1325 fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1326 self.metadata
1327 .sort_orders
1328 .get(&self.metadata.default_sort_order_id)
1329 .cloned()
1330 .ok_or_else(|| {
1331 Error::new(
1332 ErrorKind::DataInvalid,
1333 format!(
1334 "Default sort order with id '{}' not found in table metadata.",
1335 self.metadata.default_sort_order_id
1336 ),
1337 )
1338 })
1339 }
1340
1341 fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1343 self.metadata
1344 .partition_specs
1345 .iter()
1346 .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1347 .unwrap_or_else(|| {
1348 self.get_highest_spec_id()
1349 .map(|id| id + 1)
1350 .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1351 })
1352 }
1353
1354 fn get_highest_spec_id(&self) -> Option<i32> {
1355 self.metadata.partition_specs.keys().max().copied()
1356 }
1357
1358 fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1360 if new_sort_order.is_unsorted() {
1361 return SortOrder::unsorted_order().order_id;
1362 }
1363
1364 self.metadata
1365 .sort_orders
1366 .iter()
1367 .find_map(|(id, sort_order)| {
1368 sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1369 })
1370 .unwrap_or_else(|| {
1371 self.highest_sort_order_id()
1372 .unwrap_or(SortOrder::unsorted_order().order_id)
1373 + 1
1374 })
1375 }
1376
1377 fn highest_sort_order_id(&self) -> Option<i64> {
1378 self.metadata.sort_orders.keys().max().copied()
1379 }
1380
1381 pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1384 if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1385 return Err(Error::new(
1386 ErrorKind::DataInvalid,
1387 "Cannot remove current schema",
1388 ));
1389 }
1390
1391 if schema_id_to_remove.is_empty() {
1392 return Ok(self);
1393 }
1394
1395 let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1396 self.metadata.schemas.retain(|id, _schema| {
1397 if schema_id_to_remove.contains(id) {
1398 removed_schemas.push(*id);
1399 false
1400 } else {
1401 true
1402 }
1403 });
1404
1405 self.changes.push(TableUpdate::RemoveSchemas {
1406 schema_ids: removed_schemas,
1407 });
1408
1409 Ok(self)
1410 }
1411}
1412
1413impl From<TableMetadataBuildResult> for TableMetadata {
1414 fn from(result: TableMetadataBuildResult) -> Self {
1415 result.metadata
1416 }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421 use std::fs::File;
1422 use std::io::BufReader;
1423 use std::thread::sleep;
1424
1425 use super::*;
1426 use crate::TableIdent;
1427 use crate::io::FileIOBuilder;
1428 use crate::spec::{
1429 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1430 SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1431 Transform, Type, UnboundPartitionField,
1432 };
1433 use crate::table::Table;
1434
1435 const TEST_LOCATION: &str = "s3://bucket/test/location";
1436 const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1437
1438 fn schema() -> Schema {
1439 Schema::builder()
1440 .with_fields(vec![
1441 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1442 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1443 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1444 ])
1445 .build()
1446 .unwrap()
1447 }
1448
1449 fn sort_order() -> SortOrder {
1450 let schema = schema();
1451 SortOrder::builder()
1452 .with_order_id(1)
1453 .with_sort_field(SortField {
1454 source_id: 3,
1455 transform: Transform::Bucket(4),
1456 direction: SortDirection::Descending,
1457 null_order: NullOrder::First,
1458 })
1459 .build(&schema)
1460 .unwrap()
1461 }
1462
1463 fn partition_spec() -> UnboundPartitionSpec {
1464 UnboundPartitionSpec::builder()
1465 .with_spec_id(0)
1466 .add_partition_field(2, "y", Transform::Identity)
1467 .unwrap()
1468 .build()
1469 }
1470
1471 fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1472 TableMetadataBuilder::new(
1473 schema(),
1474 partition_spec(),
1475 sort_order(),
1476 TEST_LOCATION.to_string(),
1477 format_version,
1478 HashMap::new(),
1479 )
1480 .unwrap()
1481 .build()
1482 .unwrap()
1483 .metadata
1484 .into_builder(Some(
1485 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1486 ))
1487 }
1488
1489 #[test]
1490 fn test_minimal_build() {
1491 let metadata = TableMetadataBuilder::new(
1492 schema(),
1493 partition_spec(),
1494 sort_order(),
1495 TEST_LOCATION.to_string(),
1496 FormatVersion::V1,
1497 HashMap::new(),
1498 )
1499 .unwrap()
1500 .build()
1501 .unwrap()
1502 .metadata;
1503
1504 assert_eq!(metadata.format_version, FormatVersion::V1);
1505 assert_eq!(metadata.location, TEST_LOCATION);
1506 assert_eq!(metadata.current_schema_id, 0);
1507 assert_eq!(metadata.default_spec.spec_id(), 0);
1508 assert_eq!(metadata.default_sort_order_id, 1);
1509 assert_eq!(metadata.last_partition_id, 1000);
1510 assert_eq!(metadata.last_column_id, 3);
1511 assert_eq!(metadata.snapshots.len(), 0);
1512 assert_eq!(metadata.current_snapshot_id, None);
1513 assert_eq!(metadata.refs.len(), 0);
1514 assert_eq!(metadata.properties.len(), 0);
1515 assert_eq!(metadata.metadata_log.len(), 0);
1516 assert_eq!(metadata.last_sequence_number, 0);
1517 assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1518
1519 let _ = serde_json::to_string(&metadata).unwrap();
1521
1522 let metadata = metadata
1524 .into_builder(Some(
1525 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1526 ))
1527 .upgrade_format_version(FormatVersion::V2)
1528 .unwrap()
1529 .build()
1530 .unwrap()
1531 .metadata;
1532
1533 assert_eq!(metadata.format_version, FormatVersion::V2);
1534 let _ = serde_json::to_string(&metadata).unwrap();
1535 }
1536
1537 #[test]
1538 fn test_build_unpartitioned_unsorted() {
1539 let schema = Schema::builder().build().unwrap();
1540 let metadata = TableMetadataBuilder::new(
1541 schema.clone(),
1542 PartitionSpec::unpartition_spec(),
1543 SortOrder::unsorted_order(),
1544 TEST_LOCATION.to_string(),
1545 FormatVersion::V2,
1546 HashMap::new(),
1547 )
1548 .unwrap()
1549 .build()
1550 .unwrap()
1551 .metadata;
1552
1553 assert_eq!(metadata.format_version, FormatVersion::V2);
1554 assert_eq!(metadata.location, TEST_LOCATION);
1555 assert_eq!(metadata.current_schema_id, 0);
1556 assert_eq!(metadata.default_spec.spec_id(), 0);
1557 assert_eq!(metadata.default_sort_order_id, 0);
1558 assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1559 assert_eq!(metadata.last_column_id, 0);
1560 assert_eq!(metadata.snapshots.len(), 0);
1561 assert_eq!(metadata.current_snapshot_id, None);
1562 assert_eq!(metadata.refs.len(), 0);
1563 assert_eq!(metadata.properties.len(), 0);
1564 assert_eq!(metadata.metadata_log.len(), 0);
1565 assert_eq!(metadata.last_sequence_number, 0);
1566 }
1567
1568 #[test]
1569 fn test_reassigns_ids() {
1570 let schema = Schema::builder()
1571 .with_schema_id(10)
1572 .with_fields(vec![
1573 NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1574 NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1575 NestedField::required(
1576 13,
1577 "struct",
1578 Type::Struct(StructType::new(vec![
1579 NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1580 .into(),
1581 ])),
1582 )
1583 .into(),
1584 NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1585 ])
1586 .build()
1587 .unwrap();
1588 let spec = PartitionSpec::builder(schema.clone())
1589 .with_spec_id(20)
1590 .add_partition_field("a", "a", Transform::Identity)
1591 .unwrap()
1592 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1593 .unwrap()
1594 .build()
1595 .unwrap();
1596 let sort_order = SortOrder::builder()
1597 .with_fields(vec![SortField {
1598 source_id: 11,
1599 transform: Transform::Identity,
1600 direction: SortDirection::Ascending,
1601 null_order: NullOrder::First,
1602 }])
1603 .with_order_id(10)
1604 .build(&schema)
1605 .unwrap();
1606
1607 let (fresh_schema, fresh_spec, fresh_sort_order) =
1608 TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1609
1610 let expected_schema = Schema::builder()
1611 .with_fields(vec![
1612 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1613 NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1614 NestedField::required(
1615 3,
1616 "struct",
1617 Type::Struct(StructType::new(vec![
1618 NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1619 .into(),
1620 ])),
1621 )
1622 .into(),
1623 NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1624 ])
1625 .build()
1626 .unwrap();
1627
1628 let expected_spec = PartitionSpec::builder(expected_schema.clone())
1629 .with_spec_id(0)
1630 .add_partition_field("a", "a", Transform::Identity)
1631 .unwrap()
1632 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1633 .unwrap()
1634 .build()
1635 .unwrap();
1636
1637 let expected_sort_order = SortOrder::builder()
1638 .with_fields(vec![SortField {
1639 source_id: 1,
1640 transform: Transform::Identity,
1641 direction: SortDirection::Ascending,
1642 null_order: NullOrder::First,
1643 }])
1644 .with_order_id(1)
1645 .build(&expected_schema)
1646 .unwrap();
1647
1648 assert_eq!(fresh_schema, expected_schema);
1649 assert_eq!(fresh_spec, expected_spec);
1650 assert_eq!(fresh_sort_order, expected_sort_order);
1651 }
1652
1653 #[test]
1654 fn test_ids_are_reassigned_for_new_metadata() {
1655 let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1656
1657 let metadata = TableMetadataBuilder::new(
1658 schema,
1659 partition_spec(),
1660 sort_order(),
1661 TEST_LOCATION.to_string(),
1662 FormatVersion::V1,
1663 HashMap::new(),
1664 )
1665 .unwrap()
1666 .build()
1667 .unwrap()
1668 .metadata;
1669
1670 assert_eq!(metadata.current_schema_id, 0);
1671 assert_eq!(metadata.current_schema().schema_id(), 0);
1672 }
1673
1674 #[test]
1675 fn test_new_metadata_changes() {
1676 let changes = TableMetadataBuilder::new(
1677 schema(),
1678 partition_spec(),
1679 sort_order(),
1680 TEST_LOCATION.to_string(),
1681 FormatVersion::V1,
1682 HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1683 )
1684 .unwrap()
1685 .build()
1686 .unwrap()
1687 .changes;
1688
1689 pretty_assertions::assert_eq!(changes, vec![
1690 TableUpdate::SetLocation {
1691 location: TEST_LOCATION.to_string()
1692 },
1693 TableUpdate::AddSchema { schema: schema() },
1694 TableUpdate::SetCurrentSchema { schema_id: -1 },
1695 TableUpdate::AddSpec {
1696 spec: PartitionSpec::builder(schema())
1699 .with_spec_id(0)
1700 .add_unbound_field(UnboundPartitionField {
1701 name: "y".to_string(),
1702 transform: Transform::Identity,
1703 source_id: 2,
1704 field_id: Some(1000)
1705 })
1706 .unwrap()
1707 .build()
1708 .unwrap()
1709 .into_unbound(),
1710 },
1711 TableUpdate::SetDefaultSpec { spec_id: -1 },
1712 TableUpdate::AddSortOrder {
1713 sort_order: sort_order(),
1714 },
1715 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1716 TableUpdate::SetProperties {
1717 updates: HashMap::from_iter(vec![(
1718 "property 1".to_string(),
1719 "value 1".to_string()
1720 )]),
1721 }
1722 ]);
1723 }
1724
1725 #[test]
1726 fn test_new_metadata_changes_unpartitioned_unsorted() {
1727 let schema = Schema::builder().build().unwrap();
1728 let changes = TableMetadataBuilder::new(
1729 schema.clone(),
1730 PartitionSpec::unpartition_spec().into_unbound(),
1731 SortOrder::unsorted_order(),
1732 TEST_LOCATION.to_string(),
1733 FormatVersion::V1,
1734 HashMap::new(),
1735 )
1736 .unwrap()
1737 .build()
1738 .unwrap()
1739 .changes;
1740
1741 pretty_assertions::assert_eq!(changes, vec![
1742 TableUpdate::SetLocation {
1743 location: TEST_LOCATION.to_string()
1744 },
1745 TableUpdate::AddSchema {
1746 schema: Schema::builder().build().unwrap(),
1747 },
1748 TableUpdate::SetCurrentSchema { schema_id: -1 },
1749 TableUpdate::AddSpec {
1750 spec: PartitionSpec::builder(schema)
1753 .with_spec_id(0)
1754 .build()
1755 .unwrap()
1756 .into_unbound(),
1757 },
1758 TableUpdate::SetDefaultSpec { spec_id: -1 },
1759 TableUpdate::AddSortOrder {
1760 sort_order: SortOrder::unsorted_order(),
1761 },
1762 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1763 ]);
1764 }
1765
1766 #[test]
1767 fn test_add_partition_spec() {
1768 let builder = builder_without_changes(FormatVersion::V2);
1769
1770 let added_spec = UnboundPartitionSpec::builder()
1771 .with_spec_id(10)
1772 .add_partition_fields(vec![
1773 UnboundPartitionField {
1774 name: "y".to_string(),
1776 transform: Transform::Identity,
1777 source_id: 2,
1778 field_id: Some(1000),
1779 },
1780 UnboundPartitionField {
1781 name: "z".to_string(),
1783 transform: Transform::Identity,
1784 source_id: 3,
1785 field_id: None,
1786 },
1787 ])
1788 .unwrap()
1789 .build();
1790
1791 let build_result = builder
1792 .add_partition_spec(added_spec.clone())
1793 .unwrap()
1794 .build()
1795 .unwrap();
1796
1797 let expected_change = added_spec.with_spec_id(1);
1799 let expected_spec = PartitionSpec::builder(schema())
1800 .with_spec_id(1)
1801 .add_unbound_field(UnboundPartitionField {
1802 name: "y".to_string(),
1803 transform: Transform::Identity,
1804 source_id: 2,
1805 field_id: Some(1000),
1806 })
1807 .unwrap()
1808 .add_unbound_field(UnboundPartitionField {
1809 name: "z".to_string(),
1810 transform: Transform::Identity,
1811 source_id: 3,
1812 field_id: Some(1001),
1813 })
1814 .unwrap()
1815 .build()
1816 .unwrap();
1817
1818 assert_eq!(build_result.changes.len(), 1);
1819 assert_eq!(
1820 build_result.metadata.partition_spec_by_id(1),
1821 Some(&Arc::new(expected_spec))
1822 );
1823 assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1824 assert_eq!(build_result.metadata.last_partition_id, 1001);
1825 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1826 spec: expected_change
1827 });
1828
1829 let build_result = build_result
1831 .metadata
1832 .into_builder(Some(
1833 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1834 ))
1835 .remove_partition_specs(&[1])
1836 .unwrap()
1837 .build()
1838 .unwrap();
1839
1840 assert_eq!(build_result.changes.len(), 1);
1841 assert_eq!(build_result.metadata.partition_specs.len(), 1);
1842 assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1843 }
1844
1845 #[test]
1846 fn test_set_default_partition_spec() {
1847 let builder = builder_without_changes(FormatVersion::V2);
1848 let schema = builder.get_current_schema().unwrap().clone();
1849 let added_spec = UnboundPartitionSpec::builder()
1850 .with_spec_id(10)
1851 .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1852 .unwrap()
1853 .build();
1854
1855 let build_result = builder
1856 .add_partition_spec(added_spec.clone())
1857 .unwrap()
1858 .set_default_partition_spec(-1)
1859 .unwrap()
1860 .build()
1861 .unwrap();
1862
1863 let expected_spec = PartitionSpec::builder(schema)
1864 .with_spec_id(1)
1865 .add_unbound_field(UnboundPartitionField {
1866 name: "y_bucket[2]".to_string(),
1867 transform: Transform::Bucket(2),
1868 source_id: 1,
1869 field_id: Some(1001),
1870 })
1871 .unwrap()
1872 .build()
1873 .unwrap();
1874
1875 assert_eq!(build_result.changes.len(), 2);
1876 assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1877 assert_eq!(build_result.changes, vec![
1878 TableUpdate::AddSpec {
1879 spec: added_spec.with_spec_id(1)
1881 },
1882 TableUpdate::SetDefaultSpec { spec_id: -1 }
1883 ]);
1884 }
1885
1886 #[test]
1887 fn test_set_existing_default_partition_spec() {
1888 let builder = builder_without_changes(FormatVersion::V2);
1889 let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1891 let build_result = builder
1892 .add_partition_spec(unbound_spec.clone())
1893 .unwrap()
1894 .set_default_partition_spec(-1)
1895 .unwrap()
1896 .build()
1897 .unwrap();
1898
1899 assert_eq!(build_result.changes.len(), 2);
1900 assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1901 spec: unbound_spec.clone()
1902 });
1903 assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1904 spec_id: -1
1905 });
1906 assert_eq!(
1907 build_result.metadata.default_spec,
1908 Arc::new(
1909 unbound_spec
1910 .bind(build_result.metadata.current_schema().clone())
1911 .unwrap()
1912 )
1913 );
1914
1915 let build_result = build_result
1917 .metadata
1918 .into_builder(Some(
1919 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1920 ))
1921 .set_default_partition_spec(0)
1922 .unwrap()
1923 .build()
1924 .unwrap();
1925
1926 assert_eq!(build_result.changes.len(), 1);
1927 assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1928 spec_id: 0
1929 });
1930 assert_eq!(
1931 build_result.metadata.default_spec,
1932 Arc::new(
1933 partition_spec()
1934 .bind(build_result.metadata.current_schema().clone())
1935 .unwrap()
1936 )
1937 );
1938 }
1939
1940 #[test]
1941 fn test_add_sort_order() {
1942 let builder = builder_without_changes(FormatVersion::V2);
1943
1944 let added_sort_order = SortOrder::builder()
1945 .with_order_id(10)
1946 .with_fields(vec![SortField {
1947 source_id: 1,
1948 transform: Transform::Identity,
1949 direction: SortDirection::Ascending,
1950 null_order: NullOrder::First,
1951 }])
1952 .build(&schema())
1953 .unwrap();
1954
1955 let build_result = builder
1956 .add_sort_order(added_sort_order.clone())
1957 .unwrap()
1958 .build()
1959 .unwrap();
1960
1961 let expected_sort_order = added_sort_order.with_order_id(2);
1962
1963 assert_eq!(build_result.changes.len(), 1);
1964 assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1965 pretty_assertions::assert_eq!(
1966 build_result.metadata.sort_order_by_id(2),
1967 Some(&Arc::new(expected_sort_order.clone()))
1968 );
1969 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1970 sort_order: expected_sort_order
1971 });
1972 }
1973
1974 #[test]
1975 fn test_add_compatible_schema() {
1976 let builder = builder_without_changes(FormatVersion::V2);
1977
1978 let added_schema = Schema::builder()
1979 .with_schema_id(1)
1980 .with_fields(vec![
1981 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1982 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1983 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1984 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1985 ])
1986 .build()
1987 .unwrap();
1988
1989 let build_result = builder
1990 .add_current_schema(added_schema.clone())
1991 .unwrap()
1992 .build()
1993 .unwrap();
1994
1995 assert_eq!(build_result.changes.len(), 2);
1996 assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1997 pretty_assertions::assert_eq!(
1998 build_result.metadata.schema_by_id(1),
1999 Some(&Arc::new(added_schema.clone()))
2000 );
2001 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2002 schema: added_schema
2003 });
2004 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2005 schema_id: -1
2006 });
2007 }
2008
2009 #[test]
2010 fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2011 let builder = builder_without_changes(FormatVersion::V2);
2012
2013 let added_schema = Schema::builder()
2014 .with_schema_id(1)
2015 .with_fields(vec![
2016 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2017 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2018 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2019 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2020 ])
2021 .build()
2022 .unwrap();
2023
2024 let build_result = builder
2025 .add_schema(added_schema.clone())
2026 .unwrap()
2027 .set_current_schema(1)
2028 .unwrap()
2029 .build()
2030 .unwrap();
2031
2032 assert_eq!(build_result.changes.len(), 2);
2033 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2034 schema_id: -1
2035 });
2036 }
2037
2038 #[test]
2039 fn test_no_metadata_log_for_create_table() {
2040 let build_result = TableMetadataBuilder::new(
2041 schema(),
2042 partition_spec(),
2043 sort_order(),
2044 TEST_LOCATION.to_string(),
2045 FormatVersion::V2,
2046 HashMap::new(),
2047 )
2048 .unwrap()
2049 .build()
2050 .unwrap();
2051
2052 assert_eq!(build_result.metadata.metadata_log.len(), 0);
2053 }
2054
2055 #[test]
2056 fn test_no_metadata_log_entry_for_no_previous_location() {
2057 let metadata = builder_without_changes(FormatVersion::V2)
2059 .build()
2060 .unwrap()
2061 .metadata;
2062 assert_eq!(metadata.metadata_log.len(), 1);
2063
2064 let build_result = metadata
2065 .into_builder(None)
2066 .set_properties(HashMap::from_iter(vec![(
2067 "foo".to_string(),
2068 "bar".to_string(),
2069 )]))
2070 .unwrap()
2071 .build()
2072 .unwrap();
2073
2074 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2075 }
2076
2077 #[test]
2078 fn test_from_metadata_generates_metadata_log() {
2079 let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2080 let builder = TableMetadataBuilder::new(
2081 schema(),
2082 partition_spec(),
2083 sort_order(),
2084 TEST_LOCATION.to_string(),
2085 FormatVersion::V2,
2086 HashMap::new(),
2087 )
2088 .unwrap()
2089 .build()
2090 .unwrap()
2091 .metadata
2092 .into_builder(Some(metadata_path.to_string()));
2093
2094 let builder = builder
2095 .add_default_sort_order(SortOrder::unsorted_order())
2096 .unwrap();
2097
2098 let build_result = builder.build().unwrap();
2099
2100 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2101 assert_eq!(
2102 build_result.metadata.metadata_log[0].metadata_file,
2103 metadata_path
2104 );
2105 }
2106
2107 #[test]
2108 fn test_set_ref() {
2109 let builder = builder_without_changes(FormatVersion::V2);
2110
2111 let snapshot = Snapshot::builder()
2112 .with_snapshot_id(1)
2113 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2114 .with_sequence_number(0)
2115 .with_schema_id(0)
2116 .with_manifest_list("/snap-1.avro")
2117 .with_summary(Summary {
2118 operation: Operation::Append,
2119 additional_properties: HashMap::from_iter(vec![
2120 (
2121 "spark.app.id".to_string(),
2122 "local-1662532784305".to_string(),
2123 ),
2124 ("added-data-files".to_string(), "4".to_string()),
2125 ("added-records".to_string(), "4".to_string()),
2126 ("added-files-size".to_string(), "6001".to_string()),
2127 ]),
2128 })
2129 .build();
2130
2131 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2132
2133 assert!(
2134 builder
2135 .clone()
2136 .set_ref(MAIN_BRANCH, SnapshotReference {
2137 snapshot_id: 10,
2138 retention: SnapshotRetention::Branch {
2139 min_snapshots_to_keep: Some(10),
2140 max_snapshot_age_ms: None,
2141 max_ref_age_ms: None,
2142 },
2143 })
2144 .unwrap_err()
2145 .to_string()
2146 .contains("Cannot set 'main' to unknown snapshot: '10'")
2147 );
2148
2149 let build_result = builder
2150 .set_ref(MAIN_BRANCH, SnapshotReference {
2151 snapshot_id: 1,
2152 retention: SnapshotRetention::Branch {
2153 min_snapshots_to_keep: Some(10),
2154 max_snapshot_age_ms: None,
2155 max_ref_age_ms: None,
2156 },
2157 })
2158 .unwrap()
2159 .build()
2160 .unwrap();
2161 assert_eq!(build_result.metadata.snapshots.len(), 1);
2162 assert_eq!(
2163 build_result.metadata.snapshot_by_id(1),
2164 Some(&Arc::new(snapshot.clone()))
2165 );
2166 assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2167 snapshot_id: 1,
2168 timestamp_ms: snapshot.timestamp_ms()
2169 }])
2170 }
2171
2172 #[test]
2173 fn test_snapshot_log_skips_intermediates() {
2174 let builder = builder_without_changes(FormatVersion::V2);
2175
2176 let snapshot_1 = Snapshot::builder()
2177 .with_snapshot_id(1)
2178 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2179 .with_sequence_number(0)
2180 .with_schema_id(0)
2181 .with_manifest_list("/snap-1.avro")
2182 .with_summary(Summary {
2183 operation: Operation::Append,
2184 additional_properties: HashMap::from_iter(vec![
2185 (
2186 "spark.app.id".to_string(),
2187 "local-1662532784305".to_string(),
2188 ),
2189 ("added-data-files".to_string(), "4".to_string()),
2190 ("added-records".to_string(), "4".to_string()),
2191 ("added-files-size".to_string(), "6001".to_string()),
2192 ]),
2193 })
2194 .build();
2195
2196 let snapshot_2 = Snapshot::builder()
2197 .with_snapshot_id(2)
2198 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2199 .with_sequence_number(0)
2200 .with_schema_id(0)
2201 .with_manifest_list("/snap-1.avro")
2202 .with_summary(Summary {
2203 operation: Operation::Append,
2204 additional_properties: HashMap::from_iter(vec![
2205 (
2206 "spark.app.id".to_string(),
2207 "local-1662532784305".to_string(),
2208 ),
2209 ("added-data-files".to_string(), "4".to_string()),
2210 ("added-records".to_string(), "4".to_string()),
2211 ("added-files-size".to_string(), "6001".to_string()),
2212 ]),
2213 })
2214 .build();
2215
2216 let result = builder
2217 .add_snapshot(snapshot_1)
2218 .unwrap()
2219 .set_ref(MAIN_BRANCH, SnapshotReference {
2220 snapshot_id: 1,
2221 retention: SnapshotRetention::Branch {
2222 min_snapshots_to_keep: Some(10),
2223 max_snapshot_age_ms: None,
2224 max_ref_age_ms: None,
2225 },
2226 })
2227 .unwrap()
2228 .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2229 .unwrap()
2230 .build()
2231 .unwrap();
2232
2233 assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2234 snapshot_id: 2,
2235 timestamp_ms: snapshot_2.timestamp_ms()
2236 }]);
2237 assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2238 }
2239
2240 #[test]
2241 fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2242 let builder = builder_without_changes(FormatVersion::V2);
2243
2244 let snapshot = Snapshot::builder()
2245 .with_snapshot_id(2)
2246 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2247 .with_sequence_number(0)
2248 .with_schema_id(0)
2249 .with_manifest_list("/snap-1.avro")
2250 .with_summary(Summary {
2251 operation: Operation::Append,
2252 additional_properties: HashMap::new(),
2253 })
2254 .build();
2255
2256 let build_result = builder
2257 .set_branch_snapshot(snapshot.clone(), "new_branch")
2258 .unwrap()
2259 .build()
2260 .unwrap();
2261
2262 let reference = SnapshotReference {
2263 snapshot_id: 2,
2264 retention: SnapshotRetention::Branch {
2265 min_snapshots_to_keep: None,
2266 max_snapshot_age_ms: None,
2267 max_ref_age_ms: None,
2268 },
2269 };
2270
2271 assert_eq!(build_result.metadata.refs.len(), 1);
2272 assert_eq!(
2273 build_result.metadata.refs.get("new_branch"),
2274 Some(&reference)
2275 );
2276 assert_eq!(build_result.changes, vec![
2277 TableUpdate::AddSnapshot { snapshot },
2278 TableUpdate::SetSnapshotRef {
2279 ref_name: "new_branch".to_string(),
2280 reference
2281 }
2282 ]);
2283 }
2284
2285 #[test]
2286 fn test_cannot_add_duplicate_snapshot_id() {
2287 let builder = builder_without_changes(FormatVersion::V2);
2288
2289 let snapshot = Snapshot::builder()
2290 .with_snapshot_id(2)
2291 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2292 .with_sequence_number(0)
2293 .with_schema_id(0)
2294 .with_manifest_list("/snap-1.avro")
2295 .with_summary(Summary {
2296 operation: Operation::Append,
2297 additional_properties: HashMap::from_iter(vec![
2298 (
2299 "spark.app.id".to_string(),
2300 "local-1662532784305".to_string(),
2301 ),
2302 ("added-data-files".to_string(), "4".to_string()),
2303 ("added-records".to_string(), "4".to_string()),
2304 ("added-files-size".to_string(), "6001".to_string()),
2305 ]),
2306 })
2307 .build();
2308
2309 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2310 builder.add_snapshot(snapshot).unwrap_err();
2311 }
2312
2313 #[test]
2314 fn test_add_incompatible_current_schema_fails() {
2315 let builder = builder_without_changes(FormatVersion::V2);
2316
2317 let added_schema = Schema::builder()
2318 .with_schema_id(1)
2319 .with_fields(vec![])
2320 .build()
2321 .unwrap();
2322
2323 let err = builder
2324 .add_current_schema(added_schema)
2325 .unwrap()
2326 .build()
2327 .unwrap_err();
2328
2329 assert!(
2330 err.to_string()
2331 .contains("Cannot find partition source field")
2332 );
2333 }
2334
2335 #[test]
2336 fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2337 let builder = builder_without_changes(FormatVersion::V1);
2338
2339 let added_spec = UnboundPartitionSpec::builder()
2340 .with_spec_id(10)
2341 .add_partition_fields(vec![
2342 UnboundPartitionField {
2343 name: "y".to_string(),
2344 transform: Transform::Identity,
2345 source_id: 2,
2346 field_id: Some(1000),
2347 },
2348 UnboundPartitionField {
2349 name: "z".to_string(),
2350 transform: Transform::Identity,
2351 source_id: 3,
2352 field_id: Some(1002),
2353 },
2354 ])
2355 .unwrap()
2356 .build();
2357
2358 let err = builder.add_partition_spec(added_spec).unwrap_err();
2359 assert!(err.to_string().contains(
2360 "Cannot add partition spec with non-sequential field ids to format version 1 table"
2361 ));
2362 }
2363
2364 #[test]
2365 fn test_expire_metadata_log() {
2366 let builder = builder_without_changes(FormatVersion::V2);
2367 let metadata = builder
2368 .set_properties(HashMap::from_iter(vec![(
2369 TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2370 "2".to_string(),
2371 )]))
2372 .unwrap()
2373 .build()
2374 .unwrap();
2375 assert_eq!(metadata.metadata.metadata_log.len(), 1);
2376 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2377
2378 let metadata = metadata
2379 .metadata
2380 .into_builder(Some("path2".to_string()))
2381 .set_properties(HashMap::from_iter(vec![(
2382 "change_nr".to_string(),
2383 "1".to_string(),
2384 )]))
2385 .unwrap()
2386 .build()
2387 .unwrap();
2388
2389 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2390 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2391
2392 let metadata = metadata
2393 .metadata
2394 .into_builder(Some("path2".to_string()))
2395 .set_properties(HashMap::from_iter(vec![(
2396 "change_nr".to_string(),
2397 "2".to_string(),
2398 )]))
2399 .unwrap()
2400 .build()
2401 .unwrap();
2402 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2403 assert_eq!(metadata.expired_metadata_logs.len(), 1);
2404 }
2405
2406 #[test]
2407 fn test_v2_sequence_number_cannot_decrease() {
2408 let builder = builder_without_changes(FormatVersion::V2);
2409
2410 let snapshot = Snapshot::builder()
2411 .with_snapshot_id(1)
2412 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2413 .with_sequence_number(1)
2414 .with_schema_id(0)
2415 .with_manifest_list("/snap-1")
2416 .with_summary(Summary {
2417 operation: Operation::Append,
2418 additional_properties: HashMap::new(),
2419 })
2420 .build();
2421
2422 let builder = builder
2423 .add_snapshot(snapshot.clone())
2424 .unwrap()
2425 .set_ref(MAIN_BRANCH, SnapshotReference {
2426 snapshot_id: 1,
2427 retention: SnapshotRetention::Branch {
2428 min_snapshots_to_keep: Some(10),
2429 max_snapshot_age_ms: None,
2430 max_ref_age_ms: None,
2431 },
2432 })
2433 .unwrap();
2434
2435 let snapshot = Snapshot::builder()
2436 .with_snapshot_id(2)
2437 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2438 .with_sequence_number(0)
2439 .with_schema_id(0)
2440 .with_manifest_list("/snap-0")
2441 .with_parent_snapshot_id(Some(1))
2442 .with_summary(Summary {
2443 operation: Operation::Append,
2444 additional_properties: HashMap::new(),
2445 })
2446 .build();
2447
2448 let err = builder
2449 .set_branch_snapshot(snapshot, MAIN_BRANCH)
2450 .unwrap_err();
2451 assert!(
2452 err.to_string()
2453 .contains("Cannot add snapshot with sequence number")
2454 );
2455 }
2456
2457 #[test]
2458 fn test_default_spec_cannot_be_removed() {
2459 let builder = builder_without_changes(FormatVersion::V2);
2460
2461 builder.remove_partition_specs(&[0]).unwrap_err();
2462 }
2463
2464 #[test]
2465 fn test_statistics() {
2466 let builder = builder_without_changes(FormatVersion::V2);
2467
2468 let statistics = StatisticsFile {
2469 snapshot_id: 3055729675574597004,
2470 statistics_path: "s3://a/b/stats.puffin".to_string(),
2471 file_size_in_bytes: 413,
2472 file_footer_size_in_bytes: 42,
2473 key_metadata: None,
2474 blob_metadata: vec![BlobMetadata {
2475 snapshot_id: 3055729675574597004,
2476 sequence_number: 1,
2477 fields: vec![1],
2478 r#type: "ndv".to_string(),
2479 properties: HashMap::new(),
2480 }],
2481 };
2482 let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2483
2484 assert_eq!(
2485 build_result.metadata.statistics,
2486 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2487 );
2488 assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2489 statistics: statistics.clone()
2490 }]);
2491
2492 let builder = build_result.metadata.into_builder(None);
2494 let build_result = builder
2495 .remove_statistics(statistics.snapshot_id)
2496 .build()
2497 .unwrap();
2498
2499 assert_eq!(build_result.metadata.statistics.len(), 0);
2500 assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2501 snapshot_id: statistics.snapshot_id
2502 }]);
2503
2504 let builder = build_result.metadata.into_builder(None);
2506 let build_result = builder
2507 .remove_statistics(statistics.snapshot_id)
2508 .build()
2509 .unwrap();
2510 assert_eq!(build_result.metadata.statistics.len(), 0);
2511 assert_eq!(build_result.changes.len(), 0);
2512 }
2513
2514 #[test]
2515 fn test_add_partition_statistics() {
2516 let builder = builder_without_changes(FormatVersion::V2);
2517
2518 let statistics = PartitionStatisticsFile {
2519 snapshot_id: 3055729675574597004,
2520 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2521 file_size_in_bytes: 43,
2522 };
2523
2524 let build_result = builder
2525 .set_partition_statistics(statistics.clone())
2526 .build()
2527 .unwrap();
2528 assert_eq!(
2529 build_result.metadata.partition_statistics,
2530 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2531 );
2532 assert_eq!(build_result.changes, vec![
2533 TableUpdate::SetPartitionStatistics {
2534 partition_statistics: statistics.clone()
2535 }
2536 ]);
2537
2538 let builder = build_result.metadata.into_builder(None);
2540 let build_result = builder
2541 .remove_partition_statistics(statistics.snapshot_id)
2542 .build()
2543 .unwrap();
2544 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2545 assert_eq!(build_result.changes, vec![
2546 TableUpdate::RemovePartitionStatistics {
2547 snapshot_id: statistics.snapshot_id
2548 }
2549 ]);
2550
2551 let builder = build_result.metadata.into_builder(None);
2553 let build_result = builder
2554 .remove_partition_statistics(statistics.snapshot_id)
2555 .build()
2556 .unwrap();
2557 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2558 assert_eq!(build_result.changes.len(), 0);
2559 }
2560
2561 #[test]
2562 fn last_update_increased_for_property_only_update() {
2563 let builder = builder_without_changes(FormatVersion::V2);
2564
2565 let metadata = builder.build().unwrap().metadata;
2566 let last_updated_ms = metadata.last_updated_ms;
2567 sleep(std::time::Duration::from_millis(2));
2568
2569 let build_result = metadata
2570 .into_builder(Some(
2571 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2572 ))
2573 .set_properties(HashMap::from_iter(vec![(
2574 "foo".to_string(),
2575 "bar".to_string(),
2576 )]))
2577 .unwrap()
2578 .build()
2579 .unwrap();
2580
2581 assert!(
2582 build_result.metadata.last_updated_ms > last_updated_ms,
2583 "{} > {}",
2584 build_result.metadata.last_updated_ms,
2585 last_updated_ms
2586 );
2587 }
2588
2589 #[test]
2590 fn test_construct_default_main_branch() {
2591 let file = File::open(format!(
2593 "{}/testdata/table_metadata/{}",
2594 env!("CARGO_MANIFEST_DIR"),
2595 "TableMetadataV2Valid.json"
2596 ))
2597 .unwrap();
2598 let reader = BufReader::new(file);
2599 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2600
2601 let table = Table::builder()
2602 .metadata(resp)
2603 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2604 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2605 .file_io(FileIOBuilder::new("memory").build().unwrap())
2606 .build()
2607 .unwrap();
2608
2609 assert_eq!(
2610 table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2611 table.metadata().current_snapshot_id().unwrap()
2612 );
2613 }
2614
2615 #[test]
2616 fn test_active_schema_cannot_be_removed() {
2617 let builder = builder_without_changes(FormatVersion::V2);
2618 builder.remove_schemas(&[0]).unwrap_err();
2619 }
2620
2621 #[test]
2622 fn test_remove_schemas() {
2623 let file = File::open(format!(
2624 "{}/testdata/table_metadata/{}",
2625 env!("CARGO_MANIFEST_DIR"),
2626 "TableMetadataV2Valid.json"
2627 ))
2628 .unwrap();
2629 let reader = BufReader::new(file);
2630 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2631
2632 let table = Table::builder()
2633 .metadata(resp)
2634 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2635 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2636 .file_io(FileIOBuilder::new("memory").build().unwrap())
2637 .build()
2638 .unwrap();
2639
2640 assert_eq!(2, table.metadata().schemas.len());
2641
2642 {
2643 let meta_data_builder = table.metadata().clone().into_builder(None);
2645 meta_data_builder.remove_schemas(&[1]).unwrap_err();
2646 }
2647
2648 let mut meta_data_builder = table.metadata().clone().into_builder(None);
2649 meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2650 let build_result = meta_data_builder.build().unwrap();
2651 assert_eq!(1, build_result.metadata.schemas.len());
2652 assert_eq!(1, build_result.metadata.current_schema_id);
2653 assert_eq!(1, build_result.metadata.current_schema().schema_id());
2654 assert_eq!(1, build_result.changes.len());
2655
2656 let remove_schema_ids =
2657 if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2658 schema_ids
2659 } else {
2660 unreachable!("Expected RemoveSchema change")
2661 };
2662 assert_eq!(remove_schema_ids, &[0]);
2663 }
2664
2665 #[test]
2666 fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2667 let initial_schema = Schema::builder()
2668 .with_fields(vec![
2669 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2670 ])
2671 .build()
2672 .unwrap();
2673
2674 let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2675 .with_spec_id(0)
2676 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2677 .unwrap()
2678 .build();
2679
2680 let metadata = TableMetadataBuilder::new(
2681 initial_schema,
2682 partition_spec_with_bucket,
2683 SortOrder::unsorted_order(),
2684 TEST_LOCATION.to_string(),
2685 FormatVersion::V2,
2686 HashMap::new(),
2687 )
2688 .unwrap()
2689 .build()
2690 .unwrap()
2691 .metadata;
2692
2693 let partition_field_names: Vec<String> = metadata
2694 .default_partition_spec()
2695 .fields()
2696 .iter()
2697 .map(|f| f.name.clone())
2698 .collect();
2699 assert!(partition_field_names.contains(&"bucket_data".to_string()));
2700
2701 let evolved_schema = Schema::builder()
2702 .with_fields(vec![
2703 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2704 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2706 ])
2707 .build()
2708 .unwrap();
2709
2710 let builder = metadata.into_builder(Some(
2711 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2712 ));
2713
2714 let result = builder.add_current_schema(evolved_schema);
2716
2717 assert!(result.is_err());
2718 let error = result.unwrap_err();
2719 let error_message = error.message();
2720 assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2721 assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2722 }
2723
2724 #[test]
2725 fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2726 let initial_schema = Schema::builder()
2727 .with_fields(vec![
2728 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2729 ])
2730 .build()
2731 .unwrap();
2732
2733 let partition_spec = UnboundPartitionSpec::builder()
2734 .with_spec_id(0)
2735 .add_partition_field(1, "partition_col", Transform::Bucket(16))
2736 .unwrap()
2737 .build();
2738
2739 let metadata = TableMetadataBuilder::new(
2740 initial_schema,
2741 partition_spec,
2742 SortOrder::unsorted_order(),
2743 TEST_LOCATION.to_string(),
2744 FormatVersion::V2,
2745 HashMap::new(),
2746 )
2747 .unwrap()
2748 .build()
2749 .unwrap()
2750 .metadata;
2751
2752 let non_conflicting_schema = Schema::builder()
2753 .with_fields(vec![
2754 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2755 NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2756 ])
2757 .build()
2758 .unwrap();
2759
2760 let result = metadata
2762 .clone()
2763 .into_builder(Some("test_location".to_string()))
2764 .add_current_schema(non_conflicting_schema)
2765 .unwrap()
2766 .build();
2767
2768 assert!(result.is_ok());
2769 }
2770
2771 #[test]
2772 fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2773 let initial_schema = Schema::builder()
2774 .with_fields(vec![
2775 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2776 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2777 .into(),
2778 ])
2779 .build()
2780 .unwrap();
2781
2782 let partition_spec = UnboundPartitionSpec::builder()
2783 .with_spec_id(0)
2784 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2785 .unwrap()
2786 .build();
2787
2788 let metadata = TableMetadataBuilder::new(
2789 initial_schema,
2790 partition_spec,
2791 SortOrder::unsorted_order(),
2792 TEST_LOCATION.to_string(),
2793 FormatVersion::V2,
2794 HashMap::new(),
2795 )
2796 .unwrap()
2797 .build()
2798 .unwrap()
2799 .metadata;
2800
2801 let builder = metadata.into_builder(Some(
2802 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2803 ));
2804
2805 let conflicting_partition_spec = UnboundPartitionSpec::builder()
2806 .with_spec_id(1)
2807 .add_partition_field(1, "existing_field", Transform::Bucket(8))
2808 .unwrap()
2809 .build();
2810
2811 let result = builder.add_partition_spec(conflicting_partition_spec);
2812
2813 assert!(result.is_err());
2814 let error = result.unwrap_err();
2815 let error_message = error.message();
2816 assert!(error_message.contains(
2818 "Cannot create partition with name 'existing_field' that conflicts with schema field"
2819 ));
2820 assert!(error_message.contains("and is not an identity transform"));
2821 }
2822
2823 #[test]
2824 fn test_schema_evolution_validates_against_all_historical_schemas() {
2825 let initial_schema = Schema::builder()
2827 .with_fields(vec![
2828 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2829 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2830 .into(),
2831 ])
2832 .build()
2833 .unwrap();
2834
2835 let partition_spec = UnboundPartitionSpec::builder()
2836 .with_spec_id(0)
2837 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2838 .unwrap()
2839 .build();
2840
2841 let metadata = TableMetadataBuilder::new(
2842 initial_schema,
2843 partition_spec,
2844 SortOrder::unsorted_order(),
2845 TEST_LOCATION.to_string(),
2846 FormatVersion::V2,
2847 HashMap::new(),
2848 )
2849 .unwrap()
2850 .build()
2851 .unwrap()
2852 .metadata;
2853
2854 let second_schema = Schema::builder()
2856 .with_fields(vec![
2857 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2858 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2859 .into(),
2860 ])
2861 .build()
2862 .unwrap();
2863
2864 let metadata = metadata
2865 .into_builder(Some("test_location".to_string()))
2866 .add_current_schema(second_schema)
2867 .unwrap()
2868 .build()
2869 .unwrap()
2870 .metadata;
2871
2872 let third_schema = Schema::builder()
2876 .with_fields(vec![
2877 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2878 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2879 .into(),
2880 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2881 .into(),
2882 ])
2883 .build()
2884 .unwrap();
2885
2886 let builder = metadata
2887 .clone()
2888 .into_builder(Some("test_location".to_string()));
2889
2890 let result = builder.add_current_schema(third_schema);
2892 assert!(result.is_ok());
2893
2894 let conflicting_schema = Schema::builder()
2897 .with_fields(vec![
2898 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2899 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2900 .into(),
2901 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2902 .into(),
2903 NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2904 .into(), ])
2906 .build()
2907 .unwrap();
2908
2909 let builder2 = metadata.into_builder(Some("test_location".to_string()));
2910 let result2 = builder2.add_current_schema(conflicting_schema);
2911
2912 assert!(result2.is_err());
2915 let error = result2.unwrap_err();
2916 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2917 }
2918
2919 #[test]
2920 fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2921 let initial_schema = Schema::builder()
2923 .with_fields(vec![
2924 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2925 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2926 .into(),
2927 ])
2928 .build()
2929 .unwrap();
2930
2931 let partition_spec = UnboundPartitionSpec::builder()
2932 .with_spec_id(0)
2933 .add_partition_field(2, "partition_data", Transform::Identity)
2934 .unwrap()
2935 .build();
2936
2937 let metadata = TableMetadataBuilder::new(
2938 initial_schema,
2939 partition_spec,
2940 SortOrder::unsorted_order(),
2941 TEST_LOCATION.to_string(),
2942 FormatVersion::V2,
2943 HashMap::new(),
2944 )
2945 .unwrap()
2946 .build()
2947 .unwrap()
2948 .metadata;
2949
2950 let evolved_schema = Schema::builder()
2952 .with_fields(vec![
2953 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2954 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2955 .into(),
2956 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2957 .into(),
2958 ])
2959 .build()
2960 .unwrap();
2961
2962 let result = metadata
2964 .into_builder(Some("test_location".to_string()))
2965 .add_current_schema(evolved_schema);
2966
2967 assert!(result.is_ok());
2968 }
2969
2970 #[test]
2971 fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
2972 let initial_schema = Schema::builder()
2974 .with_fields(vec![
2975 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2976 ])
2977 .build()
2978 .unwrap();
2979
2980 let partition_spec = UnboundPartitionSpec::builder()
2981 .with_spec_id(0)
2982 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2983 .unwrap()
2984 .build();
2985
2986 let metadata = TableMetadataBuilder::new(
2987 initial_schema,
2988 partition_spec,
2989 SortOrder::unsorted_order(),
2990 TEST_LOCATION.to_string(),
2991 FormatVersion::V2,
2992 HashMap::new(),
2993 )
2994 .unwrap()
2995 .build()
2996 .unwrap()
2997 .metadata;
2998
2999 let conflicting_schema = Schema::builder()
3001 .with_fields(vec![
3002 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3003 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3005 ])
3006 .build()
3007 .unwrap();
3008
3009 let builder = metadata.into_builder(Some("test_location".to_string()));
3010 let result = builder.add_current_schema(conflicting_schema);
3011
3012 assert!(result.is_err());
3015 let error = result.unwrap_err();
3016 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3017 }
3018
3019 #[test]
3020 fn test_partition_spec_evolution_allows_non_conflicting_names() {
3021 let initial_schema = Schema::builder()
3022 .with_fields(vec![
3023 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3024 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3025 .into(),
3026 ])
3027 .build()
3028 .unwrap();
3029
3030 let partition_spec = UnboundPartitionSpec::builder()
3031 .with_spec_id(0)
3032 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3033 .unwrap()
3034 .build();
3035
3036 let metadata = TableMetadataBuilder::new(
3037 initial_schema,
3038 partition_spec,
3039 SortOrder::unsorted_order(),
3040 TEST_LOCATION.to_string(),
3041 FormatVersion::V2,
3042 HashMap::new(),
3043 )
3044 .unwrap()
3045 .build()
3046 .unwrap()
3047 .metadata;
3048
3049 let builder = metadata.into_builder(Some(
3050 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3051 ));
3052
3053 let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3055 .with_spec_id(1)
3056 .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3057 .unwrap()
3058 .build();
3059
3060 let result = builder.add_partition_spec(non_conflicting_partition_spec);
3061
3062 assert!(result.is_ok());
3063 }
3064
3065 #[test]
3066 fn test_row_lineage_addition() {
3067 let new_rows = 30;
3068 let base = builder_without_changes(FormatVersion::V3)
3069 .build()
3070 .unwrap()
3071 .metadata;
3072 let add_rows = Snapshot::builder()
3073 .with_snapshot_id(0)
3074 .with_timestamp_ms(base.last_updated_ms + 1)
3075 .with_sequence_number(0)
3076 .with_schema_id(0)
3077 .with_manifest_list("foo")
3078 .with_parent_snapshot_id(None)
3079 .with_summary(Summary {
3080 operation: Operation::Append,
3081 additional_properties: HashMap::new(),
3082 })
3083 .with_row_range(base.next_row_id(), new_rows)
3084 .build();
3085
3086 let first_addition = base
3087 .into_builder(None)
3088 .add_snapshot(add_rows.clone())
3089 .unwrap()
3090 .build()
3091 .unwrap()
3092 .metadata;
3093
3094 assert_eq!(first_addition.next_row_id(), new_rows);
3095
3096 let add_more_rows = Snapshot::builder()
3097 .with_snapshot_id(1)
3098 .with_timestamp_ms(first_addition.last_updated_ms + 1)
3099 .with_sequence_number(1)
3100 .with_schema_id(0)
3101 .with_manifest_list("foo")
3102 .with_parent_snapshot_id(Some(0))
3103 .with_summary(Summary {
3104 operation: Operation::Append,
3105 additional_properties: HashMap::new(),
3106 })
3107 .with_row_range(first_addition.next_row_id(), new_rows)
3108 .build();
3109
3110 let second_addition = first_addition
3111 .into_builder(None)
3112 .add_snapshot(add_more_rows)
3113 .unwrap()
3114 .build()
3115 .unwrap()
3116 .metadata;
3117 assert_eq!(second_addition.next_row_id(), new_rows * 2);
3118 }
3119
3120 #[test]
3121 fn test_row_lineage_invalid_snapshot() {
3122 let new_rows = 30;
3123 let base = builder_without_changes(FormatVersion::V3)
3124 .build()
3125 .unwrap()
3126 .metadata;
3127
3128 let add_rows = Snapshot::builder()
3130 .with_snapshot_id(0)
3131 .with_timestamp_ms(base.last_updated_ms + 1)
3132 .with_sequence_number(0)
3133 .with_schema_id(0)
3134 .with_manifest_list("foo")
3135 .with_parent_snapshot_id(None)
3136 .with_summary(Summary {
3137 operation: Operation::Append,
3138 additional_properties: HashMap::new(),
3139 })
3140 .with_row_range(base.next_row_id(), new_rows)
3141 .build();
3142
3143 let added = base
3144 .into_builder(None)
3145 .add_snapshot(add_rows)
3146 .unwrap()
3147 .build()
3148 .unwrap()
3149 .metadata;
3150
3151 let invalid_new_rows = Snapshot::builder()
3152 .with_snapshot_id(1)
3153 .with_timestamp_ms(added.last_updated_ms + 1)
3154 .with_sequence_number(1)
3155 .with_schema_id(0)
3156 .with_manifest_list("foo")
3157 .with_parent_snapshot_id(Some(0))
3158 .with_summary(Summary {
3159 operation: Operation::Append,
3160 additional_properties: HashMap::new(),
3161 })
3162 .with_row_range(added.next_row_id() - 1, 10)
3164 .build();
3165
3166 let err = added
3167 .into_builder(None)
3168 .add_snapshot(invalid_new_rows)
3169 .unwrap_err();
3170 assert!(
3171 err.to_string().contains(
3172 "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3173 )
3174 );
3175 }
3176
3177 #[test]
3178 fn test_row_lineage_append_branch() {
3179 let branch = "some_branch";
3183
3184 let base = builder_without_changes(FormatVersion::V3)
3186 .build()
3187 .unwrap()
3188 .metadata;
3189
3190 assert_eq!(base.next_row_id(), 0);
3192
3193 let branch_snapshot_1 = Snapshot::builder()
3195 .with_snapshot_id(1)
3196 .with_timestamp_ms(base.last_updated_ms + 1)
3197 .with_sequence_number(0)
3198 .with_schema_id(0)
3199 .with_manifest_list("foo")
3200 .with_parent_snapshot_id(None)
3201 .with_summary(Summary {
3202 operation: Operation::Append,
3203 additional_properties: HashMap::new(),
3204 })
3205 .with_row_range(base.next_row_id(), 30)
3206 .build();
3207
3208 let table_after_branch_1 = base
3209 .into_builder(None)
3210 .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3211 .unwrap()
3212 .build()
3213 .unwrap()
3214 .metadata;
3215
3216 assert!(table_after_branch_1.current_snapshot().is_none());
3218
3219 let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3221 let branch_snap_1 = table_after_branch_1
3222 .snapshots
3223 .get(&branch_ref.snapshot_id)
3224 .unwrap();
3225 assert_eq!(branch_snap_1.first_row_id(), Some(0));
3226
3227 assert_eq!(table_after_branch_1.next_row_id(), 30);
3229
3230 let main_snapshot = Snapshot::builder()
3232 .with_snapshot_id(2)
3233 .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3234 .with_sequence_number(1)
3235 .with_schema_id(0)
3236 .with_manifest_list("bar")
3237 .with_parent_snapshot_id(None)
3238 .with_summary(Summary {
3239 operation: Operation::Append,
3240 additional_properties: HashMap::new(),
3241 })
3242 .with_row_range(table_after_branch_1.next_row_id(), 28)
3243 .build();
3244
3245 let table_after_main = table_after_branch_1
3246 .into_builder(None)
3247 .add_snapshot(main_snapshot.clone())
3248 .unwrap()
3249 .set_ref(MAIN_BRANCH, SnapshotReference {
3250 snapshot_id: main_snapshot.snapshot_id(),
3251 retention: SnapshotRetention::Branch {
3252 min_snapshots_to_keep: None,
3253 max_snapshot_age_ms: None,
3254 max_ref_age_ms: None,
3255 },
3256 })
3257 .unwrap()
3258 .build()
3259 .unwrap()
3260 .metadata;
3261
3262 let current_snapshot = table_after_main.current_snapshot().unwrap();
3264 assert_eq!(current_snapshot.first_row_id(), Some(30));
3265
3266 assert_eq!(table_after_main.next_row_id(), 58);
3268
3269 let branch_snapshot_2 = Snapshot::builder()
3271 .with_snapshot_id(3)
3272 .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3273 .with_sequence_number(2)
3274 .with_schema_id(0)
3275 .with_manifest_list("baz")
3276 .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3277 .with_summary(Summary {
3278 operation: Operation::Append,
3279 additional_properties: HashMap::new(),
3280 })
3281 .with_row_range(table_after_main.next_row_id(), 21)
3282 .build();
3283
3284 let table_after_branch_2 = table_after_main
3285 .into_builder(None)
3286 .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3287 .unwrap()
3288 .build()
3289 .unwrap()
3290 .metadata;
3291
3292 let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3294 let branch_snap_2 = table_after_branch_2
3295 .snapshots
3296 .get(&branch_ref_2.snapshot_id)
3297 .unwrap();
3298 assert_eq!(branch_snap_2.first_row_id(), Some(58));
3299
3300 assert_eq!(table_after_branch_2.next_row_id(), 79);
3302 }
3303
3304 #[test]
3305 fn test_encryption_keys() {
3306 let builder = builder_without_changes(FormatVersion::V2);
3307
3308 let encryption_key_1 = EncryptedKey::builder()
3310 .key_id("key-1")
3311 .encrypted_key_metadata(vec![1, 2, 3, 4])
3312 .encrypted_by_id("encryption-service-1")
3313 .properties(HashMap::from_iter(vec![(
3314 "algorithm".to_string(),
3315 "AES-256".to_string(),
3316 )]))
3317 .build();
3318
3319 let encryption_key_2 = EncryptedKey::builder()
3320 .key_id("key-2")
3321 .encrypted_key_metadata(vec![5, 6, 7, 8])
3322 .encrypted_by_id("encryption-service-2")
3323 .properties(HashMap::new())
3324 .build();
3325
3326 let build_result = builder
3328 .add_encryption_key(encryption_key_1.clone())
3329 .build()
3330 .unwrap();
3331
3332 assert_eq!(build_result.changes.len(), 1);
3333 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3334 assert_eq!(
3335 build_result.metadata.encryption_key("key-1"),
3336 Some(&encryption_key_1)
3337 );
3338 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3339 encryption_key: encryption_key_1.clone()
3340 });
3341
3342 let build_result = build_result
3344 .metadata
3345 .into_builder(Some(
3346 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3347 ))
3348 .add_encryption_key(encryption_key_2.clone())
3349 .build()
3350 .unwrap();
3351
3352 assert_eq!(build_result.changes.len(), 1);
3353 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3354 assert_eq!(
3355 build_result.metadata.encryption_key("key-1"),
3356 Some(&encryption_key_1)
3357 );
3358 assert_eq!(
3359 build_result.metadata.encryption_key("key-2"),
3360 Some(&encryption_key_2)
3361 );
3362 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3363 encryption_key: encryption_key_2.clone()
3364 });
3365
3366 let build_result = build_result
3368 .metadata
3369 .into_builder(Some(
3370 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3371 ))
3372 .add_encryption_key(encryption_key_1.clone())
3373 .build()
3374 .unwrap();
3375
3376 assert_eq!(build_result.changes.len(), 0);
3377 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3378
3379 let build_result = build_result
3381 .metadata
3382 .into_builder(Some(
3383 "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3384 ))
3385 .remove_encryption_key("key-1")
3386 .build()
3387 .unwrap();
3388
3389 assert_eq!(build_result.changes.len(), 1);
3390 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3391 assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3392 assert_eq!(
3393 build_result.metadata.encryption_key("key-2"),
3394 Some(&encryption_key_2)
3395 );
3396 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3397 key_id: "key-1".to_string()
3398 });
3399
3400 let build_result = build_result
3402 .metadata
3403 .into_builder(Some(
3404 "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3405 ))
3406 .remove_encryption_key("non-existent-key")
3407 .build()
3408 .unwrap();
3409
3410 assert_eq!(build_result.changes.len(), 0);
3411 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3412
3413 let keys = build_result
3415 .metadata
3416 .encryption_keys_iter()
3417 .collect::<Vec<_>>();
3418 assert_eq!(keys.len(), 1);
3419 assert_eq!(keys[0], &encryption_key_2);
3420
3421 let build_result = build_result
3423 .metadata
3424 .into_builder(Some(
3425 "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3426 ))
3427 .remove_encryption_key("key-2")
3428 .build()
3429 .unwrap();
3430
3431 assert_eq!(build_result.changes.len(), 1);
3432 assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3433 assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3434 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3435 key_id: "key-2".to_string()
3436 });
3437
3438 let keys = build_result.metadata.encryption_keys_iter();
3440 assert_eq!(keys.len(), 0);
3441 }
3442}