1pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::spec::{
44 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
45 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
46 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
47};
48use crate::table::Table;
49use crate::{Error, ErrorKind, Result};
50
51#[async_trait]
53#[cfg_attr(test, automock)]
54pub trait Catalog: Debug + Sync + Send {
55 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
57 -> Result<Vec<NamespaceIdent>>;
58
59 async fn create_namespace(
61 &self,
62 namespace: &NamespaceIdent,
63 properties: HashMap<String, String>,
64 ) -> Result<Namespace>;
65
66 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
68
69 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
71
72 async fn update_namespace(
78 &self,
79 namespace: &NamespaceIdent,
80 properties: HashMap<String, String>,
81 ) -> Result<()>;
82
83 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
85
86 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
88
89 async fn create_table(
91 &self,
92 namespace: &NamespaceIdent,
93 creation: TableCreation,
94 ) -> Result<Table>;
95
96 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
98
99 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
101
102 async fn purge_table(&self, table: &TableIdent) -> Result<()>;
109
110 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
112
113 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
115
116 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
118
119 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
121}
122
123pub trait CatalogBuilder: Default + Debug + Send + Sync {
125 type C: Catalog;
127
128 fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
153
154 fn load(
156 self,
157 name: impl Into<String>,
158 props: HashMap<String, String>,
159 ) -> impl Future<Output = Result<Self::C>> + Send;
160}
161
162#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
168pub struct NamespaceIdent(Vec<String>);
169
170impl NamespaceIdent {
171 pub fn new(name: String) -> Self {
173 Self(vec![name])
174 }
175
176 pub fn from_vec(names: Vec<String>) -> Result<Self> {
178 if names.is_empty() {
179 return Err(Error::new(
180 ErrorKind::DataInvalid,
181 "Namespace identifier can't be empty!",
182 ));
183 }
184 Ok(Self(names))
185 }
186
187 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
189 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
190 }
191
192 pub fn to_url_string(&self) -> String {
194 self.as_ref().join("\u{001f}")
195 }
196
197 pub fn inner(self) -> Vec<String> {
199 self.0
200 }
201
202 pub fn parent(&self) -> Option<Self> {
205 self.0.split_last().and_then(|(_, parent)| {
206 if parent.is_empty() {
207 None
208 } else {
209 Some(Self(parent.to_vec()))
210 }
211 })
212 }
213}
214
215impl AsRef<Vec<String>> for NamespaceIdent {
216 fn as_ref(&self) -> &Vec<String> {
217 &self.0
218 }
219}
220
221impl Deref for NamespaceIdent {
222 type Target = [String];
223
224 fn deref(&self) -> &Self::Target {
225 &self.0
226 }
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
231pub struct Namespace {
232 name: NamespaceIdent,
233 properties: HashMap<String, String>,
234}
235
236impl Namespace {
237 pub fn new(name: NamespaceIdent) -> Self {
239 Self::with_properties(name, HashMap::default())
240 }
241
242 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
244 Self { name, properties }
245 }
246
247 pub fn name(&self) -> &NamespaceIdent {
249 &self.name
250 }
251
252 pub fn properties(&self) -> &HashMap<String, String> {
254 &self.properties
255 }
256}
257
258impl Display for NamespaceIdent {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(f, "{}", self.0.join("."))
261 }
262}
263
264#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
266pub struct TableIdent {
267 pub namespace: NamespaceIdent,
269 pub name: String,
271}
272
273impl TableIdent {
274 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
276 Self { namespace, name }
277 }
278
279 pub fn namespace(&self) -> &NamespaceIdent {
281 &self.namespace
282 }
283
284 pub fn name(&self) -> &str {
286 &self.name
287 }
288
289 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
291 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
292 let table_name = vec.pop().ok_or_else(|| {
293 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
294 })?;
295 let namespace_ident = NamespaceIdent::from_vec(vec)?;
296
297 Ok(Self {
298 namespace: namespace_ident,
299 name: table_name,
300 })
301 }
302}
303
304impl Display for TableIdent {
305 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306 write!(f, "{}.{}", self.namespace, self.name)
307 }
308}
309
310#[derive(Debug, TypedBuilder)]
312pub struct TableCreation {
313 pub name: String,
315 #[builder(default, setter(strip_option(fallback = location_opt)))]
317 pub location: Option<String>,
318 pub schema: Schema,
320 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
322 pub partition_spec: Option<UnboundPartitionSpec>,
323 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
325 pub sort_order: Option<SortOrder>,
326 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
328 props.into_iter().collect()
329 }))]
330 pub properties: HashMap<String, String>,
331 #[builder(default = FormatVersion::V2)]
333 pub format_version: FormatVersion,
334}
335
336#[derive(Debug, TypedBuilder)]
342#[builder(build_method(vis = "pub(crate)"))]
343pub struct TableCommit {
344 ident: TableIdent,
346 requirements: Vec<TableRequirement>,
350 updates: Vec<TableUpdate>,
352}
353
354impl TableCommit {
355 pub fn identifier(&self) -> &TableIdent {
357 &self.ident
358 }
359
360 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
362 take(&mut self.requirements)
363 }
364
365 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
367 take(&mut self.updates)
368 }
369
370 pub fn apply(self, table: Table) -> Result<Table> {
376 for requirement in self.requirements {
378 requirement.check(Some(table.metadata()))?;
379 }
380
381 let current_metadata_location = table.metadata_location_result()?;
383
384 let mut metadata_builder = table
386 .metadata()
387 .clone()
388 .into_builder(Some(current_metadata_location.to_string()));
389 for update in self.updates {
390 metadata_builder = update.apply(metadata_builder)?;
391 }
392
393 let new_metadata = metadata_builder.build()?.metadata;
395
396 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
397 .with_next_version()
398 .with_new_metadata(&new_metadata)
399 .to_string();
400
401 Ok(table
402 .with_metadata(Arc::new(new_metadata))
403 .with_metadata_location(new_metadata_location))
404 }
405}
406
407#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
409#[serde(tag = "type")]
410pub enum TableRequirement {
411 #[serde(rename = "assert-create")]
413 NotExist,
414 #[serde(rename = "assert-table-uuid")]
416 UuidMatch {
417 uuid: Uuid,
419 },
420 #[serde(rename = "assert-ref-snapshot-id")]
423 RefSnapshotIdMatch {
424 r#ref: String,
426 #[serde(rename = "snapshot-id")]
429 snapshot_id: Option<i64>,
430 },
431 #[serde(rename = "assert-last-assigned-field-id")]
433 LastAssignedFieldIdMatch {
434 #[serde(rename = "last-assigned-field-id")]
436 last_assigned_field_id: i32,
437 },
438 #[serde(rename = "assert-current-schema-id")]
440 CurrentSchemaIdMatch {
441 #[serde(rename = "current-schema-id")]
443 current_schema_id: SchemaId,
444 },
445 #[serde(rename = "assert-last-assigned-partition-id")]
448 LastAssignedPartitionIdMatch {
449 #[serde(rename = "last-assigned-partition-id")]
451 last_assigned_partition_id: i32,
452 },
453 #[serde(rename = "assert-default-spec-id")]
455 DefaultSpecIdMatch {
456 #[serde(rename = "default-spec-id")]
458 default_spec_id: i32,
459 },
460 #[serde(rename = "assert-default-sort-order-id")]
462 DefaultSortOrderIdMatch {
463 #[serde(rename = "default-sort-order-id")]
465 default_sort_order_id: i64,
466 },
467}
468
469#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
471#[serde(tag = "action", rename_all = "kebab-case")]
472#[allow(clippy::large_enum_variant)]
473pub enum TableUpdate {
474 #[serde(rename_all = "kebab-case")]
476 UpgradeFormatVersion {
477 format_version: FormatVersion,
479 },
480 #[serde(rename_all = "kebab-case")]
482 AssignUuid {
483 uuid: Uuid,
485 },
486 #[serde(rename_all = "kebab-case")]
488 AddSchema {
489 schema: Schema,
491 },
492 #[serde(rename_all = "kebab-case")]
494 SetCurrentSchema {
495 schema_id: i32,
497 },
498 AddSpec {
500 spec: UnboundPartitionSpec,
502 },
503 #[serde(rename_all = "kebab-case")]
505 SetDefaultSpec {
506 spec_id: i32,
508 },
509 #[serde(rename_all = "kebab-case")]
511 AddSortOrder {
512 sort_order: SortOrder,
514 },
515 #[serde(rename_all = "kebab-case")]
517 SetDefaultSortOrder {
518 sort_order_id: i64,
520 },
521 #[serde(rename_all = "kebab-case")]
523 AddSnapshot {
524 #[serde(
526 deserialize_with = "deserialize_snapshot",
527 serialize_with = "serialize_snapshot"
528 )]
529 snapshot: Snapshot,
530 },
531 #[serde(rename_all = "kebab-case")]
533 SetSnapshotRef {
534 ref_name: String,
536 #[serde(flatten)]
538 reference: SnapshotReference,
539 },
540 #[serde(rename_all = "kebab-case")]
542 RemoveSnapshots {
543 snapshot_ids: Vec<i64>,
545 },
546 #[serde(rename_all = "kebab-case")]
548 RemoveSnapshotRef {
549 ref_name: String,
551 },
552 SetLocation {
554 location: String,
556 },
557 SetProperties {
559 updates: HashMap<String, String>,
561 },
562 RemoveProperties {
564 removals: Vec<String>,
566 },
567 #[serde(rename_all = "kebab-case")]
569 RemovePartitionSpecs {
570 spec_ids: Vec<i32>,
572 },
573 #[serde(with = "_serde_set_statistics")]
575 SetStatistics {
576 statistics: StatisticsFile,
578 },
579 #[serde(rename_all = "kebab-case")]
581 RemoveStatistics {
582 snapshot_id: i64,
584 },
585 #[serde(rename_all = "kebab-case")]
587 SetPartitionStatistics {
588 partition_statistics: PartitionStatisticsFile,
590 },
591 #[serde(rename_all = "kebab-case")]
593 RemovePartitionStatistics {
594 snapshot_id: i64,
596 },
597 #[serde(rename_all = "kebab-case")]
599 RemoveSchemas {
600 schema_ids: Vec<i32>,
602 },
603 #[serde(rename_all = "kebab-case")]
605 AddEncryptionKey {
606 encryption_key: EncryptedKey,
608 },
609 #[serde(rename_all = "kebab-case")]
611 RemoveEncryptionKey {
612 key_id: String,
614 },
615}
616
617impl TableUpdate {
618 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
620 match self {
621 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
622 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
623 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
624 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
625 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
626 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
627 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
628 builder.set_default_sort_order(sort_order_id)
629 }
630 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
631 TableUpdate::SetSnapshotRef {
632 ref_name,
633 reference,
634 } => builder.set_ref(&ref_name, reference),
635 TableUpdate::RemoveSnapshots { snapshot_ids } => {
636 Ok(builder.remove_snapshots(&snapshot_ids))
637 }
638 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
639 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
640 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
641 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
642 TableUpdate::UpgradeFormatVersion { format_version } => {
643 builder.upgrade_format_version(format_version)
644 }
645 TableUpdate::RemovePartitionSpecs { spec_ids } => {
646 builder.remove_partition_specs(&spec_ids)
647 }
648 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
649 TableUpdate::RemoveStatistics { snapshot_id } => {
650 Ok(builder.remove_statistics(snapshot_id))
651 }
652 TableUpdate::SetPartitionStatistics {
653 partition_statistics,
654 } => Ok(builder.set_partition_statistics(partition_statistics)),
655 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
656 Ok(builder.remove_partition_statistics(snapshot_id))
657 }
658 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
659 TableUpdate::AddEncryptionKey { encryption_key } => {
660 Ok(builder.add_encryption_key(encryption_key))
661 }
662 TableUpdate::RemoveEncryptionKey { key_id } => {
663 Ok(builder.remove_encryption_key(&key_id))
664 }
665 }
666 }
667}
668
669impl TableRequirement {
670 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
675 if let Some(metadata) = metadata {
676 match self {
677 TableRequirement::NotExist => {
678 return Err(Error::new(
679 ErrorKind::CatalogCommitConflicts,
680 format!(
681 "Requirement failed: Table with id {} already exists",
682 metadata.uuid()
683 ),
684 )
685 .with_retryable(true));
686 }
687 TableRequirement::UuidMatch { uuid } => {
688 if &metadata.uuid() != uuid {
689 return Err(Error::new(
690 ErrorKind::CatalogCommitConflicts,
691 "Requirement failed: Table UUID does not match",
692 )
693 .with_context("expected", *uuid)
694 .with_context("found", metadata.uuid())
695 .with_retryable(true));
696 }
697 }
698 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
699 if metadata.current_schema_id != *current_schema_id {
701 return Err(Error::new(
702 ErrorKind::CatalogCommitConflicts,
703 "Requirement failed: Current schema id does not match",
704 )
705 .with_context("expected", current_schema_id.to_string())
706 .with_context("found", metadata.current_schema_id.to_string())
707 .with_retryable(true));
708 }
709 }
710 TableRequirement::DefaultSortOrderIdMatch {
711 default_sort_order_id,
712 } => {
713 if metadata.default_sort_order().order_id != *default_sort_order_id {
714 return Err(Error::new(
715 ErrorKind::CatalogCommitConflicts,
716 "Requirement failed: Default sort order id does not match",
717 )
718 .with_context("expected", default_sort_order_id.to_string())
719 .with_context("found", metadata.default_sort_order().order_id.to_string())
720 .with_retryable(true));
721 }
722 }
723 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
724 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
725 if let Some(snapshot_id) = snapshot_id {
726 let snapshot_ref = snapshot_ref.ok_or(
727 Error::new(
728 ErrorKind::CatalogCommitConflicts,
729 format!("Requirement failed: Branch or tag `{ref}` not found"),
730 )
731 .with_retryable(true),
732 )?;
733 if snapshot_ref.snapshot_id() != *snapshot_id {
734 return Err(Error::new(
735 ErrorKind::CatalogCommitConflicts,
736 format!(
737 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
738 ),
739 )
740 .with_context("expected", snapshot_id.to_string())
741 .with_context("found", snapshot_ref.snapshot_id().to_string())
742 .with_retryable(true));
743 }
744 } else if snapshot_ref.is_some() {
745 return Err(Error::new(
747 ErrorKind::CatalogCommitConflicts,
748 format!("Requirement failed: Branch or tag `{ref}` already exists"),
749 )
750 .with_retryable(true));
751 }
752 }
753 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
754 if metadata.default_partition_spec_id() != *default_spec_id {
756 return Err(Error::new(
757 ErrorKind::CatalogCommitConflicts,
758 "Requirement failed: Default partition spec id does not match",
759 )
760 .with_context("expected", default_spec_id.to_string())
761 .with_context("found", metadata.default_partition_spec_id().to_string())
762 .with_retryable(true));
763 }
764 }
765 TableRequirement::LastAssignedPartitionIdMatch {
766 last_assigned_partition_id,
767 } => {
768 if metadata.last_partition_id != *last_assigned_partition_id {
769 return Err(Error::new(
770 ErrorKind::CatalogCommitConflicts,
771 "Requirement failed: Last assigned partition id does not match",
772 )
773 .with_context("expected", last_assigned_partition_id.to_string())
774 .with_context("found", metadata.last_partition_id.to_string())
775 .with_retryable(true));
776 }
777 }
778 TableRequirement::LastAssignedFieldIdMatch {
779 last_assigned_field_id,
780 } => {
781 if &metadata.last_column_id != last_assigned_field_id {
782 return Err(Error::new(
783 ErrorKind::CatalogCommitConflicts,
784 "Requirement failed: Last assigned field id does not match",
785 )
786 .with_context("expected", last_assigned_field_id.to_string())
787 .with_context("found", metadata.last_column_id.to_string())
788 .with_retryable(true));
789 }
790 }
791 };
792 } else {
793 match self {
794 TableRequirement::NotExist => {}
795 _ => {
796 return Err(Error::new(
797 ErrorKind::TableNotFound,
798 "Requirement failed: Table does not exist",
799 ));
800 }
801 }
802 }
803
804 Ok(())
805 }
806}
807
808pub(super) mod _serde {
809 use serde::{Deserialize as _, Deserializer, Serialize as _};
810
811 use super::*;
812 use crate::spec::{SchemaId, Summary};
813
814 pub(super) fn deserialize_snapshot<'de, D>(
815 deserializer: D,
816 ) -> std::result::Result<Snapshot, D::Error>
817 where D: Deserializer<'de> {
818 let buf = CatalogSnapshot::deserialize(deserializer)?;
819 Ok(buf.into())
820 }
821
822 pub(super) fn serialize_snapshot<S>(
823 snapshot: &Snapshot,
824 serializer: S,
825 ) -> std::result::Result<S::Ok, S::Error>
826 where
827 S: serde::Serializer,
828 {
829 let buf: CatalogSnapshot = snapshot.clone().into();
830 buf.serialize(serializer)
831 }
832
833 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
834 #[serde(rename_all = "kebab-case")]
835 struct CatalogSnapshot {
839 snapshot_id: i64,
840 #[serde(skip_serializing_if = "Option::is_none")]
841 parent_snapshot_id: Option<i64>,
842 #[serde(default)]
843 sequence_number: i64,
844 timestamp_ms: i64,
845 manifest_list: String,
846 summary: Summary,
847 #[serde(skip_serializing_if = "Option::is_none")]
848 schema_id: Option<SchemaId>,
849 #[serde(skip_serializing_if = "Option::is_none")]
850 first_row_id: Option<u64>,
851 #[serde(skip_serializing_if = "Option::is_none")]
852 added_rows: Option<u64>,
853 #[serde(skip_serializing_if = "Option::is_none")]
854 key_id: Option<String>,
855 }
856
857 impl From<CatalogSnapshot> for Snapshot {
858 fn from(snapshot: CatalogSnapshot) -> Self {
859 let CatalogSnapshot {
860 snapshot_id,
861 parent_snapshot_id,
862 sequence_number,
863 timestamp_ms,
864 manifest_list,
865 schema_id,
866 summary,
867 first_row_id,
868 added_rows,
869 key_id,
870 } = snapshot;
871 let builder = Snapshot::builder()
872 .with_snapshot_id(snapshot_id)
873 .with_parent_snapshot_id(parent_snapshot_id)
874 .with_sequence_number(sequence_number)
875 .with_timestamp_ms(timestamp_ms)
876 .with_manifest_list(manifest_list)
877 .with_summary(summary)
878 .with_encryption_key_id(key_id);
879 let row_range = first_row_id.zip(added_rows);
880 match (schema_id, row_range) {
881 (None, None) => builder.build(),
882 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
883 (None, Some((first_row_id, last_row_id))) => {
884 builder.with_row_range(first_row_id, last_row_id).build()
885 }
886 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
887 .with_schema_id(schema_id)
888 .with_row_range(first_row_id, last_row_id)
889 .build(),
890 }
891 }
892 }
893
894 impl From<Snapshot> for CatalogSnapshot {
895 fn from(snapshot: Snapshot) -> Self {
896 let first_row_id = snapshot.first_row_id();
897 let added_rows = snapshot.added_rows_count();
898 let Snapshot {
899 snapshot_id,
900 parent_snapshot_id,
901 sequence_number,
902 timestamp_ms,
903 manifest_list,
904 summary,
905 schema_id,
906 row_range: _,
907 encryption_key_id: key_id,
908 } = snapshot;
909 CatalogSnapshot {
910 snapshot_id,
911 parent_snapshot_id,
912 sequence_number,
913 timestamp_ms,
914 manifest_list,
915 summary,
916 schema_id,
917 first_row_id,
918 added_rows,
919 key_id,
920 }
921 }
922 }
923}
924
925#[derive(Debug, TypedBuilder)]
927pub struct ViewCreation {
928 pub name: String,
930 pub location: String,
932 pub representations: ViewRepresentations,
934 pub schema: Schema,
936 #[builder(default)]
938 pub properties: HashMap<String, String>,
939 pub default_namespace: NamespaceIdent,
941 #[builder(default)]
943 pub default_catalog: Option<String>,
944 #[builder(default)]
947 pub summary: HashMap<String, String>,
948}
949
950#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
952#[serde(tag = "action", rename_all = "kebab-case")]
953#[allow(clippy::large_enum_variant)]
954pub enum ViewUpdate {
955 #[serde(rename_all = "kebab-case")]
957 AssignUuid {
958 uuid: Uuid,
960 },
961 #[serde(rename_all = "kebab-case")]
963 UpgradeFormatVersion {
964 format_version: ViewFormatVersion,
966 },
967 #[serde(rename_all = "kebab-case")]
969 AddSchema {
970 schema: Schema,
972 last_column_id: Option<i32>,
974 },
975 #[serde(rename_all = "kebab-case")]
977 SetLocation {
978 location: String,
980 },
981 #[serde(rename_all = "kebab-case")]
985 SetProperties {
986 updates: HashMap<String, String>,
988 },
989 #[serde(rename_all = "kebab-case")]
991 RemoveProperties {
992 removals: Vec<String>,
994 },
995 #[serde(rename_all = "kebab-case")]
997 AddViewVersion {
998 view_version: ViewVersion,
1000 },
1001 #[serde(rename_all = "kebab-case")]
1003 SetCurrentViewVersion {
1004 view_version_id: i32,
1006 },
1007}
1008
1009mod _serde_set_statistics {
1010 use serde::{Deserialize, Deserializer, Serialize, Serializer};
1013
1014 use super::*;
1015
1016 #[derive(Debug, Serialize, Deserialize)]
1017 #[serde(rename_all = "kebab-case")]
1018 struct SetStatistics {
1019 snapshot_id: Option<i64>,
1020 statistics: StatisticsFile,
1021 }
1022
1023 pub fn serialize<S>(
1024 value: &StatisticsFile,
1025 serializer: S,
1026 ) -> std::result::Result<S::Ok, S::Error>
1027 where
1028 S: Serializer,
1029 {
1030 SetStatistics {
1031 snapshot_id: Some(value.snapshot_id),
1032 statistics: value.clone(),
1033 }
1034 .serialize(serializer)
1035 }
1036
1037 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1038 where D: Deserializer<'de> {
1039 let SetStatistics {
1040 snapshot_id,
1041 statistics,
1042 } = SetStatistics::deserialize(deserializer)?;
1043 if let Some(snapshot_id) = snapshot_id
1044 && snapshot_id != statistics.snapshot_id
1045 {
1046 return Err(serde::de::Error::custom(format!(
1047 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1048 statistics.snapshot_id
1049 )));
1050 }
1051
1052 Ok(statistics)
1053 }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use std::collections::HashMap;
1059 use std::fmt::Debug;
1060 use std::fs::File;
1061 use std::io::BufReader;
1062
1063 use base64::Engine as _;
1064 use serde::Serialize;
1065 use serde::de::DeserializeOwned;
1066 use uuid::uuid;
1067
1068 use super::ViewUpdate;
1069 use crate::io::FileIO;
1070 use crate::spec::{
1071 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1072 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1073 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1074 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1075 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1076 ViewVersion,
1077 };
1078 use crate::table::Table;
1079 use crate::{
1080 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1081 };
1082
1083 #[test]
1084 fn test_parent_namespace() {
1085 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1086 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1087 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1088
1089 assert_eq!(ns1.parent(), None);
1090 assert_eq!(ns2.parent(), Some(ns1.clone()));
1091 assert_eq!(ns3.parent(), Some(ns2.clone()));
1092 }
1093
1094 #[test]
1095 fn test_create_table_id() {
1096 let table_id = TableIdent {
1097 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1098 name: "t1".to_string(),
1099 };
1100
1101 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1102 }
1103
1104 #[test]
1105 fn test_table_creation_iterator_properties() {
1106 let builder = TableCreation::builder()
1107 .name("table".to_string())
1108 .schema(Schema::builder().build().unwrap());
1109
1110 fn s(k: &str, v: &str) -> (String, String) {
1111 (k.to_string(), v.to_string())
1112 }
1113
1114 let table_creation = builder
1115 .properties([s("key", "value"), s("foo", "bar")])
1116 .build();
1117
1118 assert_eq!(
1119 HashMap::from([s("key", "value"), s("foo", "bar")]),
1120 table_creation.properties
1121 );
1122 }
1123
1124 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1125 json: impl ToString,
1126 expected: T,
1127 ) {
1128 let json_str = json.to_string();
1129 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1130 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1131
1132 let restored: T = serde_json::from_str(
1133 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1134 )
1135 .expect("Failed to parse from serialized json");
1136
1137 assert_eq!(
1138 restored, expected,
1139 "Parsed restored value is not equal to expected"
1140 );
1141 }
1142
1143 fn metadata() -> TableMetadata {
1144 let tbl_creation = TableCreation::builder()
1145 .name("table".to_string())
1146 .location("/path/to/table".to_string())
1147 .schema(Schema::builder().build().unwrap())
1148 .build();
1149
1150 TableMetadataBuilder::from_table_creation(tbl_creation)
1151 .unwrap()
1152 .assign_uuid(uuid::Uuid::nil())
1153 .build()
1154 .unwrap()
1155 .metadata
1156 }
1157
1158 #[test]
1159 fn test_check_requirement_not_exist() {
1160 let metadata = metadata();
1161 let requirement = TableRequirement::NotExist;
1162
1163 assert!(requirement.check(Some(&metadata)).is_err());
1164 assert!(requirement.check(None).is_ok());
1165 }
1166
1167 #[test]
1168 fn test_check_table_uuid() {
1169 let metadata = metadata();
1170
1171 let requirement = TableRequirement::UuidMatch {
1172 uuid: uuid::Uuid::now_v7(),
1173 };
1174 assert!(requirement.check(Some(&metadata)).is_err());
1175
1176 let requirement = TableRequirement::UuidMatch {
1177 uuid: uuid::Uuid::nil(),
1178 };
1179 assert!(requirement.check(Some(&metadata)).is_ok());
1180 }
1181
1182 #[test]
1183 fn test_check_ref_snapshot_id() {
1184 let metadata = metadata();
1185
1186 let requirement = TableRequirement::RefSnapshotIdMatch {
1188 r#ref: "my_branch".to_string(),
1189 snapshot_id: Some(1),
1190 };
1191 assert!(requirement.check(Some(&metadata)).is_err());
1192
1193 let requirement = TableRequirement::RefSnapshotIdMatch {
1195 r#ref: "my_branch".to_string(),
1196 snapshot_id: None,
1197 };
1198 assert!(requirement.check(Some(&metadata)).is_ok());
1199
1200 let snapshot = Snapshot::builder()
1202 .with_snapshot_id(3051729675574597004)
1203 .with_sequence_number(10)
1204 .with_timestamp_ms(9992191116217)
1205 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1206 .with_schema_id(0)
1207 .with_summary(Summary {
1208 operation: Operation::Append,
1209 additional_properties: HashMap::new(),
1210 })
1211 .build();
1212
1213 let builder = metadata.into_builder(None);
1214 let builder = TableUpdate::AddSnapshot {
1215 snapshot: snapshot.clone(),
1216 }
1217 .apply(builder)
1218 .unwrap();
1219 let metadata = TableUpdate::SetSnapshotRef {
1220 ref_name: MAIN_BRANCH.to_string(),
1221 reference: SnapshotReference {
1222 snapshot_id: snapshot.snapshot_id(),
1223 retention: SnapshotRetention::Branch {
1224 min_snapshots_to_keep: Some(10),
1225 max_snapshot_age_ms: None,
1226 max_ref_age_ms: None,
1227 },
1228 },
1229 }
1230 .apply(builder)
1231 .unwrap()
1232 .build()
1233 .unwrap()
1234 .metadata;
1235
1236 let requirement = TableRequirement::RefSnapshotIdMatch {
1238 r#ref: "main".to_string(),
1239 snapshot_id: Some(3051729675574597004),
1240 };
1241 assert!(requirement.check(Some(&metadata)).is_ok());
1242
1243 let requirement = TableRequirement::RefSnapshotIdMatch {
1245 r#ref: "main".to_string(),
1246 snapshot_id: Some(1),
1247 };
1248 assert!(requirement.check(Some(&metadata)).is_err());
1249 }
1250
1251 #[test]
1252 fn test_check_last_assigned_field_id() {
1253 let metadata = metadata();
1254
1255 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1256 last_assigned_field_id: 1,
1257 };
1258 assert!(requirement.check(Some(&metadata)).is_err());
1259
1260 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1261 last_assigned_field_id: 0,
1262 };
1263 assert!(requirement.check(Some(&metadata)).is_ok());
1264 }
1265
1266 #[test]
1267 fn test_check_current_schema_id() {
1268 let metadata = metadata();
1269
1270 let requirement = TableRequirement::CurrentSchemaIdMatch {
1271 current_schema_id: 1,
1272 };
1273 assert!(requirement.check(Some(&metadata)).is_err());
1274
1275 let requirement = TableRequirement::CurrentSchemaIdMatch {
1276 current_schema_id: 0,
1277 };
1278 assert!(requirement.check(Some(&metadata)).is_ok());
1279 }
1280
1281 #[test]
1282 fn test_check_last_assigned_partition_id() {
1283 let metadata = metadata();
1284 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1285 last_assigned_partition_id: 0,
1286 };
1287 assert!(requirement.check(Some(&metadata)).is_err());
1288
1289 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1290 last_assigned_partition_id: 999,
1291 };
1292 assert!(requirement.check(Some(&metadata)).is_ok());
1293 }
1294
1295 #[test]
1296 fn test_check_default_spec_id() {
1297 let metadata = metadata();
1298
1299 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1300 assert!(requirement.check(Some(&metadata)).is_err());
1301
1302 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1303 assert!(requirement.check(Some(&metadata)).is_ok());
1304 }
1305
1306 #[test]
1307 fn test_check_default_sort_order_id() {
1308 let metadata = metadata();
1309
1310 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1311 default_sort_order_id: 1,
1312 };
1313 assert!(requirement.check(Some(&metadata)).is_err());
1314
1315 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1316 default_sort_order_id: 0,
1317 };
1318 assert!(requirement.check(Some(&metadata)).is_ok());
1319 }
1320
1321 #[test]
1322 fn test_table_uuid() {
1323 test_serde_json(
1324 r#"
1325{
1326 "type": "assert-table-uuid",
1327 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1328}
1329 "#,
1330 TableRequirement::UuidMatch {
1331 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1332 },
1333 );
1334 }
1335
1336 #[test]
1337 fn test_assert_table_not_exists() {
1338 test_serde_json(
1339 r#"
1340{
1341 "type": "assert-create"
1342}
1343 "#,
1344 TableRequirement::NotExist,
1345 );
1346 }
1347
1348 #[test]
1349 fn test_assert_ref_snapshot_id() {
1350 test_serde_json(
1351 r#"
1352{
1353 "type": "assert-ref-snapshot-id",
1354 "ref": "snapshot-name",
1355 "snapshot-id": null
1356}
1357 "#,
1358 TableRequirement::RefSnapshotIdMatch {
1359 r#ref: "snapshot-name".to_string(),
1360 snapshot_id: None,
1361 },
1362 );
1363
1364 test_serde_json(
1365 r#"
1366{
1367 "type": "assert-ref-snapshot-id",
1368 "ref": "snapshot-name",
1369 "snapshot-id": 1
1370}
1371 "#,
1372 TableRequirement::RefSnapshotIdMatch {
1373 r#ref: "snapshot-name".to_string(),
1374 snapshot_id: Some(1),
1375 },
1376 );
1377 }
1378
1379 #[test]
1380 fn test_assert_last_assigned_field_id() {
1381 test_serde_json(
1382 r#"
1383{
1384 "type": "assert-last-assigned-field-id",
1385 "last-assigned-field-id": 12
1386}
1387 "#,
1388 TableRequirement::LastAssignedFieldIdMatch {
1389 last_assigned_field_id: 12,
1390 },
1391 );
1392 }
1393
1394 #[test]
1395 fn test_assert_current_schema_id() {
1396 test_serde_json(
1397 r#"
1398{
1399 "type": "assert-current-schema-id",
1400 "current-schema-id": 4
1401}
1402 "#,
1403 TableRequirement::CurrentSchemaIdMatch {
1404 current_schema_id: 4,
1405 },
1406 );
1407 }
1408
1409 #[test]
1410 fn test_assert_last_assigned_partition_id() {
1411 test_serde_json(
1412 r#"
1413{
1414 "type": "assert-last-assigned-partition-id",
1415 "last-assigned-partition-id": 1004
1416}
1417 "#,
1418 TableRequirement::LastAssignedPartitionIdMatch {
1419 last_assigned_partition_id: 1004,
1420 },
1421 );
1422 }
1423
1424 #[test]
1425 fn test_assert_default_spec_id() {
1426 test_serde_json(
1427 r#"
1428{
1429 "type": "assert-default-spec-id",
1430 "default-spec-id": 5
1431}
1432 "#,
1433 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1434 );
1435 }
1436
1437 #[test]
1438 fn test_assert_default_sort_order() {
1439 let json = r#"
1440{
1441 "type": "assert-default-sort-order-id",
1442 "default-sort-order-id": 10
1443}
1444 "#;
1445
1446 let update = TableRequirement::DefaultSortOrderIdMatch {
1447 default_sort_order_id: 10,
1448 };
1449
1450 test_serde_json(json, update);
1451 }
1452
1453 #[test]
1454 fn test_parse_assert_invalid() {
1455 assert!(
1456 serde_json::from_str::<TableRequirement>(
1457 r#"
1458{
1459 "default-sort-order-id": 10
1460}
1461"#
1462 )
1463 .is_err(),
1464 "Table requirements should not be parsed without type."
1465 );
1466 }
1467
1468 #[test]
1469 fn test_assign_uuid() {
1470 test_serde_json(
1471 r#"
1472{
1473 "action": "assign-uuid",
1474 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1475}
1476 "#,
1477 TableUpdate::AssignUuid {
1478 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1479 },
1480 );
1481 }
1482
1483 #[test]
1484 fn test_upgrade_format_version() {
1485 test_serde_json(
1486 r#"
1487{
1488 "action": "upgrade-format-version",
1489 "format-version": 2
1490}
1491 "#,
1492 TableUpdate::UpgradeFormatVersion {
1493 format_version: FormatVersion::V2,
1494 },
1495 );
1496 }
1497
1498 #[test]
1499 fn test_add_schema() {
1500 let test_schema = Schema::builder()
1501 .with_schema_id(1)
1502 .with_identifier_field_ids(vec![2])
1503 .with_fields(vec![
1504 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1505 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1506 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1507 ])
1508 .build()
1509 .unwrap();
1510 test_serde_json(
1511 r#"
1512{
1513 "action": "add-schema",
1514 "schema": {
1515 "type": "struct",
1516 "schema-id": 1,
1517 "fields": [
1518 {
1519 "id": 1,
1520 "name": "foo",
1521 "required": false,
1522 "type": "string"
1523 },
1524 {
1525 "id": 2,
1526 "name": "bar",
1527 "required": true,
1528 "type": "int"
1529 },
1530 {
1531 "id": 3,
1532 "name": "baz",
1533 "required": false,
1534 "type": "boolean"
1535 }
1536 ],
1537 "identifier-field-ids": [
1538 2
1539 ]
1540 },
1541 "last-column-id": 3
1542}
1543 "#,
1544 TableUpdate::AddSchema {
1545 schema: test_schema.clone(),
1546 },
1547 );
1548
1549 test_serde_json(
1550 r#"
1551{
1552 "action": "add-schema",
1553 "schema": {
1554 "type": "struct",
1555 "schema-id": 1,
1556 "fields": [
1557 {
1558 "id": 1,
1559 "name": "foo",
1560 "required": false,
1561 "type": "string"
1562 },
1563 {
1564 "id": 2,
1565 "name": "bar",
1566 "required": true,
1567 "type": "int"
1568 },
1569 {
1570 "id": 3,
1571 "name": "baz",
1572 "required": false,
1573 "type": "boolean"
1574 }
1575 ],
1576 "identifier-field-ids": [
1577 2
1578 ]
1579 }
1580}
1581 "#,
1582 TableUpdate::AddSchema {
1583 schema: test_schema.clone(),
1584 },
1585 );
1586 }
1587
1588 #[test]
1589 fn test_set_current_schema() {
1590 test_serde_json(
1591 r#"
1592{
1593 "action": "set-current-schema",
1594 "schema-id": 23
1595}
1596 "#,
1597 TableUpdate::SetCurrentSchema { schema_id: 23 },
1598 );
1599 }
1600
1601 #[test]
1602 fn test_add_spec() {
1603 test_serde_json(
1604 r#"
1605{
1606 "action": "add-spec",
1607 "spec": {
1608 "fields": [
1609 {
1610 "source-id": 4,
1611 "name": "ts_day",
1612 "transform": "day"
1613 },
1614 {
1615 "source-id": 1,
1616 "name": "id_bucket",
1617 "transform": "bucket[16]"
1618 },
1619 {
1620 "source-id": 2,
1621 "name": "id_truncate",
1622 "transform": "truncate[4]"
1623 }
1624 ]
1625 }
1626}
1627 "#,
1628 TableUpdate::AddSpec {
1629 spec: UnboundPartitionSpec::builder()
1630 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1631 .unwrap()
1632 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1633 .unwrap()
1634 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1635 .unwrap()
1636 .build(),
1637 },
1638 );
1639 }
1640
1641 #[test]
1642 fn test_set_default_spec() {
1643 test_serde_json(
1644 r#"
1645{
1646 "action": "set-default-spec",
1647 "spec-id": 1
1648}
1649 "#,
1650 TableUpdate::SetDefaultSpec { spec_id: 1 },
1651 )
1652 }
1653
1654 #[test]
1655 fn test_add_sort_order() {
1656 let json = r#"
1657{
1658 "action": "add-sort-order",
1659 "sort-order": {
1660 "order-id": 1,
1661 "fields": [
1662 {
1663 "transform": "identity",
1664 "source-id": 2,
1665 "direction": "asc",
1666 "null-order": "nulls-first"
1667 },
1668 {
1669 "transform": "bucket[4]",
1670 "source-id": 3,
1671 "direction": "desc",
1672 "null-order": "nulls-last"
1673 }
1674 ]
1675 }
1676}
1677 "#;
1678
1679 let update = TableUpdate::AddSortOrder {
1680 sort_order: SortOrder::builder()
1681 .with_order_id(1)
1682 .with_sort_field(
1683 SortField::builder()
1684 .source_id(2)
1685 .direction(SortDirection::Ascending)
1686 .null_order(NullOrder::First)
1687 .transform(Transform::Identity)
1688 .build(),
1689 )
1690 .with_sort_field(
1691 SortField::builder()
1692 .source_id(3)
1693 .direction(SortDirection::Descending)
1694 .null_order(NullOrder::Last)
1695 .transform(Transform::Bucket(4))
1696 .build(),
1697 )
1698 .build_unbound()
1699 .unwrap(),
1700 };
1701
1702 test_serde_json(json, update);
1703 }
1704
1705 #[test]
1706 fn test_set_default_order() {
1707 let json = r#"
1708{
1709 "action": "set-default-sort-order",
1710 "sort-order-id": 2
1711}
1712 "#;
1713 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1714
1715 test_serde_json(json, update);
1716 }
1717
1718 #[test]
1719 fn test_add_snapshot() {
1720 let json = r#"
1721{
1722 "action": "add-snapshot",
1723 "snapshot": {
1724 "snapshot-id": 3055729675574597000,
1725 "parent-snapshot-id": 3051729675574597000,
1726 "timestamp-ms": 1555100955770,
1727 "sequence-number": 1,
1728 "summary": {
1729 "operation": "append"
1730 },
1731 "manifest-list": "s3://a/b/2.avro",
1732 "schema-id": 1
1733 }
1734}
1735 "#;
1736
1737 let update = TableUpdate::AddSnapshot {
1738 snapshot: Snapshot::builder()
1739 .with_snapshot_id(3055729675574597000)
1740 .with_parent_snapshot_id(Some(3051729675574597000))
1741 .with_timestamp_ms(1555100955770)
1742 .with_sequence_number(1)
1743 .with_manifest_list("s3://a/b/2.avro")
1744 .with_schema_id(1)
1745 .with_summary(Summary {
1746 operation: Operation::Append,
1747 additional_properties: HashMap::default(),
1748 })
1749 .build(),
1750 };
1751
1752 test_serde_json(json, update);
1753 }
1754
1755 #[test]
1756 fn test_add_snapshot_v1() {
1757 let json = r#"
1758{
1759 "action": "add-snapshot",
1760 "snapshot": {
1761 "snapshot-id": 3055729675574597000,
1762 "parent-snapshot-id": 3051729675574597000,
1763 "timestamp-ms": 1555100955770,
1764 "summary": {
1765 "operation": "append"
1766 },
1767 "manifest-list": "s3://a/b/2.avro"
1768 }
1769}
1770 "#;
1771
1772 let update = TableUpdate::AddSnapshot {
1773 snapshot: Snapshot::builder()
1774 .with_snapshot_id(3055729675574597000)
1775 .with_parent_snapshot_id(Some(3051729675574597000))
1776 .with_timestamp_ms(1555100955770)
1777 .with_sequence_number(0)
1778 .with_manifest_list("s3://a/b/2.avro")
1779 .with_summary(Summary {
1780 operation: Operation::Append,
1781 additional_properties: HashMap::default(),
1782 })
1783 .build(),
1784 };
1785
1786 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1787 assert_eq!(actual, update, "Parsed value is not equal to expected");
1788 }
1789
1790 #[test]
1791 fn test_add_snapshot_v3() {
1792 let json = serde_json::json!(
1793 {
1794 "action": "add-snapshot",
1795 "snapshot": {
1796 "snapshot-id": 3055729675574597000i64,
1797 "parent-snapshot-id": 3051729675574597000i64,
1798 "timestamp-ms": 1555100955770i64,
1799 "first-row-id":0,
1800 "added-rows":2,
1801 "key-id":"key123",
1802 "summary": {
1803 "operation": "append"
1804 },
1805 "manifest-list": "s3://a/b/2.avro"
1806 }
1807 });
1808
1809 let update = TableUpdate::AddSnapshot {
1810 snapshot: Snapshot::builder()
1811 .with_snapshot_id(3055729675574597000)
1812 .with_parent_snapshot_id(Some(3051729675574597000))
1813 .with_timestamp_ms(1555100955770)
1814 .with_sequence_number(0)
1815 .with_manifest_list("s3://a/b/2.avro")
1816 .with_row_range(0, 2)
1817 .with_encryption_key_id(Some("key123".to_string()))
1818 .with_summary(Summary {
1819 operation: Operation::Append,
1820 additional_properties: HashMap::default(),
1821 })
1822 .build(),
1823 };
1824
1825 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1826 assert_eq!(actual, update, "Parsed value is not equal to expected");
1827 let restored: TableUpdate = serde_json::from_str(
1828 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1829 )
1830 .expect("Failed to parse from serialized json");
1831 assert_eq!(restored, update);
1832 }
1833
1834 #[test]
1835 fn test_remove_snapshots() {
1836 let json = r#"
1837{
1838 "action": "remove-snapshots",
1839 "snapshot-ids": [
1840 1,
1841 2
1842 ]
1843}
1844 "#;
1845
1846 let update = TableUpdate::RemoveSnapshots {
1847 snapshot_ids: vec![1, 2],
1848 };
1849 test_serde_json(json, update);
1850 }
1851
1852 #[test]
1853 fn test_remove_snapshot_ref() {
1854 let json = r#"
1855{
1856 "action": "remove-snapshot-ref",
1857 "ref-name": "snapshot-ref"
1858}
1859 "#;
1860
1861 let update = TableUpdate::RemoveSnapshotRef {
1862 ref_name: "snapshot-ref".to_string(),
1863 };
1864 test_serde_json(json, update);
1865 }
1866
1867 #[test]
1868 fn test_set_snapshot_ref_tag() {
1869 let json = r#"
1870{
1871 "action": "set-snapshot-ref",
1872 "type": "tag",
1873 "ref-name": "hank",
1874 "snapshot-id": 1,
1875 "max-ref-age-ms": 1
1876}
1877 "#;
1878
1879 let update = TableUpdate::SetSnapshotRef {
1880 ref_name: "hank".to_string(),
1881 reference: SnapshotReference {
1882 snapshot_id: 1,
1883 retention: SnapshotRetention::Tag {
1884 max_ref_age_ms: Some(1),
1885 },
1886 },
1887 };
1888
1889 test_serde_json(json, update);
1890 }
1891
1892 #[test]
1893 fn test_set_snapshot_ref_branch() {
1894 let json = r#"
1895{
1896 "action": "set-snapshot-ref",
1897 "type": "branch",
1898 "ref-name": "hank",
1899 "snapshot-id": 1,
1900 "min-snapshots-to-keep": 2,
1901 "max-snapshot-age-ms": 3,
1902 "max-ref-age-ms": 4
1903}
1904 "#;
1905
1906 let update = TableUpdate::SetSnapshotRef {
1907 ref_name: "hank".to_string(),
1908 reference: SnapshotReference {
1909 snapshot_id: 1,
1910 retention: SnapshotRetention::Branch {
1911 min_snapshots_to_keep: Some(2),
1912 max_snapshot_age_ms: Some(3),
1913 max_ref_age_ms: Some(4),
1914 },
1915 },
1916 };
1917
1918 test_serde_json(json, update);
1919 }
1920
1921 #[test]
1922 fn test_set_properties() {
1923 let json = r#"
1924{
1925 "action": "set-properties",
1926 "updates": {
1927 "prop1": "v1",
1928 "prop2": "v2"
1929 }
1930}
1931 "#;
1932
1933 let update = TableUpdate::SetProperties {
1934 updates: vec![
1935 ("prop1".to_string(), "v1".to_string()),
1936 ("prop2".to_string(), "v2".to_string()),
1937 ]
1938 .into_iter()
1939 .collect(),
1940 };
1941
1942 test_serde_json(json, update);
1943 }
1944
1945 #[test]
1946 fn test_remove_properties() {
1947 let json = r#"
1948{
1949 "action": "remove-properties",
1950 "removals": [
1951 "prop1",
1952 "prop2"
1953 ]
1954}
1955 "#;
1956
1957 let update = TableUpdate::RemoveProperties {
1958 removals: vec!["prop1".to_string(), "prop2".to_string()],
1959 };
1960
1961 test_serde_json(json, update);
1962 }
1963
1964 #[test]
1965 fn test_set_location() {
1966 let json = r#"
1967{
1968 "action": "set-location",
1969 "location": "s3://bucket/warehouse/tbl_location"
1970}
1971 "#;
1972
1973 let update = TableUpdate::SetLocation {
1974 location: "s3://bucket/warehouse/tbl_location".to_string(),
1975 };
1976
1977 test_serde_json(json, update);
1978 }
1979
1980 #[test]
1981 fn test_table_update_apply() {
1982 let table_creation = TableCreation::builder()
1983 .location("s3://db/table".to_string())
1984 .name("table".to_string())
1985 .properties(HashMap::new())
1986 .schema(Schema::builder().build().unwrap())
1987 .build();
1988 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1989 .unwrap()
1990 .build()
1991 .unwrap()
1992 .metadata;
1993 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1994 table_metadata,
1995 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1996 );
1997
1998 let uuid = uuid::Uuid::new_v4();
1999 let update = TableUpdate::AssignUuid { uuid };
2000 let updated_metadata = update
2001 .apply(table_metadata_builder)
2002 .unwrap()
2003 .build()
2004 .unwrap()
2005 .metadata;
2006 assert_eq!(updated_metadata.uuid(), uuid);
2007 }
2008
2009 #[test]
2010 fn test_view_assign_uuid() {
2011 test_serde_json(
2012 r#"
2013{
2014 "action": "assign-uuid",
2015 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2016}
2017 "#,
2018 ViewUpdate::AssignUuid {
2019 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2020 },
2021 );
2022 }
2023
2024 #[test]
2025 fn test_view_upgrade_format_version() {
2026 test_serde_json(
2027 r#"
2028{
2029 "action": "upgrade-format-version",
2030 "format-version": 1
2031}
2032 "#,
2033 ViewUpdate::UpgradeFormatVersion {
2034 format_version: ViewFormatVersion::V1,
2035 },
2036 );
2037 }
2038
2039 #[test]
2040 fn test_view_add_schema() {
2041 let test_schema = Schema::builder()
2042 .with_schema_id(1)
2043 .with_identifier_field_ids(vec![2])
2044 .with_fields(vec![
2045 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2046 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2047 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2048 ])
2049 .build()
2050 .unwrap();
2051 test_serde_json(
2052 r#"
2053{
2054 "action": "add-schema",
2055 "schema": {
2056 "type": "struct",
2057 "schema-id": 1,
2058 "fields": [
2059 {
2060 "id": 1,
2061 "name": "foo",
2062 "required": false,
2063 "type": "string"
2064 },
2065 {
2066 "id": 2,
2067 "name": "bar",
2068 "required": true,
2069 "type": "int"
2070 },
2071 {
2072 "id": 3,
2073 "name": "baz",
2074 "required": false,
2075 "type": "boolean"
2076 }
2077 ],
2078 "identifier-field-ids": [
2079 2
2080 ]
2081 },
2082 "last-column-id": 3
2083}
2084 "#,
2085 ViewUpdate::AddSchema {
2086 schema: test_schema.clone(),
2087 last_column_id: Some(3),
2088 },
2089 );
2090 }
2091
2092 #[test]
2093 fn test_view_set_location() {
2094 test_serde_json(
2095 r#"
2096{
2097 "action": "set-location",
2098 "location": "s3://db/view"
2099}
2100 "#,
2101 ViewUpdate::SetLocation {
2102 location: "s3://db/view".to_string(),
2103 },
2104 );
2105 }
2106
2107 #[test]
2108 fn test_view_set_properties() {
2109 test_serde_json(
2110 r#"
2111{
2112 "action": "set-properties",
2113 "updates": {
2114 "prop1": "v1",
2115 "prop2": "v2"
2116 }
2117}
2118 "#,
2119 ViewUpdate::SetProperties {
2120 updates: vec![
2121 ("prop1".to_string(), "v1".to_string()),
2122 ("prop2".to_string(), "v2".to_string()),
2123 ]
2124 .into_iter()
2125 .collect(),
2126 },
2127 );
2128 }
2129
2130 #[test]
2131 fn test_view_remove_properties() {
2132 test_serde_json(
2133 r#"
2134{
2135 "action": "remove-properties",
2136 "removals": [
2137 "prop1",
2138 "prop2"
2139 ]
2140}
2141 "#,
2142 ViewUpdate::RemoveProperties {
2143 removals: vec!["prop1".to_string(), "prop2".to_string()],
2144 },
2145 );
2146 }
2147
2148 #[test]
2149 fn test_view_add_view_version() {
2150 test_serde_json(
2151 r#"
2152{
2153 "action": "add-view-version",
2154 "view-version": {
2155 "version-id" : 1,
2156 "timestamp-ms" : 1573518431292,
2157 "schema-id" : 1,
2158 "default-catalog" : "prod",
2159 "default-namespace" : [ "default" ],
2160 "summary" : {
2161 "engine-name" : "Spark"
2162 },
2163 "representations" : [ {
2164 "type" : "sql",
2165 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2166 "dialect" : "spark"
2167 } ]
2168 }
2169}
2170 "#,
2171 ViewUpdate::AddViewVersion {
2172 view_version: ViewVersion::builder()
2173 .with_version_id(1)
2174 .with_timestamp_ms(1573518431292)
2175 .with_schema_id(1)
2176 .with_default_catalog(Some("prod".to_string()))
2177 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2178 .with_summary(
2179 vec![("engine-name".to_string(), "Spark".to_string())]
2180 .into_iter()
2181 .collect(),
2182 )
2183 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2184 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2185 dialect: "spark".to_string(),
2186 })]))
2187 .build(),
2188 },
2189 );
2190 }
2191
2192 #[test]
2193 fn test_view_set_current_view_version() {
2194 test_serde_json(
2195 r#"
2196{
2197 "action": "set-current-view-version",
2198 "view-version-id": 1
2199}
2200 "#,
2201 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2202 );
2203 }
2204
2205 #[test]
2206 fn test_remove_partition_specs_update() {
2207 test_serde_json(
2208 r#"
2209{
2210 "action": "remove-partition-specs",
2211 "spec-ids": [1, 2]
2212}
2213 "#,
2214 TableUpdate::RemovePartitionSpecs {
2215 spec_ids: vec![1, 2],
2216 },
2217 );
2218 }
2219
2220 #[test]
2221 fn test_set_statistics_file() {
2222 test_serde_json(
2223 r#"
2224 {
2225 "action": "set-statistics",
2226 "snapshot-id": 1940541653261589030,
2227 "statistics": {
2228 "snapshot-id": 1940541653261589030,
2229 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2230 "file-size-in-bytes": 124,
2231 "file-footer-size-in-bytes": 27,
2232 "blob-metadata": [
2233 {
2234 "type": "boring-type",
2235 "snapshot-id": 1940541653261589030,
2236 "sequence-number": 2,
2237 "fields": [
2238 1
2239 ],
2240 "properties": {
2241 "prop-key": "prop-value"
2242 }
2243 }
2244 ]
2245 }
2246 }
2247 "#,
2248 TableUpdate::SetStatistics {
2249 statistics: StatisticsFile {
2250 snapshot_id: 1940541653261589030,
2251 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2252 file_size_in_bytes: 124,
2253 file_footer_size_in_bytes: 27,
2254 key_metadata: None,
2255 blob_metadata: vec![BlobMetadata {
2256 r#type: "boring-type".to_string(),
2257 snapshot_id: 1940541653261589030,
2258 sequence_number: 2,
2259 fields: vec![1],
2260 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2261 .into_iter()
2262 .collect(),
2263 }],
2264 },
2265 },
2266 );
2267 }
2268
2269 #[test]
2270 fn test_remove_statistics_file() {
2271 test_serde_json(
2272 r#"
2273 {
2274 "action": "remove-statistics",
2275 "snapshot-id": 1940541653261589030
2276 }
2277 "#,
2278 TableUpdate::RemoveStatistics {
2279 snapshot_id: 1940541653261589030,
2280 },
2281 );
2282 }
2283
2284 #[test]
2285 fn test_set_partition_statistics_file() {
2286 test_serde_json(
2287 r#"
2288 {
2289 "action": "set-partition-statistics",
2290 "partition-statistics": {
2291 "snapshot-id": 1940541653261589030,
2292 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2293 "file-size-in-bytes": 43
2294 }
2295 }
2296 "#,
2297 TableUpdate::SetPartitionStatistics {
2298 partition_statistics: PartitionStatisticsFile {
2299 snapshot_id: 1940541653261589030,
2300 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2301 file_size_in_bytes: 43,
2302 },
2303 },
2304 )
2305 }
2306
2307 #[test]
2308 fn test_remove_partition_statistics_file() {
2309 test_serde_json(
2310 r#"
2311 {
2312 "action": "remove-partition-statistics",
2313 "snapshot-id": 1940541653261589030
2314 }
2315 "#,
2316 TableUpdate::RemovePartitionStatistics {
2317 snapshot_id: 1940541653261589030,
2318 },
2319 )
2320 }
2321
2322 #[test]
2323 fn test_remove_schema_update() {
2324 test_serde_json(
2325 r#"
2326 {
2327 "action": "remove-schemas",
2328 "schema-ids": [1, 2]
2329 }
2330 "#,
2331 TableUpdate::RemoveSchemas {
2332 schema_ids: vec![1, 2],
2333 },
2334 );
2335 }
2336
2337 #[test]
2338 fn test_add_encryption_key() {
2339 let key_bytes = "key".as_bytes();
2340 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2341 test_serde_json(
2342 format!(
2343 r#"
2344 {{
2345 "action": "add-encryption-key",
2346 "encryption-key": {{
2347 "key-id": "a",
2348 "encrypted-key-metadata": "{encoded_key}",
2349 "encrypted-by-id": "b"
2350 }}
2351 }}
2352 "#
2353 ),
2354 TableUpdate::AddEncryptionKey {
2355 encryption_key: EncryptedKey::builder()
2356 .key_id("a")
2357 .encrypted_key_metadata(key_bytes.to_vec())
2358 .encrypted_by_id("b")
2359 .build(),
2360 },
2361 );
2362 }
2363
2364 #[test]
2365 fn test_remove_encryption_key() {
2366 test_serde_json(
2367 r#"
2368 {
2369 "action": "remove-encryption-key",
2370 "key-id": "a"
2371 }
2372 "#,
2373 TableUpdate::RemoveEncryptionKey {
2374 key_id: "a".to_string(),
2375 },
2376 );
2377 }
2378
2379 #[test]
2380 fn test_table_commit() {
2381 let table = {
2382 let file = File::open(format!(
2383 "{}/testdata/table_metadata/{}",
2384 env!("CARGO_MANIFEST_DIR"),
2385 "TableMetadataV2Valid.json"
2386 ))
2387 .unwrap();
2388 let reader = BufReader::new(file);
2389 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2390
2391 Table::builder()
2392 .metadata(resp)
2393 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2394 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2395 .file_io(FileIO::new_with_memory())
2396 .build()
2397 .unwrap()
2398 };
2399
2400 let updates = vec![
2401 TableUpdate::SetLocation {
2402 location: "s3://bucket/test/new_location/data".to_string(),
2403 },
2404 TableUpdate::SetProperties {
2405 updates: vec![
2406 ("prop1".to_string(), "v1".to_string()),
2407 ("prop2".to_string(), "v2".to_string()),
2408 ]
2409 .into_iter()
2410 .collect(),
2411 },
2412 ];
2413
2414 let requirements = vec![TableRequirement::UuidMatch {
2415 uuid: table.metadata().table_uuid,
2416 }];
2417
2418 let table_commit = TableCommit::builder()
2419 .ident(table.identifier().to_owned())
2420 .updates(updates)
2421 .requirements(requirements)
2422 .build();
2423
2424 let updated_table = table_commit.apply(table).unwrap();
2425
2426 assert_eq!(
2427 updated_table.metadata().properties.get("prop1").unwrap(),
2428 "v1"
2429 );
2430 assert_eq!(
2431 updated_table.metadata().properties.get("prop2").unwrap(),
2432 "v2"
2433 );
2434
2435 assert!(
2437 updated_table
2438 .metadata_location()
2439 .unwrap()
2440 .starts_with("s3://bucket/test/location/metadata/00001-")
2441 );
2442
2443 assert_eq!(
2444 updated_table.metadata().location,
2445 "s3://bucket/test/new_location/data",
2446 );
2447 }
2448}