1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24 DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25 ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26 Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27 StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28 UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34pub(crate) const FIRST_FIELD_ID: i32 = 1;
35
36#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50 metadata: TableMetadata,
51 changes: Vec<TableUpdate>,
52 last_added_schema_id: Option<i32>,
53 last_added_spec_id: Option<i32>,
54 last_added_order_id: Option<i64>,
55 previous_history_entry: Option<MetadataLog>,
57 last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct TableMetadataBuildResult {
63 pub metadata: TableMetadata,
65 pub changes: Vec<TableUpdate>,
67 pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72 pub const LAST_ADDED: i32 = -1;
74
75 pub fn new(
80 schema: Schema,
81 spec: impl Into<UnboundPartitionSpec>,
82 sort_order: SortOrder,
83 location: String,
84 format_version: FormatVersion,
85 properties: HashMap<String, String>,
86 ) -> Result<Self> {
87 let (fresh_schema, fresh_spec, fresh_sort_order) =
89 Self::reassign_ids(schema, spec.into(), sort_order)?;
90 let schema_id = fresh_schema.schema_id();
91
92 let builder = Self {
93 metadata: TableMetadata {
94 format_version,
95 table_uuid: Uuid::now_v7(),
96 location: "".to_string(), last_sequence_number: 0,
98 last_updated_ms: 0, last_column_id: -1, current_schema_id: -1, schemas: HashMap::new(),
102 partition_specs: HashMap::new(),
103 default_spec: Arc::new(
104 PartitionSpec::unpartition_spec().with_spec_id(-1),
110 ), default_partition_type: StructType::new(vec![]),
112 last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113 properties: HashMap::new(),
114 current_snapshot_id: None,
115 snapshots: HashMap::new(),
116 snapshot_log: vec![],
117 sort_orders: HashMap::new(),
118 metadata_log: vec![],
119 default_sort_order_id: -1, refs: HashMap::default(),
121 statistics: HashMap::new(),
122 partition_statistics: HashMap::new(),
123 encryption_keys: HashMap::new(),
124 next_row_id: INITIAL_ROW_ID,
125 },
126 last_updated_ms: None,
127 changes: vec![],
128 last_added_schema_id: Some(schema_id),
129 last_added_spec_id: None,
130 last_added_order_id: None,
131 previous_history_entry: None,
132 };
133
134 builder
135 .set_location(location)
136 .add_current_schema(fresh_schema)?
137 .add_default_partition_spec(fresh_spec.into_unbound())?
138 .add_default_sort_order(fresh_sort_order)?
139 .set_properties(properties)
140 }
141
142 #[must_use]
148 pub fn new_from_metadata(
149 previous: TableMetadata,
150 current_file_location: Option<String>,
151 ) -> Self {
152 Self {
153 previous_history_entry: current_file_location.map(|l| MetadataLog {
154 metadata_file: l,
155 timestamp_ms: previous.last_updated_ms,
156 }),
157 metadata: previous,
158 changes: Vec::default(),
159 last_added_schema_id: None,
160 last_added_spec_id: None,
161 last_added_order_id: None,
162 last_updated_ms: None,
163 }
164 }
165
166 pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168 let TableCreation {
169 name: _,
170 location,
171 schema,
172 partition_spec,
173 sort_order,
174 properties,
175 format_version,
176 } = table_creation;
177
178 let location = location.ok_or_else(|| {
179 Error::new(
180 ErrorKind::DataInvalid,
181 "Can't create table without location",
182 )
183 })?;
184 let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185 spec_id: None,
186 fields: vec![],
187 });
188
189 Self::new(
190 schema,
191 partition_spec,
192 sort_order.unwrap_or(SortOrder::unsorted_order()),
193 location,
194 format_version,
195 properties,
196 )
197 }
198
199 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201 if self.metadata.table_uuid != uuid {
202 self.metadata.table_uuid = uuid;
203 self.changes.push(TableUpdate::AssignUuid { uuid });
204 }
205
206 self
207 }
208
209 pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214 if format_version < self.metadata.format_version {
215 return Err(Error::new(
216 ErrorKind::DataInvalid,
217 format!(
218 "Cannot downgrade FormatVersion from {} to {}",
219 self.metadata.format_version, format_version
220 ),
221 ));
222 }
223
224 if format_version != self.metadata.format_version {
225 match format_version {
226 FormatVersion::V1 => {
227 }
229 FormatVersion::V2 => {
230 self.metadata.format_version = format_version;
231 self.changes
232 .push(TableUpdate::UpgradeFormatVersion { format_version });
233 }
234 FormatVersion::V3 => {
235 self.metadata.format_version = format_version;
236 self.metadata.next_row_id = INITIAL_ROW_ID;
238 self.changes
239 .push(TableUpdate::UpgradeFormatVersion { format_version });
240 }
241 }
242 }
243
244 Ok(self)
245 }
246
247 pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
256 let reserved_properties = properties
258 .keys()
259 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
260 .map(ToString::to_string)
261 .collect::<Vec<_>>();
262
263 if !reserved_properties.is_empty() {
264 return Err(Error::new(
265 ErrorKind::DataInvalid,
266 format!(
267 "Table properties should not contain reserved properties, but got: [{}]",
268 reserved_properties.join(", ")
269 ),
270 ));
271 }
272
273 if properties.is_empty() {
274 return Ok(self);
275 }
276
277 self.metadata.properties.extend(properties.clone());
278 self.changes.push(TableUpdate::SetProperties {
279 updates: properties,
280 });
281
282 Ok(self)
283 }
284
285 pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
291 let properties = properties.iter().cloned().collect::<HashSet<_>>();
293
294 let reserved_properties = properties
296 .iter()
297 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
298 .map(ToString::to_string)
299 .collect::<Vec<_>>();
300
301 if !reserved_properties.is_empty() {
302 return Err(Error::new(
303 ErrorKind::DataInvalid,
304 format!(
305 "Table properties to remove contain reserved properties: [{}]",
306 reserved_properties.join(", ")
307 ),
308 ));
309 }
310
311 for property in &properties {
312 self.metadata.properties.remove(property);
313 }
314
315 if !properties.is_empty() {
316 self.changes.push(TableUpdate::RemoveProperties {
317 removals: properties.into_iter().collect(),
318 });
319 }
320
321 Ok(self)
322 }
323
324 pub fn set_location(mut self, location: String) -> Self {
326 let location = location.trim_end_matches('/').to_string();
327 if self.metadata.location != location {
328 self.changes.push(TableUpdate::SetLocation {
329 location: location.clone(),
330 });
331 self.metadata.location = location;
332 }
333
334 self
335 }
336
337 pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
346 if self
347 .metadata
348 .snapshots
349 .contains_key(&snapshot.snapshot_id())
350 {
351 return Err(Error::new(
352 ErrorKind::DataInvalid,
353 format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
354 ));
355 }
356
357 if self.metadata.format_version != FormatVersion::V1
358 && snapshot.sequence_number() <= self.metadata.last_sequence_number
359 && snapshot.parent_snapshot_id().is_some()
360 {
361 return Err(Error::new(
362 ErrorKind::DataInvalid,
363 format!(
364 "Cannot add snapshot with sequence number {} older than last sequence number {}",
365 snapshot.sequence_number(),
366 self.metadata.last_sequence_number
367 ),
368 ));
369 }
370
371 if let Some(last) = self.metadata.snapshot_log.last() {
372 if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
375 return Err(Error::new(
376 ErrorKind::DataInvalid,
377 format!(
378 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
379 snapshot.timestamp_ms(),
380 last.timestamp_ms
381 ),
382 ));
383 }
384 }
385
386 let max_last_updated = self
387 .last_updated_ms
388 .unwrap_or_default()
389 .max(self.metadata.last_updated_ms);
390 if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
391 return Err(Error::new(
392 ErrorKind::DataInvalid,
393 format!(
394 "Invalid snapshot timestamp {}: before last updated timestamp {}",
395 snapshot.timestamp_ms(),
396 max_last_updated
397 ),
398 ));
399 }
400
401 let mut added_rows = None;
402 if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
403 if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
404 if first_row_id < self.metadata.next_row_id {
405 return Err(Error::new(
406 ErrorKind::DataInvalid,
407 format!(
408 "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
409 self.metadata.next_row_id
410 ),
411 ));
412 }
413
414 added_rows = Some(added_rows_count);
415 } else {
416 return Err(Error::new(
417 ErrorKind::DataInvalid,
418 format!(
419 "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
420 ),
421 ));
422 }
423 }
424
425 if let Some(added_rows) = added_rows {
426 self.metadata.next_row_id = self
427 .metadata
428 .next_row_id
429 .checked_add(added_rows)
430 .ok_or_else(|| {
431 Error::new(
432 ErrorKind::DataInvalid,
433 "Cannot add snapshot: next-row-id overflowed when adding added-rows",
434 )
435 })?;
436 }
437
438 self.changes.push(TableUpdate::AddSnapshot {
440 snapshot: snapshot.clone(),
441 });
442
443 self.last_updated_ms = Some(snapshot.timestamp_ms());
444 self.metadata.last_sequence_number = snapshot.sequence_number();
445 self.metadata
446 .snapshots
447 .insert(snapshot.snapshot_id(), snapshot.into());
448
449 Ok(self)
450 }
451
452 pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
458 let reference = self.metadata.refs.get(branch).cloned();
459
460 let reference = if let Some(mut reference) = reference {
461 if !reference.is_branch() {
462 return Err(Error::new(
463 ErrorKind::DataInvalid,
464 format!("Cannot append snapshot to non-branch reference '{branch}'",),
465 ));
466 }
467
468 reference.snapshot_id = snapshot.snapshot_id();
469 reference
470 } else {
471 SnapshotReference {
472 snapshot_id: snapshot.snapshot_id(),
473 retention: SnapshotRetention::Branch {
474 min_snapshots_to_keep: None,
475 max_snapshot_age_ms: None,
476 max_ref_age_ms: None,
477 },
478 }
479 };
480
481 self.add_snapshot(snapshot)?.set_ref(branch, reference)
482 }
483
484 pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
488 let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
489
490 self.metadata.snapshots.retain(|k, _| {
491 if snapshot_ids.contains(k) {
492 removed_snapshots.push(*k);
493 false
494 } else {
495 true
496 }
497 });
498
499 if !removed_snapshots.is_empty() {
500 self.changes.push(TableUpdate::RemoveSnapshots {
501 snapshot_ids: removed_snapshots,
502 });
503 }
504
505 self.metadata
507 .refs
508 .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
509
510 self
511 }
512
513 pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
518 if self
519 .metadata
520 .refs
521 .get(ref_name)
522 .is_some_and(|snap_ref| snap_ref.eq(&reference))
523 {
524 return Ok(self);
525 }
526
527 let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
528 return Err(Error::new(
529 ErrorKind::DataInvalid,
530 format!(
531 "Cannot set '{ref_name}' to unknown snapshot: '{}'",
532 reference.snapshot_id
533 ),
534 ));
535 };
536
537 let is_added_snapshot = self.changes.iter().any(|update| {
539 matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
540 });
541 if is_added_snapshot {
542 self.last_updated_ms = Some(snapshot.timestamp_ms());
543 }
544
545 if ref_name == MAIN_BRANCH {
547 self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
548 let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
549 last_updated_ms
550 } else {
551 let last_updated_ms = chrono::Utc::now().timestamp_millis();
552 self.last_updated_ms = Some(last_updated_ms);
553 last_updated_ms
554 };
555
556 self.metadata.snapshot_log.push(SnapshotLog {
557 snapshot_id: snapshot.snapshot_id(),
558 timestamp_ms,
559 });
560 }
561
562 self.changes.push(TableUpdate::SetSnapshotRef {
563 ref_name: ref_name.to_string(),
564 reference: reference.clone(),
565 });
566 self.metadata.refs.insert(ref_name.to_string(), reference);
567
568 Ok(self)
569 }
570
571 pub fn remove_ref(mut self, ref_name: &str) -> Self {
575 if ref_name == MAIN_BRANCH {
576 self.metadata.current_snapshot_id = None;
577 }
578
579 if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
580 self.changes.push(TableUpdate::RemoveSnapshotRef {
581 ref_name: ref_name.to_string(),
582 });
583 }
584
585 self
586 }
587
588 pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
590 self.metadata
591 .statistics
592 .insert(statistics.snapshot_id, statistics.clone());
593 self.changes.push(TableUpdate::SetStatistics {
594 statistics: statistics.clone(),
595 });
596 self
597 }
598
599 pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
601 let previous = self.metadata.statistics.remove(&snapshot_id);
602 if previous.is_some() {
603 self.changes
604 .push(TableUpdate::RemoveStatistics { snapshot_id });
605 }
606 self
607 }
608
609 pub fn set_partition_statistics(
611 mut self,
612 partition_statistics_file: PartitionStatisticsFile,
613 ) -> Self {
614 self.metadata.partition_statistics.insert(
615 partition_statistics_file.snapshot_id,
616 partition_statistics_file.clone(),
617 );
618 self.changes.push(TableUpdate::SetPartitionStatistics {
619 partition_statistics: partition_statistics_file,
620 });
621 self
622 }
623
624 pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
626 let previous = self.metadata.partition_statistics.remove(&snapshot_id);
627 if previous.is_some() {
628 self.changes
629 .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
630 }
631 self
632 }
633
634 pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
641 self.validate_schema_field_names(&schema)?;
643
644 let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
645 let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
646
647 if schema_found {
648 if self.last_added_schema_id != Some(new_schema_id) {
649 self.changes.push(TableUpdate::AddSchema {
650 schema: schema.clone(),
651 });
652 self.last_added_schema_id = Some(new_schema_id);
653 }
654
655 return Ok(self);
656 }
657
658 self.metadata.last_column_id =
661 std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
662
663 let schema = match new_schema_id == schema.schema_id() {
665 true => schema,
666 false => schema.with_schema_id(new_schema_id),
667 };
668
669 self.metadata
670 .schemas
671 .insert(new_schema_id, schema.clone().into());
672
673 self.changes.push(TableUpdate::AddSchema { schema });
674
675 self.last_added_schema_id = Some(new_schema_id);
676
677 Ok(self)
678 }
679
680 pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
688 if schema_id == Self::LAST_ADDED {
689 schema_id = self.last_added_schema_id.ok_or_else(|| {
690 Error::new(
691 ErrorKind::DataInvalid,
692 "Cannot set current schema to last added schema: no schema has been added.",
693 )
694 })?;
695 };
696 let schema_id = schema_id; if schema_id == self.metadata.current_schema_id {
699 return Ok(self);
700 }
701
702 let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
703 Error::new(
704 ErrorKind::DataInvalid,
705 format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
706 )
707 })?;
708
709 self.metadata.current_schema_id = schema_id;
715
716 if self.last_added_schema_id == Some(schema_id) {
717 self.changes.push(TableUpdate::SetCurrentSchema {
718 schema_id: Self::LAST_ADDED,
719 });
720 } else {
721 self.changes
722 .push(TableUpdate::SetCurrentSchema { schema_id });
723 }
724
725 Ok(self)
726 }
727
728 pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
730 self.add_schema(schema)?
731 .set_current_schema(Self::LAST_ADDED)
732 }
733
734 fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
743 if self.metadata.schemas.is_empty() {
744 return Ok(());
745 }
746
747 for field_name in schema.field_id_to_name_map().values() {
748 let has_partition_conflict = self.metadata.partition_name_exists(field_name);
749 let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
750
751 if has_partition_conflict && is_new_field {
752 return Err(Error::new(
753 ErrorKind::DataInvalid,
754 format!(
755 "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
756 Schema evolution cannot introduce field names that match existing partition field names."
757 ),
758 ));
759 }
760 }
761
762 Ok(())
763 }
764
765 fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
775 if self.metadata.schemas.is_empty() {
776 return Ok(());
777 }
778
779 let current_schema = self.get_current_schema()?;
780 for partition_field in unbound_spec.fields() {
781 let exists_in_any_schema = self
782 .metadata
783 .name_exists_in_any_schema(&partition_field.name);
784
785 if !exists_in_any_schema {
787 continue;
788 }
789
790 if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
792 let is_identity_transform =
793 partition_field.transform == crate::spec::Transform::Identity;
794 let has_matching_source_id = schema_field.id == partition_field.source_id;
795
796 if !is_identity_transform {
797 return Err(Error::new(
798 ErrorKind::DataInvalid,
799 format!(
800 "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
801 partition_field.name
802 ),
803 ));
804 }
805
806 if !has_matching_source_id {
807 return Err(Error::new(
808 ErrorKind::DataInvalid,
809 format!(
810 "Cannot create identity partition sourced from different field in schema. \
811 Field name '{}' has id `{}` in schema but partition source id is `{}`",
812 partition_field.name, schema_field.id, partition_field.source_id
813 ),
814 ));
815 }
816 }
817 }
818
819 Ok(())
820 }
821
822 pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
833 let schema = self.get_current_schema()?.clone();
834
835 self.validate_partition_field_names(&unbound_spec)?;
837
838 let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?;
840
841 let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
842 .with_last_assigned_field_id(self.metadata.last_partition_id)
843 .build()?;
844
845 let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
846 let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
847 let spec = spec.with_spec_id(new_spec_id);
848 let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
849
850 if spec_found {
851 if self.last_added_spec_id != Some(new_spec_id) {
852 self.changes
853 .push(TableUpdate::AddSpec { spec: unbound_spec });
854 self.last_added_spec_id = Some(new_spec_id);
855 }
856
857 return Ok(self);
858 }
859
860 if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
861 return Err(Error::new(
862 ErrorKind::DataInvalid,
863 "Cannot add partition spec with non-sequential field ids to format version 1 table",
864 ));
865 }
866
867 let highest_field_id = spec
868 .highest_field_id()
869 .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
870 self.metadata
871 .partition_specs
872 .insert(new_spec_id, Arc::new(spec));
873 self.changes
874 .push(TableUpdate::AddSpec { spec: unbound_spec });
875
876 self.last_added_spec_id = Some(new_spec_id);
877 self.metadata.last_partition_id =
878 std::cmp::max(self.metadata.last_partition_id, highest_field_id);
879
880 Ok(self)
881 }
882
883 fn reuse_partition_field_ids(
888 &self,
889 unbound_spec: UnboundPartitionSpec,
890 ) -> Result<UnboundPartitionSpec> {
891 let equivalent_field_ids: HashMap<_, _> = self
893 .metadata
894 .partition_specs
895 .values()
896 .flat_map(|spec| spec.fields())
897 .map(|field| ((field.source_id, &field.transform), field.field_id))
898 .collect();
899
900 let fields = unbound_spec
902 .fields
903 .into_iter()
904 .map(|mut field| {
905 if field.field_id.is_none()
906 && let Some(&existing_field_id) =
907 equivalent_field_ids.get(&(field.source_id, &field.transform))
908 {
909 field.field_id = Some(existing_field_id);
910 }
911 field
912 })
913 .collect();
914
915 Ok(UnboundPartitionSpec {
916 spec_id: unbound_spec.spec_id,
917 fields,
918 })
919 }
920
921 pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
927 if spec_id == Self::LAST_ADDED {
928 spec_id = self.last_added_spec_id.ok_or_else(|| {
929 Error::new(
930 ErrorKind::DataInvalid,
931 "Cannot set default partition spec to last added spec: no spec has been added.",
932 )
933 })?;
934 }
935
936 if self.metadata.default_spec.spec_id() == spec_id {
937 return Ok(self);
938 }
939
940 if !self.metadata.partition_specs.contains_key(&spec_id) {
941 return Err(Error::new(
942 ErrorKind::DataInvalid,
943 format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
944 ));
945 }
946
947 let schemaless_spec = self
948 .metadata
949 .partition_specs
950 .get(&spec_id)
951 .ok_or_else(|| {
952 Error::new(
953 ErrorKind::DataInvalid,
954 format!(
955 "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
956 ),
957 )
958 })?
959 .clone();
960 let spec = Arc::unwrap_or_clone(schemaless_spec);
961 let spec_type = spec.partition_type(self.get_current_schema()?)?;
962 self.metadata.default_spec = Arc::new(spec);
963 self.metadata.default_partition_type = spec_type;
964
965 if self.last_added_spec_id == Some(spec_id) {
966 self.changes.push(TableUpdate::SetDefaultSpec {
967 spec_id: Self::LAST_ADDED,
968 });
969 } else {
970 self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
971 }
972
973 Ok(self)
974 }
975
976 pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
978 self.add_partition_spec(unbound_spec)?
979 .set_default_partition_spec(Self::LAST_ADDED)
980 }
981
982 pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
989 if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
990 return Err(Error::new(
991 ErrorKind::DataInvalid,
992 "Cannot remove default partition spec",
993 ));
994 }
995
996 let mut removed_specs = Vec::with_capacity(spec_ids.len());
997 spec_ids.iter().for_each(|id| {
998 if self.metadata.partition_specs.remove(id).is_some() {
999 removed_specs.push(*id);
1000 }
1001 });
1002
1003 if !removed_specs.is_empty() {
1004 self.changes.push(TableUpdate::RemovePartitionSpecs {
1005 spec_ids: removed_specs,
1006 });
1007 }
1008
1009 Ok(self)
1010 }
1011
1012 pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
1023 let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
1024 let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
1025
1026 if sort_order_found {
1027 if self.last_added_order_id != Some(new_order_id) {
1028 self.changes.push(TableUpdate::AddSortOrder {
1029 sort_order: sort_order.clone().with_order_id(new_order_id),
1030 });
1031 self.last_added_order_id = Some(new_order_id);
1032 }
1033
1034 return Ok(self);
1035 }
1036
1037 let schema = self.get_current_schema()?.clone().as_ref().clone();
1038 let sort_order = SortOrder::builder()
1039 .with_order_id(new_order_id)
1040 .with_fields(sort_order.fields)
1041 .build(&schema)
1042 .map_err(|e| {
1043 Error::new(
1044 ErrorKind::DataInvalid,
1045 format!("Sort order to add is incompatible with current schema: {e}"),
1046 )
1047 .with_source(e)
1048 })?;
1049
1050 self.last_added_order_id = Some(new_order_id);
1051 self.metadata
1052 .sort_orders
1053 .insert(new_order_id, sort_order.clone().into());
1054 self.changes.push(TableUpdate::AddSortOrder { sort_order });
1055
1056 Ok(self)
1057 }
1058
1059 pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1065 if sort_order_id == Self::LAST_ADDED as i64 {
1066 sort_order_id = self.last_added_order_id.ok_or_else(|| {
1067 Error::new(
1068 ErrorKind::DataInvalid,
1069 "Cannot set default sort order to last added order: no order has been added.",
1070 )
1071 })?;
1072 }
1073
1074 if self.metadata.default_sort_order_id == sort_order_id {
1075 return Ok(self);
1076 }
1077
1078 if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1079 return Err(Error::new(
1080 ErrorKind::DataInvalid,
1081 format!(
1082 "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1083 ),
1084 ));
1085 }
1086
1087 self.metadata.default_sort_order_id = sort_order_id;
1088
1089 if self.last_added_order_id == Some(sort_order_id) {
1090 self.changes.push(TableUpdate::SetDefaultSortOrder {
1091 sort_order_id: Self::LAST_ADDED as i64,
1092 });
1093 } else {
1094 self.changes
1095 .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1096 }
1097
1098 Ok(self)
1099 }
1100
1101 fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1103 self.add_sort_order(sort_order)?
1104 .set_default_sort_order(Self::LAST_ADDED as i64)
1105 }
1106
1107 pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1109 let key_id = key.key_id().to_string();
1110 if self.metadata.encryption_keys.contains_key(&key_id) {
1111 return self;
1113 }
1114
1115 self.metadata.encryption_keys.insert(key_id, key.clone());
1116 self.changes.push(TableUpdate::AddEncryptionKey {
1117 encryption_key: key,
1118 });
1119 self
1120 }
1121
1122 pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1124 if self.metadata.encryption_keys.remove(key_id).is_some() {
1125 self.changes.push(TableUpdate::RemoveEncryptionKey {
1126 key_id: key_id.to_string(),
1127 });
1128 }
1129 self
1130 }
1131
1132 pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1134 self.metadata.last_updated_ms = self
1135 .last_updated_ms
1136 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1137
1138 let schema = self.get_current_schema()?.clone();
1142 let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1143
1144 self.metadata.default_spec = Arc::new(
1145 Arc::unwrap_or_clone(self.metadata.default_spec)
1146 .into_unbound()
1147 .bind(schema.clone())?,
1148 );
1149 self.metadata.default_partition_type =
1150 self.metadata.default_spec.partition_type(&schema)?;
1151 SortOrder::builder()
1152 .with_fields(sort_order.fields)
1153 .build(&schema)?;
1154
1155 self.update_snapshot_log()?;
1156 self.metadata.try_normalize()?;
1157
1158 if let Some(hist_entry) = self.previous_history_entry.take() {
1159 self.metadata.metadata_log.push(hist_entry);
1160 }
1161 let expired_metadata_logs = self.expire_metadata_log();
1162
1163 Ok(TableMetadataBuildResult {
1164 metadata: self.metadata,
1165 changes: self.changes,
1166 expired_metadata_logs,
1167 })
1168 }
1169
1170 fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1171 let max_size = self
1172 .metadata
1173 .properties
1174 .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1175 .and_then(|v| v.parse::<usize>().ok())
1176 .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1177 .max(1);
1178
1179 if self.metadata.metadata_log.len() > max_size {
1180 self.metadata
1181 .metadata_log
1182 .drain(0..self.metadata.metadata_log.len() - max_size)
1183 .collect()
1184 } else {
1185 Vec::new()
1186 }
1187 }
1188
1189 fn update_snapshot_log(&mut self) -> Result<()> {
1190 let intermediate_snapshots = self.get_intermediate_snapshots();
1191 let has_removed_snapshots = self
1192 .changes
1193 .iter()
1194 .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1195
1196 if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1197 return Ok(());
1198 }
1199
1200 let mut new_snapshot_log = Vec::new();
1201 for log_entry in &self.metadata.snapshot_log {
1202 let snapshot_id = log_entry.snapshot_id;
1203 if self.metadata.snapshots.contains_key(&snapshot_id) {
1204 if !intermediate_snapshots.contains(&snapshot_id) {
1205 new_snapshot_log.push(log_entry.clone());
1206 }
1207 } else if has_removed_snapshots {
1208 new_snapshot_log.clear();
1214 }
1215 }
1216
1217 if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1218 let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1219 if last_id != Some(current_snapshot_id) {
1220 return Err(Error::new(
1221 ErrorKind::DataInvalid,
1222 "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1223 ));
1224 }
1225 };
1226
1227 self.metadata.snapshot_log = new_snapshot_log;
1228 Ok(())
1229 }
1230
1231 fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1241 let added_snapshot_ids = self
1242 .changes
1243 .iter()
1244 .filter_map(|update| match update {
1245 TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1246 _ => None,
1247 })
1248 .collect::<HashSet<_>>();
1249
1250 self.changes
1251 .iter()
1252 .filter_map(|update| match update {
1253 TableUpdate::SetSnapshotRef {
1254 ref_name,
1255 reference,
1256 } => {
1257 if added_snapshot_ids.contains(&reference.snapshot_id)
1258 && ref_name == MAIN_BRANCH
1259 && reference.snapshot_id
1260 != self
1261 .metadata
1262 .current_snapshot_id
1263 .unwrap_or(i64::from(Self::LAST_ADDED))
1264 {
1265 Some(reference.snapshot_id)
1266 } else {
1267 None
1268 }
1269 }
1270 _ => None,
1271 })
1272 .collect()
1273 }
1274
1275 fn reassign_ids(
1276 schema: Schema,
1277 spec: UnboundPartitionSpec,
1278 sort_order: SortOrder,
1279 ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1280 let previous_id_to_name = schema.field_id_to_name_map().clone();
1282 let fresh_schema = schema
1283 .into_builder()
1284 .with_schema_id(DEFAULT_SCHEMA_ID)
1285 .with_reassigned_field_ids(FIRST_FIELD_ID)
1286 .build()?;
1287
1288 let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1290 for field in spec.fields() {
1291 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1292 Error::new(
1293 ErrorKind::DataInvalid,
1294 format!(
1295 "Cannot find source column with id {} for partition column {} in schema.",
1296 field.source_id, field.name
1297 ),
1298 )
1299 })?;
1300 fresh_spec =
1301 fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1302 }
1303 let fresh_spec = fresh_spec.build()?;
1304
1305 let mut fresh_order = SortOrder::builder();
1307 for mut field in sort_order.fields {
1308 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1309 Error::new(
1310 ErrorKind::DataInvalid,
1311 format!(
1312 "Cannot find source column with id {} for sort column in schema.",
1313 field.source_id
1314 ),
1315 )
1316 })?;
1317 let new_field_id = fresh_schema
1318 .field_by_name(source_field_name)
1319 .ok_or_else(|| {
1320 Error::new(
1321 ErrorKind::Unexpected,
1322 format!(
1323 "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1324 ),
1325 )
1326 })?.id;
1327 field.source_id = new_field_id;
1328 fresh_order.with_sort_field(field);
1329 }
1330 let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1331
1332 Ok((fresh_schema, fresh_spec, fresh_sort_order))
1333 }
1334
1335 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1336 self.metadata
1337 .schemas
1338 .iter()
1339 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1340 .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1341 }
1342
1343 fn get_highest_schema_id(&self) -> i32 {
1344 *self
1345 .metadata
1346 .schemas
1347 .keys()
1348 .max()
1349 .unwrap_or(&self.metadata.current_schema_id)
1350 }
1351
1352 fn get_current_schema(&self) -> Result<&SchemaRef> {
1353 self.metadata
1354 .schemas
1355 .get(&self.metadata.current_schema_id)
1356 .ok_or_else(|| {
1357 Error::new(
1358 ErrorKind::DataInvalid,
1359 format!(
1360 "Current schema with id '{}' not found in table metadata.",
1361 self.metadata.current_schema_id
1362 ),
1363 )
1364 })
1365 }
1366
1367 fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1368 self.metadata
1369 .sort_orders
1370 .get(&self.metadata.default_sort_order_id)
1371 .cloned()
1372 .ok_or_else(|| {
1373 Error::new(
1374 ErrorKind::DataInvalid,
1375 format!(
1376 "Default sort order with id '{}' not found in table metadata.",
1377 self.metadata.default_sort_order_id
1378 ),
1379 )
1380 })
1381 }
1382
1383 fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1385 self.metadata
1386 .partition_specs
1387 .iter()
1388 .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1389 .unwrap_or_else(|| {
1390 self.get_highest_spec_id()
1391 .map(|id| id + 1)
1392 .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1393 })
1394 }
1395
1396 fn get_highest_spec_id(&self) -> Option<i32> {
1397 self.metadata.partition_specs.keys().max().copied()
1398 }
1399
1400 fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1402 if new_sort_order.is_unsorted() {
1403 return SortOrder::unsorted_order().order_id;
1404 }
1405
1406 self.metadata
1407 .sort_orders
1408 .iter()
1409 .find_map(|(id, sort_order)| {
1410 sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1411 })
1412 .unwrap_or_else(|| {
1413 self.highest_sort_order_id()
1414 .unwrap_or(SortOrder::unsorted_order().order_id)
1415 + 1
1416 })
1417 }
1418
1419 fn highest_sort_order_id(&self) -> Option<i64> {
1420 self.metadata.sort_orders.keys().max().copied()
1421 }
1422
1423 pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1426 if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1427 return Err(Error::new(
1428 ErrorKind::DataInvalid,
1429 "Cannot remove current schema",
1430 ));
1431 }
1432
1433 if schema_id_to_remove.is_empty() {
1434 return Ok(self);
1435 }
1436
1437 let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1438 self.metadata.schemas.retain(|id, _schema| {
1439 if schema_id_to_remove.contains(id) {
1440 removed_schemas.push(*id);
1441 false
1442 } else {
1443 true
1444 }
1445 });
1446
1447 self.changes.push(TableUpdate::RemoveSchemas {
1448 schema_ids: removed_schemas,
1449 });
1450
1451 Ok(self)
1452 }
1453}
1454
1455impl From<TableMetadataBuildResult> for TableMetadata {
1456 fn from(result: TableMetadataBuildResult) -> Self {
1457 result.metadata
1458 }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use std::fs::File;
1464 use std::io::BufReader;
1465 use std::thread::sleep;
1466
1467 use super::*;
1468 use crate::TableIdent;
1469 use crate::io::FileIO;
1470 use crate::spec::{
1471 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1472 SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1473 Transform, Type, UnboundPartitionField,
1474 };
1475 use crate::table::Table;
1476 use crate::test_utils::test_runtime;
1477
1478 const TEST_LOCATION: &str = "s3://bucket/test/location";
1479 const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1480
1481 fn schema() -> Schema {
1482 Schema::builder()
1483 .with_fields(vec![
1484 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1485 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1486 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1487 ])
1488 .build()
1489 .unwrap()
1490 }
1491
1492 fn sort_order() -> SortOrder {
1493 let schema = schema();
1494 SortOrder::builder()
1495 .with_order_id(1)
1496 .with_sort_field(SortField {
1497 source_id: 3,
1498 transform: Transform::Bucket(4),
1499 direction: SortDirection::Descending,
1500 null_order: NullOrder::First,
1501 })
1502 .build(&schema)
1503 .unwrap()
1504 }
1505
1506 fn partition_spec() -> UnboundPartitionSpec {
1507 UnboundPartitionSpec::builder()
1508 .with_spec_id(0)
1509 .add_partition_field(2, "y", Transform::Identity)
1510 .unwrap()
1511 .build()
1512 }
1513
1514 fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1515 TableMetadataBuilder::new(
1516 schema(),
1517 partition_spec(),
1518 sort_order(),
1519 TEST_LOCATION.to_string(),
1520 format_version,
1521 HashMap::new(),
1522 )
1523 .unwrap()
1524 .build()
1525 .unwrap()
1526 .metadata
1527 .into_builder(Some(
1528 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1529 ))
1530 }
1531
1532 #[test]
1533 fn test_minimal_build() {
1534 let metadata = TableMetadataBuilder::new(
1535 schema(),
1536 partition_spec(),
1537 sort_order(),
1538 TEST_LOCATION.to_string(),
1539 FormatVersion::V1,
1540 HashMap::new(),
1541 )
1542 .unwrap()
1543 .build()
1544 .unwrap()
1545 .metadata;
1546
1547 assert_eq!(metadata.format_version, FormatVersion::V1);
1548 assert_eq!(metadata.location, TEST_LOCATION);
1549 assert_eq!(metadata.current_schema_id, 0);
1550 assert_eq!(metadata.default_spec.spec_id(), 0);
1551 assert_eq!(metadata.default_sort_order_id, 1);
1552 assert_eq!(metadata.last_partition_id, 1000);
1553 assert_eq!(metadata.last_column_id, 3);
1554 assert_eq!(metadata.snapshots.len(), 0);
1555 assert_eq!(metadata.current_snapshot_id, None);
1556 assert_eq!(metadata.refs.len(), 0);
1557 assert_eq!(metadata.properties.len(), 0);
1558 assert_eq!(metadata.metadata_log.len(), 0);
1559 assert_eq!(metadata.last_sequence_number, 0);
1560 assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1561
1562 let _ = serde_json::to_string(&metadata).unwrap();
1564
1565 let metadata = metadata
1567 .into_builder(Some(
1568 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1569 ))
1570 .upgrade_format_version(FormatVersion::V2)
1571 .unwrap()
1572 .build()
1573 .unwrap()
1574 .metadata;
1575
1576 assert_eq!(metadata.format_version, FormatVersion::V2);
1577 let _ = serde_json::to_string(&metadata).unwrap();
1578 }
1579
1580 #[test]
1581 fn test_build_unpartitioned_unsorted() {
1582 let schema = Schema::builder().build().unwrap();
1583 let metadata = TableMetadataBuilder::new(
1584 schema.clone(),
1585 PartitionSpec::unpartition_spec(),
1586 SortOrder::unsorted_order(),
1587 TEST_LOCATION.to_string(),
1588 FormatVersion::V2,
1589 HashMap::new(),
1590 )
1591 .unwrap()
1592 .build()
1593 .unwrap()
1594 .metadata;
1595
1596 assert_eq!(metadata.format_version, FormatVersion::V2);
1597 assert_eq!(metadata.location, TEST_LOCATION);
1598 assert_eq!(metadata.current_schema_id, 0);
1599 assert_eq!(metadata.default_spec.spec_id(), 0);
1600 assert_eq!(metadata.default_sort_order_id, 0);
1601 assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1602 assert_eq!(metadata.last_column_id, 0);
1603 assert_eq!(metadata.snapshots.len(), 0);
1604 assert_eq!(metadata.current_snapshot_id, None);
1605 assert_eq!(metadata.refs.len(), 0);
1606 assert_eq!(metadata.properties.len(), 0);
1607 assert_eq!(metadata.metadata_log.len(), 0);
1608 assert_eq!(metadata.last_sequence_number, 0);
1609 }
1610
1611 #[test]
1612 fn test_reassigns_ids() {
1613 let schema = Schema::builder()
1614 .with_schema_id(10)
1615 .with_fields(vec![
1616 NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1617 NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1618 NestedField::required(
1619 13,
1620 "struct",
1621 Type::Struct(StructType::new(vec![
1622 NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1623 .into(),
1624 ])),
1625 )
1626 .into(),
1627 NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1628 ])
1629 .build()
1630 .unwrap();
1631 let spec = PartitionSpec::builder(schema.clone())
1632 .with_spec_id(20)
1633 .add_partition_field("a", "a", Transform::Identity)
1634 .unwrap()
1635 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1636 .unwrap()
1637 .build()
1638 .unwrap();
1639 let sort_order = SortOrder::builder()
1640 .with_fields(vec![SortField {
1641 source_id: 11,
1642 transform: Transform::Identity,
1643 direction: SortDirection::Ascending,
1644 null_order: NullOrder::First,
1645 }])
1646 .with_order_id(10)
1647 .build(&schema)
1648 .unwrap();
1649
1650 let (fresh_schema, fresh_spec, fresh_sort_order) =
1651 TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1652
1653 let expected_schema = Schema::builder()
1654 .with_fields(vec![
1655 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1656 NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1657 NestedField::required(
1658 3,
1659 "struct",
1660 Type::Struct(StructType::new(vec![
1661 NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1662 .into(),
1663 ])),
1664 )
1665 .into(),
1666 NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1667 ])
1668 .build()
1669 .unwrap();
1670
1671 let expected_spec = PartitionSpec::builder(expected_schema.clone())
1672 .with_spec_id(0)
1673 .add_partition_field("a", "a", Transform::Identity)
1674 .unwrap()
1675 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1676 .unwrap()
1677 .build()
1678 .unwrap();
1679
1680 let expected_sort_order = SortOrder::builder()
1681 .with_fields(vec![SortField {
1682 source_id: 1,
1683 transform: Transform::Identity,
1684 direction: SortDirection::Ascending,
1685 null_order: NullOrder::First,
1686 }])
1687 .with_order_id(1)
1688 .build(&expected_schema)
1689 .unwrap();
1690
1691 assert_eq!(fresh_schema, expected_schema);
1692 assert_eq!(fresh_spec, expected_spec);
1693 assert_eq!(fresh_sort_order, expected_sort_order);
1694 }
1695
1696 #[test]
1697 fn test_ids_are_reassigned_for_new_metadata() {
1698 let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1699
1700 let metadata = TableMetadataBuilder::new(
1701 schema,
1702 partition_spec(),
1703 sort_order(),
1704 TEST_LOCATION.to_string(),
1705 FormatVersion::V1,
1706 HashMap::new(),
1707 )
1708 .unwrap()
1709 .build()
1710 .unwrap()
1711 .metadata;
1712
1713 assert_eq!(metadata.current_schema_id, 0);
1714 assert_eq!(metadata.current_schema().schema_id(), 0);
1715 }
1716
1717 #[test]
1718 fn test_new_metadata_changes() {
1719 let changes = TableMetadataBuilder::new(
1720 schema(),
1721 partition_spec(),
1722 sort_order(),
1723 TEST_LOCATION.to_string(),
1724 FormatVersion::V1,
1725 HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1726 )
1727 .unwrap()
1728 .build()
1729 .unwrap()
1730 .changes;
1731
1732 pretty_assertions::assert_eq!(changes, vec![
1733 TableUpdate::SetLocation {
1734 location: TEST_LOCATION.to_string()
1735 },
1736 TableUpdate::AddSchema { schema: schema() },
1737 TableUpdate::SetCurrentSchema { schema_id: -1 },
1738 TableUpdate::AddSpec {
1739 spec: PartitionSpec::builder(schema())
1742 .with_spec_id(0)
1743 .add_unbound_field(UnboundPartitionField {
1744 name: "y".to_string(),
1745 transform: Transform::Identity,
1746 source_id: 2,
1747 field_id: Some(1000)
1748 })
1749 .unwrap()
1750 .build()
1751 .unwrap()
1752 .into_unbound(),
1753 },
1754 TableUpdate::SetDefaultSpec { spec_id: -1 },
1755 TableUpdate::AddSortOrder {
1756 sort_order: sort_order(),
1757 },
1758 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1759 TableUpdate::SetProperties {
1760 updates: HashMap::from_iter(vec![(
1761 "property 1".to_string(),
1762 "value 1".to_string()
1763 )]),
1764 }
1765 ]);
1766 }
1767
1768 #[test]
1769 fn test_new_metadata_changes_unpartitioned_unsorted() {
1770 let schema = Schema::builder().build().unwrap();
1771 let changes = TableMetadataBuilder::new(
1772 schema.clone(),
1773 PartitionSpec::unpartition_spec().into_unbound(),
1774 SortOrder::unsorted_order(),
1775 TEST_LOCATION.to_string(),
1776 FormatVersion::V1,
1777 HashMap::new(),
1778 )
1779 .unwrap()
1780 .build()
1781 .unwrap()
1782 .changes;
1783
1784 pretty_assertions::assert_eq!(changes, vec![
1785 TableUpdate::SetLocation {
1786 location: TEST_LOCATION.to_string()
1787 },
1788 TableUpdate::AddSchema {
1789 schema: Schema::builder().build().unwrap(),
1790 },
1791 TableUpdate::SetCurrentSchema { schema_id: -1 },
1792 TableUpdate::AddSpec {
1793 spec: PartitionSpec::builder(schema)
1796 .with_spec_id(0)
1797 .build()
1798 .unwrap()
1799 .into_unbound(),
1800 },
1801 TableUpdate::SetDefaultSpec { spec_id: -1 },
1802 TableUpdate::AddSortOrder {
1803 sort_order: SortOrder::unsorted_order(),
1804 },
1805 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1806 ]);
1807 }
1808
1809 #[test]
1810 fn test_add_partition_spec() {
1811 let builder = builder_without_changes(FormatVersion::V2);
1812
1813 let added_spec = UnboundPartitionSpec::builder()
1814 .with_spec_id(10)
1815 .add_partition_fields(vec![
1816 UnboundPartitionField {
1817 name: "y".to_string(),
1819 transform: Transform::Identity,
1820 source_id: 2,
1821 field_id: Some(1000),
1822 },
1823 UnboundPartitionField {
1824 name: "z".to_string(),
1826 transform: Transform::Identity,
1827 source_id: 3,
1828 field_id: None,
1829 },
1830 ])
1831 .unwrap()
1832 .build();
1833
1834 let build_result = builder
1835 .add_partition_spec(added_spec.clone())
1836 .unwrap()
1837 .build()
1838 .unwrap();
1839
1840 let expected_change = added_spec.with_spec_id(1);
1842 let expected_spec = PartitionSpec::builder(schema())
1843 .with_spec_id(1)
1844 .add_unbound_field(UnboundPartitionField {
1845 name: "y".to_string(),
1846 transform: Transform::Identity,
1847 source_id: 2,
1848 field_id: Some(1000),
1849 })
1850 .unwrap()
1851 .add_unbound_field(UnboundPartitionField {
1852 name: "z".to_string(),
1853 transform: Transform::Identity,
1854 source_id: 3,
1855 field_id: Some(1001),
1856 })
1857 .unwrap()
1858 .build()
1859 .unwrap();
1860
1861 assert_eq!(build_result.changes.len(), 1);
1862 assert_eq!(
1863 build_result.metadata.partition_spec_by_id(1),
1864 Some(&Arc::new(expected_spec))
1865 );
1866 assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1867 assert_eq!(build_result.metadata.last_partition_id, 1001);
1868 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1869 spec: expected_change
1870 });
1871
1872 let build_result = build_result
1874 .metadata
1875 .into_builder(Some(
1876 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1877 ))
1878 .remove_partition_specs(&[1])
1879 .unwrap()
1880 .build()
1881 .unwrap();
1882
1883 assert_eq!(build_result.changes.len(), 1);
1884 assert_eq!(build_result.metadata.partition_specs.len(), 1);
1885 assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1886 }
1887
1888 #[test]
1889 fn test_set_default_partition_spec() {
1890 let builder = builder_without_changes(FormatVersion::V2);
1891 let schema = builder.get_current_schema().unwrap().clone();
1892 let added_spec = UnboundPartitionSpec::builder()
1893 .with_spec_id(10)
1894 .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1895 .unwrap()
1896 .build();
1897
1898 let build_result = builder
1899 .add_partition_spec(added_spec.clone())
1900 .unwrap()
1901 .set_default_partition_spec(-1)
1902 .unwrap()
1903 .build()
1904 .unwrap();
1905
1906 let expected_spec = PartitionSpec::builder(schema)
1907 .with_spec_id(1)
1908 .add_unbound_field(UnboundPartitionField {
1909 name: "y_bucket[2]".to_string(),
1910 transform: Transform::Bucket(2),
1911 source_id: 1,
1912 field_id: Some(1001),
1913 })
1914 .unwrap()
1915 .build()
1916 .unwrap();
1917
1918 assert_eq!(build_result.changes.len(), 2);
1919 assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1920 assert_eq!(build_result.changes, vec![
1921 TableUpdate::AddSpec {
1922 spec: added_spec.with_spec_id(1)
1924 },
1925 TableUpdate::SetDefaultSpec { spec_id: -1 }
1926 ]);
1927 }
1928
1929 #[test]
1930 fn test_set_existing_default_partition_spec() {
1931 let builder = builder_without_changes(FormatVersion::V2);
1932 let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1934 let build_result = builder
1935 .add_partition_spec(unbound_spec.clone())
1936 .unwrap()
1937 .set_default_partition_spec(-1)
1938 .unwrap()
1939 .build()
1940 .unwrap();
1941
1942 assert_eq!(build_result.changes.len(), 2);
1943 assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1944 spec: unbound_spec.clone()
1945 });
1946 assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1947 spec_id: -1
1948 });
1949 assert_eq!(
1950 build_result.metadata.default_spec,
1951 Arc::new(
1952 unbound_spec
1953 .bind(build_result.metadata.current_schema().clone())
1954 .unwrap()
1955 )
1956 );
1957
1958 let build_result = build_result
1960 .metadata
1961 .into_builder(Some(
1962 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1963 ))
1964 .set_default_partition_spec(0)
1965 .unwrap()
1966 .build()
1967 .unwrap();
1968
1969 assert_eq!(build_result.changes.len(), 1);
1970 assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1971 spec_id: 0
1972 });
1973 assert_eq!(
1974 build_result.metadata.default_spec,
1975 Arc::new(
1976 partition_spec()
1977 .bind(build_result.metadata.current_schema().clone())
1978 .unwrap()
1979 )
1980 );
1981 }
1982
1983 #[test]
1984 fn test_add_sort_order() {
1985 let builder = builder_without_changes(FormatVersion::V2);
1986
1987 let added_sort_order = SortOrder::builder()
1988 .with_order_id(10)
1989 .with_fields(vec![SortField {
1990 source_id: 1,
1991 transform: Transform::Identity,
1992 direction: SortDirection::Ascending,
1993 null_order: NullOrder::First,
1994 }])
1995 .build(&schema())
1996 .unwrap();
1997
1998 let build_result = builder
1999 .add_sort_order(added_sort_order.clone())
2000 .unwrap()
2001 .build()
2002 .unwrap();
2003
2004 let expected_sort_order = added_sort_order.with_order_id(2);
2005
2006 assert_eq!(build_result.changes.len(), 1);
2007 assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
2008 pretty_assertions::assert_eq!(
2009 build_result.metadata.sort_order_by_id(2),
2010 Some(&Arc::new(expected_sort_order.clone()))
2011 );
2012 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
2013 sort_order: expected_sort_order
2014 });
2015 }
2016
2017 #[test]
2018 fn test_add_compatible_schema() {
2019 let builder = builder_without_changes(FormatVersion::V2);
2020
2021 let added_schema = Schema::builder()
2022 .with_schema_id(1)
2023 .with_fields(vec![
2024 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2025 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2026 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2027 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2028 ])
2029 .build()
2030 .unwrap();
2031
2032 let build_result = builder
2033 .add_current_schema(added_schema.clone())
2034 .unwrap()
2035 .build()
2036 .unwrap();
2037
2038 assert_eq!(build_result.changes.len(), 2);
2039 assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
2040 pretty_assertions::assert_eq!(
2041 build_result.metadata.schema_by_id(1),
2042 Some(&Arc::new(added_schema.clone()))
2043 );
2044 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2045 schema: added_schema
2046 });
2047 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2048 schema_id: -1
2049 });
2050 }
2051
2052 #[test]
2053 fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2054 let builder = builder_without_changes(FormatVersion::V2);
2055
2056 let added_schema = Schema::builder()
2057 .with_schema_id(1)
2058 .with_fields(vec![
2059 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2060 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2061 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2062 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2063 ])
2064 .build()
2065 .unwrap();
2066
2067 let build_result = builder
2068 .add_schema(added_schema.clone())
2069 .unwrap()
2070 .set_current_schema(1)
2071 .unwrap()
2072 .build()
2073 .unwrap();
2074
2075 assert_eq!(build_result.changes.len(), 2);
2076 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2077 schema_id: -1
2078 });
2079 }
2080
2081 #[test]
2082 fn test_no_metadata_log_for_create_table() {
2083 let build_result = TableMetadataBuilder::new(
2084 schema(),
2085 partition_spec(),
2086 sort_order(),
2087 TEST_LOCATION.to_string(),
2088 FormatVersion::V2,
2089 HashMap::new(),
2090 )
2091 .unwrap()
2092 .build()
2093 .unwrap();
2094
2095 assert_eq!(build_result.metadata.metadata_log.len(), 0);
2096 }
2097
2098 #[test]
2099 fn test_no_metadata_log_entry_for_no_previous_location() {
2100 let metadata = builder_without_changes(FormatVersion::V2)
2102 .build()
2103 .unwrap()
2104 .metadata;
2105 assert_eq!(metadata.metadata_log.len(), 1);
2106
2107 let build_result = metadata
2108 .into_builder(None)
2109 .set_properties(HashMap::from_iter(vec![(
2110 "foo".to_string(),
2111 "bar".to_string(),
2112 )]))
2113 .unwrap()
2114 .build()
2115 .unwrap();
2116
2117 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2118 }
2119
2120 #[test]
2121 fn test_from_metadata_generates_metadata_log() {
2122 let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2123 let builder = TableMetadataBuilder::new(
2124 schema(),
2125 partition_spec(),
2126 sort_order(),
2127 TEST_LOCATION.to_string(),
2128 FormatVersion::V2,
2129 HashMap::new(),
2130 )
2131 .unwrap()
2132 .build()
2133 .unwrap()
2134 .metadata
2135 .into_builder(Some(metadata_path.to_string()));
2136
2137 let builder = builder
2138 .add_default_sort_order(SortOrder::unsorted_order())
2139 .unwrap();
2140
2141 let build_result = builder.build().unwrap();
2142
2143 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2144 assert_eq!(
2145 build_result.metadata.metadata_log[0].metadata_file,
2146 metadata_path
2147 );
2148 }
2149
2150 #[test]
2151 fn test_set_ref() {
2152 let builder = builder_without_changes(FormatVersion::V2);
2153
2154 let snapshot = Snapshot::builder()
2155 .with_snapshot_id(1)
2156 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2157 .with_sequence_number(0)
2158 .with_schema_id(0)
2159 .with_manifest_list("/snap-1.avro")
2160 .with_summary(Summary {
2161 operation: Operation::Append,
2162 additional_properties: HashMap::from_iter(vec![
2163 (
2164 "spark.app.id".to_string(),
2165 "local-1662532784305".to_string(),
2166 ),
2167 ("added-data-files".to_string(), "4".to_string()),
2168 ("added-records".to_string(), "4".to_string()),
2169 ("added-files-size".to_string(), "6001".to_string()),
2170 ]),
2171 })
2172 .build();
2173
2174 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2175
2176 assert!(
2177 builder
2178 .clone()
2179 .set_ref(MAIN_BRANCH, SnapshotReference {
2180 snapshot_id: 10,
2181 retention: SnapshotRetention::Branch {
2182 min_snapshots_to_keep: Some(10),
2183 max_snapshot_age_ms: None,
2184 max_ref_age_ms: None,
2185 },
2186 })
2187 .unwrap_err()
2188 .to_string()
2189 .contains("Cannot set 'main' to unknown snapshot: '10'")
2190 );
2191
2192 let build_result = builder
2193 .set_ref(MAIN_BRANCH, SnapshotReference {
2194 snapshot_id: 1,
2195 retention: SnapshotRetention::Branch {
2196 min_snapshots_to_keep: Some(10),
2197 max_snapshot_age_ms: None,
2198 max_ref_age_ms: None,
2199 },
2200 })
2201 .unwrap()
2202 .build()
2203 .unwrap();
2204 assert_eq!(build_result.metadata.snapshots.len(), 1);
2205 assert_eq!(
2206 build_result.metadata.snapshot_by_id(1),
2207 Some(&Arc::new(snapshot.clone()))
2208 );
2209 assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2210 snapshot_id: 1,
2211 timestamp_ms: snapshot.timestamp_ms()
2212 }])
2213 }
2214
2215 #[test]
2216 fn test_snapshot_log_skips_intermediates() {
2217 let builder = builder_without_changes(FormatVersion::V2);
2218
2219 let snapshot_1 = Snapshot::builder()
2220 .with_snapshot_id(1)
2221 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2222 .with_sequence_number(0)
2223 .with_schema_id(0)
2224 .with_manifest_list("/snap-1.avro")
2225 .with_summary(Summary {
2226 operation: Operation::Append,
2227 additional_properties: HashMap::from_iter(vec![
2228 (
2229 "spark.app.id".to_string(),
2230 "local-1662532784305".to_string(),
2231 ),
2232 ("added-data-files".to_string(), "4".to_string()),
2233 ("added-records".to_string(), "4".to_string()),
2234 ("added-files-size".to_string(), "6001".to_string()),
2235 ]),
2236 })
2237 .build();
2238
2239 let snapshot_2 = Snapshot::builder()
2240 .with_snapshot_id(2)
2241 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2242 .with_sequence_number(0)
2243 .with_schema_id(0)
2244 .with_manifest_list("/snap-1.avro")
2245 .with_summary(Summary {
2246 operation: Operation::Append,
2247 additional_properties: HashMap::from_iter(vec![
2248 (
2249 "spark.app.id".to_string(),
2250 "local-1662532784305".to_string(),
2251 ),
2252 ("added-data-files".to_string(), "4".to_string()),
2253 ("added-records".to_string(), "4".to_string()),
2254 ("added-files-size".to_string(), "6001".to_string()),
2255 ]),
2256 })
2257 .build();
2258
2259 let result = builder
2260 .add_snapshot(snapshot_1)
2261 .unwrap()
2262 .set_ref(MAIN_BRANCH, SnapshotReference {
2263 snapshot_id: 1,
2264 retention: SnapshotRetention::Branch {
2265 min_snapshots_to_keep: Some(10),
2266 max_snapshot_age_ms: None,
2267 max_ref_age_ms: None,
2268 },
2269 })
2270 .unwrap()
2271 .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2272 .unwrap()
2273 .build()
2274 .unwrap();
2275
2276 assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2277 snapshot_id: 2,
2278 timestamp_ms: snapshot_2.timestamp_ms()
2279 }]);
2280 assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2281 }
2282
2283 #[test]
2284 fn test_remove_main_ref_keeps_snapshot_log() {
2285 let builder = builder_without_changes(FormatVersion::V2);
2286
2287 let snapshot = Snapshot::builder()
2288 .with_snapshot_id(1)
2289 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2290 .with_sequence_number(0)
2291 .with_schema_id(0)
2292 .with_manifest_list("/snap-1.avro")
2293 .with_summary(Summary {
2294 operation: Operation::Append,
2295 additional_properties: HashMap::from_iter(vec![
2296 (
2297 "spark.app.id".to_string(),
2298 "local-1662532784305".to_string(),
2299 ),
2300 ("added-data-files".to_string(), "4".to_string()),
2301 ("added-records".to_string(), "4".to_string()),
2302 ("added-files-size".to_string(), "6001".to_string()),
2303 ]),
2304 })
2305 .build();
2306
2307 let result = builder
2308 .add_snapshot(snapshot.clone())
2309 .unwrap()
2310 .set_ref(MAIN_BRANCH, SnapshotReference {
2311 snapshot_id: 1,
2312 retention: SnapshotRetention::Branch {
2313 min_snapshots_to_keep: Some(10),
2314 max_snapshot_age_ms: None,
2315 max_ref_age_ms: None,
2316 },
2317 })
2318 .unwrap()
2319 .build()
2320 .unwrap();
2321
2322 assert_eq!(result.metadata.snapshot_log.len(), 1);
2324 assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2325 assert_eq!(result.metadata.current_snapshot_id, Some(1));
2326
2327 let result_after_remove = result
2329 .metadata
2330 .into_builder(Some(
2331 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2332 ))
2333 .remove_ref(MAIN_BRANCH)
2334 .build()
2335 .unwrap();
2336
2337 assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2339 assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2340 assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2341 assert_eq!(result_after_remove.changes.len(), 1);
2342 assert_eq!(
2343 result_after_remove.changes[0],
2344 TableUpdate::RemoveSnapshotRef {
2345 ref_name: MAIN_BRANCH.to_string()
2346 }
2347 );
2348 }
2349
2350 #[test]
2351 fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2352 let builder = builder_without_changes(FormatVersion::V2);
2353
2354 let snapshot = Snapshot::builder()
2355 .with_snapshot_id(2)
2356 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2357 .with_sequence_number(0)
2358 .with_schema_id(0)
2359 .with_manifest_list("/snap-1.avro")
2360 .with_summary(Summary {
2361 operation: Operation::Append,
2362 additional_properties: HashMap::new(),
2363 })
2364 .build();
2365
2366 let build_result = builder
2367 .set_branch_snapshot(snapshot.clone(), "new_branch")
2368 .unwrap()
2369 .build()
2370 .unwrap();
2371
2372 let reference = SnapshotReference {
2373 snapshot_id: 2,
2374 retention: SnapshotRetention::Branch {
2375 min_snapshots_to_keep: None,
2376 max_snapshot_age_ms: None,
2377 max_ref_age_ms: None,
2378 },
2379 };
2380
2381 assert_eq!(build_result.metadata.refs.len(), 1);
2382 assert_eq!(
2383 build_result.metadata.refs.get("new_branch"),
2384 Some(&reference)
2385 );
2386 assert_eq!(build_result.changes, vec![
2387 TableUpdate::AddSnapshot { snapshot },
2388 TableUpdate::SetSnapshotRef {
2389 ref_name: "new_branch".to_string(),
2390 reference
2391 }
2392 ]);
2393 }
2394
2395 #[test]
2396 fn test_cannot_add_duplicate_snapshot_id() {
2397 let builder = builder_without_changes(FormatVersion::V2);
2398
2399 let snapshot = Snapshot::builder()
2400 .with_snapshot_id(2)
2401 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2402 .with_sequence_number(0)
2403 .with_schema_id(0)
2404 .with_manifest_list("/snap-1.avro")
2405 .with_summary(Summary {
2406 operation: Operation::Append,
2407 additional_properties: HashMap::from_iter(vec![
2408 (
2409 "spark.app.id".to_string(),
2410 "local-1662532784305".to_string(),
2411 ),
2412 ("added-data-files".to_string(), "4".to_string()),
2413 ("added-records".to_string(), "4".to_string()),
2414 ("added-files-size".to_string(), "6001".to_string()),
2415 ]),
2416 })
2417 .build();
2418
2419 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2420 builder.add_snapshot(snapshot).unwrap_err();
2421 }
2422
2423 #[test]
2424 fn test_add_incompatible_current_schema_fails() {
2425 let builder = builder_without_changes(FormatVersion::V2);
2426
2427 let added_schema = Schema::builder()
2428 .with_schema_id(1)
2429 .with_fields(vec![])
2430 .build()
2431 .unwrap();
2432
2433 let err = builder
2434 .add_current_schema(added_schema)
2435 .unwrap()
2436 .build()
2437 .unwrap_err();
2438
2439 assert!(
2440 err.to_string()
2441 .contains("Cannot find partition source field")
2442 );
2443 }
2444
2445 #[test]
2446 fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2447 let builder = builder_without_changes(FormatVersion::V1);
2448
2449 let added_spec = UnboundPartitionSpec::builder()
2450 .with_spec_id(10)
2451 .add_partition_fields(vec![
2452 UnboundPartitionField {
2453 name: "y".to_string(),
2454 transform: Transform::Identity,
2455 source_id: 2,
2456 field_id: Some(1000),
2457 },
2458 UnboundPartitionField {
2459 name: "z".to_string(),
2460 transform: Transform::Identity,
2461 source_id: 3,
2462 field_id: Some(1002),
2463 },
2464 ])
2465 .unwrap()
2466 .build();
2467
2468 let err = builder.add_partition_spec(added_spec).unwrap_err();
2469 assert!(err.to_string().contains(
2470 "Cannot add partition spec with non-sequential field ids to format version 1 table"
2471 ));
2472 }
2473
2474 #[test]
2475 fn test_expire_metadata_log() {
2476 let builder = builder_without_changes(FormatVersion::V2);
2477 let metadata = builder
2478 .set_properties(HashMap::from_iter(vec![(
2479 TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2480 "2".to_string(),
2481 )]))
2482 .unwrap()
2483 .build()
2484 .unwrap();
2485 assert_eq!(metadata.metadata.metadata_log.len(), 1);
2486 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2487
2488 let metadata = metadata
2489 .metadata
2490 .into_builder(Some("path2".to_string()))
2491 .set_properties(HashMap::from_iter(vec![(
2492 "change_nr".to_string(),
2493 "1".to_string(),
2494 )]))
2495 .unwrap()
2496 .build()
2497 .unwrap();
2498
2499 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2500 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2501
2502 let metadata = metadata
2503 .metadata
2504 .into_builder(Some("path2".to_string()))
2505 .set_properties(HashMap::from_iter(vec![(
2506 "change_nr".to_string(),
2507 "2".to_string(),
2508 )]))
2509 .unwrap()
2510 .build()
2511 .unwrap();
2512 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2513 assert_eq!(metadata.expired_metadata_logs.len(), 1);
2514 }
2515
2516 #[test]
2517 fn test_v2_sequence_number_cannot_decrease() {
2518 let builder = builder_without_changes(FormatVersion::V2);
2519
2520 let snapshot = Snapshot::builder()
2521 .with_snapshot_id(1)
2522 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2523 .with_sequence_number(1)
2524 .with_schema_id(0)
2525 .with_manifest_list("/snap-1")
2526 .with_summary(Summary {
2527 operation: Operation::Append,
2528 additional_properties: HashMap::new(),
2529 })
2530 .build();
2531
2532 let builder = builder
2533 .add_snapshot(snapshot.clone())
2534 .unwrap()
2535 .set_ref(MAIN_BRANCH, SnapshotReference {
2536 snapshot_id: 1,
2537 retention: SnapshotRetention::Branch {
2538 min_snapshots_to_keep: Some(10),
2539 max_snapshot_age_ms: None,
2540 max_ref_age_ms: None,
2541 },
2542 })
2543 .unwrap();
2544
2545 let snapshot = Snapshot::builder()
2546 .with_snapshot_id(2)
2547 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2548 .with_sequence_number(0)
2549 .with_schema_id(0)
2550 .with_manifest_list("/snap-0")
2551 .with_parent_snapshot_id(Some(1))
2552 .with_summary(Summary {
2553 operation: Operation::Append,
2554 additional_properties: HashMap::new(),
2555 })
2556 .build();
2557
2558 let err = builder
2559 .set_branch_snapshot(snapshot, MAIN_BRANCH)
2560 .unwrap_err();
2561 assert!(
2562 err.to_string()
2563 .contains("Cannot add snapshot with sequence number")
2564 );
2565 }
2566
2567 #[test]
2568 fn test_default_spec_cannot_be_removed() {
2569 let builder = builder_without_changes(FormatVersion::V2);
2570
2571 builder.remove_partition_specs(&[0]).unwrap_err();
2572 }
2573
2574 #[test]
2575 fn test_statistics() {
2576 let builder = builder_without_changes(FormatVersion::V2);
2577
2578 let statistics = StatisticsFile {
2579 snapshot_id: 3055729675574597004,
2580 statistics_path: "s3://a/b/stats.puffin".to_string(),
2581 file_size_in_bytes: 413,
2582 file_footer_size_in_bytes: 42,
2583 key_metadata: None,
2584 blob_metadata: vec![BlobMetadata {
2585 snapshot_id: 3055729675574597004,
2586 sequence_number: 1,
2587 fields: vec![1],
2588 r#type: "ndv".to_string(),
2589 properties: HashMap::new(),
2590 }],
2591 };
2592 let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2593
2594 assert_eq!(
2595 build_result.metadata.statistics,
2596 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2597 );
2598 assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2599 statistics: statistics.clone()
2600 }]);
2601
2602 let builder = build_result.metadata.into_builder(None);
2604 let build_result = builder
2605 .remove_statistics(statistics.snapshot_id)
2606 .build()
2607 .unwrap();
2608
2609 assert_eq!(build_result.metadata.statistics.len(), 0);
2610 assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2611 snapshot_id: statistics.snapshot_id
2612 }]);
2613
2614 let builder = build_result.metadata.into_builder(None);
2616 let build_result = builder
2617 .remove_statistics(statistics.snapshot_id)
2618 .build()
2619 .unwrap();
2620 assert_eq!(build_result.metadata.statistics.len(), 0);
2621 assert_eq!(build_result.changes.len(), 0);
2622 }
2623
2624 #[test]
2625 fn test_add_partition_statistics() {
2626 let builder = builder_without_changes(FormatVersion::V2);
2627
2628 let statistics = PartitionStatisticsFile {
2629 snapshot_id: 3055729675574597004,
2630 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2631 file_size_in_bytes: 43,
2632 };
2633
2634 let build_result = builder
2635 .set_partition_statistics(statistics.clone())
2636 .build()
2637 .unwrap();
2638 assert_eq!(
2639 build_result.metadata.partition_statistics,
2640 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2641 );
2642 assert_eq!(build_result.changes, vec![
2643 TableUpdate::SetPartitionStatistics {
2644 partition_statistics: statistics.clone()
2645 }
2646 ]);
2647
2648 let builder = build_result.metadata.into_builder(None);
2650 let build_result = builder
2651 .remove_partition_statistics(statistics.snapshot_id)
2652 .build()
2653 .unwrap();
2654 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2655 assert_eq!(build_result.changes, vec![
2656 TableUpdate::RemovePartitionStatistics {
2657 snapshot_id: statistics.snapshot_id
2658 }
2659 ]);
2660
2661 let builder = build_result.metadata.into_builder(None);
2663 let build_result = builder
2664 .remove_partition_statistics(statistics.snapshot_id)
2665 .build()
2666 .unwrap();
2667 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2668 assert_eq!(build_result.changes.len(), 0);
2669 }
2670
2671 #[test]
2672 fn last_update_increased_for_property_only_update() {
2673 let builder = builder_without_changes(FormatVersion::V2);
2674
2675 let metadata = builder.build().unwrap().metadata;
2676 let last_updated_ms = metadata.last_updated_ms;
2677 sleep(std::time::Duration::from_millis(2));
2678
2679 let build_result = metadata
2680 .into_builder(Some(
2681 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2682 ))
2683 .set_properties(HashMap::from_iter(vec![(
2684 "foo".to_string(),
2685 "bar".to_string(),
2686 )]))
2687 .unwrap()
2688 .build()
2689 .unwrap();
2690
2691 assert!(
2692 build_result.metadata.last_updated_ms > last_updated_ms,
2693 "{} > {}",
2694 build_result.metadata.last_updated_ms,
2695 last_updated_ms
2696 );
2697 }
2698
2699 #[test]
2700 fn test_construct_default_main_branch() {
2701 let file = File::open(format!(
2703 "{}/testdata/table_metadata/{}",
2704 env!("CARGO_MANIFEST_DIR"),
2705 "TableMetadataV2Valid.json"
2706 ))
2707 .unwrap();
2708 let reader = BufReader::new(file);
2709 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2710
2711 let table = Table::builder()
2712 .metadata(resp)
2713 .metadata_location("s3://bucket/test/location/metadata/v1.json")
2714 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2715 .file_io(FileIO::new_with_memory())
2716 .runtime(test_runtime())
2717 .build()
2718 .unwrap();
2719
2720 assert_eq!(
2721 table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2722 table.metadata().current_snapshot_id().unwrap()
2723 );
2724 }
2725
2726 #[test]
2727 fn test_active_schema_cannot_be_removed() {
2728 let builder = builder_without_changes(FormatVersion::V2);
2729 builder.remove_schemas(&[0]).unwrap_err();
2730 }
2731
2732 #[test]
2733 fn test_remove_schemas() {
2734 let file = File::open(format!(
2735 "{}/testdata/table_metadata/{}",
2736 env!("CARGO_MANIFEST_DIR"),
2737 "TableMetadataV2Valid.json"
2738 ))
2739 .unwrap();
2740 let reader = BufReader::new(file);
2741 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2742
2743 let table = Table::builder()
2744 .metadata(resp)
2745 .metadata_location("s3://bucket/test/location/metadata/v1.json")
2746 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2747 .file_io(FileIO::new_with_memory())
2748 .runtime(test_runtime())
2749 .build()
2750 .unwrap();
2751
2752 assert_eq!(2, table.metadata().schemas.len());
2753
2754 {
2755 let meta_data_builder = table.metadata().clone().into_builder(None);
2757 meta_data_builder.remove_schemas(&[1]).unwrap_err();
2758 }
2759
2760 let mut meta_data_builder = table.metadata().clone().into_builder(None);
2761 meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2762 let build_result = meta_data_builder.build().unwrap();
2763 assert_eq!(1, build_result.metadata.schemas.len());
2764 assert_eq!(1, build_result.metadata.current_schema_id);
2765 assert_eq!(1, build_result.metadata.current_schema().schema_id());
2766 assert_eq!(1, build_result.changes.len());
2767
2768 let remove_schema_ids =
2769 if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2770 schema_ids
2771 } else {
2772 unreachable!("Expected RemoveSchema change")
2773 };
2774 assert_eq!(remove_schema_ids, &[0]);
2775 }
2776
2777 #[test]
2778 fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2779 let initial_schema = Schema::builder()
2780 .with_fields(vec![
2781 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2782 ])
2783 .build()
2784 .unwrap();
2785
2786 let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2787 .with_spec_id(0)
2788 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2789 .unwrap()
2790 .build();
2791
2792 let metadata = TableMetadataBuilder::new(
2793 initial_schema,
2794 partition_spec_with_bucket,
2795 SortOrder::unsorted_order(),
2796 TEST_LOCATION.to_string(),
2797 FormatVersion::V2,
2798 HashMap::new(),
2799 )
2800 .unwrap()
2801 .build()
2802 .unwrap()
2803 .metadata;
2804
2805 let partition_field_names: Vec<String> = metadata
2806 .default_partition_spec()
2807 .fields()
2808 .iter()
2809 .map(|f| f.name.clone())
2810 .collect();
2811 assert!(partition_field_names.contains(&"bucket_data".to_string()));
2812
2813 let evolved_schema = Schema::builder()
2814 .with_fields(vec![
2815 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2816 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2818 ])
2819 .build()
2820 .unwrap();
2821
2822 let builder = metadata.into_builder(Some(
2823 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2824 ));
2825
2826 let result = builder.add_current_schema(evolved_schema);
2828
2829 assert!(result.is_err());
2830 let error = result.unwrap_err();
2831 let error_message = error.message();
2832 assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2833 assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2834 }
2835
2836 #[test]
2837 fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2838 let initial_schema = Schema::builder()
2839 .with_fields(vec![
2840 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2841 ])
2842 .build()
2843 .unwrap();
2844
2845 let partition_spec = UnboundPartitionSpec::builder()
2846 .with_spec_id(0)
2847 .add_partition_field(1, "partition_col", Transform::Bucket(16))
2848 .unwrap()
2849 .build();
2850
2851 let metadata = TableMetadataBuilder::new(
2852 initial_schema,
2853 partition_spec,
2854 SortOrder::unsorted_order(),
2855 TEST_LOCATION.to_string(),
2856 FormatVersion::V2,
2857 HashMap::new(),
2858 )
2859 .unwrap()
2860 .build()
2861 .unwrap()
2862 .metadata;
2863
2864 let non_conflicting_schema = Schema::builder()
2865 .with_fields(vec![
2866 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2867 NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2868 ])
2869 .build()
2870 .unwrap();
2871
2872 let result = metadata
2874 .clone()
2875 .into_builder(Some("test_location".to_string()))
2876 .add_current_schema(non_conflicting_schema)
2877 .unwrap()
2878 .build();
2879
2880 assert!(result.is_ok());
2881 }
2882
2883 #[test]
2884 fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2885 let initial_schema = Schema::builder()
2886 .with_fields(vec![
2887 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2888 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2889 .into(),
2890 ])
2891 .build()
2892 .unwrap();
2893
2894 let partition_spec = UnboundPartitionSpec::builder()
2895 .with_spec_id(0)
2896 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2897 .unwrap()
2898 .build();
2899
2900 let metadata = TableMetadataBuilder::new(
2901 initial_schema,
2902 partition_spec,
2903 SortOrder::unsorted_order(),
2904 TEST_LOCATION.to_string(),
2905 FormatVersion::V2,
2906 HashMap::new(),
2907 )
2908 .unwrap()
2909 .build()
2910 .unwrap()
2911 .metadata;
2912
2913 let builder = metadata.into_builder(Some(
2914 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2915 ));
2916
2917 let conflicting_partition_spec = UnboundPartitionSpec::builder()
2918 .with_spec_id(1)
2919 .add_partition_field(1, "existing_field", Transform::Bucket(8))
2920 .unwrap()
2921 .build();
2922
2923 let result = builder.add_partition_spec(conflicting_partition_spec);
2924
2925 assert!(result.is_err());
2926 let error = result.unwrap_err();
2927 let error_message = error.message();
2928 assert!(error_message.contains(
2930 "Cannot create partition with name 'existing_field' that conflicts with schema field"
2931 ));
2932 assert!(error_message.contains("and is not an identity transform"));
2933 }
2934
2935 #[test]
2936 fn test_schema_evolution_validates_against_all_historical_schemas() {
2937 let initial_schema = Schema::builder()
2939 .with_fields(vec![
2940 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2941 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2942 .into(),
2943 ])
2944 .build()
2945 .unwrap();
2946
2947 let partition_spec = UnboundPartitionSpec::builder()
2948 .with_spec_id(0)
2949 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2950 .unwrap()
2951 .build();
2952
2953 let metadata = TableMetadataBuilder::new(
2954 initial_schema,
2955 partition_spec,
2956 SortOrder::unsorted_order(),
2957 TEST_LOCATION.to_string(),
2958 FormatVersion::V2,
2959 HashMap::new(),
2960 )
2961 .unwrap()
2962 .build()
2963 .unwrap()
2964 .metadata;
2965
2966 let second_schema = Schema::builder()
2968 .with_fields(vec![
2969 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2970 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2971 .into(),
2972 ])
2973 .build()
2974 .unwrap();
2975
2976 let metadata = metadata
2977 .into_builder(Some("test_location".to_string()))
2978 .add_current_schema(second_schema)
2979 .unwrap()
2980 .build()
2981 .unwrap()
2982 .metadata;
2983
2984 let third_schema = Schema::builder()
2988 .with_fields(vec![
2989 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2990 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2991 .into(),
2992 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2993 .into(),
2994 ])
2995 .build()
2996 .unwrap();
2997
2998 let builder = metadata
2999 .clone()
3000 .into_builder(Some("test_location".to_string()));
3001
3002 let result = builder.add_current_schema(third_schema);
3004 assert!(result.is_ok());
3005
3006 let conflicting_schema = Schema::builder()
3009 .with_fields(vec![
3010 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3011 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3012 .into(),
3013 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
3014 .into(),
3015 NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
3016 .into(), ])
3018 .build()
3019 .unwrap();
3020
3021 let builder2 = metadata.into_builder(Some("test_location".to_string()));
3022 let result2 = builder2.add_current_schema(conflicting_schema);
3023
3024 assert!(result2.is_err());
3027 let error = result2.unwrap_err();
3028 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3029 }
3030
3031 #[test]
3032 fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
3033 let initial_schema = Schema::builder()
3035 .with_fields(vec![
3036 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3037 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3038 .into(),
3039 ])
3040 .build()
3041 .unwrap();
3042
3043 let partition_spec = UnboundPartitionSpec::builder()
3044 .with_spec_id(0)
3045 .add_partition_field(2, "partition_data", Transform::Identity)
3046 .unwrap()
3047 .build();
3048
3049 let metadata = TableMetadataBuilder::new(
3050 initial_schema,
3051 partition_spec,
3052 SortOrder::unsorted_order(),
3053 TEST_LOCATION.to_string(),
3054 FormatVersion::V2,
3055 HashMap::new(),
3056 )
3057 .unwrap()
3058 .build()
3059 .unwrap()
3060 .metadata;
3061
3062 let evolved_schema = Schema::builder()
3064 .with_fields(vec![
3065 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3066 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3067 .into(),
3068 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3069 .into(),
3070 ])
3071 .build()
3072 .unwrap();
3073
3074 let result = metadata
3076 .into_builder(Some("test_location".to_string()))
3077 .add_current_schema(evolved_schema);
3078
3079 assert!(result.is_ok());
3080 }
3081
3082 #[test]
3083 fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3084 let initial_schema = Schema::builder()
3086 .with_fields(vec![
3087 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3088 ])
3089 .build()
3090 .unwrap();
3091
3092 let partition_spec = UnboundPartitionSpec::builder()
3093 .with_spec_id(0)
3094 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3095 .unwrap()
3096 .build();
3097
3098 let metadata = TableMetadataBuilder::new(
3099 initial_schema,
3100 partition_spec,
3101 SortOrder::unsorted_order(),
3102 TEST_LOCATION.to_string(),
3103 FormatVersion::V2,
3104 HashMap::new(),
3105 )
3106 .unwrap()
3107 .build()
3108 .unwrap()
3109 .metadata;
3110
3111 let conflicting_schema = Schema::builder()
3113 .with_fields(vec![
3114 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3115 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3117 ])
3118 .build()
3119 .unwrap();
3120
3121 let builder = metadata.into_builder(Some("test_location".to_string()));
3122 let result = builder.add_current_schema(conflicting_schema);
3123
3124 assert!(result.is_err());
3127 let error = result.unwrap_err();
3128 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3129 }
3130
3131 #[test]
3132 fn test_partition_spec_evolution_allows_non_conflicting_names() {
3133 let initial_schema = Schema::builder()
3134 .with_fields(vec![
3135 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3136 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3137 .into(),
3138 ])
3139 .build()
3140 .unwrap();
3141
3142 let partition_spec = UnboundPartitionSpec::builder()
3143 .with_spec_id(0)
3144 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3145 .unwrap()
3146 .build();
3147
3148 let metadata = TableMetadataBuilder::new(
3149 initial_schema,
3150 partition_spec,
3151 SortOrder::unsorted_order(),
3152 TEST_LOCATION.to_string(),
3153 FormatVersion::V2,
3154 HashMap::new(),
3155 )
3156 .unwrap()
3157 .build()
3158 .unwrap()
3159 .metadata;
3160
3161 let builder = metadata.into_builder(Some(
3162 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3163 ));
3164
3165 let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3167 .with_spec_id(1)
3168 .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3169 .unwrap()
3170 .build();
3171
3172 let result = builder.add_partition_spec(non_conflicting_partition_spec);
3173
3174 assert!(result.is_ok());
3175 }
3176
3177 #[test]
3178 fn test_row_lineage_addition() {
3179 let new_rows = 30;
3180 let base = builder_without_changes(FormatVersion::V3)
3181 .build()
3182 .unwrap()
3183 .metadata;
3184 let add_rows = Snapshot::builder()
3185 .with_snapshot_id(0)
3186 .with_timestamp_ms(base.last_updated_ms + 1)
3187 .with_sequence_number(0)
3188 .with_schema_id(0)
3189 .with_manifest_list("foo")
3190 .with_parent_snapshot_id(None)
3191 .with_summary(Summary {
3192 operation: Operation::Append,
3193 additional_properties: HashMap::new(),
3194 })
3195 .with_row_range(base.next_row_id(), new_rows)
3196 .build();
3197
3198 let first_addition = base
3199 .into_builder(None)
3200 .add_snapshot(add_rows.clone())
3201 .unwrap()
3202 .build()
3203 .unwrap()
3204 .metadata;
3205
3206 assert_eq!(first_addition.next_row_id(), new_rows);
3207
3208 let add_more_rows = Snapshot::builder()
3209 .with_snapshot_id(1)
3210 .with_timestamp_ms(first_addition.last_updated_ms + 1)
3211 .with_sequence_number(1)
3212 .with_schema_id(0)
3213 .with_manifest_list("foo")
3214 .with_parent_snapshot_id(Some(0))
3215 .with_summary(Summary {
3216 operation: Operation::Append,
3217 additional_properties: HashMap::new(),
3218 })
3219 .with_row_range(first_addition.next_row_id(), new_rows)
3220 .build();
3221
3222 let second_addition = first_addition
3223 .into_builder(None)
3224 .add_snapshot(add_more_rows)
3225 .unwrap()
3226 .build()
3227 .unwrap()
3228 .metadata;
3229 assert_eq!(second_addition.next_row_id(), new_rows * 2);
3230 }
3231
3232 #[test]
3233 fn test_row_lineage_invalid_snapshot() {
3234 let new_rows = 30;
3235 let base = builder_without_changes(FormatVersion::V3)
3236 .build()
3237 .unwrap()
3238 .metadata;
3239
3240 let add_rows = Snapshot::builder()
3242 .with_snapshot_id(0)
3243 .with_timestamp_ms(base.last_updated_ms + 1)
3244 .with_sequence_number(0)
3245 .with_schema_id(0)
3246 .with_manifest_list("foo")
3247 .with_parent_snapshot_id(None)
3248 .with_summary(Summary {
3249 operation: Operation::Append,
3250 additional_properties: HashMap::new(),
3251 })
3252 .with_row_range(base.next_row_id(), new_rows)
3253 .build();
3254
3255 let added = base
3256 .into_builder(None)
3257 .add_snapshot(add_rows)
3258 .unwrap()
3259 .build()
3260 .unwrap()
3261 .metadata;
3262
3263 let invalid_new_rows = Snapshot::builder()
3264 .with_snapshot_id(1)
3265 .with_timestamp_ms(added.last_updated_ms + 1)
3266 .with_sequence_number(1)
3267 .with_schema_id(0)
3268 .with_manifest_list("foo")
3269 .with_parent_snapshot_id(Some(0))
3270 .with_summary(Summary {
3271 operation: Operation::Append,
3272 additional_properties: HashMap::new(),
3273 })
3274 .with_row_range(added.next_row_id() - 1, 10)
3276 .build();
3277
3278 let err = added
3279 .into_builder(None)
3280 .add_snapshot(invalid_new_rows)
3281 .unwrap_err();
3282 assert!(
3283 err.to_string().contains(
3284 "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3285 )
3286 );
3287 }
3288
3289 #[test]
3290 fn test_row_lineage_append_branch() {
3291 let branch = "some_branch";
3295
3296 let base = builder_without_changes(FormatVersion::V3)
3298 .build()
3299 .unwrap()
3300 .metadata;
3301
3302 assert_eq!(base.next_row_id(), 0);
3304
3305 let branch_snapshot_1 = Snapshot::builder()
3307 .with_snapshot_id(1)
3308 .with_timestamp_ms(base.last_updated_ms + 1)
3309 .with_sequence_number(0)
3310 .with_schema_id(0)
3311 .with_manifest_list("foo")
3312 .with_parent_snapshot_id(None)
3313 .with_summary(Summary {
3314 operation: Operation::Append,
3315 additional_properties: HashMap::new(),
3316 })
3317 .with_row_range(base.next_row_id(), 30)
3318 .build();
3319
3320 let table_after_branch_1 = base
3321 .into_builder(None)
3322 .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3323 .unwrap()
3324 .build()
3325 .unwrap()
3326 .metadata;
3327
3328 assert!(table_after_branch_1.current_snapshot().is_none());
3330
3331 let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3333 let branch_snap_1 = table_after_branch_1
3334 .snapshots
3335 .get(&branch_ref.snapshot_id)
3336 .unwrap();
3337 assert_eq!(branch_snap_1.first_row_id(), Some(0));
3338
3339 assert_eq!(table_after_branch_1.next_row_id(), 30);
3341
3342 let main_snapshot = Snapshot::builder()
3344 .with_snapshot_id(2)
3345 .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3346 .with_sequence_number(1)
3347 .with_schema_id(0)
3348 .with_manifest_list("bar")
3349 .with_parent_snapshot_id(None)
3350 .with_summary(Summary {
3351 operation: Operation::Append,
3352 additional_properties: HashMap::new(),
3353 })
3354 .with_row_range(table_after_branch_1.next_row_id(), 28)
3355 .build();
3356
3357 let table_after_main = table_after_branch_1
3358 .into_builder(None)
3359 .add_snapshot(main_snapshot.clone())
3360 .unwrap()
3361 .set_ref(MAIN_BRANCH, SnapshotReference {
3362 snapshot_id: main_snapshot.snapshot_id(),
3363 retention: SnapshotRetention::Branch {
3364 min_snapshots_to_keep: None,
3365 max_snapshot_age_ms: None,
3366 max_ref_age_ms: None,
3367 },
3368 })
3369 .unwrap()
3370 .build()
3371 .unwrap()
3372 .metadata;
3373
3374 let current_snapshot = table_after_main.current_snapshot().unwrap();
3376 assert_eq!(current_snapshot.first_row_id(), Some(30));
3377
3378 assert_eq!(table_after_main.next_row_id(), 58);
3380
3381 let branch_snapshot_2 = Snapshot::builder()
3383 .with_snapshot_id(3)
3384 .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3385 .with_sequence_number(2)
3386 .with_schema_id(0)
3387 .with_manifest_list("baz")
3388 .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3389 .with_summary(Summary {
3390 operation: Operation::Append,
3391 additional_properties: HashMap::new(),
3392 })
3393 .with_row_range(table_after_main.next_row_id(), 21)
3394 .build();
3395
3396 let table_after_branch_2 = table_after_main
3397 .into_builder(None)
3398 .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3399 .unwrap()
3400 .build()
3401 .unwrap()
3402 .metadata;
3403
3404 let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3406 let branch_snap_2 = table_after_branch_2
3407 .snapshots
3408 .get(&branch_ref_2.snapshot_id)
3409 .unwrap();
3410 assert_eq!(branch_snap_2.first_row_id(), Some(58));
3411
3412 assert_eq!(table_after_branch_2.next_row_id(), 79);
3414 }
3415
3416 #[test]
3417 fn test_encryption_keys() {
3418 let builder = builder_without_changes(FormatVersion::V2);
3419
3420 let encryption_key_1 = EncryptedKey::builder()
3422 .key_id("key-1")
3423 .encrypted_key_metadata(vec![1, 2, 3, 4])
3424 .encrypted_by_id("encryption-service-1")
3425 .properties(HashMap::from_iter(vec![(
3426 "algorithm".to_string(),
3427 "AES-256".to_string(),
3428 )]))
3429 .build();
3430
3431 let encryption_key_2 = EncryptedKey::builder()
3432 .key_id("key-2")
3433 .encrypted_key_metadata(vec![5, 6, 7, 8])
3434 .encrypted_by_id("encryption-service-2")
3435 .properties(HashMap::new())
3436 .build();
3437
3438 let build_result = builder
3440 .add_encryption_key(encryption_key_1.clone())
3441 .build()
3442 .unwrap();
3443
3444 assert_eq!(build_result.changes.len(), 1);
3445 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3446 assert_eq!(
3447 build_result.metadata.encryption_key("key-1"),
3448 Some(&encryption_key_1)
3449 );
3450 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3451 encryption_key: encryption_key_1.clone()
3452 });
3453
3454 let build_result = build_result
3456 .metadata
3457 .into_builder(Some(
3458 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3459 ))
3460 .add_encryption_key(encryption_key_2.clone())
3461 .build()
3462 .unwrap();
3463
3464 assert_eq!(build_result.changes.len(), 1);
3465 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3466 assert_eq!(
3467 build_result.metadata.encryption_key("key-1"),
3468 Some(&encryption_key_1)
3469 );
3470 assert_eq!(
3471 build_result.metadata.encryption_key("key-2"),
3472 Some(&encryption_key_2)
3473 );
3474 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3475 encryption_key: encryption_key_2.clone()
3476 });
3477
3478 let build_result = build_result
3480 .metadata
3481 .into_builder(Some(
3482 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3483 ))
3484 .add_encryption_key(encryption_key_1.clone())
3485 .build()
3486 .unwrap();
3487
3488 assert_eq!(build_result.changes.len(), 0);
3489 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3490
3491 let build_result = build_result
3493 .metadata
3494 .into_builder(Some(
3495 "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3496 ))
3497 .remove_encryption_key("key-1")
3498 .build()
3499 .unwrap();
3500
3501 assert_eq!(build_result.changes.len(), 1);
3502 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3503 assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3504 assert_eq!(
3505 build_result.metadata.encryption_key("key-2"),
3506 Some(&encryption_key_2)
3507 );
3508 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3509 key_id: "key-1".to_string()
3510 });
3511
3512 let build_result = build_result
3514 .metadata
3515 .into_builder(Some(
3516 "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3517 ))
3518 .remove_encryption_key("non-existent-key")
3519 .build()
3520 .unwrap();
3521
3522 assert_eq!(build_result.changes.len(), 0);
3523 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3524
3525 let keys = build_result
3527 .metadata
3528 .encryption_keys_iter()
3529 .collect::<Vec<_>>();
3530 assert_eq!(keys.len(), 1);
3531 assert_eq!(keys[0], &encryption_key_2);
3532
3533 let build_result = build_result
3535 .metadata
3536 .into_builder(Some(
3537 "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3538 ))
3539 .remove_encryption_key("key-2")
3540 .build()
3541 .unwrap();
3542
3543 assert_eq!(build_result.changes.len(), 1);
3544 assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3545 assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3546 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3547 key_id: "key-2".to_string()
3548 });
3549
3550 let keys = build_result.metadata.encryption_keys_iter();
3552 assert_eq!(keys.len(), 0);
3553 }
3554
3555 #[test]
3556 fn test_partition_field_id_reuse_across_specs() {
3557 let schema = Schema::builder()
3558 .with_fields(vec![
3559 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3560 NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
3561 NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
3562 .into(),
3563 ])
3564 .build()
3565 .unwrap();
3566
3567 let initial_spec = UnboundPartitionSpec::builder()
3569 .add_partition_field(1, "id", Transform::Identity)
3570 .unwrap()
3571 .build();
3572
3573 let mut metadata = TableMetadataBuilder::new(
3574 schema,
3575 initial_spec,
3576 SortOrder::unsorted_order(),
3577 "s3://bucket/table".to_string(),
3578 FormatVersion::V2,
3579 HashMap::new(),
3580 )
3581 .unwrap()
3582 .build()
3583 .unwrap()
3584 .metadata;
3585
3586 let spec1 = UnboundPartitionSpec::builder()
3588 .add_partition_field(2, "data_bucket", Transform::Bucket(10))
3589 .unwrap()
3590 .build();
3591 let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string()));
3592 let result = builder.add_partition_spec(spec1).unwrap().build().unwrap();
3593 metadata = result.metadata;
3594
3595 let spec2 = UnboundPartitionSpec::builder()
3598 .add_partition_field(1, "id", Transform::Identity) .unwrap()
3600 .add_partition_field(2, "data_bucket", Transform::Bucket(10)) .unwrap()
3602 .add_partition_field(3, "year", Transform::Year) .unwrap()
3604 .build();
3605 let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string()));
3606 let result = builder.add_partition_spec(spec2).unwrap().build().unwrap();
3607
3608 let spec2 = result.metadata.partition_spec_by_id(2).unwrap();
3610 let field_ids: Vec<i32> = spec2.fields().iter().map(|f| f.field_id).collect();
3611 assert_eq!(field_ids, vec![1000, 1001, 1002]); }
3613}