1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24 DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25 ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26 Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27 StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28 UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34pub(crate) const FIRST_FIELD_ID: i32 = 1;
35
36#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50 metadata: TableMetadata,
51 changes: Vec<TableUpdate>,
52 last_added_schema_id: Option<i32>,
53 last_added_spec_id: Option<i32>,
54 last_added_order_id: Option<i64>,
55 previous_history_entry: Option<MetadataLog>,
57 last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct TableMetadataBuildResult {
63 pub metadata: TableMetadata,
65 pub changes: Vec<TableUpdate>,
67 pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72 pub const LAST_ADDED: i32 = -1;
74
75 pub fn new(
80 schema: Schema,
81 spec: impl Into<UnboundPartitionSpec>,
82 sort_order: SortOrder,
83 location: String,
84 format_version: FormatVersion,
85 properties: HashMap<String, String>,
86 ) -> Result<Self> {
87 let (fresh_schema, fresh_spec, fresh_sort_order) =
89 Self::reassign_ids(schema, spec.into(), sort_order)?;
90 let schema_id = fresh_schema.schema_id();
91
92 let builder = Self {
93 metadata: TableMetadata {
94 format_version,
95 table_uuid: Uuid::now_v7(),
96 location: "".to_string(), last_sequence_number: 0,
98 last_updated_ms: 0, last_column_id: -1, current_schema_id: -1, schemas: HashMap::new(),
102 partition_specs: HashMap::new(),
103 default_spec: Arc::new(
104 PartitionSpec::unpartition_spec().with_spec_id(-1),
110 ), default_partition_type: StructType::new(vec![]),
112 last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113 properties: HashMap::new(),
114 current_snapshot_id: None,
115 snapshots: HashMap::new(),
116 snapshot_log: vec![],
117 sort_orders: HashMap::new(),
118 metadata_log: vec![],
119 default_sort_order_id: -1, refs: HashMap::default(),
121 statistics: HashMap::new(),
122 partition_statistics: HashMap::new(),
123 encryption_keys: HashMap::new(),
124 next_row_id: INITIAL_ROW_ID,
125 },
126 last_updated_ms: None,
127 changes: vec![],
128 last_added_schema_id: Some(schema_id),
129 last_added_spec_id: None,
130 last_added_order_id: None,
131 previous_history_entry: None,
132 };
133
134 builder
135 .set_location(location)
136 .add_current_schema(fresh_schema)?
137 .add_default_partition_spec(fresh_spec.into_unbound())?
138 .add_default_sort_order(fresh_sort_order)?
139 .set_properties(properties)
140 }
141
142 #[must_use]
148 pub fn new_from_metadata(
149 previous: TableMetadata,
150 current_file_location: Option<String>,
151 ) -> Self {
152 Self {
153 previous_history_entry: current_file_location.map(|l| MetadataLog {
154 metadata_file: l,
155 timestamp_ms: previous.last_updated_ms,
156 }),
157 metadata: previous,
158 changes: Vec::default(),
159 last_added_schema_id: None,
160 last_added_spec_id: None,
161 last_added_order_id: None,
162 last_updated_ms: None,
163 }
164 }
165
166 pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168 let TableCreation {
169 name: _,
170 location,
171 schema,
172 partition_spec,
173 sort_order,
174 properties,
175 format_version,
176 } = table_creation;
177
178 let location = location.ok_or_else(|| {
179 Error::new(
180 ErrorKind::DataInvalid,
181 "Can't create table without location",
182 )
183 })?;
184 let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185 spec_id: None,
186 fields: vec![],
187 });
188
189 Self::new(
190 schema,
191 partition_spec,
192 sort_order.unwrap_or(SortOrder::unsorted_order()),
193 location,
194 format_version,
195 properties,
196 )
197 }
198
199 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201 if self.metadata.table_uuid != uuid {
202 self.metadata.table_uuid = uuid;
203 self.changes.push(TableUpdate::AssignUuid { uuid });
204 }
205
206 self
207 }
208
209 pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214 if format_version < self.metadata.format_version {
215 return Err(Error::new(
216 ErrorKind::DataInvalid,
217 format!(
218 "Cannot downgrade FormatVersion from {} to {}",
219 self.metadata.format_version, format_version
220 ),
221 ));
222 }
223
224 if format_version != self.metadata.format_version {
225 match format_version {
226 FormatVersion::V1 => {
227 }
229 FormatVersion::V2 => {
230 self.metadata.format_version = format_version;
231 self.changes
232 .push(TableUpdate::UpgradeFormatVersion { format_version });
233 }
234 FormatVersion::V3 => {
235 self.metadata.format_version = format_version;
236 self.metadata.next_row_id = INITIAL_ROW_ID;
238 self.changes
239 .push(TableUpdate::UpgradeFormatVersion { format_version });
240 }
241 }
242 }
243
244 Ok(self)
245 }
246
247 pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
256 let reserved_properties = properties
258 .keys()
259 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
260 .map(ToString::to_string)
261 .collect::<Vec<_>>();
262
263 if !reserved_properties.is_empty() {
264 return Err(Error::new(
265 ErrorKind::DataInvalid,
266 format!(
267 "Table properties should not contain reserved properties, but got: [{}]",
268 reserved_properties.join(", ")
269 ),
270 ));
271 }
272
273 if properties.is_empty() {
274 return Ok(self);
275 }
276
277 self.metadata.properties.extend(properties.clone());
278 self.changes.push(TableUpdate::SetProperties {
279 updates: properties,
280 });
281
282 Ok(self)
283 }
284
285 pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
291 let properties = properties.iter().cloned().collect::<HashSet<_>>();
293
294 let reserved_properties = properties
296 .iter()
297 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
298 .map(ToString::to_string)
299 .collect::<Vec<_>>();
300
301 if !reserved_properties.is_empty() {
302 return Err(Error::new(
303 ErrorKind::DataInvalid,
304 format!(
305 "Table properties to remove contain reserved properties: [{}]",
306 reserved_properties.join(", ")
307 ),
308 ));
309 }
310
311 for property in &properties {
312 self.metadata.properties.remove(property);
313 }
314
315 if !properties.is_empty() {
316 self.changes.push(TableUpdate::RemoveProperties {
317 removals: properties.into_iter().collect(),
318 });
319 }
320
321 Ok(self)
322 }
323
324 pub fn set_location(mut self, location: String) -> Self {
326 let location = location.trim_end_matches('/').to_string();
327 if self.metadata.location != location {
328 self.changes.push(TableUpdate::SetLocation {
329 location: location.clone(),
330 });
331 self.metadata.location = location;
332 }
333
334 self
335 }
336
337 pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
346 if self
347 .metadata
348 .snapshots
349 .contains_key(&snapshot.snapshot_id())
350 {
351 return Err(Error::new(
352 ErrorKind::DataInvalid,
353 format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
354 ));
355 }
356
357 if self.metadata.format_version != FormatVersion::V1
358 && snapshot.sequence_number() <= self.metadata.last_sequence_number
359 && snapshot.parent_snapshot_id().is_some()
360 {
361 return Err(Error::new(
362 ErrorKind::DataInvalid,
363 format!(
364 "Cannot add snapshot with sequence number {} older than last sequence number {}",
365 snapshot.sequence_number(),
366 self.metadata.last_sequence_number
367 ),
368 ));
369 }
370
371 if let Some(last) = self.metadata.snapshot_log.last() {
372 if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
375 return Err(Error::new(
376 ErrorKind::DataInvalid,
377 format!(
378 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
379 snapshot.timestamp_ms(),
380 last.timestamp_ms
381 ),
382 ));
383 }
384 }
385
386 let max_last_updated = self
387 .last_updated_ms
388 .unwrap_or_default()
389 .max(self.metadata.last_updated_ms);
390 if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
391 return Err(Error::new(
392 ErrorKind::DataInvalid,
393 format!(
394 "Invalid snapshot timestamp {}: before last updated timestamp {}",
395 snapshot.timestamp_ms(),
396 max_last_updated
397 ),
398 ));
399 }
400
401 let mut added_rows = None;
402 if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
403 if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
404 if first_row_id < self.metadata.next_row_id {
405 return Err(Error::new(
406 ErrorKind::DataInvalid,
407 format!(
408 "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
409 self.metadata.next_row_id
410 ),
411 ));
412 }
413
414 added_rows = Some(added_rows_count);
415 } else {
416 return Err(Error::new(
417 ErrorKind::DataInvalid,
418 format!(
419 "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
420 ),
421 ));
422 }
423 }
424
425 if let Some(added_rows) = added_rows {
426 self.metadata.next_row_id = self
427 .metadata
428 .next_row_id
429 .checked_add(added_rows)
430 .ok_or_else(|| {
431 Error::new(
432 ErrorKind::DataInvalid,
433 "Cannot add snapshot: next-row-id overflowed when adding added-rows",
434 )
435 })?;
436 }
437
438 self.changes.push(TableUpdate::AddSnapshot {
440 snapshot: snapshot.clone(),
441 });
442
443 self.last_updated_ms = Some(snapshot.timestamp_ms());
444 self.metadata.last_sequence_number = snapshot.sequence_number();
445 self.metadata
446 .snapshots
447 .insert(snapshot.snapshot_id(), snapshot.into());
448
449 Ok(self)
450 }
451
452 pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
458 let reference = self.metadata.refs.get(branch).cloned();
459
460 let reference = if let Some(mut reference) = reference {
461 if !reference.is_branch() {
462 return Err(Error::new(
463 ErrorKind::DataInvalid,
464 format!("Cannot append snapshot to non-branch reference '{branch}'",),
465 ));
466 }
467
468 reference.snapshot_id = snapshot.snapshot_id();
469 reference
470 } else {
471 SnapshotReference {
472 snapshot_id: snapshot.snapshot_id(),
473 retention: SnapshotRetention::Branch {
474 min_snapshots_to_keep: None,
475 max_snapshot_age_ms: None,
476 max_ref_age_ms: None,
477 },
478 }
479 };
480
481 self.add_snapshot(snapshot)?.set_ref(branch, reference)
482 }
483
484 pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
488 let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
489
490 self.metadata.snapshots.retain(|k, _| {
491 if snapshot_ids.contains(k) {
492 removed_snapshots.push(*k);
493 false
494 } else {
495 true
496 }
497 });
498
499 if !removed_snapshots.is_empty() {
500 self.changes.push(TableUpdate::RemoveSnapshots {
501 snapshot_ids: removed_snapshots,
502 });
503 }
504
505 self.metadata
507 .refs
508 .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
509
510 self
511 }
512
513 pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
518 if self
519 .metadata
520 .refs
521 .get(ref_name)
522 .is_some_and(|snap_ref| snap_ref.eq(&reference))
523 {
524 return Ok(self);
525 }
526
527 let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
528 return Err(Error::new(
529 ErrorKind::DataInvalid,
530 format!(
531 "Cannot set '{ref_name}' to unknown snapshot: '{}'",
532 reference.snapshot_id
533 ),
534 ));
535 };
536
537 let is_added_snapshot = self.changes.iter().any(|update| {
539 matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
540 });
541 if is_added_snapshot {
542 self.last_updated_ms = Some(snapshot.timestamp_ms());
543 }
544
545 if ref_name == MAIN_BRANCH {
547 self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
548 let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
549 last_updated_ms
550 } else {
551 let last_updated_ms = chrono::Utc::now().timestamp_millis();
552 self.last_updated_ms = Some(last_updated_ms);
553 last_updated_ms
554 };
555
556 self.metadata.snapshot_log.push(SnapshotLog {
557 snapshot_id: snapshot.snapshot_id(),
558 timestamp_ms,
559 });
560 }
561
562 self.changes.push(TableUpdate::SetSnapshotRef {
563 ref_name: ref_name.to_string(),
564 reference: reference.clone(),
565 });
566 self.metadata.refs.insert(ref_name.to_string(), reference);
567
568 Ok(self)
569 }
570
571 pub fn remove_ref(mut self, ref_name: &str) -> Self {
575 if ref_name == MAIN_BRANCH {
576 self.metadata.current_snapshot_id = None;
577 }
578
579 if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
580 self.changes.push(TableUpdate::RemoveSnapshotRef {
581 ref_name: ref_name.to_string(),
582 });
583 }
584
585 self
586 }
587
588 pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
590 self.metadata
591 .statistics
592 .insert(statistics.snapshot_id, statistics.clone());
593 self.changes.push(TableUpdate::SetStatistics {
594 statistics: statistics.clone(),
595 });
596 self
597 }
598
599 pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
601 let previous = self.metadata.statistics.remove(&snapshot_id);
602 if previous.is_some() {
603 self.changes
604 .push(TableUpdate::RemoveStatistics { snapshot_id });
605 }
606 self
607 }
608
609 pub fn set_partition_statistics(
611 mut self,
612 partition_statistics_file: PartitionStatisticsFile,
613 ) -> Self {
614 self.metadata.partition_statistics.insert(
615 partition_statistics_file.snapshot_id,
616 partition_statistics_file.clone(),
617 );
618 self.changes.push(TableUpdate::SetPartitionStatistics {
619 partition_statistics: partition_statistics_file,
620 });
621 self
622 }
623
624 pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
626 let previous = self.metadata.partition_statistics.remove(&snapshot_id);
627 if previous.is_some() {
628 self.changes
629 .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
630 }
631 self
632 }
633
634 pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
641 self.validate_schema_field_names(&schema)?;
643
644 let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
645 let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
646
647 if schema_found {
648 if self.last_added_schema_id != Some(new_schema_id) {
649 self.changes.push(TableUpdate::AddSchema {
650 schema: schema.clone(),
651 });
652 self.last_added_schema_id = Some(new_schema_id);
653 }
654
655 return Ok(self);
656 }
657
658 self.metadata.last_column_id =
661 std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
662
663 let schema = match new_schema_id == schema.schema_id() {
665 true => schema,
666 false => schema.with_schema_id(new_schema_id),
667 };
668
669 self.metadata
670 .schemas
671 .insert(new_schema_id, schema.clone().into());
672
673 self.changes.push(TableUpdate::AddSchema { schema });
674
675 self.last_added_schema_id = Some(new_schema_id);
676
677 Ok(self)
678 }
679
680 pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
688 if schema_id == Self::LAST_ADDED {
689 schema_id = self.last_added_schema_id.ok_or_else(|| {
690 Error::new(
691 ErrorKind::DataInvalid,
692 "Cannot set current schema to last added schema: no schema has been added.",
693 )
694 })?;
695 };
696 let schema_id = schema_id; if schema_id == self.metadata.current_schema_id {
699 return Ok(self);
700 }
701
702 let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
703 Error::new(
704 ErrorKind::DataInvalid,
705 format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
706 )
707 })?;
708
709 self.metadata.current_schema_id = schema_id;
715
716 if self.last_added_schema_id == Some(schema_id) {
717 self.changes.push(TableUpdate::SetCurrentSchema {
718 schema_id: Self::LAST_ADDED,
719 });
720 } else {
721 self.changes
722 .push(TableUpdate::SetCurrentSchema { schema_id });
723 }
724
725 Ok(self)
726 }
727
728 pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
730 self.add_schema(schema)?
731 .set_current_schema(Self::LAST_ADDED)
732 }
733
734 fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
743 if self.metadata.schemas.is_empty() {
744 return Ok(());
745 }
746
747 for field_name in schema.field_id_to_name_map().values() {
748 let has_partition_conflict = self.metadata.partition_name_exists(field_name);
749 let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
750
751 if has_partition_conflict && is_new_field {
752 return Err(Error::new(
753 ErrorKind::DataInvalid,
754 format!(
755 "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
756 Schema evolution cannot introduce field names that match existing partition field names."
757 ),
758 ));
759 }
760 }
761
762 Ok(())
763 }
764
765 fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
775 if self.metadata.schemas.is_empty() {
776 return Ok(());
777 }
778
779 let current_schema = self.get_current_schema()?;
780 for partition_field in unbound_spec.fields() {
781 let exists_in_any_schema = self
782 .metadata
783 .name_exists_in_any_schema(&partition_field.name);
784
785 if !exists_in_any_schema {
787 continue;
788 }
789
790 if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
792 let is_identity_transform =
793 partition_field.transform == crate::spec::Transform::Identity;
794 let has_matching_source_id = schema_field.id == partition_field.source_id;
795
796 if !is_identity_transform {
797 return Err(Error::new(
798 ErrorKind::DataInvalid,
799 format!(
800 "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
801 partition_field.name
802 ),
803 ));
804 }
805
806 if !has_matching_source_id {
807 return Err(Error::new(
808 ErrorKind::DataInvalid,
809 format!(
810 "Cannot create identity partition sourced from different field in schema. \
811 Field name '{}' has id `{}` in schema but partition source id is `{}`",
812 partition_field.name, schema_field.id, partition_field.source_id
813 ),
814 ));
815 }
816 }
817 }
818
819 Ok(())
820 }
821
822 pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
833 let schema = self.get_current_schema()?.clone();
834
835 self.validate_partition_field_names(&unbound_spec)?;
837
838 let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?;
840
841 let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
842 .with_last_assigned_field_id(self.metadata.last_partition_id)
843 .build()?;
844
845 let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
846 let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
847 let spec = spec.with_spec_id(new_spec_id);
848 let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
849
850 if spec_found {
851 if self.last_added_spec_id != Some(new_spec_id) {
852 self.changes
853 .push(TableUpdate::AddSpec { spec: unbound_spec });
854 self.last_added_spec_id = Some(new_spec_id);
855 }
856
857 return Ok(self);
858 }
859
860 if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
861 return Err(Error::new(
862 ErrorKind::DataInvalid,
863 "Cannot add partition spec with non-sequential field ids to format version 1 table",
864 ));
865 }
866
867 let highest_field_id = spec
868 .highest_field_id()
869 .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
870 self.metadata
871 .partition_specs
872 .insert(new_spec_id, Arc::new(spec));
873 self.changes
874 .push(TableUpdate::AddSpec { spec: unbound_spec });
875
876 self.last_added_spec_id = Some(new_spec_id);
877 self.metadata.last_partition_id =
878 std::cmp::max(self.metadata.last_partition_id, highest_field_id);
879
880 Ok(self)
881 }
882
883 fn reuse_partition_field_ids(
888 &self,
889 unbound_spec: UnboundPartitionSpec,
890 ) -> Result<UnboundPartitionSpec> {
891 let equivalent_field_ids: HashMap<_, _> = self
893 .metadata
894 .partition_specs
895 .values()
896 .flat_map(|spec| spec.fields())
897 .map(|field| ((field.source_id, &field.transform), field.field_id))
898 .collect();
899
900 let fields = unbound_spec
902 .fields
903 .into_iter()
904 .map(|mut field| {
905 if field.field_id.is_none()
906 && let Some(&existing_field_id) =
907 equivalent_field_ids.get(&(field.source_id, &field.transform))
908 {
909 field.field_id = Some(existing_field_id);
910 }
911 field
912 })
913 .collect();
914
915 Ok(UnboundPartitionSpec {
916 spec_id: unbound_spec.spec_id,
917 fields,
918 })
919 }
920
921 pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
927 if spec_id == Self::LAST_ADDED {
928 spec_id = self.last_added_spec_id.ok_or_else(|| {
929 Error::new(
930 ErrorKind::DataInvalid,
931 "Cannot set default partition spec to last added spec: no spec has been added.",
932 )
933 })?;
934 }
935
936 if self.metadata.default_spec.spec_id() == spec_id {
937 return Ok(self);
938 }
939
940 if !self.metadata.partition_specs.contains_key(&spec_id) {
941 return Err(Error::new(
942 ErrorKind::DataInvalid,
943 format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
944 ));
945 }
946
947 let schemaless_spec = self
948 .metadata
949 .partition_specs
950 .get(&spec_id)
951 .ok_or_else(|| {
952 Error::new(
953 ErrorKind::DataInvalid,
954 format!(
955 "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
956 ),
957 )
958 })?
959 .clone();
960 let spec = Arc::unwrap_or_clone(schemaless_spec);
961 let spec_type = spec.partition_type(self.get_current_schema()?)?;
962 self.metadata.default_spec = Arc::new(spec);
963 self.metadata.default_partition_type = spec_type;
964
965 if self.last_added_spec_id == Some(spec_id) {
966 self.changes.push(TableUpdate::SetDefaultSpec {
967 spec_id: Self::LAST_ADDED,
968 });
969 } else {
970 self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
971 }
972
973 Ok(self)
974 }
975
976 pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
978 self.add_partition_spec(unbound_spec)?
979 .set_default_partition_spec(Self::LAST_ADDED)
980 }
981
982 pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
989 if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
990 return Err(Error::new(
991 ErrorKind::DataInvalid,
992 "Cannot remove default partition spec",
993 ));
994 }
995
996 let mut removed_specs = Vec::with_capacity(spec_ids.len());
997 spec_ids.iter().for_each(|id| {
998 if self.metadata.partition_specs.remove(id).is_some() {
999 removed_specs.push(*id);
1000 }
1001 });
1002
1003 if !removed_specs.is_empty() {
1004 self.changes.push(TableUpdate::RemovePartitionSpecs {
1005 spec_ids: removed_specs,
1006 });
1007 }
1008
1009 Ok(self)
1010 }
1011
1012 pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
1023 let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
1024 let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
1025
1026 if sort_order_found {
1027 if self.last_added_order_id != Some(new_order_id) {
1028 self.changes.push(TableUpdate::AddSortOrder {
1029 sort_order: sort_order.clone().with_order_id(new_order_id),
1030 });
1031 self.last_added_order_id = Some(new_order_id);
1032 }
1033
1034 return Ok(self);
1035 }
1036
1037 let schema = self.get_current_schema()?.clone().as_ref().clone();
1038 let sort_order = SortOrder::builder()
1039 .with_order_id(new_order_id)
1040 .with_fields(sort_order.fields)
1041 .build(&schema)
1042 .map_err(|e| {
1043 Error::new(
1044 ErrorKind::DataInvalid,
1045 format!("Sort order to add is incompatible with current schema: {e}"),
1046 )
1047 .with_source(e)
1048 })?;
1049
1050 self.last_added_order_id = Some(new_order_id);
1051 self.metadata
1052 .sort_orders
1053 .insert(new_order_id, sort_order.clone().into());
1054 self.changes.push(TableUpdate::AddSortOrder { sort_order });
1055
1056 Ok(self)
1057 }
1058
1059 pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1065 if sort_order_id == Self::LAST_ADDED as i64 {
1066 sort_order_id = self.last_added_order_id.ok_or_else(|| {
1067 Error::new(
1068 ErrorKind::DataInvalid,
1069 "Cannot set default sort order to last added order: no order has been added.",
1070 )
1071 })?;
1072 }
1073
1074 if self.metadata.default_sort_order_id == sort_order_id {
1075 return Ok(self);
1076 }
1077
1078 if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1079 return Err(Error::new(
1080 ErrorKind::DataInvalid,
1081 format!(
1082 "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1083 ),
1084 ));
1085 }
1086
1087 self.metadata.default_sort_order_id = sort_order_id;
1088
1089 if self.last_added_order_id == Some(sort_order_id) {
1090 self.changes.push(TableUpdate::SetDefaultSortOrder {
1091 sort_order_id: Self::LAST_ADDED as i64,
1092 });
1093 } else {
1094 self.changes
1095 .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1096 }
1097
1098 Ok(self)
1099 }
1100
1101 fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1103 self.add_sort_order(sort_order)?
1104 .set_default_sort_order(Self::LAST_ADDED as i64)
1105 }
1106
1107 pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1109 let key_id = key.key_id().to_string();
1110 if self.metadata.encryption_keys.contains_key(&key_id) {
1111 return self;
1113 }
1114
1115 self.metadata.encryption_keys.insert(key_id, key.clone());
1116 self.changes.push(TableUpdate::AddEncryptionKey {
1117 encryption_key: key,
1118 });
1119 self
1120 }
1121
1122 pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1124 if self.metadata.encryption_keys.remove(key_id).is_some() {
1125 self.changes.push(TableUpdate::RemoveEncryptionKey {
1126 key_id: key_id.to_string(),
1127 });
1128 }
1129 self
1130 }
1131
1132 pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1134 self.metadata.last_updated_ms = self
1135 .last_updated_ms
1136 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1137
1138 let schema = self.get_current_schema()?.clone();
1142 let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1143
1144 self.metadata.default_spec = Arc::new(
1145 Arc::unwrap_or_clone(self.metadata.default_spec)
1146 .into_unbound()
1147 .bind(schema.clone())?,
1148 );
1149 self.metadata.default_partition_type =
1150 self.metadata.default_spec.partition_type(&schema)?;
1151 SortOrder::builder()
1152 .with_fields(sort_order.fields)
1153 .build(&schema)?;
1154
1155 self.update_snapshot_log()?;
1156 self.metadata.try_normalize()?;
1157
1158 if let Some(hist_entry) = self.previous_history_entry.take() {
1159 self.metadata.metadata_log.push(hist_entry);
1160 }
1161 let expired_metadata_logs = self.expire_metadata_log();
1162
1163 Ok(TableMetadataBuildResult {
1164 metadata: self.metadata,
1165 changes: self.changes,
1166 expired_metadata_logs,
1167 })
1168 }
1169
1170 fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1171 let max_size = self
1172 .metadata
1173 .properties
1174 .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1175 .and_then(|v| v.parse::<usize>().ok())
1176 .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1177 .max(1);
1178
1179 if self.metadata.metadata_log.len() > max_size {
1180 self.metadata
1181 .metadata_log
1182 .drain(0..self.metadata.metadata_log.len() - max_size)
1183 .collect()
1184 } else {
1185 Vec::new()
1186 }
1187 }
1188
1189 fn update_snapshot_log(&mut self) -> Result<()> {
1190 let intermediate_snapshots = self.get_intermediate_snapshots();
1191 let has_removed_snapshots = self
1192 .changes
1193 .iter()
1194 .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1195
1196 if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1197 return Ok(());
1198 }
1199
1200 let mut new_snapshot_log = Vec::new();
1201 for log_entry in &self.metadata.snapshot_log {
1202 let snapshot_id = log_entry.snapshot_id;
1203 if self.metadata.snapshots.contains_key(&snapshot_id) {
1204 if !intermediate_snapshots.contains(&snapshot_id) {
1205 new_snapshot_log.push(log_entry.clone());
1206 }
1207 } else if has_removed_snapshots {
1208 new_snapshot_log.clear();
1214 }
1215 }
1216
1217 if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1218 let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1219 if last_id != Some(current_snapshot_id) {
1220 return Err(Error::new(
1221 ErrorKind::DataInvalid,
1222 "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1223 ));
1224 }
1225 };
1226
1227 self.metadata.snapshot_log = new_snapshot_log;
1228 Ok(())
1229 }
1230
1231 fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1241 let added_snapshot_ids = self
1242 .changes
1243 .iter()
1244 .filter_map(|update| match update {
1245 TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1246 _ => None,
1247 })
1248 .collect::<HashSet<_>>();
1249
1250 self.changes
1251 .iter()
1252 .filter_map(|update| match update {
1253 TableUpdate::SetSnapshotRef {
1254 ref_name,
1255 reference,
1256 } => {
1257 if added_snapshot_ids.contains(&reference.snapshot_id)
1258 && ref_name == MAIN_BRANCH
1259 && reference.snapshot_id
1260 != self
1261 .metadata
1262 .current_snapshot_id
1263 .unwrap_or(i64::from(Self::LAST_ADDED))
1264 {
1265 Some(reference.snapshot_id)
1266 } else {
1267 None
1268 }
1269 }
1270 _ => None,
1271 })
1272 .collect()
1273 }
1274
1275 fn reassign_ids(
1276 schema: Schema,
1277 spec: UnboundPartitionSpec,
1278 sort_order: SortOrder,
1279 ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1280 let previous_id_to_name = schema.field_id_to_name_map().clone();
1282 let fresh_schema = schema
1283 .into_builder()
1284 .with_schema_id(DEFAULT_SCHEMA_ID)
1285 .with_reassigned_field_ids(FIRST_FIELD_ID)
1286 .build()?;
1287
1288 let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1290 for field in spec.fields() {
1291 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1292 Error::new(
1293 ErrorKind::DataInvalid,
1294 format!(
1295 "Cannot find source column with id {} for partition column {} in schema.",
1296 field.source_id, field.name
1297 ),
1298 )
1299 })?;
1300 fresh_spec =
1301 fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1302 }
1303 let fresh_spec = fresh_spec.build()?;
1304
1305 let mut fresh_order = SortOrder::builder();
1307 for mut field in sort_order.fields {
1308 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1309 Error::new(
1310 ErrorKind::DataInvalid,
1311 format!(
1312 "Cannot find source column with id {} for sort column in schema.",
1313 field.source_id
1314 ),
1315 )
1316 })?;
1317 let new_field_id = fresh_schema
1318 .field_by_name(source_field_name)
1319 .ok_or_else(|| {
1320 Error::new(
1321 ErrorKind::Unexpected,
1322 format!(
1323 "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1324 ),
1325 )
1326 })?.id;
1327 field.source_id = new_field_id;
1328 fresh_order.with_sort_field(field);
1329 }
1330 let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1331
1332 Ok((fresh_schema, fresh_spec, fresh_sort_order))
1333 }
1334
1335 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1336 self.metadata
1337 .schemas
1338 .iter()
1339 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1340 .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1341 }
1342
1343 fn get_highest_schema_id(&self) -> i32 {
1344 *self
1345 .metadata
1346 .schemas
1347 .keys()
1348 .max()
1349 .unwrap_or(&self.metadata.current_schema_id)
1350 }
1351
1352 fn get_current_schema(&self) -> Result<&SchemaRef> {
1353 self.metadata
1354 .schemas
1355 .get(&self.metadata.current_schema_id)
1356 .ok_or_else(|| {
1357 Error::new(
1358 ErrorKind::DataInvalid,
1359 format!(
1360 "Current schema with id '{}' not found in table metadata.",
1361 self.metadata.current_schema_id
1362 ),
1363 )
1364 })
1365 }
1366
1367 fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1368 self.metadata
1369 .sort_orders
1370 .get(&self.metadata.default_sort_order_id)
1371 .cloned()
1372 .ok_or_else(|| {
1373 Error::new(
1374 ErrorKind::DataInvalid,
1375 format!(
1376 "Default sort order with id '{}' not found in table metadata.",
1377 self.metadata.default_sort_order_id
1378 ),
1379 )
1380 })
1381 }
1382
1383 fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1385 self.metadata
1386 .partition_specs
1387 .iter()
1388 .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1389 .unwrap_or_else(|| {
1390 self.get_highest_spec_id()
1391 .map(|id| id + 1)
1392 .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1393 })
1394 }
1395
1396 fn get_highest_spec_id(&self) -> Option<i32> {
1397 self.metadata.partition_specs.keys().max().copied()
1398 }
1399
1400 fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1402 if new_sort_order.is_unsorted() {
1403 return SortOrder::unsorted_order().order_id;
1404 }
1405
1406 self.metadata
1407 .sort_orders
1408 .iter()
1409 .find_map(|(id, sort_order)| {
1410 sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1411 })
1412 .unwrap_or_else(|| {
1413 self.highest_sort_order_id()
1414 .unwrap_or(SortOrder::unsorted_order().order_id)
1415 + 1
1416 })
1417 }
1418
1419 fn highest_sort_order_id(&self) -> Option<i64> {
1420 self.metadata.sort_orders.keys().max().copied()
1421 }
1422
1423 pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1426 if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1427 return Err(Error::new(
1428 ErrorKind::DataInvalid,
1429 "Cannot remove current schema",
1430 ));
1431 }
1432
1433 if schema_id_to_remove.is_empty() {
1434 return Ok(self);
1435 }
1436
1437 let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1438 self.metadata.schemas.retain(|id, _schema| {
1439 if schema_id_to_remove.contains(id) {
1440 removed_schemas.push(*id);
1441 false
1442 } else {
1443 true
1444 }
1445 });
1446
1447 self.changes.push(TableUpdate::RemoveSchemas {
1448 schema_ids: removed_schemas,
1449 });
1450
1451 Ok(self)
1452 }
1453}
1454
1455impl From<TableMetadataBuildResult> for TableMetadata {
1456 fn from(result: TableMetadataBuildResult) -> Self {
1457 result.metadata
1458 }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use std::fs::File;
1464 use std::io::BufReader;
1465 use std::thread::sleep;
1466
1467 use super::*;
1468 use crate::TableIdent;
1469 use crate::io::FileIOBuilder;
1470 use crate::spec::{
1471 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1472 SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1473 Transform, Type, UnboundPartitionField,
1474 };
1475 use crate::table::Table;
1476
1477 const TEST_LOCATION: &str = "s3://bucket/test/location";
1478 const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1479
1480 fn schema() -> Schema {
1481 Schema::builder()
1482 .with_fields(vec![
1483 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1484 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1485 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1486 ])
1487 .build()
1488 .unwrap()
1489 }
1490
1491 fn sort_order() -> SortOrder {
1492 let schema = schema();
1493 SortOrder::builder()
1494 .with_order_id(1)
1495 .with_sort_field(SortField {
1496 source_id: 3,
1497 transform: Transform::Bucket(4),
1498 direction: SortDirection::Descending,
1499 null_order: NullOrder::First,
1500 })
1501 .build(&schema)
1502 .unwrap()
1503 }
1504
1505 fn partition_spec() -> UnboundPartitionSpec {
1506 UnboundPartitionSpec::builder()
1507 .with_spec_id(0)
1508 .add_partition_field(2, "y", Transform::Identity)
1509 .unwrap()
1510 .build()
1511 }
1512
1513 fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1514 TableMetadataBuilder::new(
1515 schema(),
1516 partition_spec(),
1517 sort_order(),
1518 TEST_LOCATION.to_string(),
1519 format_version,
1520 HashMap::new(),
1521 )
1522 .unwrap()
1523 .build()
1524 .unwrap()
1525 .metadata
1526 .into_builder(Some(
1527 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1528 ))
1529 }
1530
1531 #[test]
1532 fn test_minimal_build() {
1533 let metadata = TableMetadataBuilder::new(
1534 schema(),
1535 partition_spec(),
1536 sort_order(),
1537 TEST_LOCATION.to_string(),
1538 FormatVersion::V1,
1539 HashMap::new(),
1540 )
1541 .unwrap()
1542 .build()
1543 .unwrap()
1544 .metadata;
1545
1546 assert_eq!(metadata.format_version, FormatVersion::V1);
1547 assert_eq!(metadata.location, TEST_LOCATION);
1548 assert_eq!(metadata.current_schema_id, 0);
1549 assert_eq!(metadata.default_spec.spec_id(), 0);
1550 assert_eq!(metadata.default_sort_order_id, 1);
1551 assert_eq!(metadata.last_partition_id, 1000);
1552 assert_eq!(metadata.last_column_id, 3);
1553 assert_eq!(metadata.snapshots.len(), 0);
1554 assert_eq!(metadata.current_snapshot_id, None);
1555 assert_eq!(metadata.refs.len(), 0);
1556 assert_eq!(metadata.properties.len(), 0);
1557 assert_eq!(metadata.metadata_log.len(), 0);
1558 assert_eq!(metadata.last_sequence_number, 0);
1559 assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1560
1561 let _ = serde_json::to_string(&metadata).unwrap();
1563
1564 let metadata = metadata
1566 .into_builder(Some(
1567 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1568 ))
1569 .upgrade_format_version(FormatVersion::V2)
1570 .unwrap()
1571 .build()
1572 .unwrap()
1573 .metadata;
1574
1575 assert_eq!(metadata.format_version, FormatVersion::V2);
1576 let _ = serde_json::to_string(&metadata).unwrap();
1577 }
1578
1579 #[test]
1580 fn test_build_unpartitioned_unsorted() {
1581 let schema = Schema::builder().build().unwrap();
1582 let metadata = TableMetadataBuilder::new(
1583 schema.clone(),
1584 PartitionSpec::unpartition_spec(),
1585 SortOrder::unsorted_order(),
1586 TEST_LOCATION.to_string(),
1587 FormatVersion::V2,
1588 HashMap::new(),
1589 )
1590 .unwrap()
1591 .build()
1592 .unwrap()
1593 .metadata;
1594
1595 assert_eq!(metadata.format_version, FormatVersion::V2);
1596 assert_eq!(metadata.location, TEST_LOCATION);
1597 assert_eq!(metadata.current_schema_id, 0);
1598 assert_eq!(metadata.default_spec.spec_id(), 0);
1599 assert_eq!(metadata.default_sort_order_id, 0);
1600 assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1601 assert_eq!(metadata.last_column_id, 0);
1602 assert_eq!(metadata.snapshots.len(), 0);
1603 assert_eq!(metadata.current_snapshot_id, None);
1604 assert_eq!(metadata.refs.len(), 0);
1605 assert_eq!(metadata.properties.len(), 0);
1606 assert_eq!(metadata.metadata_log.len(), 0);
1607 assert_eq!(metadata.last_sequence_number, 0);
1608 }
1609
1610 #[test]
1611 fn test_reassigns_ids() {
1612 let schema = Schema::builder()
1613 .with_schema_id(10)
1614 .with_fields(vec![
1615 NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1616 NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1617 NestedField::required(
1618 13,
1619 "struct",
1620 Type::Struct(StructType::new(vec![
1621 NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1622 .into(),
1623 ])),
1624 )
1625 .into(),
1626 NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1627 ])
1628 .build()
1629 .unwrap();
1630 let spec = PartitionSpec::builder(schema.clone())
1631 .with_spec_id(20)
1632 .add_partition_field("a", "a", Transform::Identity)
1633 .unwrap()
1634 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1635 .unwrap()
1636 .build()
1637 .unwrap();
1638 let sort_order = SortOrder::builder()
1639 .with_fields(vec![SortField {
1640 source_id: 11,
1641 transform: Transform::Identity,
1642 direction: SortDirection::Ascending,
1643 null_order: NullOrder::First,
1644 }])
1645 .with_order_id(10)
1646 .build(&schema)
1647 .unwrap();
1648
1649 let (fresh_schema, fresh_spec, fresh_sort_order) =
1650 TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1651
1652 let expected_schema = Schema::builder()
1653 .with_fields(vec![
1654 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1655 NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1656 NestedField::required(
1657 3,
1658 "struct",
1659 Type::Struct(StructType::new(vec![
1660 NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1661 .into(),
1662 ])),
1663 )
1664 .into(),
1665 NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1666 ])
1667 .build()
1668 .unwrap();
1669
1670 let expected_spec = PartitionSpec::builder(expected_schema.clone())
1671 .with_spec_id(0)
1672 .add_partition_field("a", "a", Transform::Identity)
1673 .unwrap()
1674 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1675 .unwrap()
1676 .build()
1677 .unwrap();
1678
1679 let expected_sort_order = SortOrder::builder()
1680 .with_fields(vec![SortField {
1681 source_id: 1,
1682 transform: Transform::Identity,
1683 direction: SortDirection::Ascending,
1684 null_order: NullOrder::First,
1685 }])
1686 .with_order_id(1)
1687 .build(&expected_schema)
1688 .unwrap();
1689
1690 assert_eq!(fresh_schema, expected_schema);
1691 assert_eq!(fresh_spec, expected_spec);
1692 assert_eq!(fresh_sort_order, expected_sort_order);
1693 }
1694
1695 #[test]
1696 fn test_ids_are_reassigned_for_new_metadata() {
1697 let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1698
1699 let metadata = TableMetadataBuilder::new(
1700 schema,
1701 partition_spec(),
1702 sort_order(),
1703 TEST_LOCATION.to_string(),
1704 FormatVersion::V1,
1705 HashMap::new(),
1706 )
1707 .unwrap()
1708 .build()
1709 .unwrap()
1710 .metadata;
1711
1712 assert_eq!(metadata.current_schema_id, 0);
1713 assert_eq!(metadata.current_schema().schema_id(), 0);
1714 }
1715
1716 #[test]
1717 fn test_new_metadata_changes() {
1718 let changes = TableMetadataBuilder::new(
1719 schema(),
1720 partition_spec(),
1721 sort_order(),
1722 TEST_LOCATION.to_string(),
1723 FormatVersion::V1,
1724 HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1725 )
1726 .unwrap()
1727 .build()
1728 .unwrap()
1729 .changes;
1730
1731 pretty_assertions::assert_eq!(changes, vec![
1732 TableUpdate::SetLocation {
1733 location: TEST_LOCATION.to_string()
1734 },
1735 TableUpdate::AddSchema { schema: schema() },
1736 TableUpdate::SetCurrentSchema { schema_id: -1 },
1737 TableUpdate::AddSpec {
1738 spec: PartitionSpec::builder(schema())
1741 .with_spec_id(0)
1742 .add_unbound_field(UnboundPartitionField {
1743 name: "y".to_string(),
1744 transform: Transform::Identity,
1745 source_id: 2,
1746 field_id: Some(1000)
1747 })
1748 .unwrap()
1749 .build()
1750 .unwrap()
1751 .into_unbound(),
1752 },
1753 TableUpdate::SetDefaultSpec { spec_id: -1 },
1754 TableUpdate::AddSortOrder {
1755 sort_order: sort_order(),
1756 },
1757 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1758 TableUpdate::SetProperties {
1759 updates: HashMap::from_iter(vec![(
1760 "property 1".to_string(),
1761 "value 1".to_string()
1762 )]),
1763 }
1764 ]);
1765 }
1766
1767 #[test]
1768 fn test_new_metadata_changes_unpartitioned_unsorted() {
1769 let schema = Schema::builder().build().unwrap();
1770 let changes = TableMetadataBuilder::new(
1771 schema.clone(),
1772 PartitionSpec::unpartition_spec().into_unbound(),
1773 SortOrder::unsorted_order(),
1774 TEST_LOCATION.to_string(),
1775 FormatVersion::V1,
1776 HashMap::new(),
1777 )
1778 .unwrap()
1779 .build()
1780 .unwrap()
1781 .changes;
1782
1783 pretty_assertions::assert_eq!(changes, vec![
1784 TableUpdate::SetLocation {
1785 location: TEST_LOCATION.to_string()
1786 },
1787 TableUpdate::AddSchema {
1788 schema: Schema::builder().build().unwrap(),
1789 },
1790 TableUpdate::SetCurrentSchema { schema_id: -1 },
1791 TableUpdate::AddSpec {
1792 spec: PartitionSpec::builder(schema)
1795 .with_spec_id(0)
1796 .build()
1797 .unwrap()
1798 .into_unbound(),
1799 },
1800 TableUpdate::SetDefaultSpec { spec_id: -1 },
1801 TableUpdate::AddSortOrder {
1802 sort_order: SortOrder::unsorted_order(),
1803 },
1804 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1805 ]);
1806 }
1807
1808 #[test]
1809 fn test_add_partition_spec() {
1810 let builder = builder_without_changes(FormatVersion::V2);
1811
1812 let added_spec = UnboundPartitionSpec::builder()
1813 .with_spec_id(10)
1814 .add_partition_fields(vec![
1815 UnboundPartitionField {
1816 name: "y".to_string(),
1818 transform: Transform::Identity,
1819 source_id: 2,
1820 field_id: Some(1000),
1821 },
1822 UnboundPartitionField {
1823 name: "z".to_string(),
1825 transform: Transform::Identity,
1826 source_id: 3,
1827 field_id: None,
1828 },
1829 ])
1830 .unwrap()
1831 .build();
1832
1833 let build_result = builder
1834 .add_partition_spec(added_spec.clone())
1835 .unwrap()
1836 .build()
1837 .unwrap();
1838
1839 let expected_change = added_spec.with_spec_id(1);
1841 let expected_spec = PartitionSpec::builder(schema())
1842 .with_spec_id(1)
1843 .add_unbound_field(UnboundPartitionField {
1844 name: "y".to_string(),
1845 transform: Transform::Identity,
1846 source_id: 2,
1847 field_id: Some(1000),
1848 })
1849 .unwrap()
1850 .add_unbound_field(UnboundPartitionField {
1851 name: "z".to_string(),
1852 transform: Transform::Identity,
1853 source_id: 3,
1854 field_id: Some(1001),
1855 })
1856 .unwrap()
1857 .build()
1858 .unwrap();
1859
1860 assert_eq!(build_result.changes.len(), 1);
1861 assert_eq!(
1862 build_result.metadata.partition_spec_by_id(1),
1863 Some(&Arc::new(expected_spec))
1864 );
1865 assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1866 assert_eq!(build_result.metadata.last_partition_id, 1001);
1867 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1868 spec: expected_change
1869 });
1870
1871 let build_result = build_result
1873 .metadata
1874 .into_builder(Some(
1875 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1876 ))
1877 .remove_partition_specs(&[1])
1878 .unwrap()
1879 .build()
1880 .unwrap();
1881
1882 assert_eq!(build_result.changes.len(), 1);
1883 assert_eq!(build_result.metadata.partition_specs.len(), 1);
1884 assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1885 }
1886
1887 #[test]
1888 fn test_set_default_partition_spec() {
1889 let builder = builder_without_changes(FormatVersion::V2);
1890 let schema = builder.get_current_schema().unwrap().clone();
1891 let added_spec = UnboundPartitionSpec::builder()
1892 .with_spec_id(10)
1893 .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1894 .unwrap()
1895 .build();
1896
1897 let build_result = builder
1898 .add_partition_spec(added_spec.clone())
1899 .unwrap()
1900 .set_default_partition_spec(-1)
1901 .unwrap()
1902 .build()
1903 .unwrap();
1904
1905 let expected_spec = PartitionSpec::builder(schema)
1906 .with_spec_id(1)
1907 .add_unbound_field(UnboundPartitionField {
1908 name: "y_bucket[2]".to_string(),
1909 transform: Transform::Bucket(2),
1910 source_id: 1,
1911 field_id: Some(1001),
1912 })
1913 .unwrap()
1914 .build()
1915 .unwrap();
1916
1917 assert_eq!(build_result.changes.len(), 2);
1918 assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1919 assert_eq!(build_result.changes, vec![
1920 TableUpdate::AddSpec {
1921 spec: added_spec.with_spec_id(1)
1923 },
1924 TableUpdate::SetDefaultSpec { spec_id: -1 }
1925 ]);
1926 }
1927
1928 #[test]
1929 fn test_set_existing_default_partition_spec() {
1930 let builder = builder_without_changes(FormatVersion::V2);
1931 let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1933 let build_result = builder
1934 .add_partition_spec(unbound_spec.clone())
1935 .unwrap()
1936 .set_default_partition_spec(-1)
1937 .unwrap()
1938 .build()
1939 .unwrap();
1940
1941 assert_eq!(build_result.changes.len(), 2);
1942 assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1943 spec: unbound_spec.clone()
1944 });
1945 assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1946 spec_id: -1
1947 });
1948 assert_eq!(
1949 build_result.metadata.default_spec,
1950 Arc::new(
1951 unbound_spec
1952 .bind(build_result.metadata.current_schema().clone())
1953 .unwrap()
1954 )
1955 );
1956
1957 let build_result = build_result
1959 .metadata
1960 .into_builder(Some(
1961 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1962 ))
1963 .set_default_partition_spec(0)
1964 .unwrap()
1965 .build()
1966 .unwrap();
1967
1968 assert_eq!(build_result.changes.len(), 1);
1969 assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1970 spec_id: 0
1971 });
1972 assert_eq!(
1973 build_result.metadata.default_spec,
1974 Arc::new(
1975 partition_spec()
1976 .bind(build_result.metadata.current_schema().clone())
1977 .unwrap()
1978 )
1979 );
1980 }
1981
1982 #[test]
1983 fn test_add_sort_order() {
1984 let builder = builder_without_changes(FormatVersion::V2);
1985
1986 let added_sort_order = SortOrder::builder()
1987 .with_order_id(10)
1988 .with_fields(vec![SortField {
1989 source_id: 1,
1990 transform: Transform::Identity,
1991 direction: SortDirection::Ascending,
1992 null_order: NullOrder::First,
1993 }])
1994 .build(&schema())
1995 .unwrap();
1996
1997 let build_result = builder
1998 .add_sort_order(added_sort_order.clone())
1999 .unwrap()
2000 .build()
2001 .unwrap();
2002
2003 let expected_sort_order = added_sort_order.with_order_id(2);
2004
2005 assert_eq!(build_result.changes.len(), 1);
2006 assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
2007 pretty_assertions::assert_eq!(
2008 build_result.metadata.sort_order_by_id(2),
2009 Some(&Arc::new(expected_sort_order.clone()))
2010 );
2011 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
2012 sort_order: expected_sort_order
2013 });
2014 }
2015
2016 #[test]
2017 fn test_add_compatible_schema() {
2018 let builder = builder_without_changes(FormatVersion::V2);
2019
2020 let added_schema = Schema::builder()
2021 .with_schema_id(1)
2022 .with_fields(vec![
2023 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2024 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2025 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2026 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2027 ])
2028 .build()
2029 .unwrap();
2030
2031 let build_result = builder
2032 .add_current_schema(added_schema.clone())
2033 .unwrap()
2034 .build()
2035 .unwrap();
2036
2037 assert_eq!(build_result.changes.len(), 2);
2038 assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
2039 pretty_assertions::assert_eq!(
2040 build_result.metadata.schema_by_id(1),
2041 Some(&Arc::new(added_schema.clone()))
2042 );
2043 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2044 schema: added_schema
2045 });
2046 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2047 schema_id: -1
2048 });
2049 }
2050
2051 #[test]
2052 fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2053 let builder = builder_without_changes(FormatVersion::V2);
2054
2055 let added_schema = Schema::builder()
2056 .with_schema_id(1)
2057 .with_fields(vec![
2058 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2059 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2060 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2061 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2062 ])
2063 .build()
2064 .unwrap();
2065
2066 let build_result = builder
2067 .add_schema(added_schema.clone())
2068 .unwrap()
2069 .set_current_schema(1)
2070 .unwrap()
2071 .build()
2072 .unwrap();
2073
2074 assert_eq!(build_result.changes.len(), 2);
2075 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2076 schema_id: -1
2077 });
2078 }
2079
2080 #[test]
2081 fn test_no_metadata_log_for_create_table() {
2082 let build_result = TableMetadataBuilder::new(
2083 schema(),
2084 partition_spec(),
2085 sort_order(),
2086 TEST_LOCATION.to_string(),
2087 FormatVersion::V2,
2088 HashMap::new(),
2089 )
2090 .unwrap()
2091 .build()
2092 .unwrap();
2093
2094 assert_eq!(build_result.metadata.metadata_log.len(), 0);
2095 }
2096
2097 #[test]
2098 fn test_no_metadata_log_entry_for_no_previous_location() {
2099 let metadata = builder_without_changes(FormatVersion::V2)
2101 .build()
2102 .unwrap()
2103 .metadata;
2104 assert_eq!(metadata.metadata_log.len(), 1);
2105
2106 let build_result = metadata
2107 .into_builder(None)
2108 .set_properties(HashMap::from_iter(vec![(
2109 "foo".to_string(),
2110 "bar".to_string(),
2111 )]))
2112 .unwrap()
2113 .build()
2114 .unwrap();
2115
2116 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2117 }
2118
2119 #[test]
2120 fn test_from_metadata_generates_metadata_log() {
2121 let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2122 let builder = TableMetadataBuilder::new(
2123 schema(),
2124 partition_spec(),
2125 sort_order(),
2126 TEST_LOCATION.to_string(),
2127 FormatVersion::V2,
2128 HashMap::new(),
2129 )
2130 .unwrap()
2131 .build()
2132 .unwrap()
2133 .metadata
2134 .into_builder(Some(metadata_path.to_string()));
2135
2136 let builder = builder
2137 .add_default_sort_order(SortOrder::unsorted_order())
2138 .unwrap();
2139
2140 let build_result = builder.build().unwrap();
2141
2142 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2143 assert_eq!(
2144 build_result.metadata.metadata_log[0].metadata_file,
2145 metadata_path
2146 );
2147 }
2148
2149 #[test]
2150 fn test_set_ref() {
2151 let builder = builder_without_changes(FormatVersion::V2);
2152
2153 let snapshot = Snapshot::builder()
2154 .with_snapshot_id(1)
2155 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2156 .with_sequence_number(0)
2157 .with_schema_id(0)
2158 .with_manifest_list("/snap-1.avro")
2159 .with_summary(Summary {
2160 operation: Operation::Append,
2161 additional_properties: HashMap::from_iter(vec![
2162 (
2163 "spark.app.id".to_string(),
2164 "local-1662532784305".to_string(),
2165 ),
2166 ("added-data-files".to_string(), "4".to_string()),
2167 ("added-records".to_string(), "4".to_string()),
2168 ("added-files-size".to_string(), "6001".to_string()),
2169 ]),
2170 })
2171 .build();
2172
2173 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2174
2175 assert!(
2176 builder
2177 .clone()
2178 .set_ref(MAIN_BRANCH, SnapshotReference {
2179 snapshot_id: 10,
2180 retention: SnapshotRetention::Branch {
2181 min_snapshots_to_keep: Some(10),
2182 max_snapshot_age_ms: None,
2183 max_ref_age_ms: None,
2184 },
2185 })
2186 .unwrap_err()
2187 .to_string()
2188 .contains("Cannot set 'main' to unknown snapshot: '10'")
2189 );
2190
2191 let build_result = builder
2192 .set_ref(MAIN_BRANCH, SnapshotReference {
2193 snapshot_id: 1,
2194 retention: SnapshotRetention::Branch {
2195 min_snapshots_to_keep: Some(10),
2196 max_snapshot_age_ms: None,
2197 max_ref_age_ms: None,
2198 },
2199 })
2200 .unwrap()
2201 .build()
2202 .unwrap();
2203 assert_eq!(build_result.metadata.snapshots.len(), 1);
2204 assert_eq!(
2205 build_result.metadata.snapshot_by_id(1),
2206 Some(&Arc::new(snapshot.clone()))
2207 );
2208 assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2209 snapshot_id: 1,
2210 timestamp_ms: snapshot.timestamp_ms()
2211 }])
2212 }
2213
2214 #[test]
2215 fn test_snapshot_log_skips_intermediates() {
2216 let builder = builder_without_changes(FormatVersion::V2);
2217
2218 let snapshot_1 = Snapshot::builder()
2219 .with_snapshot_id(1)
2220 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2221 .with_sequence_number(0)
2222 .with_schema_id(0)
2223 .with_manifest_list("/snap-1.avro")
2224 .with_summary(Summary {
2225 operation: Operation::Append,
2226 additional_properties: HashMap::from_iter(vec![
2227 (
2228 "spark.app.id".to_string(),
2229 "local-1662532784305".to_string(),
2230 ),
2231 ("added-data-files".to_string(), "4".to_string()),
2232 ("added-records".to_string(), "4".to_string()),
2233 ("added-files-size".to_string(), "6001".to_string()),
2234 ]),
2235 })
2236 .build();
2237
2238 let snapshot_2 = Snapshot::builder()
2239 .with_snapshot_id(2)
2240 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2241 .with_sequence_number(0)
2242 .with_schema_id(0)
2243 .with_manifest_list("/snap-1.avro")
2244 .with_summary(Summary {
2245 operation: Operation::Append,
2246 additional_properties: HashMap::from_iter(vec![
2247 (
2248 "spark.app.id".to_string(),
2249 "local-1662532784305".to_string(),
2250 ),
2251 ("added-data-files".to_string(), "4".to_string()),
2252 ("added-records".to_string(), "4".to_string()),
2253 ("added-files-size".to_string(), "6001".to_string()),
2254 ]),
2255 })
2256 .build();
2257
2258 let result = builder
2259 .add_snapshot(snapshot_1)
2260 .unwrap()
2261 .set_ref(MAIN_BRANCH, SnapshotReference {
2262 snapshot_id: 1,
2263 retention: SnapshotRetention::Branch {
2264 min_snapshots_to_keep: Some(10),
2265 max_snapshot_age_ms: None,
2266 max_ref_age_ms: None,
2267 },
2268 })
2269 .unwrap()
2270 .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2271 .unwrap()
2272 .build()
2273 .unwrap();
2274
2275 assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2276 snapshot_id: 2,
2277 timestamp_ms: snapshot_2.timestamp_ms()
2278 }]);
2279 assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2280 }
2281
2282 #[test]
2283 fn test_remove_main_ref_keeps_snapshot_log() {
2284 let builder = builder_without_changes(FormatVersion::V2);
2285
2286 let snapshot = Snapshot::builder()
2287 .with_snapshot_id(1)
2288 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2289 .with_sequence_number(0)
2290 .with_schema_id(0)
2291 .with_manifest_list("/snap-1.avro")
2292 .with_summary(Summary {
2293 operation: Operation::Append,
2294 additional_properties: HashMap::from_iter(vec![
2295 (
2296 "spark.app.id".to_string(),
2297 "local-1662532784305".to_string(),
2298 ),
2299 ("added-data-files".to_string(), "4".to_string()),
2300 ("added-records".to_string(), "4".to_string()),
2301 ("added-files-size".to_string(), "6001".to_string()),
2302 ]),
2303 })
2304 .build();
2305
2306 let result = builder
2307 .add_snapshot(snapshot.clone())
2308 .unwrap()
2309 .set_ref(MAIN_BRANCH, SnapshotReference {
2310 snapshot_id: 1,
2311 retention: SnapshotRetention::Branch {
2312 min_snapshots_to_keep: Some(10),
2313 max_snapshot_age_ms: None,
2314 max_ref_age_ms: None,
2315 },
2316 })
2317 .unwrap()
2318 .build()
2319 .unwrap();
2320
2321 assert_eq!(result.metadata.snapshot_log.len(), 1);
2323 assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2324 assert_eq!(result.metadata.current_snapshot_id, Some(1));
2325
2326 let result_after_remove = result
2328 .metadata
2329 .into_builder(Some(
2330 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2331 ))
2332 .remove_ref(MAIN_BRANCH)
2333 .build()
2334 .unwrap();
2335
2336 assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2338 assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2339 assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2340 assert_eq!(result_after_remove.changes.len(), 1);
2341 assert_eq!(
2342 result_after_remove.changes[0],
2343 TableUpdate::RemoveSnapshotRef {
2344 ref_name: MAIN_BRANCH.to_string()
2345 }
2346 );
2347 }
2348
2349 #[test]
2350 fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2351 let builder = builder_without_changes(FormatVersion::V2);
2352
2353 let snapshot = Snapshot::builder()
2354 .with_snapshot_id(2)
2355 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2356 .with_sequence_number(0)
2357 .with_schema_id(0)
2358 .with_manifest_list("/snap-1.avro")
2359 .with_summary(Summary {
2360 operation: Operation::Append,
2361 additional_properties: HashMap::new(),
2362 })
2363 .build();
2364
2365 let build_result = builder
2366 .set_branch_snapshot(snapshot.clone(), "new_branch")
2367 .unwrap()
2368 .build()
2369 .unwrap();
2370
2371 let reference = SnapshotReference {
2372 snapshot_id: 2,
2373 retention: SnapshotRetention::Branch {
2374 min_snapshots_to_keep: None,
2375 max_snapshot_age_ms: None,
2376 max_ref_age_ms: None,
2377 },
2378 };
2379
2380 assert_eq!(build_result.metadata.refs.len(), 1);
2381 assert_eq!(
2382 build_result.metadata.refs.get("new_branch"),
2383 Some(&reference)
2384 );
2385 assert_eq!(build_result.changes, vec![
2386 TableUpdate::AddSnapshot { snapshot },
2387 TableUpdate::SetSnapshotRef {
2388 ref_name: "new_branch".to_string(),
2389 reference
2390 }
2391 ]);
2392 }
2393
2394 #[test]
2395 fn test_cannot_add_duplicate_snapshot_id() {
2396 let builder = builder_without_changes(FormatVersion::V2);
2397
2398 let snapshot = Snapshot::builder()
2399 .with_snapshot_id(2)
2400 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2401 .with_sequence_number(0)
2402 .with_schema_id(0)
2403 .with_manifest_list("/snap-1.avro")
2404 .with_summary(Summary {
2405 operation: Operation::Append,
2406 additional_properties: HashMap::from_iter(vec![
2407 (
2408 "spark.app.id".to_string(),
2409 "local-1662532784305".to_string(),
2410 ),
2411 ("added-data-files".to_string(), "4".to_string()),
2412 ("added-records".to_string(), "4".to_string()),
2413 ("added-files-size".to_string(), "6001".to_string()),
2414 ]),
2415 })
2416 .build();
2417
2418 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2419 builder.add_snapshot(snapshot).unwrap_err();
2420 }
2421
2422 #[test]
2423 fn test_add_incompatible_current_schema_fails() {
2424 let builder = builder_without_changes(FormatVersion::V2);
2425
2426 let added_schema = Schema::builder()
2427 .with_schema_id(1)
2428 .with_fields(vec![])
2429 .build()
2430 .unwrap();
2431
2432 let err = builder
2433 .add_current_schema(added_schema)
2434 .unwrap()
2435 .build()
2436 .unwrap_err();
2437
2438 assert!(
2439 err.to_string()
2440 .contains("Cannot find partition source field")
2441 );
2442 }
2443
2444 #[test]
2445 fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2446 let builder = builder_without_changes(FormatVersion::V1);
2447
2448 let added_spec = UnboundPartitionSpec::builder()
2449 .with_spec_id(10)
2450 .add_partition_fields(vec![
2451 UnboundPartitionField {
2452 name: "y".to_string(),
2453 transform: Transform::Identity,
2454 source_id: 2,
2455 field_id: Some(1000),
2456 },
2457 UnboundPartitionField {
2458 name: "z".to_string(),
2459 transform: Transform::Identity,
2460 source_id: 3,
2461 field_id: Some(1002),
2462 },
2463 ])
2464 .unwrap()
2465 .build();
2466
2467 let err = builder.add_partition_spec(added_spec).unwrap_err();
2468 assert!(err.to_string().contains(
2469 "Cannot add partition spec with non-sequential field ids to format version 1 table"
2470 ));
2471 }
2472
2473 #[test]
2474 fn test_expire_metadata_log() {
2475 let builder = builder_without_changes(FormatVersion::V2);
2476 let metadata = builder
2477 .set_properties(HashMap::from_iter(vec![(
2478 TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2479 "2".to_string(),
2480 )]))
2481 .unwrap()
2482 .build()
2483 .unwrap();
2484 assert_eq!(metadata.metadata.metadata_log.len(), 1);
2485 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2486
2487 let metadata = metadata
2488 .metadata
2489 .into_builder(Some("path2".to_string()))
2490 .set_properties(HashMap::from_iter(vec![(
2491 "change_nr".to_string(),
2492 "1".to_string(),
2493 )]))
2494 .unwrap()
2495 .build()
2496 .unwrap();
2497
2498 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2499 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2500
2501 let metadata = metadata
2502 .metadata
2503 .into_builder(Some("path2".to_string()))
2504 .set_properties(HashMap::from_iter(vec![(
2505 "change_nr".to_string(),
2506 "2".to_string(),
2507 )]))
2508 .unwrap()
2509 .build()
2510 .unwrap();
2511 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2512 assert_eq!(metadata.expired_metadata_logs.len(), 1);
2513 }
2514
2515 #[test]
2516 fn test_v2_sequence_number_cannot_decrease() {
2517 let builder = builder_without_changes(FormatVersion::V2);
2518
2519 let snapshot = Snapshot::builder()
2520 .with_snapshot_id(1)
2521 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2522 .with_sequence_number(1)
2523 .with_schema_id(0)
2524 .with_manifest_list("/snap-1")
2525 .with_summary(Summary {
2526 operation: Operation::Append,
2527 additional_properties: HashMap::new(),
2528 })
2529 .build();
2530
2531 let builder = builder
2532 .add_snapshot(snapshot.clone())
2533 .unwrap()
2534 .set_ref(MAIN_BRANCH, SnapshotReference {
2535 snapshot_id: 1,
2536 retention: SnapshotRetention::Branch {
2537 min_snapshots_to_keep: Some(10),
2538 max_snapshot_age_ms: None,
2539 max_ref_age_ms: None,
2540 },
2541 })
2542 .unwrap();
2543
2544 let snapshot = Snapshot::builder()
2545 .with_snapshot_id(2)
2546 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2547 .with_sequence_number(0)
2548 .with_schema_id(0)
2549 .with_manifest_list("/snap-0")
2550 .with_parent_snapshot_id(Some(1))
2551 .with_summary(Summary {
2552 operation: Operation::Append,
2553 additional_properties: HashMap::new(),
2554 })
2555 .build();
2556
2557 let err = builder
2558 .set_branch_snapshot(snapshot, MAIN_BRANCH)
2559 .unwrap_err();
2560 assert!(
2561 err.to_string()
2562 .contains("Cannot add snapshot with sequence number")
2563 );
2564 }
2565
2566 #[test]
2567 fn test_default_spec_cannot_be_removed() {
2568 let builder = builder_without_changes(FormatVersion::V2);
2569
2570 builder.remove_partition_specs(&[0]).unwrap_err();
2571 }
2572
2573 #[test]
2574 fn test_statistics() {
2575 let builder = builder_without_changes(FormatVersion::V2);
2576
2577 let statistics = StatisticsFile {
2578 snapshot_id: 3055729675574597004,
2579 statistics_path: "s3://a/b/stats.puffin".to_string(),
2580 file_size_in_bytes: 413,
2581 file_footer_size_in_bytes: 42,
2582 key_metadata: None,
2583 blob_metadata: vec![BlobMetadata {
2584 snapshot_id: 3055729675574597004,
2585 sequence_number: 1,
2586 fields: vec![1],
2587 r#type: "ndv".to_string(),
2588 properties: HashMap::new(),
2589 }],
2590 };
2591 let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2592
2593 assert_eq!(
2594 build_result.metadata.statistics,
2595 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2596 );
2597 assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2598 statistics: statistics.clone()
2599 }]);
2600
2601 let builder = build_result.metadata.into_builder(None);
2603 let build_result = builder
2604 .remove_statistics(statistics.snapshot_id)
2605 .build()
2606 .unwrap();
2607
2608 assert_eq!(build_result.metadata.statistics.len(), 0);
2609 assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2610 snapshot_id: statistics.snapshot_id
2611 }]);
2612
2613 let builder = build_result.metadata.into_builder(None);
2615 let build_result = builder
2616 .remove_statistics(statistics.snapshot_id)
2617 .build()
2618 .unwrap();
2619 assert_eq!(build_result.metadata.statistics.len(), 0);
2620 assert_eq!(build_result.changes.len(), 0);
2621 }
2622
2623 #[test]
2624 fn test_add_partition_statistics() {
2625 let builder = builder_without_changes(FormatVersion::V2);
2626
2627 let statistics = PartitionStatisticsFile {
2628 snapshot_id: 3055729675574597004,
2629 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2630 file_size_in_bytes: 43,
2631 };
2632
2633 let build_result = builder
2634 .set_partition_statistics(statistics.clone())
2635 .build()
2636 .unwrap();
2637 assert_eq!(
2638 build_result.metadata.partition_statistics,
2639 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2640 );
2641 assert_eq!(build_result.changes, vec![
2642 TableUpdate::SetPartitionStatistics {
2643 partition_statistics: statistics.clone()
2644 }
2645 ]);
2646
2647 let builder = build_result.metadata.into_builder(None);
2649 let build_result = builder
2650 .remove_partition_statistics(statistics.snapshot_id)
2651 .build()
2652 .unwrap();
2653 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2654 assert_eq!(build_result.changes, vec![
2655 TableUpdate::RemovePartitionStatistics {
2656 snapshot_id: statistics.snapshot_id
2657 }
2658 ]);
2659
2660 let builder = build_result.metadata.into_builder(None);
2662 let build_result = builder
2663 .remove_partition_statistics(statistics.snapshot_id)
2664 .build()
2665 .unwrap();
2666 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2667 assert_eq!(build_result.changes.len(), 0);
2668 }
2669
2670 #[test]
2671 fn last_update_increased_for_property_only_update() {
2672 let builder = builder_without_changes(FormatVersion::V2);
2673
2674 let metadata = builder.build().unwrap().metadata;
2675 let last_updated_ms = metadata.last_updated_ms;
2676 sleep(std::time::Duration::from_millis(2));
2677
2678 let build_result = metadata
2679 .into_builder(Some(
2680 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2681 ))
2682 .set_properties(HashMap::from_iter(vec![(
2683 "foo".to_string(),
2684 "bar".to_string(),
2685 )]))
2686 .unwrap()
2687 .build()
2688 .unwrap();
2689
2690 assert!(
2691 build_result.metadata.last_updated_ms > last_updated_ms,
2692 "{} > {}",
2693 build_result.metadata.last_updated_ms,
2694 last_updated_ms
2695 );
2696 }
2697
2698 #[test]
2699 fn test_construct_default_main_branch() {
2700 let file = File::open(format!(
2702 "{}/testdata/table_metadata/{}",
2703 env!("CARGO_MANIFEST_DIR"),
2704 "TableMetadataV2Valid.json"
2705 ))
2706 .unwrap();
2707 let reader = BufReader::new(file);
2708 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2709
2710 let table = Table::builder()
2711 .metadata(resp)
2712 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2713 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2714 .file_io(FileIOBuilder::new("memory").build().unwrap())
2715 .build()
2716 .unwrap();
2717
2718 assert_eq!(
2719 table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2720 table.metadata().current_snapshot_id().unwrap()
2721 );
2722 }
2723
2724 #[test]
2725 fn test_active_schema_cannot_be_removed() {
2726 let builder = builder_without_changes(FormatVersion::V2);
2727 builder.remove_schemas(&[0]).unwrap_err();
2728 }
2729
2730 #[test]
2731 fn test_remove_schemas() {
2732 let file = File::open(format!(
2733 "{}/testdata/table_metadata/{}",
2734 env!("CARGO_MANIFEST_DIR"),
2735 "TableMetadataV2Valid.json"
2736 ))
2737 .unwrap();
2738 let reader = BufReader::new(file);
2739 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2740
2741 let table = Table::builder()
2742 .metadata(resp)
2743 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2744 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2745 .file_io(FileIOBuilder::new("memory").build().unwrap())
2746 .build()
2747 .unwrap();
2748
2749 assert_eq!(2, table.metadata().schemas.len());
2750
2751 {
2752 let meta_data_builder = table.metadata().clone().into_builder(None);
2754 meta_data_builder.remove_schemas(&[1]).unwrap_err();
2755 }
2756
2757 let mut meta_data_builder = table.metadata().clone().into_builder(None);
2758 meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2759 let build_result = meta_data_builder.build().unwrap();
2760 assert_eq!(1, build_result.metadata.schemas.len());
2761 assert_eq!(1, build_result.metadata.current_schema_id);
2762 assert_eq!(1, build_result.metadata.current_schema().schema_id());
2763 assert_eq!(1, build_result.changes.len());
2764
2765 let remove_schema_ids =
2766 if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2767 schema_ids
2768 } else {
2769 unreachable!("Expected RemoveSchema change")
2770 };
2771 assert_eq!(remove_schema_ids, &[0]);
2772 }
2773
2774 #[test]
2775 fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2776 let initial_schema = Schema::builder()
2777 .with_fields(vec![
2778 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2779 ])
2780 .build()
2781 .unwrap();
2782
2783 let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2784 .with_spec_id(0)
2785 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2786 .unwrap()
2787 .build();
2788
2789 let metadata = TableMetadataBuilder::new(
2790 initial_schema,
2791 partition_spec_with_bucket,
2792 SortOrder::unsorted_order(),
2793 TEST_LOCATION.to_string(),
2794 FormatVersion::V2,
2795 HashMap::new(),
2796 )
2797 .unwrap()
2798 .build()
2799 .unwrap()
2800 .metadata;
2801
2802 let partition_field_names: Vec<String> = metadata
2803 .default_partition_spec()
2804 .fields()
2805 .iter()
2806 .map(|f| f.name.clone())
2807 .collect();
2808 assert!(partition_field_names.contains(&"bucket_data".to_string()));
2809
2810 let evolved_schema = Schema::builder()
2811 .with_fields(vec![
2812 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2813 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2815 ])
2816 .build()
2817 .unwrap();
2818
2819 let builder = metadata.into_builder(Some(
2820 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2821 ));
2822
2823 let result = builder.add_current_schema(evolved_schema);
2825
2826 assert!(result.is_err());
2827 let error = result.unwrap_err();
2828 let error_message = error.message();
2829 assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2830 assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2831 }
2832
2833 #[test]
2834 fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2835 let initial_schema = Schema::builder()
2836 .with_fields(vec![
2837 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2838 ])
2839 .build()
2840 .unwrap();
2841
2842 let partition_spec = UnboundPartitionSpec::builder()
2843 .with_spec_id(0)
2844 .add_partition_field(1, "partition_col", Transform::Bucket(16))
2845 .unwrap()
2846 .build();
2847
2848 let metadata = TableMetadataBuilder::new(
2849 initial_schema,
2850 partition_spec,
2851 SortOrder::unsorted_order(),
2852 TEST_LOCATION.to_string(),
2853 FormatVersion::V2,
2854 HashMap::new(),
2855 )
2856 .unwrap()
2857 .build()
2858 .unwrap()
2859 .metadata;
2860
2861 let non_conflicting_schema = Schema::builder()
2862 .with_fields(vec![
2863 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2864 NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2865 ])
2866 .build()
2867 .unwrap();
2868
2869 let result = metadata
2871 .clone()
2872 .into_builder(Some("test_location".to_string()))
2873 .add_current_schema(non_conflicting_schema)
2874 .unwrap()
2875 .build();
2876
2877 assert!(result.is_ok());
2878 }
2879
2880 #[test]
2881 fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2882 let initial_schema = Schema::builder()
2883 .with_fields(vec![
2884 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2885 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2886 .into(),
2887 ])
2888 .build()
2889 .unwrap();
2890
2891 let partition_spec = UnboundPartitionSpec::builder()
2892 .with_spec_id(0)
2893 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2894 .unwrap()
2895 .build();
2896
2897 let metadata = TableMetadataBuilder::new(
2898 initial_schema,
2899 partition_spec,
2900 SortOrder::unsorted_order(),
2901 TEST_LOCATION.to_string(),
2902 FormatVersion::V2,
2903 HashMap::new(),
2904 )
2905 .unwrap()
2906 .build()
2907 .unwrap()
2908 .metadata;
2909
2910 let builder = metadata.into_builder(Some(
2911 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2912 ));
2913
2914 let conflicting_partition_spec = UnboundPartitionSpec::builder()
2915 .with_spec_id(1)
2916 .add_partition_field(1, "existing_field", Transform::Bucket(8))
2917 .unwrap()
2918 .build();
2919
2920 let result = builder.add_partition_spec(conflicting_partition_spec);
2921
2922 assert!(result.is_err());
2923 let error = result.unwrap_err();
2924 let error_message = error.message();
2925 assert!(error_message.contains(
2927 "Cannot create partition with name 'existing_field' that conflicts with schema field"
2928 ));
2929 assert!(error_message.contains("and is not an identity transform"));
2930 }
2931
2932 #[test]
2933 fn test_schema_evolution_validates_against_all_historical_schemas() {
2934 let initial_schema = Schema::builder()
2936 .with_fields(vec![
2937 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2938 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2939 .into(),
2940 ])
2941 .build()
2942 .unwrap();
2943
2944 let partition_spec = UnboundPartitionSpec::builder()
2945 .with_spec_id(0)
2946 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2947 .unwrap()
2948 .build();
2949
2950 let metadata = TableMetadataBuilder::new(
2951 initial_schema,
2952 partition_spec,
2953 SortOrder::unsorted_order(),
2954 TEST_LOCATION.to_string(),
2955 FormatVersion::V2,
2956 HashMap::new(),
2957 )
2958 .unwrap()
2959 .build()
2960 .unwrap()
2961 .metadata;
2962
2963 let second_schema = Schema::builder()
2965 .with_fields(vec![
2966 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2967 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2968 .into(),
2969 ])
2970 .build()
2971 .unwrap();
2972
2973 let metadata = metadata
2974 .into_builder(Some("test_location".to_string()))
2975 .add_current_schema(second_schema)
2976 .unwrap()
2977 .build()
2978 .unwrap()
2979 .metadata;
2980
2981 let third_schema = Schema::builder()
2985 .with_fields(vec![
2986 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2987 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2988 .into(),
2989 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2990 .into(),
2991 ])
2992 .build()
2993 .unwrap();
2994
2995 let builder = metadata
2996 .clone()
2997 .into_builder(Some("test_location".to_string()));
2998
2999 let result = builder.add_current_schema(third_schema);
3001 assert!(result.is_ok());
3002
3003 let conflicting_schema = Schema::builder()
3006 .with_fields(vec![
3007 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3008 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3009 .into(),
3010 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
3011 .into(),
3012 NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
3013 .into(), ])
3015 .build()
3016 .unwrap();
3017
3018 let builder2 = metadata.into_builder(Some("test_location".to_string()));
3019 let result2 = builder2.add_current_schema(conflicting_schema);
3020
3021 assert!(result2.is_err());
3024 let error = result2.unwrap_err();
3025 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3026 }
3027
3028 #[test]
3029 fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
3030 let initial_schema = Schema::builder()
3032 .with_fields(vec![
3033 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3034 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3035 .into(),
3036 ])
3037 .build()
3038 .unwrap();
3039
3040 let partition_spec = UnboundPartitionSpec::builder()
3041 .with_spec_id(0)
3042 .add_partition_field(2, "partition_data", Transform::Identity)
3043 .unwrap()
3044 .build();
3045
3046 let metadata = TableMetadataBuilder::new(
3047 initial_schema,
3048 partition_spec,
3049 SortOrder::unsorted_order(),
3050 TEST_LOCATION.to_string(),
3051 FormatVersion::V2,
3052 HashMap::new(),
3053 )
3054 .unwrap()
3055 .build()
3056 .unwrap()
3057 .metadata;
3058
3059 let evolved_schema = Schema::builder()
3061 .with_fields(vec![
3062 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3063 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3064 .into(),
3065 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3066 .into(),
3067 ])
3068 .build()
3069 .unwrap();
3070
3071 let result = metadata
3073 .into_builder(Some("test_location".to_string()))
3074 .add_current_schema(evolved_schema);
3075
3076 assert!(result.is_ok());
3077 }
3078
3079 #[test]
3080 fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3081 let initial_schema = Schema::builder()
3083 .with_fields(vec![
3084 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3085 ])
3086 .build()
3087 .unwrap();
3088
3089 let partition_spec = UnboundPartitionSpec::builder()
3090 .with_spec_id(0)
3091 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3092 .unwrap()
3093 .build();
3094
3095 let metadata = TableMetadataBuilder::new(
3096 initial_schema,
3097 partition_spec,
3098 SortOrder::unsorted_order(),
3099 TEST_LOCATION.to_string(),
3100 FormatVersion::V2,
3101 HashMap::new(),
3102 )
3103 .unwrap()
3104 .build()
3105 .unwrap()
3106 .metadata;
3107
3108 let conflicting_schema = Schema::builder()
3110 .with_fields(vec![
3111 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3112 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3114 ])
3115 .build()
3116 .unwrap();
3117
3118 let builder = metadata.into_builder(Some("test_location".to_string()));
3119 let result = builder.add_current_schema(conflicting_schema);
3120
3121 assert!(result.is_err());
3124 let error = result.unwrap_err();
3125 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3126 }
3127
3128 #[test]
3129 fn test_partition_spec_evolution_allows_non_conflicting_names() {
3130 let initial_schema = Schema::builder()
3131 .with_fields(vec![
3132 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3133 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3134 .into(),
3135 ])
3136 .build()
3137 .unwrap();
3138
3139 let partition_spec = UnboundPartitionSpec::builder()
3140 .with_spec_id(0)
3141 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3142 .unwrap()
3143 .build();
3144
3145 let metadata = TableMetadataBuilder::new(
3146 initial_schema,
3147 partition_spec,
3148 SortOrder::unsorted_order(),
3149 TEST_LOCATION.to_string(),
3150 FormatVersion::V2,
3151 HashMap::new(),
3152 )
3153 .unwrap()
3154 .build()
3155 .unwrap()
3156 .metadata;
3157
3158 let builder = metadata.into_builder(Some(
3159 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3160 ));
3161
3162 let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3164 .with_spec_id(1)
3165 .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3166 .unwrap()
3167 .build();
3168
3169 let result = builder.add_partition_spec(non_conflicting_partition_spec);
3170
3171 assert!(result.is_ok());
3172 }
3173
3174 #[test]
3175 fn test_row_lineage_addition() {
3176 let new_rows = 30;
3177 let base = builder_without_changes(FormatVersion::V3)
3178 .build()
3179 .unwrap()
3180 .metadata;
3181 let add_rows = Snapshot::builder()
3182 .with_snapshot_id(0)
3183 .with_timestamp_ms(base.last_updated_ms + 1)
3184 .with_sequence_number(0)
3185 .with_schema_id(0)
3186 .with_manifest_list("foo")
3187 .with_parent_snapshot_id(None)
3188 .with_summary(Summary {
3189 operation: Operation::Append,
3190 additional_properties: HashMap::new(),
3191 })
3192 .with_row_range(base.next_row_id(), new_rows)
3193 .build();
3194
3195 let first_addition = base
3196 .into_builder(None)
3197 .add_snapshot(add_rows.clone())
3198 .unwrap()
3199 .build()
3200 .unwrap()
3201 .metadata;
3202
3203 assert_eq!(first_addition.next_row_id(), new_rows);
3204
3205 let add_more_rows = Snapshot::builder()
3206 .with_snapshot_id(1)
3207 .with_timestamp_ms(first_addition.last_updated_ms + 1)
3208 .with_sequence_number(1)
3209 .with_schema_id(0)
3210 .with_manifest_list("foo")
3211 .with_parent_snapshot_id(Some(0))
3212 .with_summary(Summary {
3213 operation: Operation::Append,
3214 additional_properties: HashMap::new(),
3215 })
3216 .with_row_range(first_addition.next_row_id(), new_rows)
3217 .build();
3218
3219 let second_addition = first_addition
3220 .into_builder(None)
3221 .add_snapshot(add_more_rows)
3222 .unwrap()
3223 .build()
3224 .unwrap()
3225 .metadata;
3226 assert_eq!(second_addition.next_row_id(), new_rows * 2);
3227 }
3228
3229 #[test]
3230 fn test_row_lineage_invalid_snapshot() {
3231 let new_rows = 30;
3232 let base = builder_without_changes(FormatVersion::V3)
3233 .build()
3234 .unwrap()
3235 .metadata;
3236
3237 let add_rows = Snapshot::builder()
3239 .with_snapshot_id(0)
3240 .with_timestamp_ms(base.last_updated_ms + 1)
3241 .with_sequence_number(0)
3242 .with_schema_id(0)
3243 .with_manifest_list("foo")
3244 .with_parent_snapshot_id(None)
3245 .with_summary(Summary {
3246 operation: Operation::Append,
3247 additional_properties: HashMap::new(),
3248 })
3249 .with_row_range(base.next_row_id(), new_rows)
3250 .build();
3251
3252 let added = base
3253 .into_builder(None)
3254 .add_snapshot(add_rows)
3255 .unwrap()
3256 .build()
3257 .unwrap()
3258 .metadata;
3259
3260 let invalid_new_rows = Snapshot::builder()
3261 .with_snapshot_id(1)
3262 .with_timestamp_ms(added.last_updated_ms + 1)
3263 .with_sequence_number(1)
3264 .with_schema_id(0)
3265 .with_manifest_list("foo")
3266 .with_parent_snapshot_id(Some(0))
3267 .with_summary(Summary {
3268 operation: Operation::Append,
3269 additional_properties: HashMap::new(),
3270 })
3271 .with_row_range(added.next_row_id() - 1, 10)
3273 .build();
3274
3275 let err = added
3276 .into_builder(None)
3277 .add_snapshot(invalid_new_rows)
3278 .unwrap_err();
3279 assert!(
3280 err.to_string().contains(
3281 "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3282 )
3283 );
3284 }
3285
3286 #[test]
3287 fn test_row_lineage_append_branch() {
3288 let branch = "some_branch";
3292
3293 let base = builder_without_changes(FormatVersion::V3)
3295 .build()
3296 .unwrap()
3297 .metadata;
3298
3299 assert_eq!(base.next_row_id(), 0);
3301
3302 let branch_snapshot_1 = Snapshot::builder()
3304 .with_snapshot_id(1)
3305 .with_timestamp_ms(base.last_updated_ms + 1)
3306 .with_sequence_number(0)
3307 .with_schema_id(0)
3308 .with_manifest_list("foo")
3309 .with_parent_snapshot_id(None)
3310 .with_summary(Summary {
3311 operation: Operation::Append,
3312 additional_properties: HashMap::new(),
3313 })
3314 .with_row_range(base.next_row_id(), 30)
3315 .build();
3316
3317 let table_after_branch_1 = base
3318 .into_builder(None)
3319 .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3320 .unwrap()
3321 .build()
3322 .unwrap()
3323 .metadata;
3324
3325 assert!(table_after_branch_1.current_snapshot().is_none());
3327
3328 let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3330 let branch_snap_1 = table_after_branch_1
3331 .snapshots
3332 .get(&branch_ref.snapshot_id)
3333 .unwrap();
3334 assert_eq!(branch_snap_1.first_row_id(), Some(0));
3335
3336 assert_eq!(table_after_branch_1.next_row_id(), 30);
3338
3339 let main_snapshot = Snapshot::builder()
3341 .with_snapshot_id(2)
3342 .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3343 .with_sequence_number(1)
3344 .with_schema_id(0)
3345 .with_manifest_list("bar")
3346 .with_parent_snapshot_id(None)
3347 .with_summary(Summary {
3348 operation: Operation::Append,
3349 additional_properties: HashMap::new(),
3350 })
3351 .with_row_range(table_after_branch_1.next_row_id(), 28)
3352 .build();
3353
3354 let table_after_main = table_after_branch_1
3355 .into_builder(None)
3356 .add_snapshot(main_snapshot.clone())
3357 .unwrap()
3358 .set_ref(MAIN_BRANCH, SnapshotReference {
3359 snapshot_id: main_snapshot.snapshot_id(),
3360 retention: SnapshotRetention::Branch {
3361 min_snapshots_to_keep: None,
3362 max_snapshot_age_ms: None,
3363 max_ref_age_ms: None,
3364 },
3365 })
3366 .unwrap()
3367 .build()
3368 .unwrap()
3369 .metadata;
3370
3371 let current_snapshot = table_after_main.current_snapshot().unwrap();
3373 assert_eq!(current_snapshot.first_row_id(), Some(30));
3374
3375 assert_eq!(table_after_main.next_row_id(), 58);
3377
3378 let branch_snapshot_2 = Snapshot::builder()
3380 .with_snapshot_id(3)
3381 .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3382 .with_sequence_number(2)
3383 .with_schema_id(0)
3384 .with_manifest_list("baz")
3385 .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3386 .with_summary(Summary {
3387 operation: Operation::Append,
3388 additional_properties: HashMap::new(),
3389 })
3390 .with_row_range(table_after_main.next_row_id(), 21)
3391 .build();
3392
3393 let table_after_branch_2 = table_after_main
3394 .into_builder(None)
3395 .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3396 .unwrap()
3397 .build()
3398 .unwrap()
3399 .metadata;
3400
3401 let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3403 let branch_snap_2 = table_after_branch_2
3404 .snapshots
3405 .get(&branch_ref_2.snapshot_id)
3406 .unwrap();
3407 assert_eq!(branch_snap_2.first_row_id(), Some(58));
3408
3409 assert_eq!(table_after_branch_2.next_row_id(), 79);
3411 }
3412
3413 #[test]
3414 fn test_encryption_keys() {
3415 let builder = builder_without_changes(FormatVersion::V2);
3416
3417 let encryption_key_1 = EncryptedKey::builder()
3419 .key_id("key-1")
3420 .encrypted_key_metadata(vec![1, 2, 3, 4])
3421 .encrypted_by_id("encryption-service-1")
3422 .properties(HashMap::from_iter(vec![(
3423 "algorithm".to_string(),
3424 "AES-256".to_string(),
3425 )]))
3426 .build();
3427
3428 let encryption_key_2 = EncryptedKey::builder()
3429 .key_id("key-2")
3430 .encrypted_key_metadata(vec![5, 6, 7, 8])
3431 .encrypted_by_id("encryption-service-2")
3432 .properties(HashMap::new())
3433 .build();
3434
3435 let build_result = builder
3437 .add_encryption_key(encryption_key_1.clone())
3438 .build()
3439 .unwrap();
3440
3441 assert_eq!(build_result.changes.len(), 1);
3442 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3443 assert_eq!(
3444 build_result.metadata.encryption_key("key-1"),
3445 Some(&encryption_key_1)
3446 );
3447 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3448 encryption_key: encryption_key_1.clone()
3449 });
3450
3451 let build_result = build_result
3453 .metadata
3454 .into_builder(Some(
3455 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3456 ))
3457 .add_encryption_key(encryption_key_2.clone())
3458 .build()
3459 .unwrap();
3460
3461 assert_eq!(build_result.changes.len(), 1);
3462 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3463 assert_eq!(
3464 build_result.metadata.encryption_key("key-1"),
3465 Some(&encryption_key_1)
3466 );
3467 assert_eq!(
3468 build_result.metadata.encryption_key("key-2"),
3469 Some(&encryption_key_2)
3470 );
3471 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3472 encryption_key: encryption_key_2.clone()
3473 });
3474
3475 let build_result = build_result
3477 .metadata
3478 .into_builder(Some(
3479 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3480 ))
3481 .add_encryption_key(encryption_key_1.clone())
3482 .build()
3483 .unwrap();
3484
3485 assert_eq!(build_result.changes.len(), 0);
3486 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3487
3488 let build_result = build_result
3490 .metadata
3491 .into_builder(Some(
3492 "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3493 ))
3494 .remove_encryption_key("key-1")
3495 .build()
3496 .unwrap();
3497
3498 assert_eq!(build_result.changes.len(), 1);
3499 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3500 assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3501 assert_eq!(
3502 build_result.metadata.encryption_key("key-2"),
3503 Some(&encryption_key_2)
3504 );
3505 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3506 key_id: "key-1".to_string()
3507 });
3508
3509 let build_result = build_result
3511 .metadata
3512 .into_builder(Some(
3513 "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3514 ))
3515 .remove_encryption_key("non-existent-key")
3516 .build()
3517 .unwrap();
3518
3519 assert_eq!(build_result.changes.len(), 0);
3520 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3521
3522 let keys = build_result
3524 .metadata
3525 .encryption_keys_iter()
3526 .collect::<Vec<_>>();
3527 assert_eq!(keys.len(), 1);
3528 assert_eq!(keys[0], &encryption_key_2);
3529
3530 let build_result = build_result
3532 .metadata
3533 .into_builder(Some(
3534 "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3535 ))
3536 .remove_encryption_key("key-2")
3537 .build()
3538 .unwrap();
3539
3540 assert_eq!(build_result.changes.len(), 1);
3541 assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3542 assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3543 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3544 key_id: "key-2".to_string()
3545 });
3546
3547 let keys = build_result.metadata.encryption_keys_iter();
3549 assert_eq!(keys.len(), 0);
3550 }
3551
3552 #[test]
3553 fn test_partition_field_id_reuse_across_specs() {
3554 let schema = Schema::builder()
3555 .with_fields(vec![
3556 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3557 NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
3558 NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
3559 .into(),
3560 ])
3561 .build()
3562 .unwrap();
3563
3564 let initial_spec = UnboundPartitionSpec::builder()
3566 .add_partition_field(1, "id", Transform::Identity)
3567 .unwrap()
3568 .build();
3569
3570 let mut metadata = TableMetadataBuilder::new(
3571 schema,
3572 initial_spec,
3573 SortOrder::unsorted_order(),
3574 "s3://bucket/table".to_string(),
3575 FormatVersion::V2,
3576 HashMap::new(),
3577 )
3578 .unwrap()
3579 .build()
3580 .unwrap()
3581 .metadata;
3582
3583 let spec1 = UnboundPartitionSpec::builder()
3585 .add_partition_field(2, "data_bucket", Transform::Bucket(10))
3586 .unwrap()
3587 .build();
3588 let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string()));
3589 let result = builder.add_partition_spec(spec1).unwrap().build().unwrap();
3590 metadata = result.metadata;
3591
3592 let spec2 = UnboundPartitionSpec::builder()
3595 .add_partition_field(1, "id", Transform::Identity) .unwrap()
3597 .add_partition_field(2, "data_bucket", Transform::Bucket(10)) .unwrap()
3599 .add_partition_field(3, "year", Transform::Year) .unwrap()
3601 .build();
3602 let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string()));
3603 let result = builder.add_partition_spec(spec2).unwrap().build().unwrap();
3604
3605 let spec2 = result.metadata.partition_spec_by_id(2).unwrap();
3607 let field_ids: Vec<i32> = spec2.fields().iter().map(|f| f.field_id).collect();
3608 assert_eq!(field_ids, vec![1000, 1001, 1002]); }
3610}