1pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::runtime::Runtime;
44use crate::spec::{
45 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
46 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
47 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
48};
49use crate::table::Table;
50use crate::{Error, ErrorKind, Result};
51
52#[async_trait]
54#[cfg_attr(test, automock)]
55pub trait Catalog: Debug + Sync + Send {
56 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
58 -> Result<Vec<NamespaceIdent>>;
59
60 async fn create_namespace(
62 &self,
63 namespace: &NamespaceIdent,
64 properties: HashMap<String, String>,
65 ) -> Result<Namespace>;
66
67 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
69
70 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
72
73 async fn update_namespace(
79 &self,
80 namespace: &NamespaceIdent,
81 properties: HashMap<String, String>,
82 ) -> Result<()>;
83
84 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
86
87 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
89
90 async fn create_table(
92 &self,
93 namespace: &NamespaceIdent,
94 creation: TableCreation,
95 ) -> Result<Table>;
96
97 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
99
100 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
102
103 async fn purge_table(&self, table: &TableIdent) -> Result<()>;
110
111 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
113
114 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
116
117 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
119
120 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
122}
123
124pub trait CatalogBuilder: Default + Debug + Send + Sync {
126 type C: Catalog;
128
129 fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
154
155 fn with_runtime(self, runtime: Runtime) -> Self;
161
162 fn load(
164 self,
165 name: impl Into<String>,
166 props: HashMap<String, String>,
167 ) -> impl Future<Output = Result<Self::C>> + Send;
168}
169
170#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
176pub struct NamespaceIdent(Vec<String>);
177
178impl NamespaceIdent {
179 pub fn new(name: String) -> Self {
181 Self(vec![name])
182 }
183
184 pub fn from_vec(names: Vec<String>) -> Result<Self> {
186 if names.is_empty() {
187 return Err(Error::new(
188 ErrorKind::DataInvalid,
189 "Namespace identifier can't be empty!",
190 ));
191 }
192 Ok(Self(names))
193 }
194
195 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
197 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
198 }
199
200 pub fn to_url_string(&self) -> String {
202 self.as_ref().join("\u{001f}")
203 }
204
205 pub fn inner(self) -> Vec<String> {
207 self.0
208 }
209
210 pub fn parent(&self) -> Option<Self> {
213 self.0.split_last().and_then(|(_, parent)| {
214 if parent.is_empty() {
215 None
216 } else {
217 Some(Self(parent.to_vec()))
218 }
219 })
220 }
221}
222
223impl AsRef<Vec<String>> for NamespaceIdent {
224 fn as_ref(&self) -> &Vec<String> {
225 &self.0
226 }
227}
228
229impl Deref for NamespaceIdent {
230 type Target = [String];
231
232 fn deref(&self) -> &Self::Target {
233 &self.0
234 }
235}
236
237#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct Namespace {
240 name: NamespaceIdent,
241 properties: HashMap<String, String>,
242}
243
244impl Namespace {
245 pub fn new(name: NamespaceIdent) -> Self {
247 Self::with_properties(name, HashMap::default())
248 }
249
250 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
252 Self { name, properties }
253 }
254
255 pub fn name(&self) -> &NamespaceIdent {
257 &self.name
258 }
259
260 pub fn properties(&self) -> &HashMap<String, String> {
262 &self.properties
263 }
264}
265
266impl Display for NamespaceIdent {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 write!(f, "{}", self.0.join("."))
269 }
270}
271
272#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
274pub struct TableIdent {
275 pub namespace: NamespaceIdent,
277 pub name: String,
279}
280
281impl TableIdent {
282 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
284 Self { namespace, name }
285 }
286
287 pub fn namespace(&self) -> &NamespaceIdent {
289 &self.namespace
290 }
291
292 pub fn name(&self) -> &str {
294 &self.name
295 }
296
297 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
299 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
300 let table_name = vec.pop().ok_or_else(|| {
301 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
302 })?;
303 let namespace_ident = NamespaceIdent::from_vec(vec)?;
304
305 Ok(Self {
306 namespace: namespace_ident,
307 name: table_name,
308 })
309 }
310}
311
312impl Display for TableIdent {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 write!(f, "{}.{}", self.namespace, self.name)
315 }
316}
317
318#[derive(Debug, TypedBuilder)]
320pub struct TableCreation {
321 pub name: String,
323 #[builder(default, setter(strip_option(fallback = location_opt)))]
325 pub location: Option<String>,
326 pub schema: Schema,
328 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
330 pub partition_spec: Option<UnboundPartitionSpec>,
331 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
333 pub sort_order: Option<SortOrder>,
334 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
336 props.into_iter().collect()
337 }))]
338 pub properties: HashMap<String, String>,
339 #[builder(default = FormatVersion::V2)]
341 pub format_version: FormatVersion,
342}
343
344#[derive(Debug, TypedBuilder)]
350#[builder(build_method(vis = "pub(crate)"))]
351pub struct TableCommit {
352 ident: TableIdent,
354 requirements: Vec<TableRequirement>,
358 updates: Vec<TableUpdate>,
360}
361
362impl TableCommit {
363 pub fn identifier(&self) -> &TableIdent {
365 &self.ident
366 }
367
368 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
370 take(&mut self.requirements)
371 }
372
373 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
375 take(&mut self.updates)
376 }
377
378 pub fn apply(self, table: Table) -> Result<Table> {
384 for requirement in self.requirements {
386 requirement.check(Some(table.metadata()))?;
387 }
388
389 let current_metadata_location = table.metadata_location_result()?;
391
392 let mut metadata_builder = table
394 .metadata()
395 .clone()
396 .into_builder(Some(current_metadata_location.to_string()));
397 for update in self.updates {
398 metadata_builder = update.apply(metadata_builder)?;
399 }
400
401 let new_metadata = metadata_builder.build()?.metadata;
403
404 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
405 .with_next_version()
406 .with_new_metadata(&new_metadata)
407 .to_string();
408
409 Ok(table
410 .with_metadata(Arc::new(new_metadata))
411 .with_metadata_location(new_metadata_location))
412 }
413}
414
415#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
417#[serde(tag = "type")]
418pub enum TableRequirement {
419 #[serde(rename = "assert-create")]
421 NotExist,
422 #[serde(rename = "assert-table-uuid")]
424 UuidMatch {
425 uuid: Uuid,
427 },
428 #[serde(rename = "assert-ref-snapshot-id")]
431 RefSnapshotIdMatch {
432 r#ref: String,
434 #[serde(rename = "snapshot-id")]
437 snapshot_id: Option<i64>,
438 },
439 #[serde(rename = "assert-last-assigned-field-id")]
441 LastAssignedFieldIdMatch {
442 #[serde(rename = "last-assigned-field-id")]
444 last_assigned_field_id: i32,
445 },
446 #[serde(rename = "assert-current-schema-id")]
448 CurrentSchemaIdMatch {
449 #[serde(rename = "current-schema-id")]
451 current_schema_id: SchemaId,
452 },
453 #[serde(rename = "assert-last-assigned-partition-id")]
456 LastAssignedPartitionIdMatch {
457 #[serde(rename = "last-assigned-partition-id")]
459 last_assigned_partition_id: i32,
460 },
461 #[serde(rename = "assert-default-spec-id")]
463 DefaultSpecIdMatch {
464 #[serde(rename = "default-spec-id")]
466 default_spec_id: i32,
467 },
468 #[serde(rename = "assert-default-sort-order-id")]
470 DefaultSortOrderIdMatch {
471 #[serde(rename = "default-sort-order-id")]
473 default_sort_order_id: i64,
474 },
475}
476
477#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
479#[serde(tag = "action", rename_all = "kebab-case")]
480#[allow(clippy::large_enum_variant)]
481pub enum TableUpdate {
482 #[serde(rename_all = "kebab-case")]
484 UpgradeFormatVersion {
485 format_version: FormatVersion,
487 },
488 #[serde(rename_all = "kebab-case")]
490 AssignUuid {
491 uuid: Uuid,
493 },
494 #[serde(rename_all = "kebab-case")]
496 AddSchema {
497 schema: Schema,
499 },
500 #[serde(rename_all = "kebab-case")]
502 SetCurrentSchema {
503 schema_id: i32,
505 },
506 AddSpec {
508 spec: UnboundPartitionSpec,
510 },
511 #[serde(rename_all = "kebab-case")]
513 SetDefaultSpec {
514 spec_id: i32,
516 },
517 #[serde(rename_all = "kebab-case")]
519 AddSortOrder {
520 sort_order: SortOrder,
522 },
523 #[serde(rename_all = "kebab-case")]
525 SetDefaultSortOrder {
526 sort_order_id: i64,
528 },
529 #[serde(rename_all = "kebab-case")]
531 AddSnapshot {
532 #[serde(
534 deserialize_with = "deserialize_snapshot",
535 serialize_with = "serialize_snapshot"
536 )]
537 snapshot: Snapshot,
538 },
539 #[serde(rename_all = "kebab-case")]
541 SetSnapshotRef {
542 ref_name: String,
544 #[serde(flatten)]
546 reference: SnapshotReference,
547 },
548 #[serde(rename_all = "kebab-case")]
550 RemoveSnapshots {
551 snapshot_ids: Vec<i64>,
553 },
554 #[serde(rename_all = "kebab-case")]
556 RemoveSnapshotRef {
557 ref_name: String,
559 },
560 SetLocation {
562 location: String,
564 },
565 SetProperties {
567 updates: HashMap<String, String>,
569 },
570 RemoveProperties {
572 removals: Vec<String>,
574 },
575 #[serde(rename_all = "kebab-case")]
577 RemovePartitionSpecs {
578 spec_ids: Vec<i32>,
580 },
581 #[serde(with = "_serde_set_statistics")]
583 SetStatistics {
584 statistics: StatisticsFile,
586 },
587 #[serde(rename_all = "kebab-case")]
589 RemoveStatistics {
590 snapshot_id: i64,
592 },
593 #[serde(rename_all = "kebab-case")]
595 SetPartitionStatistics {
596 partition_statistics: PartitionStatisticsFile,
598 },
599 #[serde(rename_all = "kebab-case")]
601 RemovePartitionStatistics {
602 snapshot_id: i64,
604 },
605 #[serde(rename_all = "kebab-case")]
607 RemoveSchemas {
608 schema_ids: Vec<i32>,
610 },
611 #[serde(rename_all = "kebab-case")]
613 AddEncryptionKey {
614 encryption_key: EncryptedKey,
616 },
617 #[serde(rename_all = "kebab-case")]
619 RemoveEncryptionKey {
620 key_id: String,
622 },
623}
624
625impl TableUpdate {
626 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
628 match self {
629 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
630 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
631 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
632 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
633 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
634 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
635 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
636 builder.set_default_sort_order(sort_order_id)
637 }
638 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
639 TableUpdate::SetSnapshotRef {
640 ref_name,
641 reference,
642 } => builder.set_ref(&ref_name, reference),
643 TableUpdate::RemoveSnapshots { snapshot_ids } => {
644 Ok(builder.remove_snapshots(&snapshot_ids))
645 }
646 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
647 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
648 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
649 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
650 TableUpdate::UpgradeFormatVersion { format_version } => {
651 builder.upgrade_format_version(format_version)
652 }
653 TableUpdate::RemovePartitionSpecs { spec_ids } => {
654 builder.remove_partition_specs(&spec_ids)
655 }
656 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
657 TableUpdate::RemoveStatistics { snapshot_id } => {
658 Ok(builder.remove_statistics(snapshot_id))
659 }
660 TableUpdate::SetPartitionStatistics {
661 partition_statistics,
662 } => Ok(builder.set_partition_statistics(partition_statistics)),
663 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
664 Ok(builder.remove_partition_statistics(snapshot_id))
665 }
666 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
667 TableUpdate::AddEncryptionKey { encryption_key } => {
668 Ok(builder.add_encryption_key(encryption_key))
669 }
670 TableUpdate::RemoveEncryptionKey { key_id } => {
671 Ok(builder.remove_encryption_key(&key_id))
672 }
673 }
674 }
675}
676
677impl TableRequirement {
678 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
683 if let Some(metadata) = metadata {
684 match self {
685 TableRequirement::NotExist => {
686 return Err(Error::new(
687 ErrorKind::CatalogCommitConflicts,
688 format!(
689 "Requirement failed: Table with id {} already exists",
690 metadata.uuid()
691 ),
692 )
693 .with_retryable(true));
694 }
695 TableRequirement::UuidMatch { uuid } => {
696 if &metadata.uuid() != uuid {
697 return Err(Error::new(
698 ErrorKind::CatalogCommitConflicts,
699 "Requirement failed: Table UUID does not match",
700 )
701 .with_context("expected", *uuid)
702 .with_context("found", metadata.uuid())
703 .with_retryable(true));
704 }
705 }
706 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
707 if metadata.current_schema_id != *current_schema_id {
709 return Err(Error::new(
710 ErrorKind::CatalogCommitConflicts,
711 "Requirement failed: Current schema id does not match",
712 )
713 .with_context("expected", current_schema_id.to_string())
714 .with_context("found", metadata.current_schema_id.to_string())
715 .with_retryable(true));
716 }
717 }
718 TableRequirement::DefaultSortOrderIdMatch {
719 default_sort_order_id,
720 } => {
721 if metadata.default_sort_order().order_id != *default_sort_order_id {
722 return Err(Error::new(
723 ErrorKind::CatalogCommitConflicts,
724 "Requirement failed: Default sort order id does not match",
725 )
726 .with_context("expected", default_sort_order_id.to_string())
727 .with_context("found", metadata.default_sort_order().order_id.to_string())
728 .with_retryable(true));
729 }
730 }
731 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
732 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
733 if let Some(snapshot_id) = snapshot_id {
734 let snapshot_ref = snapshot_ref.ok_or(
735 Error::new(
736 ErrorKind::CatalogCommitConflicts,
737 format!("Requirement failed: Branch or tag `{ref}` not found"),
738 )
739 .with_retryable(true),
740 )?;
741 if snapshot_ref.snapshot_id() != *snapshot_id {
742 return Err(Error::new(
743 ErrorKind::CatalogCommitConflicts,
744 format!(
745 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
746 ),
747 )
748 .with_context("expected", snapshot_id.to_string())
749 .with_context("found", snapshot_ref.snapshot_id().to_string())
750 .with_retryable(true));
751 }
752 } else if snapshot_ref.is_some() {
753 return Err(Error::new(
755 ErrorKind::CatalogCommitConflicts,
756 format!("Requirement failed: Branch or tag `{ref}` already exists"),
757 )
758 .with_retryable(true));
759 }
760 }
761 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
762 if metadata.default_partition_spec_id() != *default_spec_id {
764 return Err(Error::new(
765 ErrorKind::CatalogCommitConflicts,
766 "Requirement failed: Default partition spec id does not match",
767 )
768 .with_context("expected", default_spec_id.to_string())
769 .with_context("found", metadata.default_partition_spec_id().to_string())
770 .with_retryable(true));
771 }
772 }
773 TableRequirement::LastAssignedPartitionIdMatch {
774 last_assigned_partition_id,
775 } => {
776 if metadata.last_partition_id != *last_assigned_partition_id {
777 return Err(Error::new(
778 ErrorKind::CatalogCommitConflicts,
779 "Requirement failed: Last assigned partition id does not match",
780 )
781 .with_context("expected", last_assigned_partition_id.to_string())
782 .with_context("found", metadata.last_partition_id.to_string())
783 .with_retryable(true));
784 }
785 }
786 TableRequirement::LastAssignedFieldIdMatch {
787 last_assigned_field_id,
788 } => {
789 if &metadata.last_column_id != last_assigned_field_id {
790 return Err(Error::new(
791 ErrorKind::CatalogCommitConflicts,
792 "Requirement failed: Last assigned field id does not match",
793 )
794 .with_context("expected", last_assigned_field_id.to_string())
795 .with_context("found", metadata.last_column_id.to_string())
796 .with_retryable(true));
797 }
798 }
799 };
800 } else {
801 match self {
802 TableRequirement::NotExist => {}
803 _ => {
804 return Err(Error::new(
805 ErrorKind::TableNotFound,
806 "Requirement failed: Table does not exist",
807 ));
808 }
809 }
810 }
811
812 Ok(())
813 }
814}
815
816pub(super) mod _serde {
817 use serde::{Deserialize as _, Deserializer, Serialize as _};
818
819 use super::*;
820 use crate::spec::{SchemaId, Summary};
821
822 pub(super) fn deserialize_snapshot<'de, D>(
823 deserializer: D,
824 ) -> std::result::Result<Snapshot, D::Error>
825 where D: Deserializer<'de> {
826 let buf = CatalogSnapshot::deserialize(deserializer)?;
827 Ok(buf.into())
828 }
829
830 pub(super) fn serialize_snapshot<S>(
831 snapshot: &Snapshot,
832 serializer: S,
833 ) -> std::result::Result<S::Ok, S::Error>
834 where
835 S: serde::Serializer,
836 {
837 let buf: CatalogSnapshot = snapshot.clone().into();
838 buf.serialize(serializer)
839 }
840
841 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
842 #[serde(rename_all = "kebab-case")]
843 struct CatalogSnapshot {
847 snapshot_id: i64,
848 #[serde(skip_serializing_if = "Option::is_none")]
849 parent_snapshot_id: Option<i64>,
850 #[serde(default)]
851 sequence_number: i64,
852 timestamp_ms: i64,
853 manifest_list: String,
854 summary: Summary,
855 #[serde(skip_serializing_if = "Option::is_none")]
856 schema_id: Option<SchemaId>,
857 #[serde(skip_serializing_if = "Option::is_none")]
858 first_row_id: Option<u64>,
859 #[serde(skip_serializing_if = "Option::is_none")]
860 added_rows: Option<u64>,
861 #[serde(skip_serializing_if = "Option::is_none")]
862 key_id: Option<String>,
863 }
864
865 impl From<CatalogSnapshot> for Snapshot {
866 fn from(snapshot: CatalogSnapshot) -> Self {
867 let CatalogSnapshot {
868 snapshot_id,
869 parent_snapshot_id,
870 sequence_number,
871 timestamp_ms,
872 manifest_list,
873 schema_id,
874 summary,
875 first_row_id,
876 added_rows,
877 key_id,
878 } = snapshot;
879 let builder = Snapshot::builder()
880 .with_snapshot_id(snapshot_id)
881 .with_parent_snapshot_id(parent_snapshot_id)
882 .with_sequence_number(sequence_number)
883 .with_timestamp_ms(timestamp_ms)
884 .with_manifest_list(manifest_list)
885 .with_summary(summary)
886 .with_encryption_key_id(key_id);
887 let row_range = first_row_id.zip(added_rows);
888 match (schema_id, row_range) {
889 (None, None) => builder.build(),
890 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
891 (None, Some((first_row_id, last_row_id))) => {
892 builder.with_row_range(first_row_id, last_row_id).build()
893 }
894 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
895 .with_schema_id(schema_id)
896 .with_row_range(first_row_id, last_row_id)
897 .build(),
898 }
899 }
900 }
901
902 impl From<Snapshot> for CatalogSnapshot {
903 fn from(snapshot: Snapshot) -> Self {
904 let first_row_id = snapshot.first_row_id();
905 let added_rows = snapshot.added_rows_count();
906 let Snapshot {
907 snapshot_id,
908 parent_snapshot_id,
909 sequence_number,
910 timestamp_ms,
911 manifest_list,
912 summary,
913 schema_id,
914 row_range: _,
915 encryption_key_id: key_id,
916 } = snapshot;
917 CatalogSnapshot {
918 snapshot_id,
919 parent_snapshot_id,
920 sequence_number,
921 timestamp_ms,
922 manifest_list,
923 summary,
924 schema_id,
925 first_row_id,
926 added_rows,
927 key_id,
928 }
929 }
930 }
931}
932
933#[derive(Debug, TypedBuilder)]
935pub struct ViewCreation {
936 pub name: String,
938 pub location: String,
940 pub representations: ViewRepresentations,
942 pub schema: Schema,
944 #[builder(default)]
946 pub properties: HashMap<String, String>,
947 pub default_namespace: NamespaceIdent,
949 #[builder(default)]
951 pub default_catalog: Option<String>,
952 #[builder(default)]
955 pub summary: HashMap<String, String>,
956}
957
958#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
960#[serde(tag = "action", rename_all = "kebab-case")]
961#[allow(clippy::large_enum_variant)]
962pub enum ViewUpdate {
963 #[serde(rename_all = "kebab-case")]
965 AssignUuid {
966 uuid: Uuid,
968 },
969 #[serde(rename_all = "kebab-case")]
971 UpgradeFormatVersion {
972 format_version: ViewFormatVersion,
974 },
975 #[serde(rename_all = "kebab-case")]
977 AddSchema {
978 schema: Schema,
980 last_column_id: Option<i32>,
982 },
983 #[serde(rename_all = "kebab-case")]
985 SetLocation {
986 location: String,
988 },
989 #[serde(rename_all = "kebab-case")]
993 SetProperties {
994 updates: HashMap<String, String>,
996 },
997 #[serde(rename_all = "kebab-case")]
999 RemoveProperties {
1000 removals: Vec<String>,
1002 },
1003 #[serde(rename_all = "kebab-case")]
1005 AddViewVersion {
1006 view_version: ViewVersion,
1008 },
1009 #[serde(rename_all = "kebab-case")]
1011 SetCurrentViewVersion {
1012 view_version_id: i32,
1014 },
1015}
1016
1017mod _serde_set_statistics {
1018 use serde::{Deserialize, Deserializer, Serialize, Serializer};
1021
1022 use super::*;
1023
1024 #[derive(Debug, Serialize, Deserialize)]
1025 #[serde(rename_all = "kebab-case")]
1026 struct SetStatistics {
1027 snapshot_id: Option<i64>,
1028 statistics: StatisticsFile,
1029 }
1030
1031 pub fn serialize<S>(
1032 value: &StatisticsFile,
1033 serializer: S,
1034 ) -> std::result::Result<S::Ok, S::Error>
1035 where
1036 S: Serializer,
1037 {
1038 SetStatistics {
1039 snapshot_id: Some(value.snapshot_id),
1040 statistics: value.clone(),
1041 }
1042 .serialize(serializer)
1043 }
1044
1045 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1046 where D: Deserializer<'de> {
1047 let SetStatistics {
1048 snapshot_id,
1049 statistics,
1050 } = SetStatistics::deserialize(deserializer)?;
1051 if let Some(snapshot_id) = snapshot_id
1052 && snapshot_id != statistics.snapshot_id
1053 {
1054 return Err(serde::de::Error::custom(format!(
1055 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1056 statistics.snapshot_id
1057 )));
1058 }
1059
1060 Ok(statistics)
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use std::collections::HashMap;
1067 use std::fmt::Debug;
1068 use std::fs::File;
1069 use std::io::BufReader;
1070
1071 use base64::Engine as _;
1072 use serde::Serialize;
1073 use serde::de::DeserializeOwned;
1074 use uuid::uuid;
1075
1076 use super::ViewUpdate;
1077 use crate::io::FileIO;
1078 use crate::spec::{
1079 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1080 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1081 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1082 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1083 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1084 ViewVersion,
1085 };
1086 use crate::table::Table;
1087 use crate::test_utils::test_runtime;
1088 use crate::{
1089 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1090 };
1091
1092 #[test]
1093 fn test_parent_namespace() {
1094 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1095 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1096 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1097
1098 assert_eq!(ns1.parent(), None);
1099 assert_eq!(ns2.parent(), Some(ns1.clone()));
1100 assert_eq!(ns3.parent(), Some(ns2.clone()));
1101 }
1102
1103 #[test]
1104 fn test_create_table_id() {
1105 let table_id = TableIdent {
1106 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1107 name: "t1".to_string(),
1108 };
1109
1110 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1111 }
1112
1113 #[test]
1114 fn test_table_creation_iterator_properties() {
1115 let builder = TableCreation::builder()
1116 .name("table".to_string())
1117 .schema(Schema::builder().build().unwrap());
1118
1119 fn s(k: &str, v: &str) -> (String, String) {
1120 (k.to_string(), v.to_string())
1121 }
1122
1123 let table_creation = builder
1124 .properties([s("key", "value"), s("foo", "bar")])
1125 .build();
1126
1127 assert_eq!(
1128 HashMap::from([s("key", "value"), s("foo", "bar")]),
1129 table_creation.properties
1130 );
1131 }
1132
1133 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1134 json: impl ToString,
1135 expected: T,
1136 ) {
1137 let json_str = json.to_string();
1138 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1139 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1140
1141 let restored: T = serde_json::from_str(
1142 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1143 )
1144 .expect("Failed to parse from serialized json");
1145
1146 assert_eq!(
1147 restored, expected,
1148 "Parsed restored value is not equal to expected"
1149 );
1150 }
1151
1152 fn metadata() -> TableMetadata {
1153 let tbl_creation = TableCreation::builder()
1154 .name("table".to_string())
1155 .location("/path/to/table".to_string())
1156 .schema(Schema::builder().build().unwrap())
1157 .build();
1158
1159 TableMetadataBuilder::from_table_creation(tbl_creation)
1160 .unwrap()
1161 .assign_uuid(uuid::Uuid::nil())
1162 .build()
1163 .unwrap()
1164 .metadata
1165 }
1166
1167 #[test]
1168 fn test_check_requirement_not_exist() {
1169 let metadata = metadata();
1170 let requirement = TableRequirement::NotExist;
1171
1172 assert!(requirement.check(Some(&metadata)).is_err());
1173 assert!(requirement.check(None).is_ok());
1174 }
1175
1176 #[test]
1177 fn test_check_table_uuid() {
1178 let metadata = metadata();
1179
1180 let requirement = TableRequirement::UuidMatch {
1181 uuid: uuid::Uuid::now_v7(),
1182 };
1183 assert!(requirement.check(Some(&metadata)).is_err());
1184
1185 let requirement = TableRequirement::UuidMatch {
1186 uuid: uuid::Uuid::nil(),
1187 };
1188 assert!(requirement.check(Some(&metadata)).is_ok());
1189 }
1190
1191 #[test]
1192 fn test_check_ref_snapshot_id() {
1193 let metadata = metadata();
1194
1195 let requirement = TableRequirement::RefSnapshotIdMatch {
1197 r#ref: "my_branch".to_string(),
1198 snapshot_id: Some(1),
1199 };
1200 assert!(requirement.check(Some(&metadata)).is_err());
1201
1202 let requirement = TableRequirement::RefSnapshotIdMatch {
1204 r#ref: "my_branch".to_string(),
1205 snapshot_id: None,
1206 };
1207 assert!(requirement.check(Some(&metadata)).is_ok());
1208
1209 let snapshot = Snapshot::builder()
1211 .with_snapshot_id(3051729675574597004)
1212 .with_sequence_number(10)
1213 .with_timestamp_ms(9992191116217)
1214 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1215 .with_schema_id(0)
1216 .with_summary(Summary {
1217 operation: Operation::Append,
1218 additional_properties: HashMap::new(),
1219 })
1220 .build();
1221
1222 let builder = metadata.into_builder(None);
1223 let builder = TableUpdate::AddSnapshot {
1224 snapshot: snapshot.clone(),
1225 }
1226 .apply(builder)
1227 .unwrap();
1228 let metadata = TableUpdate::SetSnapshotRef {
1229 ref_name: MAIN_BRANCH.to_string(),
1230 reference: SnapshotReference {
1231 snapshot_id: snapshot.snapshot_id(),
1232 retention: SnapshotRetention::Branch {
1233 min_snapshots_to_keep: Some(10),
1234 max_snapshot_age_ms: None,
1235 max_ref_age_ms: None,
1236 },
1237 },
1238 }
1239 .apply(builder)
1240 .unwrap()
1241 .build()
1242 .unwrap()
1243 .metadata;
1244
1245 let requirement = TableRequirement::RefSnapshotIdMatch {
1247 r#ref: "main".to_string(),
1248 snapshot_id: Some(3051729675574597004),
1249 };
1250 assert!(requirement.check(Some(&metadata)).is_ok());
1251
1252 let requirement = TableRequirement::RefSnapshotIdMatch {
1254 r#ref: "main".to_string(),
1255 snapshot_id: Some(1),
1256 };
1257 assert!(requirement.check(Some(&metadata)).is_err());
1258 }
1259
1260 #[test]
1261 fn test_check_last_assigned_field_id() {
1262 let metadata = metadata();
1263
1264 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1265 last_assigned_field_id: 1,
1266 };
1267 assert!(requirement.check(Some(&metadata)).is_err());
1268
1269 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1270 last_assigned_field_id: 0,
1271 };
1272 assert!(requirement.check(Some(&metadata)).is_ok());
1273 }
1274
1275 #[test]
1276 fn test_check_current_schema_id() {
1277 let metadata = metadata();
1278
1279 let requirement = TableRequirement::CurrentSchemaIdMatch {
1280 current_schema_id: 1,
1281 };
1282 assert!(requirement.check(Some(&metadata)).is_err());
1283
1284 let requirement = TableRequirement::CurrentSchemaIdMatch {
1285 current_schema_id: 0,
1286 };
1287 assert!(requirement.check(Some(&metadata)).is_ok());
1288 }
1289
1290 #[test]
1291 fn test_check_last_assigned_partition_id() {
1292 let metadata = metadata();
1293 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1294 last_assigned_partition_id: 0,
1295 };
1296 assert!(requirement.check(Some(&metadata)).is_err());
1297
1298 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1299 last_assigned_partition_id: 999,
1300 };
1301 assert!(requirement.check(Some(&metadata)).is_ok());
1302 }
1303
1304 #[test]
1305 fn test_check_default_spec_id() {
1306 let metadata = metadata();
1307
1308 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1309 assert!(requirement.check(Some(&metadata)).is_err());
1310
1311 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1312 assert!(requirement.check(Some(&metadata)).is_ok());
1313 }
1314
1315 #[test]
1316 fn test_check_default_sort_order_id() {
1317 let metadata = metadata();
1318
1319 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1320 default_sort_order_id: 1,
1321 };
1322 assert!(requirement.check(Some(&metadata)).is_err());
1323
1324 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1325 default_sort_order_id: 0,
1326 };
1327 assert!(requirement.check(Some(&metadata)).is_ok());
1328 }
1329
1330 #[test]
1331 fn test_table_uuid() {
1332 test_serde_json(
1333 r#"
1334{
1335 "type": "assert-table-uuid",
1336 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1337}
1338 "#,
1339 TableRequirement::UuidMatch {
1340 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1341 },
1342 );
1343 }
1344
1345 #[test]
1346 fn test_assert_table_not_exists() {
1347 test_serde_json(
1348 r#"
1349{
1350 "type": "assert-create"
1351}
1352 "#,
1353 TableRequirement::NotExist,
1354 );
1355 }
1356
1357 #[test]
1358 fn test_assert_ref_snapshot_id() {
1359 test_serde_json(
1360 r#"
1361{
1362 "type": "assert-ref-snapshot-id",
1363 "ref": "snapshot-name",
1364 "snapshot-id": null
1365}
1366 "#,
1367 TableRequirement::RefSnapshotIdMatch {
1368 r#ref: "snapshot-name".to_string(),
1369 snapshot_id: None,
1370 },
1371 );
1372
1373 test_serde_json(
1374 r#"
1375{
1376 "type": "assert-ref-snapshot-id",
1377 "ref": "snapshot-name",
1378 "snapshot-id": 1
1379}
1380 "#,
1381 TableRequirement::RefSnapshotIdMatch {
1382 r#ref: "snapshot-name".to_string(),
1383 snapshot_id: Some(1),
1384 },
1385 );
1386 }
1387
1388 #[test]
1389 fn test_assert_last_assigned_field_id() {
1390 test_serde_json(
1391 r#"
1392{
1393 "type": "assert-last-assigned-field-id",
1394 "last-assigned-field-id": 12
1395}
1396 "#,
1397 TableRequirement::LastAssignedFieldIdMatch {
1398 last_assigned_field_id: 12,
1399 },
1400 );
1401 }
1402
1403 #[test]
1404 fn test_assert_current_schema_id() {
1405 test_serde_json(
1406 r#"
1407{
1408 "type": "assert-current-schema-id",
1409 "current-schema-id": 4
1410}
1411 "#,
1412 TableRequirement::CurrentSchemaIdMatch {
1413 current_schema_id: 4,
1414 },
1415 );
1416 }
1417
1418 #[test]
1419 fn test_assert_last_assigned_partition_id() {
1420 test_serde_json(
1421 r#"
1422{
1423 "type": "assert-last-assigned-partition-id",
1424 "last-assigned-partition-id": 1004
1425}
1426 "#,
1427 TableRequirement::LastAssignedPartitionIdMatch {
1428 last_assigned_partition_id: 1004,
1429 },
1430 );
1431 }
1432
1433 #[test]
1434 fn test_assert_default_spec_id() {
1435 test_serde_json(
1436 r#"
1437{
1438 "type": "assert-default-spec-id",
1439 "default-spec-id": 5
1440}
1441 "#,
1442 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1443 );
1444 }
1445
1446 #[test]
1447 fn test_assert_default_sort_order() {
1448 let json = r#"
1449{
1450 "type": "assert-default-sort-order-id",
1451 "default-sort-order-id": 10
1452}
1453 "#;
1454
1455 let update = TableRequirement::DefaultSortOrderIdMatch {
1456 default_sort_order_id: 10,
1457 };
1458
1459 test_serde_json(json, update);
1460 }
1461
1462 #[test]
1463 fn test_parse_assert_invalid() {
1464 assert!(
1465 serde_json::from_str::<TableRequirement>(
1466 r#"
1467{
1468 "default-sort-order-id": 10
1469}
1470"#
1471 )
1472 .is_err(),
1473 "Table requirements should not be parsed without type."
1474 );
1475 }
1476
1477 #[test]
1478 fn test_assign_uuid() {
1479 test_serde_json(
1480 r#"
1481{
1482 "action": "assign-uuid",
1483 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1484}
1485 "#,
1486 TableUpdate::AssignUuid {
1487 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1488 },
1489 );
1490 }
1491
1492 #[test]
1493 fn test_upgrade_format_version() {
1494 test_serde_json(
1495 r#"
1496{
1497 "action": "upgrade-format-version",
1498 "format-version": 2
1499}
1500 "#,
1501 TableUpdate::UpgradeFormatVersion {
1502 format_version: FormatVersion::V2,
1503 },
1504 );
1505 }
1506
1507 #[test]
1508 fn test_add_schema() {
1509 let test_schema = Schema::builder()
1510 .with_schema_id(1)
1511 .with_identifier_field_ids(vec![2])
1512 .with_fields(vec![
1513 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1514 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1515 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1516 ])
1517 .build()
1518 .unwrap();
1519 test_serde_json(
1520 r#"
1521{
1522 "action": "add-schema",
1523 "schema": {
1524 "type": "struct",
1525 "schema-id": 1,
1526 "fields": [
1527 {
1528 "id": 1,
1529 "name": "foo",
1530 "required": false,
1531 "type": "string"
1532 },
1533 {
1534 "id": 2,
1535 "name": "bar",
1536 "required": true,
1537 "type": "int"
1538 },
1539 {
1540 "id": 3,
1541 "name": "baz",
1542 "required": false,
1543 "type": "boolean"
1544 }
1545 ],
1546 "identifier-field-ids": [
1547 2
1548 ]
1549 },
1550 "last-column-id": 3
1551}
1552 "#,
1553 TableUpdate::AddSchema {
1554 schema: test_schema.clone(),
1555 },
1556 );
1557
1558 test_serde_json(
1559 r#"
1560{
1561 "action": "add-schema",
1562 "schema": {
1563 "type": "struct",
1564 "schema-id": 1,
1565 "fields": [
1566 {
1567 "id": 1,
1568 "name": "foo",
1569 "required": false,
1570 "type": "string"
1571 },
1572 {
1573 "id": 2,
1574 "name": "bar",
1575 "required": true,
1576 "type": "int"
1577 },
1578 {
1579 "id": 3,
1580 "name": "baz",
1581 "required": false,
1582 "type": "boolean"
1583 }
1584 ],
1585 "identifier-field-ids": [
1586 2
1587 ]
1588 }
1589}
1590 "#,
1591 TableUpdate::AddSchema {
1592 schema: test_schema.clone(),
1593 },
1594 );
1595 }
1596
1597 #[test]
1598 fn test_set_current_schema() {
1599 test_serde_json(
1600 r#"
1601{
1602 "action": "set-current-schema",
1603 "schema-id": 23
1604}
1605 "#,
1606 TableUpdate::SetCurrentSchema { schema_id: 23 },
1607 );
1608 }
1609
1610 #[test]
1611 fn test_add_spec() {
1612 test_serde_json(
1613 r#"
1614{
1615 "action": "add-spec",
1616 "spec": {
1617 "fields": [
1618 {
1619 "source-id": 4,
1620 "name": "ts_day",
1621 "transform": "day"
1622 },
1623 {
1624 "source-id": 1,
1625 "name": "id_bucket",
1626 "transform": "bucket[16]"
1627 },
1628 {
1629 "source-id": 2,
1630 "name": "id_truncate",
1631 "transform": "truncate[4]"
1632 }
1633 ]
1634 }
1635}
1636 "#,
1637 TableUpdate::AddSpec {
1638 spec: UnboundPartitionSpec::builder()
1639 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1640 .unwrap()
1641 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1642 .unwrap()
1643 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1644 .unwrap()
1645 .build(),
1646 },
1647 );
1648 }
1649
1650 #[test]
1651 fn test_set_default_spec() {
1652 test_serde_json(
1653 r#"
1654{
1655 "action": "set-default-spec",
1656 "spec-id": 1
1657}
1658 "#,
1659 TableUpdate::SetDefaultSpec { spec_id: 1 },
1660 )
1661 }
1662
1663 #[test]
1664 fn test_add_sort_order() {
1665 let json = r#"
1666{
1667 "action": "add-sort-order",
1668 "sort-order": {
1669 "order-id": 1,
1670 "fields": [
1671 {
1672 "transform": "identity",
1673 "source-id": 2,
1674 "direction": "asc",
1675 "null-order": "nulls-first"
1676 },
1677 {
1678 "transform": "bucket[4]",
1679 "source-id": 3,
1680 "direction": "desc",
1681 "null-order": "nulls-last"
1682 }
1683 ]
1684 }
1685}
1686 "#;
1687
1688 let update = TableUpdate::AddSortOrder {
1689 sort_order: SortOrder::builder()
1690 .with_order_id(1)
1691 .with_sort_field(
1692 SortField::builder()
1693 .source_id(2)
1694 .direction(SortDirection::Ascending)
1695 .null_order(NullOrder::First)
1696 .transform(Transform::Identity)
1697 .build(),
1698 )
1699 .with_sort_field(
1700 SortField::builder()
1701 .source_id(3)
1702 .direction(SortDirection::Descending)
1703 .null_order(NullOrder::Last)
1704 .transform(Transform::Bucket(4))
1705 .build(),
1706 )
1707 .build_unbound()
1708 .unwrap(),
1709 };
1710
1711 test_serde_json(json, update);
1712 }
1713
1714 #[test]
1715 fn test_set_default_order() {
1716 let json = r#"
1717{
1718 "action": "set-default-sort-order",
1719 "sort-order-id": 2
1720}
1721 "#;
1722 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1723
1724 test_serde_json(json, update);
1725 }
1726
1727 #[test]
1728 fn test_add_snapshot() {
1729 let json = r#"
1730{
1731 "action": "add-snapshot",
1732 "snapshot": {
1733 "snapshot-id": 3055729675574597000,
1734 "parent-snapshot-id": 3051729675574597000,
1735 "timestamp-ms": 1555100955770,
1736 "sequence-number": 1,
1737 "summary": {
1738 "operation": "append"
1739 },
1740 "manifest-list": "s3://a/b/2.avro",
1741 "schema-id": 1
1742 }
1743}
1744 "#;
1745
1746 let update = TableUpdate::AddSnapshot {
1747 snapshot: Snapshot::builder()
1748 .with_snapshot_id(3055729675574597000)
1749 .with_parent_snapshot_id(Some(3051729675574597000))
1750 .with_timestamp_ms(1555100955770)
1751 .with_sequence_number(1)
1752 .with_manifest_list("s3://a/b/2.avro")
1753 .with_schema_id(1)
1754 .with_summary(Summary {
1755 operation: Operation::Append,
1756 additional_properties: HashMap::default(),
1757 })
1758 .build(),
1759 };
1760
1761 test_serde_json(json, update);
1762 }
1763
1764 #[test]
1765 fn test_add_snapshot_v1() {
1766 let json = r#"
1767{
1768 "action": "add-snapshot",
1769 "snapshot": {
1770 "snapshot-id": 3055729675574597000,
1771 "parent-snapshot-id": 3051729675574597000,
1772 "timestamp-ms": 1555100955770,
1773 "summary": {
1774 "operation": "append"
1775 },
1776 "manifest-list": "s3://a/b/2.avro"
1777 }
1778}
1779 "#;
1780
1781 let update = TableUpdate::AddSnapshot {
1782 snapshot: Snapshot::builder()
1783 .with_snapshot_id(3055729675574597000)
1784 .with_parent_snapshot_id(Some(3051729675574597000))
1785 .with_timestamp_ms(1555100955770)
1786 .with_sequence_number(0)
1787 .with_manifest_list("s3://a/b/2.avro")
1788 .with_summary(Summary {
1789 operation: Operation::Append,
1790 additional_properties: HashMap::default(),
1791 })
1792 .build(),
1793 };
1794
1795 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1796 assert_eq!(actual, update, "Parsed value is not equal to expected");
1797 }
1798
1799 #[test]
1800 fn test_add_snapshot_v3() {
1801 let json = serde_json::json!(
1802 {
1803 "action": "add-snapshot",
1804 "snapshot": {
1805 "snapshot-id": 3055729675574597000i64,
1806 "parent-snapshot-id": 3051729675574597000i64,
1807 "timestamp-ms": 1555100955770i64,
1808 "first-row-id":0,
1809 "added-rows":2,
1810 "key-id":"key123",
1811 "summary": {
1812 "operation": "append"
1813 },
1814 "manifest-list": "s3://a/b/2.avro"
1815 }
1816 });
1817
1818 let update = TableUpdate::AddSnapshot {
1819 snapshot: Snapshot::builder()
1820 .with_snapshot_id(3055729675574597000)
1821 .with_parent_snapshot_id(Some(3051729675574597000))
1822 .with_timestamp_ms(1555100955770)
1823 .with_sequence_number(0)
1824 .with_manifest_list("s3://a/b/2.avro")
1825 .with_row_range(0, 2)
1826 .with_encryption_key_id(Some("key123".to_string()))
1827 .with_summary(Summary {
1828 operation: Operation::Append,
1829 additional_properties: HashMap::default(),
1830 })
1831 .build(),
1832 };
1833
1834 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1835 assert_eq!(actual, update, "Parsed value is not equal to expected");
1836 let restored: TableUpdate = serde_json::from_str(
1837 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1838 )
1839 .expect("Failed to parse from serialized json");
1840 assert_eq!(restored, update);
1841 }
1842
1843 #[test]
1844 fn test_remove_snapshots() {
1845 let json = r#"
1846{
1847 "action": "remove-snapshots",
1848 "snapshot-ids": [
1849 1,
1850 2
1851 ]
1852}
1853 "#;
1854
1855 let update = TableUpdate::RemoveSnapshots {
1856 snapshot_ids: vec![1, 2],
1857 };
1858 test_serde_json(json, update);
1859 }
1860
1861 #[test]
1862 fn test_remove_snapshot_ref() {
1863 let json = r#"
1864{
1865 "action": "remove-snapshot-ref",
1866 "ref-name": "snapshot-ref"
1867}
1868 "#;
1869
1870 let update = TableUpdate::RemoveSnapshotRef {
1871 ref_name: "snapshot-ref".to_string(),
1872 };
1873 test_serde_json(json, update);
1874 }
1875
1876 #[test]
1877 fn test_set_snapshot_ref_tag() {
1878 let json = r#"
1879{
1880 "action": "set-snapshot-ref",
1881 "type": "tag",
1882 "ref-name": "hank",
1883 "snapshot-id": 1,
1884 "max-ref-age-ms": 1
1885}
1886 "#;
1887
1888 let update = TableUpdate::SetSnapshotRef {
1889 ref_name: "hank".to_string(),
1890 reference: SnapshotReference {
1891 snapshot_id: 1,
1892 retention: SnapshotRetention::Tag {
1893 max_ref_age_ms: Some(1),
1894 },
1895 },
1896 };
1897
1898 test_serde_json(json, update);
1899 }
1900
1901 #[test]
1902 fn test_set_snapshot_ref_branch() {
1903 let json = r#"
1904{
1905 "action": "set-snapshot-ref",
1906 "type": "branch",
1907 "ref-name": "hank",
1908 "snapshot-id": 1,
1909 "min-snapshots-to-keep": 2,
1910 "max-snapshot-age-ms": 3,
1911 "max-ref-age-ms": 4
1912}
1913 "#;
1914
1915 let update = TableUpdate::SetSnapshotRef {
1916 ref_name: "hank".to_string(),
1917 reference: SnapshotReference {
1918 snapshot_id: 1,
1919 retention: SnapshotRetention::Branch {
1920 min_snapshots_to_keep: Some(2),
1921 max_snapshot_age_ms: Some(3),
1922 max_ref_age_ms: Some(4),
1923 },
1924 },
1925 };
1926
1927 test_serde_json(json, update);
1928 }
1929
1930 #[test]
1931 fn test_set_properties() {
1932 let json = r#"
1933{
1934 "action": "set-properties",
1935 "updates": {
1936 "prop1": "v1",
1937 "prop2": "v2"
1938 }
1939}
1940 "#;
1941
1942 let update = TableUpdate::SetProperties {
1943 updates: vec![
1944 ("prop1".to_string(), "v1".to_string()),
1945 ("prop2".to_string(), "v2".to_string()),
1946 ]
1947 .into_iter()
1948 .collect(),
1949 };
1950
1951 test_serde_json(json, update);
1952 }
1953
1954 #[test]
1955 fn test_remove_properties() {
1956 let json = r#"
1957{
1958 "action": "remove-properties",
1959 "removals": [
1960 "prop1",
1961 "prop2"
1962 ]
1963}
1964 "#;
1965
1966 let update = TableUpdate::RemoveProperties {
1967 removals: vec!["prop1".to_string(), "prop2".to_string()],
1968 };
1969
1970 test_serde_json(json, update);
1971 }
1972
1973 #[test]
1974 fn test_set_location() {
1975 let json = r#"
1976{
1977 "action": "set-location",
1978 "location": "s3://bucket/warehouse/tbl_location"
1979}
1980 "#;
1981
1982 let update = TableUpdate::SetLocation {
1983 location: "s3://bucket/warehouse/tbl_location".to_string(),
1984 };
1985
1986 test_serde_json(json, update);
1987 }
1988
1989 #[test]
1990 fn test_table_update_apply() {
1991 let table_creation = TableCreation::builder()
1992 .location("s3://db/table".to_string())
1993 .name("table".to_string())
1994 .properties(HashMap::new())
1995 .schema(Schema::builder().build().unwrap())
1996 .build();
1997 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1998 .unwrap()
1999 .build()
2000 .unwrap()
2001 .metadata;
2002 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
2003 table_metadata,
2004 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
2005 );
2006
2007 let uuid = uuid::Uuid::new_v4();
2008 let update = TableUpdate::AssignUuid { uuid };
2009 let updated_metadata = update
2010 .apply(table_metadata_builder)
2011 .unwrap()
2012 .build()
2013 .unwrap()
2014 .metadata;
2015 assert_eq!(updated_metadata.uuid(), uuid);
2016 }
2017
2018 #[test]
2019 fn test_view_assign_uuid() {
2020 test_serde_json(
2021 r#"
2022{
2023 "action": "assign-uuid",
2024 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2025}
2026 "#,
2027 ViewUpdate::AssignUuid {
2028 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2029 },
2030 );
2031 }
2032
2033 #[test]
2034 fn test_view_upgrade_format_version() {
2035 test_serde_json(
2036 r#"
2037{
2038 "action": "upgrade-format-version",
2039 "format-version": 1
2040}
2041 "#,
2042 ViewUpdate::UpgradeFormatVersion {
2043 format_version: ViewFormatVersion::V1,
2044 },
2045 );
2046 }
2047
2048 #[test]
2049 fn test_view_add_schema() {
2050 let test_schema = Schema::builder()
2051 .with_schema_id(1)
2052 .with_identifier_field_ids(vec![2])
2053 .with_fields(vec![
2054 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2055 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2056 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2057 ])
2058 .build()
2059 .unwrap();
2060 test_serde_json(
2061 r#"
2062{
2063 "action": "add-schema",
2064 "schema": {
2065 "type": "struct",
2066 "schema-id": 1,
2067 "fields": [
2068 {
2069 "id": 1,
2070 "name": "foo",
2071 "required": false,
2072 "type": "string"
2073 },
2074 {
2075 "id": 2,
2076 "name": "bar",
2077 "required": true,
2078 "type": "int"
2079 },
2080 {
2081 "id": 3,
2082 "name": "baz",
2083 "required": false,
2084 "type": "boolean"
2085 }
2086 ],
2087 "identifier-field-ids": [
2088 2
2089 ]
2090 },
2091 "last-column-id": 3
2092}
2093 "#,
2094 ViewUpdate::AddSchema {
2095 schema: test_schema.clone(),
2096 last_column_id: Some(3),
2097 },
2098 );
2099 }
2100
2101 #[test]
2102 fn test_view_set_location() {
2103 test_serde_json(
2104 r#"
2105{
2106 "action": "set-location",
2107 "location": "s3://db/view"
2108}
2109 "#,
2110 ViewUpdate::SetLocation {
2111 location: "s3://db/view".to_string(),
2112 },
2113 );
2114 }
2115
2116 #[test]
2117 fn test_view_set_properties() {
2118 test_serde_json(
2119 r#"
2120{
2121 "action": "set-properties",
2122 "updates": {
2123 "prop1": "v1",
2124 "prop2": "v2"
2125 }
2126}
2127 "#,
2128 ViewUpdate::SetProperties {
2129 updates: vec![
2130 ("prop1".to_string(), "v1".to_string()),
2131 ("prop2".to_string(), "v2".to_string()),
2132 ]
2133 .into_iter()
2134 .collect(),
2135 },
2136 );
2137 }
2138
2139 #[test]
2140 fn test_view_remove_properties() {
2141 test_serde_json(
2142 r#"
2143{
2144 "action": "remove-properties",
2145 "removals": [
2146 "prop1",
2147 "prop2"
2148 ]
2149}
2150 "#,
2151 ViewUpdate::RemoveProperties {
2152 removals: vec!["prop1".to_string(), "prop2".to_string()],
2153 },
2154 );
2155 }
2156
2157 #[test]
2158 fn test_view_add_view_version() {
2159 test_serde_json(
2160 r#"
2161{
2162 "action": "add-view-version",
2163 "view-version": {
2164 "version-id" : 1,
2165 "timestamp-ms" : 1573518431292,
2166 "schema-id" : 1,
2167 "default-catalog" : "prod",
2168 "default-namespace" : [ "default" ],
2169 "summary" : {
2170 "engine-name" : "Spark"
2171 },
2172 "representations" : [ {
2173 "type" : "sql",
2174 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2175 "dialect" : "spark"
2176 } ]
2177 }
2178}
2179 "#,
2180 ViewUpdate::AddViewVersion {
2181 view_version: ViewVersion::builder()
2182 .with_version_id(1)
2183 .with_timestamp_ms(1573518431292)
2184 .with_schema_id(1)
2185 .with_default_catalog(Some("prod".to_string()))
2186 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2187 .with_summary(
2188 vec![("engine-name".to_string(), "Spark".to_string())]
2189 .into_iter()
2190 .collect(),
2191 )
2192 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2193 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2194 dialect: "spark".to_string(),
2195 })]))
2196 .build(),
2197 },
2198 );
2199 }
2200
2201 #[test]
2202 fn test_view_set_current_view_version() {
2203 test_serde_json(
2204 r#"
2205{
2206 "action": "set-current-view-version",
2207 "view-version-id": 1
2208}
2209 "#,
2210 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2211 );
2212 }
2213
2214 #[test]
2215 fn test_remove_partition_specs_update() {
2216 test_serde_json(
2217 r#"
2218{
2219 "action": "remove-partition-specs",
2220 "spec-ids": [1, 2]
2221}
2222 "#,
2223 TableUpdate::RemovePartitionSpecs {
2224 spec_ids: vec![1, 2],
2225 },
2226 );
2227 }
2228
2229 #[test]
2230 fn test_set_statistics_file() {
2231 test_serde_json(
2232 r#"
2233 {
2234 "action": "set-statistics",
2235 "snapshot-id": 1940541653261589030,
2236 "statistics": {
2237 "snapshot-id": 1940541653261589030,
2238 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2239 "file-size-in-bytes": 124,
2240 "file-footer-size-in-bytes": 27,
2241 "blob-metadata": [
2242 {
2243 "type": "boring-type",
2244 "snapshot-id": 1940541653261589030,
2245 "sequence-number": 2,
2246 "fields": [
2247 1
2248 ],
2249 "properties": {
2250 "prop-key": "prop-value"
2251 }
2252 }
2253 ]
2254 }
2255 }
2256 "#,
2257 TableUpdate::SetStatistics {
2258 statistics: StatisticsFile {
2259 snapshot_id: 1940541653261589030,
2260 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2261 file_size_in_bytes: 124,
2262 file_footer_size_in_bytes: 27,
2263 key_metadata: None,
2264 blob_metadata: vec![BlobMetadata {
2265 r#type: "boring-type".to_string(),
2266 snapshot_id: 1940541653261589030,
2267 sequence_number: 2,
2268 fields: vec![1],
2269 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2270 .into_iter()
2271 .collect(),
2272 }],
2273 },
2274 },
2275 );
2276 }
2277
2278 #[test]
2279 fn test_remove_statistics_file() {
2280 test_serde_json(
2281 r#"
2282 {
2283 "action": "remove-statistics",
2284 "snapshot-id": 1940541653261589030
2285 }
2286 "#,
2287 TableUpdate::RemoveStatistics {
2288 snapshot_id: 1940541653261589030,
2289 },
2290 );
2291 }
2292
2293 #[test]
2294 fn test_set_partition_statistics_file() {
2295 test_serde_json(
2296 r#"
2297 {
2298 "action": "set-partition-statistics",
2299 "partition-statistics": {
2300 "snapshot-id": 1940541653261589030,
2301 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2302 "file-size-in-bytes": 43
2303 }
2304 }
2305 "#,
2306 TableUpdate::SetPartitionStatistics {
2307 partition_statistics: PartitionStatisticsFile {
2308 snapshot_id: 1940541653261589030,
2309 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2310 file_size_in_bytes: 43,
2311 },
2312 },
2313 )
2314 }
2315
2316 #[test]
2317 fn test_remove_partition_statistics_file() {
2318 test_serde_json(
2319 r#"
2320 {
2321 "action": "remove-partition-statistics",
2322 "snapshot-id": 1940541653261589030
2323 }
2324 "#,
2325 TableUpdate::RemovePartitionStatistics {
2326 snapshot_id: 1940541653261589030,
2327 },
2328 )
2329 }
2330
2331 #[test]
2332 fn test_remove_schema_update() {
2333 test_serde_json(
2334 r#"
2335 {
2336 "action": "remove-schemas",
2337 "schema-ids": [1, 2]
2338 }
2339 "#,
2340 TableUpdate::RemoveSchemas {
2341 schema_ids: vec![1, 2],
2342 },
2343 );
2344 }
2345
2346 #[test]
2347 fn test_add_encryption_key() {
2348 let key_bytes = "key".as_bytes();
2349 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2350 test_serde_json(
2351 format!(
2352 r#"
2353 {{
2354 "action": "add-encryption-key",
2355 "encryption-key": {{
2356 "key-id": "a",
2357 "encrypted-key-metadata": "{encoded_key}",
2358 "encrypted-by-id": "b"
2359 }}
2360 }}
2361 "#
2362 ),
2363 TableUpdate::AddEncryptionKey {
2364 encryption_key: EncryptedKey::builder()
2365 .key_id("a")
2366 .encrypted_key_metadata(key_bytes.to_vec())
2367 .encrypted_by_id("b")
2368 .build(),
2369 },
2370 );
2371 }
2372
2373 #[test]
2374 fn test_remove_encryption_key() {
2375 test_serde_json(
2376 r#"
2377 {
2378 "action": "remove-encryption-key",
2379 "key-id": "a"
2380 }
2381 "#,
2382 TableUpdate::RemoveEncryptionKey {
2383 key_id: "a".to_string(),
2384 },
2385 );
2386 }
2387
2388 #[test]
2389 fn test_table_commit() {
2390 let table = {
2391 let file = File::open(format!(
2392 "{}/testdata/table_metadata/{}",
2393 env!("CARGO_MANIFEST_DIR"),
2394 "TableMetadataV2Valid.json"
2395 ))
2396 .unwrap();
2397 let reader = BufReader::new(file);
2398 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2399
2400 Table::builder()
2401 .metadata(resp)
2402 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json")
2403 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2404 .file_io(FileIO::new_with_memory())
2405 .runtime(test_runtime())
2406 .build()
2407 .unwrap()
2408 };
2409
2410 let updates = vec![
2411 TableUpdate::SetLocation {
2412 location: "s3://bucket/test/new_location/data".to_string(),
2413 },
2414 TableUpdate::SetProperties {
2415 updates: vec![
2416 ("prop1".to_string(), "v1".to_string()),
2417 ("prop2".to_string(), "v2".to_string()),
2418 ]
2419 .into_iter()
2420 .collect(),
2421 },
2422 ];
2423
2424 let requirements = vec![TableRequirement::UuidMatch {
2425 uuid: table.metadata().table_uuid,
2426 }];
2427
2428 let table_commit = TableCommit::builder()
2429 .ident(table.identifier().to_owned())
2430 .updates(updates)
2431 .requirements(requirements)
2432 .build();
2433
2434 let updated_table = table_commit.apply(table).unwrap();
2435
2436 assert_eq!(
2437 updated_table.metadata().properties.get("prop1").unwrap(),
2438 "v1"
2439 );
2440 assert_eq!(
2441 updated_table.metadata().properties.get("prop2").unwrap(),
2442 "v2"
2443 );
2444
2445 assert!(
2447 updated_table
2448 .metadata_location()
2449 .unwrap()
2450 .starts_with("s3://bucket/test/location/metadata/00001-")
2451 );
2452
2453 assert_eq!(
2454 updated_table.metadata().location,
2455 "s3://bucket/test/new_location/data",
2456 );
2457 }
2458}