1pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::spec::{
44 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
45 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
46 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
47};
48use crate::table::Table;
49use crate::{Error, ErrorKind, Result};
50
51#[async_trait]
53#[cfg_attr(test, automock)]
54pub trait Catalog: Debug + Sync + Send {
55 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
57 -> Result<Vec<NamespaceIdent>>;
58
59 async fn create_namespace(
61 &self,
62 namespace: &NamespaceIdent,
63 properties: HashMap<String, String>,
64 ) -> Result<Namespace>;
65
66 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
68
69 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
71
72 async fn update_namespace(
78 &self,
79 namespace: &NamespaceIdent,
80 properties: HashMap<String, String>,
81 ) -> Result<()>;
82
83 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
85
86 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
88
89 async fn create_table(
91 &self,
92 namespace: &NamespaceIdent,
93 creation: TableCreation,
94 ) -> Result<Table>;
95
96 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
98
99 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
101
102 async fn purge_table(&self, table: &TableIdent) -> Result<()>;
109
110 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
112
113 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
115
116 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
118
119 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
121}
122
123pub trait CatalogBuilder: Default + Debug + Send + Sync {
125 type C: Catalog;
127
128 fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
154
155 fn load(
157 self,
158 name: impl Into<String>,
159 props: HashMap<String, String>,
160 ) -> impl Future<Output = Result<Self::C>> + Send;
161}
162
163#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
169pub struct NamespaceIdent(Vec<String>);
170
171impl NamespaceIdent {
172 pub fn new(name: String) -> Self {
174 Self(vec![name])
175 }
176
177 pub fn from_vec(names: Vec<String>) -> Result<Self> {
179 if names.is_empty() {
180 return Err(Error::new(
181 ErrorKind::DataInvalid,
182 "Namespace identifier can't be empty!",
183 ));
184 }
185 Ok(Self(names))
186 }
187
188 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
190 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
191 }
192
193 pub fn to_url_string(&self) -> String {
195 self.as_ref().join("\u{001f}")
196 }
197
198 pub fn inner(self) -> Vec<String> {
200 self.0
201 }
202
203 pub fn parent(&self) -> Option<Self> {
206 self.0.split_last().and_then(|(_, parent)| {
207 if parent.is_empty() {
208 None
209 } else {
210 Some(Self(parent.to_vec()))
211 }
212 })
213 }
214}
215
216impl AsRef<Vec<String>> for NamespaceIdent {
217 fn as_ref(&self) -> &Vec<String> {
218 &self.0
219 }
220}
221
222impl Deref for NamespaceIdent {
223 type Target = [String];
224
225 fn deref(&self) -> &Self::Target {
226 &self.0
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct Namespace {
233 name: NamespaceIdent,
234 properties: HashMap<String, String>,
235}
236
237impl Namespace {
238 pub fn new(name: NamespaceIdent) -> Self {
240 Self::with_properties(name, HashMap::default())
241 }
242
243 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
245 Self { name, properties }
246 }
247
248 pub fn name(&self) -> &NamespaceIdent {
250 &self.name
251 }
252
253 pub fn properties(&self) -> &HashMap<String, String> {
255 &self.properties
256 }
257}
258
259impl Display for NamespaceIdent {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.join("."))
262 }
263}
264
265#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
267pub struct TableIdent {
268 pub namespace: NamespaceIdent,
270 pub name: String,
272}
273
274impl TableIdent {
275 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
277 Self { namespace, name }
278 }
279
280 pub fn namespace(&self) -> &NamespaceIdent {
282 &self.namespace
283 }
284
285 pub fn name(&self) -> &str {
287 &self.name
288 }
289
290 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
292 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
293 let table_name = vec.pop().ok_or_else(|| {
294 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
295 })?;
296 let namespace_ident = NamespaceIdent::from_vec(vec)?;
297
298 Ok(Self {
299 namespace: namespace_ident,
300 name: table_name,
301 })
302 }
303}
304
305impl Display for TableIdent {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 write!(f, "{}.{}", self.namespace, self.name)
308 }
309}
310
311#[derive(Debug, TypedBuilder)]
313pub struct TableCreation {
314 pub name: String,
316 #[builder(default, setter(strip_option(fallback = location_opt)))]
318 pub location: Option<String>,
319 pub schema: Schema,
321 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
323 pub partition_spec: Option<UnboundPartitionSpec>,
324 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
326 pub sort_order: Option<SortOrder>,
327 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
329 props.into_iter().collect()
330 }))]
331 pub properties: HashMap<String, String>,
332 #[builder(default = FormatVersion::V2)]
334 pub format_version: FormatVersion,
335}
336
337#[derive(Debug, TypedBuilder)]
343#[builder(build_method(vis = "pub(crate)"))]
344pub struct TableCommit {
345 ident: TableIdent,
347 requirements: Vec<TableRequirement>,
351 updates: Vec<TableUpdate>,
353}
354
355impl TableCommit {
356 pub fn identifier(&self) -> &TableIdent {
358 &self.ident
359 }
360
361 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
363 take(&mut self.requirements)
364 }
365
366 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
368 take(&mut self.updates)
369 }
370
371 pub fn apply(self, table: Table) -> Result<Table> {
377 for requirement in self.requirements {
379 requirement.check(Some(table.metadata()))?;
380 }
381
382 let current_metadata_location = table.metadata_location_result()?;
384
385 let mut metadata_builder = table
387 .metadata()
388 .clone()
389 .into_builder(Some(current_metadata_location.to_string()));
390 for update in self.updates {
391 metadata_builder = update.apply(metadata_builder)?;
392 }
393
394 let new_metadata = metadata_builder.build()?.metadata;
396
397 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
398 .with_next_version()
399 .with_new_metadata(&new_metadata)
400 .to_string();
401
402 Ok(table
403 .with_metadata(Arc::new(new_metadata))
404 .with_metadata_location(new_metadata_location))
405 }
406}
407
408#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
410#[serde(tag = "type")]
411pub enum TableRequirement {
412 #[serde(rename = "assert-create")]
414 NotExist,
415 #[serde(rename = "assert-table-uuid")]
417 UuidMatch {
418 uuid: Uuid,
420 },
421 #[serde(rename = "assert-ref-snapshot-id")]
424 RefSnapshotIdMatch {
425 r#ref: String,
427 #[serde(rename = "snapshot-id")]
430 snapshot_id: Option<i64>,
431 },
432 #[serde(rename = "assert-last-assigned-field-id")]
434 LastAssignedFieldIdMatch {
435 #[serde(rename = "last-assigned-field-id")]
437 last_assigned_field_id: i32,
438 },
439 #[serde(rename = "assert-current-schema-id")]
441 CurrentSchemaIdMatch {
442 #[serde(rename = "current-schema-id")]
444 current_schema_id: SchemaId,
445 },
446 #[serde(rename = "assert-last-assigned-partition-id")]
449 LastAssignedPartitionIdMatch {
450 #[serde(rename = "last-assigned-partition-id")]
452 last_assigned_partition_id: i32,
453 },
454 #[serde(rename = "assert-default-spec-id")]
456 DefaultSpecIdMatch {
457 #[serde(rename = "default-spec-id")]
459 default_spec_id: i32,
460 },
461 #[serde(rename = "assert-default-sort-order-id")]
463 DefaultSortOrderIdMatch {
464 #[serde(rename = "default-sort-order-id")]
466 default_sort_order_id: i64,
467 },
468}
469
470#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
472#[serde(tag = "action", rename_all = "kebab-case")]
473#[allow(clippy::large_enum_variant)]
474pub enum TableUpdate {
475 #[serde(rename_all = "kebab-case")]
477 UpgradeFormatVersion {
478 format_version: FormatVersion,
480 },
481 #[serde(rename_all = "kebab-case")]
483 AssignUuid {
484 uuid: Uuid,
486 },
487 #[serde(rename_all = "kebab-case")]
489 AddSchema {
490 schema: Schema,
492 },
493 #[serde(rename_all = "kebab-case")]
495 SetCurrentSchema {
496 schema_id: i32,
498 },
499 AddSpec {
501 spec: UnboundPartitionSpec,
503 },
504 #[serde(rename_all = "kebab-case")]
506 SetDefaultSpec {
507 spec_id: i32,
509 },
510 #[serde(rename_all = "kebab-case")]
512 AddSortOrder {
513 sort_order: SortOrder,
515 },
516 #[serde(rename_all = "kebab-case")]
518 SetDefaultSortOrder {
519 sort_order_id: i64,
521 },
522 #[serde(rename_all = "kebab-case")]
524 AddSnapshot {
525 #[serde(
527 deserialize_with = "deserialize_snapshot",
528 serialize_with = "serialize_snapshot"
529 )]
530 snapshot: Snapshot,
531 },
532 #[serde(rename_all = "kebab-case")]
534 SetSnapshotRef {
535 ref_name: String,
537 #[serde(flatten)]
539 reference: SnapshotReference,
540 },
541 #[serde(rename_all = "kebab-case")]
543 RemoveSnapshots {
544 snapshot_ids: Vec<i64>,
546 },
547 #[serde(rename_all = "kebab-case")]
549 RemoveSnapshotRef {
550 ref_name: String,
552 },
553 SetLocation {
555 location: String,
557 },
558 SetProperties {
560 updates: HashMap<String, String>,
562 },
563 RemoveProperties {
565 removals: Vec<String>,
567 },
568 #[serde(rename_all = "kebab-case")]
570 RemovePartitionSpecs {
571 spec_ids: Vec<i32>,
573 },
574 #[serde(with = "_serde_set_statistics")]
576 SetStatistics {
577 statistics: StatisticsFile,
579 },
580 #[serde(rename_all = "kebab-case")]
582 RemoveStatistics {
583 snapshot_id: i64,
585 },
586 #[serde(rename_all = "kebab-case")]
588 SetPartitionStatistics {
589 partition_statistics: PartitionStatisticsFile,
591 },
592 #[serde(rename_all = "kebab-case")]
594 RemovePartitionStatistics {
595 snapshot_id: i64,
597 },
598 #[serde(rename_all = "kebab-case")]
600 RemoveSchemas {
601 schema_ids: Vec<i32>,
603 },
604 #[serde(rename_all = "kebab-case")]
606 AddEncryptionKey {
607 encryption_key: EncryptedKey,
609 },
610 #[serde(rename_all = "kebab-case")]
612 RemoveEncryptionKey {
613 key_id: String,
615 },
616}
617
618impl TableUpdate {
619 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
621 match self {
622 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
623 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
624 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
625 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
626 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
627 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
628 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
629 builder.set_default_sort_order(sort_order_id)
630 }
631 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
632 TableUpdate::SetSnapshotRef {
633 ref_name,
634 reference,
635 } => builder.set_ref(&ref_name, reference),
636 TableUpdate::RemoveSnapshots { snapshot_ids } => {
637 Ok(builder.remove_snapshots(&snapshot_ids))
638 }
639 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
640 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
641 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
642 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
643 TableUpdate::UpgradeFormatVersion { format_version } => {
644 builder.upgrade_format_version(format_version)
645 }
646 TableUpdate::RemovePartitionSpecs { spec_ids } => {
647 builder.remove_partition_specs(&spec_ids)
648 }
649 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
650 TableUpdate::RemoveStatistics { snapshot_id } => {
651 Ok(builder.remove_statistics(snapshot_id))
652 }
653 TableUpdate::SetPartitionStatistics {
654 partition_statistics,
655 } => Ok(builder.set_partition_statistics(partition_statistics)),
656 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
657 Ok(builder.remove_partition_statistics(snapshot_id))
658 }
659 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
660 TableUpdate::AddEncryptionKey { encryption_key } => {
661 Ok(builder.add_encryption_key(encryption_key))
662 }
663 TableUpdate::RemoveEncryptionKey { key_id } => {
664 Ok(builder.remove_encryption_key(&key_id))
665 }
666 }
667 }
668}
669
670impl TableRequirement {
671 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
676 if let Some(metadata) = metadata {
677 match self {
678 TableRequirement::NotExist => {
679 return Err(Error::new(
680 ErrorKind::CatalogCommitConflicts,
681 format!(
682 "Requirement failed: Table with id {} already exists",
683 metadata.uuid()
684 ),
685 )
686 .with_retryable(true));
687 }
688 TableRequirement::UuidMatch { uuid } => {
689 if &metadata.uuid() != uuid {
690 return Err(Error::new(
691 ErrorKind::CatalogCommitConflicts,
692 "Requirement failed: Table UUID does not match",
693 )
694 .with_context("expected", *uuid)
695 .with_context("found", metadata.uuid())
696 .with_retryable(true));
697 }
698 }
699 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
700 if metadata.current_schema_id != *current_schema_id {
702 return Err(Error::new(
703 ErrorKind::CatalogCommitConflicts,
704 "Requirement failed: Current schema id does not match",
705 )
706 .with_context("expected", current_schema_id.to_string())
707 .with_context("found", metadata.current_schema_id.to_string())
708 .with_retryable(true));
709 }
710 }
711 TableRequirement::DefaultSortOrderIdMatch {
712 default_sort_order_id,
713 } => {
714 if metadata.default_sort_order().order_id != *default_sort_order_id {
715 return Err(Error::new(
716 ErrorKind::CatalogCommitConflicts,
717 "Requirement failed: Default sort order id does not match",
718 )
719 .with_context("expected", default_sort_order_id.to_string())
720 .with_context("found", metadata.default_sort_order().order_id.to_string())
721 .with_retryable(true));
722 }
723 }
724 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
725 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
726 if let Some(snapshot_id) = snapshot_id {
727 let snapshot_ref = snapshot_ref.ok_or(
728 Error::new(
729 ErrorKind::CatalogCommitConflicts,
730 format!("Requirement failed: Branch or tag `{ref}` not found"),
731 )
732 .with_retryable(true),
733 )?;
734 if snapshot_ref.snapshot_id() != *snapshot_id {
735 return Err(Error::new(
736 ErrorKind::CatalogCommitConflicts,
737 format!(
738 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
739 ),
740 )
741 .with_context("expected", snapshot_id.to_string())
742 .with_context("found", snapshot_ref.snapshot_id().to_string())
743 .with_retryable(true));
744 }
745 } else if snapshot_ref.is_some() {
746 return Err(Error::new(
748 ErrorKind::CatalogCommitConflicts,
749 format!("Requirement failed: Branch or tag `{ref}` already exists"),
750 )
751 .with_retryable(true));
752 }
753 }
754 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
755 if metadata.default_partition_spec_id() != *default_spec_id {
757 return Err(Error::new(
758 ErrorKind::CatalogCommitConflicts,
759 "Requirement failed: Default partition spec id does not match",
760 )
761 .with_context("expected", default_spec_id.to_string())
762 .with_context("found", metadata.default_partition_spec_id().to_string())
763 .with_retryable(true));
764 }
765 }
766 TableRequirement::LastAssignedPartitionIdMatch {
767 last_assigned_partition_id,
768 } => {
769 if metadata.last_partition_id != *last_assigned_partition_id {
770 return Err(Error::new(
771 ErrorKind::CatalogCommitConflicts,
772 "Requirement failed: Last assigned partition id does not match",
773 )
774 .with_context("expected", last_assigned_partition_id.to_string())
775 .with_context("found", metadata.last_partition_id.to_string())
776 .with_retryable(true));
777 }
778 }
779 TableRequirement::LastAssignedFieldIdMatch {
780 last_assigned_field_id,
781 } => {
782 if &metadata.last_column_id != last_assigned_field_id {
783 return Err(Error::new(
784 ErrorKind::CatalogCommitConflicts,
785 "Requirement failed: Last assigned field id does not match",
786 )
787 .with_context("expected", last_assigned_field_id.to_string())
788 .with_context("found", metadata.last_column_id.to_string())
789 .with_retryable(true));
790 }
791 }
792 };
793 } else {
794 match self {
795 TableRequirement::NotExist => {}
796 _ => {
797 return Err(Error::new(
798 ErrorKind::TableNotFound,
799 "Requirement failed: Table does not exist",
800 ));
801 }
802 }
803 }
804
805 Ok(())
806 }
807}
808
809pub(super) mod _serde {
810 use serde::{Deserialize as _, Deserializer, Serialize as _};
811
812 use super::*;
813 use crate::spec::{SchemaId, Summary};
814
815 pub(super) fn deserialize_snapshot<'de, D>(
816 deserializer: D,
817 ) -> std::result::Result<Snapshot, D::Error>
818 where D: Deserializer<'de> {
819 let buf = CatalogSnapshot::deserialize(deserializer)?;
820 Ok(buf.into())
821 }
822
823 pub(super) fn serialize_snapshot<S>(
824 snapshot: &Snapshot,
825 serializer: S,
826 ) -> std::result::Result<S::Ok, S::Error>
827 where
828 S: serde::Serializer,
829 {
830 let buf: CatalogSnapshot = snapshot.clone().into();
831 buf.serialize(serializer)
832 }
833
834 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
835 #[serde(rename_all = "kebab-case")]
836 struct CatalogSnapshot {
840 snapshot_id: i64,
841 #[serde(skip_serializing_if = "Option::is_none")]
842 parent_snapshot_id: Option<i64>,
843 #[serde(default)]
844 sequence_number: i64,
845 timestamp_ms: i64,
846 manifest_list: String,
847 summary: Summary,
848 #[serde(skip_serializing_if = "Option::is_none")]
849 schema_id: Option<SchemaId>,
850 #[serde(skip_serializing_if = "Option::is_none")]
851 first_row_id: Option<u64>,
852 #[serde(skip_serializing_if = "Option::is_none")]
853 added_rows: Option<u64>,
854 #[serde(skip_serializing_if = "Option::is_none")]
855 key_id: Option<String>,
856 }
857
858 impl From<CatalogSnapshot> for Snapshot {
859 fn from(snapshot: CatalogSnapshot) -> Self {
860 let CatalogSnapshot {
861 snapshot_id,
862 parent_snapshot_id,
863 sequence_number,
864 timestamp_ms,
865 manifest_list,
866 schema_id,
867 summary,
868 first_row_id,
869 added_rows,
870 key_id,
871 } = snapshot;
872 let builder = Snapshot::builder()
873 .with_snapshot_id(snapshot_id)
874 .with_parent_snapshot_id(parent_snapshot_id)
875 .with_sequence_number(sequence_number)
876 .with_timestamp_ms(timestamp_ms)
877 .with_manifest_list(manifest_list)
878 .with_summary(summary)
879 .with_encryption_key_id(key_id);
880 let row_range = first_row_id.zip(added_rows);
881 match (schema_id, row_range) {
882 (None, None) => builder.build(),
883 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
884 (None, Some((first_row_id, last_row_id))) => {
885 builder.with_row_range(first_row_id, last_row_id).build()
886 }
887 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
888 .with_schema_id(schema_id)
889 .with_row_range(first_row_id, last_row_id)
890 .build(),
891 }
892 }
893 }
894
895 impl From<Snapshot> for CatalogSnapshot {
896 fn from(snapshot: Snapshot) -> Self {
897 let first_row_id = snapshot.first_row_id();
898 let added_rows = snapshot.added_rows_count();
899 let Snapshot {
900 snapshot_id,
901 parent_snapshot_id,
902 sequence_number,
903 timestamp_ms,
904 manifest_list,
905 summary,
906 schema_id,
907 row_range: _,
908 encryption_key_id: key_id,
909 } = snapshot;
910 CatalogSnapshot {
911 snapshot_id,
912 parent_snapshot_id,
913 sequence_number,
914 timestamp_ms,
915 manifest_list,
916 summary,
917 schema_id,
918 first_row_id,
919 added_rows,
920 key_id,
921 }
922 }
923 }
924}
925
926#[derive(Debug, TypedBuilder)]
928pub struct ViewCreation {
929 pub name: String,
931 pub location: String,
933 pub representations: ViewRepresentations,
935 pub schema: Schema,
937 #[builder(default)]
939 pub properties: HashMap<String, String>,
940 pub default_namespace: NamespaceIdent,
942 #[builder(default)]
944 pub default_catalog: Option<String>,
945 #[builder(default)]
948 pub summary: HashMap<String, String>,
949}
950
951#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
953#[serde(tag = "action", rename_all = "kebab-case")]
954#[allow(clippy::large_enum_variant)]
955pub enum ViewUpdate {
956 #[serde(rename_all = "kebab-case")]
958 AssignUuid {
959 uuid: Uuid,
961 },
962 #[serde(rename_all = "kebab-case")]
964 UpgradeFormatVersion {
965 format_version: ViewFormatVersion,
967 },
968 #[serde(rename_all = "kebab-case")]
970 AddSchema {
971 schema: Schema,
973 last_column_id: Option<i32>,
975 },
976 #[serde(rename_all = "kebab-case")]
978 SetLocation {
979 location: String,
981 },
982 #[serde(rename_all = "kebab-case")]
986 SetProperties {
987 updates: HashMap<String, String>,
989 },
990 #[serde(rename_all = "kebab-case")]
992 RemoveProperties {
993 removals: Vec<String>,
995 },
996 #[serde(rename_all = "kebab-case")]
998 AddViewVersion {
999 view_version: ViewVersion,
1001 },
1002 #[serde(rename_all = "kebab-case")]
1004 SetCurrentViewVersion {
1005 view_version_id: i32,
1007 },
1008}
1009
1010mod _serde_set_statistics {
1011 use serde::{Deserialize, Deserializer, Serialize, Serializer};
1014
1015 use super::*;
1016
1017 #[derive(Debug, Serialize, Deserialize)]
1018 #[serde(rename_all = "kebab-case")]
1019 struct SetStatistics {
1020 snapshot_id: Option<i64>,
1021 statistics: StatisticsFile,
1022 }
1023
1024 pub fn serialize<S>(
1025 value: &StatisticsFile,
1026 serializer: S,
1027 ) -> std::result::Result<S::Ok, S::Error>
1028 where
1029 S: Serializer,
1030 {
1031 SetStatistics {
1032 snapshot_id: Some(value.snapshot_id),
1033 statistics: value.clone(),
1034 }
1035 .serialize(serializer)
1036 }
1037
1038 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1039 where D: Deserializer<'de> {
1040 let SetStatistics {
1041 snapshot_id,
1042 statistics,
1043 } = SetStatistics::deserialize(deserializer)?;
1044 if let Some(snapshot_id) = snapshot_id
1045 && snapshot_id != statistics.snapshot_id
1046 {
1047 return Err(serde::de::Error::custom(format!(
1048 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1049 statistics.snapshot_id
1050 )));
1051 }
1052
1053 Ok(statistics)
1054 }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use std::collections::HashMap;
1060 use std::fmt::Debug;
1061 use std::fs::File;
1062 use std::io::BufReader;
1063
1064 use base64::Engine as _;
1065 use serde::Serialize;
1066 use serde::de::DeserializeOwned;
1067 use uuid::uuid;
1068
1069 use super::ViewUpdate;
1070 use crate::io::FileIO;
1071 use crate::spec::{
1072 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1073 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1074 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1075 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1076 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1077 ViewVersion,
1078 };
1079 use crate::table::Table;
1080 use crate::{
1081 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1082 };
1083
1084 #[test]
1085 fn test_parent_namespace() {
1086 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1087 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1088 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1089
1090 assert_eq!(ns1.parent(), None);
1091 assert_eq!(ns2.parent(), Some(ns1.clone()));
1092 assert_eq!(ns3.parent(), Some(ns2.clone()));
1093 }
1094
1095 #[test]
1096 fn test_create_table_id() {
1097 let table_id = TableIdent {
1098 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1099 name: "t1".to_string(),
1100 };
1101
1102 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1103 }
1104
1105 #[test]
1106 fn test_table_creation_iterator_properties() {
1107 let builder = TableCreation::builder()
1108 .name("table".to_string())
1109 .schema(Schema::builder().build().unwrap());
1110
1111 fn s(k: &str, v: &str) -> (String, String) {
1112 (k.to_string(), v.to_string())
1113 }
1114
1115 let table_creation = builder
1116 .properties([s("key", "value"), s("foo", "bar")])
1117 .build();
1118
1119 assert_eq!(
1120 HashMap::from([s("key", "value"), s("foo", "bar")]),
1121 table_creation.properties
1122 );
1123 }
1124
1125 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1126 json: impl ToString,
1127 expected: T,
1128 ) {
1129 let json_str = json.to_string();
1130 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1131 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1132
1133 let restored: T = serde_json::from_str(
1134 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1135 )
1136 .expect("Failed to parse from serialized json");
1137
1138 assert_eq!(
1139 restored, expected,
1140 "Parsed restored value is not equal to expected"
1141 );
1142 }
1143
1144 fn metadata() -> TableMetadata {
1145 let tbl_creation = TableCreation::builder()
1146 .name("table".to_string())
1147 .location("/path/to/table".to_string())
1148 .schema(Schema::builder().build().unwrap())
1149 .build();
1150
1151 TableMetadataBuilder::from_table_creation(tbl_creation)
1152 .unwrap()
1153 .assign_uuid(uuid::Uuid::nil())
1154 .build()
1155 .unwrap()
1156 .metadata
1157 }
1158
1159 #[test]
1160 fn test_check_requirement_not_exist() {
1161 let metadata = metadata();
1162 let requirement = TableRequirement::NotExist;
1163
1164 assert!(requirement.check(Some(&metadata)).is_err());
1165 assert!(requirement.check(None).is_ok());
1166 }
1167
1168 #[test]
1169 fn test_check_table_uuid() {
1170 let metadata = metadata();
1171
1172 let requirement = TableRequirement::UuidMatch {
1173 uuid: uuid::Uuid::now_v7(),
1174 };
1175 assert!(requirement.check(Some(&metadata)).is_err());
1176
1177 let requirement = TableRequirement::UuidMatch {
1178 uuid: uuid::Uuid::nil(),
1179 };
1180 assert!(requirement.check(Some(&metadata)).is_ok());
1181 }
1182
1183 #[test]
1184 fn test_check_ref_snapshot_id() {
1185 let metadata = metadata();
1186
1187 let requirement = TableRequirement::RefSnapshotIdMatch {
1189 r#ref: "my_branch".to_string(),
1190 snapshot_id: Some(1),
1191 };
1192 assert!(requirement.check(Some(&metadata)).is_err());
1193
1194 let requirement = TableRequirement::RefSnapshotIdMatch {
1196 r#ref: "my_branch".to_string(),
1197 snapshot_id: None,
1198 };
1199 assert!(requirement.check(Some(&metadata)).is_ok());
1200
1201 let snapshot = Snapshot::builder()
1203 .with_snapshot_id(3051729675574597004)
1204 .with_sequence_number(10)
1205 .with_timestamp_ms(9992191116217)
1206 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1207 .with_schema_id(0)
1208 .with_summary(Summary {
1209 operation: Operation::Append,
1210 additional_properties: HashMap::new(),
1211 })
1212 .build();
1213
1214 let builder = metadata.into_builder(None);
1215 let builder = TableUpdate::AddSnapshot {
1216 snapshot: snapshot.clone(),
1217 }
1218 .apply(builder)
1219 .unwrap();
1220 let metadata = TableUpdate::SetSnapshotRef {
1221 ref_name: MAIN_BRANCH.to_string(),
1222 reference: SnapshotReference {
1223 snapshot_id: snapshot.snapshot_id(),
1224 retention: SnapshotRetention::Branch {
1225 min_snapshots_to_keep: Some(10),
1226 max_snapshot_age_ms: None,
1227 max_ref_age_ms: None,
1228 },
1229 },
1230 }
1231 .apply(builder)
1232 .unwrap()
1233 .build()
1234 .unwrap()
1235 .metadata;
1236
1237 let requirement = TableRequirement::RefSnapshotIdMatch {
1239 r#ref: "main".to_string(),
1240 snapshot_id: Some(3051729675574597004),
1241 };
1242 assert!(requirement.check(Some(&metadata)).is_ok());
1243
1244 let requirement = TableRequirement::RefSnapshotIdMatch {
1246 r#ref: "main".to_string(),
1247 snapshot_id: Some(1),
1248 };
1249 assert!(requirement.check(Some(&metadata)).is_err());
1250 }
1251
1252 #[test]
1253 fn test_check_last_assigned_field_id() {
1254 let metadata = metadata();
1255
1256 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1257 last_assigned_field_id: 1,
1258 };
1259 assert!(requirement.check(Some(&metadata)).is_err());
1260
1261 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1262 last_assigned_field_id: 0,
1263 };
1264 assert!(requirement.check(Some(&metadata)).is_ok());
1265 }
1266
1267 #[test]
1268 fn test_check_current_schema_id() {
1269 let metadata = metadata();
1270
1271 let requirement = TableRequirement::CurrentSchemaIdMatch {
1272 current_schema_id: 1,
1273 };
1274 assert!(requirement.check(Some(&metadata)).is_err());
1275
1276 let requirement = TableRequirement::CurrentSchemaIdMatch {
1277 current_schema_id: 0,
1278 };
1279 assert!(requirement.check(Some(&metadata)).is_ok());
1280 }
1281
1282 #[test]
1283 fn test_check_last_assigned_partition_id() {
1284 let metadata = metadata();
1285 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1286 last_assigned_partition_id: 0,
1287 };
1288 assert!(requirement.check(Some(&metadata)).is_err());
1289
1290 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1291 last_assigned_partition_id: 999,
1292 };
1293 assert!(requirement.check(Some(&metadata)).is_ok());
1294 }
1295
1296 #[test]
1297 fn test_check_default_spec_id() {
1298 let metadata = metadata();
1299
1300 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1301 assert!(requirement.check(Some(&metadata)).is_err());
1302
1303 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1304 assert!(requirement.check(Some(&metadata)).is_ok());
1305 }
1306
1307 #[test]
1308 fn test_check_default_sort_order_id() {
1309 let metadata = metadata();
1310
1311 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1312 default_sort_order_id: 1,
1313 };
1314 assert!(requirement.check(Some(&metadata)).is_err());
1315
1316 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1317 default_sort_order_id: 0,
1318 };
1319 assert!(requirement.check(Some(&metadata)).is_ok());
1320 }
1321
1322 #[test]
1323 fn test_table_uuid() {
1324 test_serde_json(
1325 r#"
1326{
1327 "type": "assert-table-uuid",
1328 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1329}
1330 "#,
1331 TableRequirement::UuidMatch {
1332 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1333 },
1334 );
1335 }
1336
1337 #[test]
1338 fn test_assert_table_not_exists() {
1339 test_serde_json(
1340 r#"
1341{
1342 "type": "assert-create"
1343}
1344 "#,
1345 TableRequirement::NotExist,
1346 );
1347 }
1348
1349 #[test]
1350 fn test_assert_ref_snapshot_id() {
1351 test_serde_json(
1352 r#"
1353{
1354 "type": "assert-ref-snapshot-id",
1355 "ref": "snapshot-name",
1356 "snapshot-id": null
1357}
1358 "#,
1359 TableRequirement::RefSnapshotIdMatch {
1360 r#ref: "snapshot-name".to_string(),
1361 snapshot_id: None,
1362 },
1363 );
1364
1365 test_serde_json(
1366 r#"
1367{
1368 "type": "assert-ref-snapshot-id",
1369 "ref": "snapshot-name",
1370 "snapshot-id": 1
1371}
1372 "#,
1373 TableRequirement::RefSnapshotIdMatch {
1374 r#ref: "snapshot-name".to_string(),
1375 snapshot_id: Some(1),
1376 },
1377 );
1378 }
1379
1380 #[test]
1381 fn test_assert_last_assigned_field_id() {
1382 test_serde_json(
1383 r#"
1384{
1385 "type": "assert-last-assigned-field-id",
1386 "last-assigned-field-id": 12
1387}
1388 "#,
1389 TableRequirement::LastAssignedFieldIdMatch {
1390 last_assigned_field_id: 12,
1391 },
1392 );
1393 }
1394
1395 #[test]
1396 fn test_assert_current_schema_id() {
1397 test_serde_json(
1398 r#"
1399{
1400 "type": "assert-current-schema-id",
1401 "current-schema-id": 4
1402}
1403 "#,
1404 TableRequirement::CurrentSchemaIdMatch {
1405 current_schema_id: 4,
1406 },
1407 );
1408 }
1409
1410 #[test]
1411 fn test_assert_last_assigned_partition_id() {
1412 test_serde_json(
1413 r#"
1414{
1415 "type": "assert-last-assigned-partition-id",
1416 "last-assigned-partition-id": 1004
1417}
1418 "#,
1419 TableRequirement::LastAssignedPartitionIdMatch {
1420 last_assigned_partition_id: 1004,
1421 },
1422 );
1423 }
1424
1425 #[test]
1426 fn test_assert_default_spec_id() {
1427 test_serde_json(
1428 r#"
1429{
1430 "type": "assert-default-spec-id",
1431 "default-spec-id": 5
1432}
1433 "#,
1434 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1435 );
1436 }
1437
1438 #[test]
1439 fn test_assert_default_sort_order() {
1440 let json = r#"
1441{
1442 "type": "assert-default-sort-order-id",
1443 "default-sort-order-id": 10
1444}
1445 "#;
1446
1447 let update = TableRequirement::DefaultSortOrderIdMatch {
1448 default_sort_order_id: 10,
1449 };
1450
1451 test_serde_json(json, update);
1452 }
1453
1454 #[test]
1455 fn test_parse_assert_invalid() {
1456 assert!(
1457 serde_json::from_str::<TableRequirement>(
1458 r#"
1459{
1460 "default-sort-order-id": 10
1461}
1462"#
1463 )
1464 .is_err(),
1465 "Table requirements should not be parsed without type."
1466 );
1467 }
1468
1469 #[test]
1470 fn test_assign_uuid() {
1471 test_serde_json(
1472 r#"
1473{
1474 "action": "assign-uuid",
1475 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1476}
1477 "#,
1478 TableUpdate::AssignUuid {
1479 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1480 },
1481 );
1482 }
1483
1484 #[test]
1485 fn test_upgrade_format_version() {
1486 test_serde_json(
1487 r#"
1488{
1489 "action": "upgrade-format-version",
1490 "format-version": 2
1491}
1492 "#,
1493 TableUpdate::UpgradeFormatVersion {
1494 format_version: FormatVersion::V2,
1495 },
1496 );
1497 }
1498
1499 #[test]
1500 fn test_add_schema() {
1501 let test_schema = Schema::builder()
1502 .with_schema_id(1)
1503 .with_identifier_field_ids(vec![2])
1504 .with_fields(vec![
1505 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1506 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1507 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1508 ])
1509 .build()
1510 .unwrap();
1511 test_serde_json(
1512 r#"
1513{
1514 "action": "add-schema",
1515 "schema": {
1516 "type": "struct",
1517 "schema-id": 1,
1518 "fields": [
1519 {
1520 "id": 1,
1521 "name": "foo",
1522 "required": false,
1523 "type": "string"
1524 },
1525 {
1526 "id": 2,
1527 "name": "bar",
1528 "required": true,
1529 "type": "int"
1530 },
1531 {
1532 "id": 3,
1533 "name": "baz",
1534 "required": false,
1535 "type": "boolean"
1536 }
1537 ],
1538 "identifier-field-ids": [
1539 2
1540 ]
1541 },
1542 "last-column-id": 3
1543}
1544 "#,
1545 TableUpdate::AddSchema {
1546 schema: test_schema.clone(),
1547 },
1548 );
1549
1550 test_serde_json(
1551 r#"
1552{
1553 "action": "add-schema",
1554 "schema": {
1555 "type": "struct",
1556 "schema-id": 1,
1557 "fields": [
1558 {
1559 "id": 1,
1560 "name": "foo",
1561 "required": false,
1562 "type": "string"
1563 },
1564 {
1565 "id": 2,
1566 "name": "bar",
1567 "required": true,
1568 "type": "int"
1569 },
1570 {
1571 "id": 3,
1572 "name": "baz",
1573 "required": false,
1574 "type": "boolean"
1575 }
1576 ],
1577 "identifier-field-ids": [
1578 2
1579 ]
1580 }
1581}
1582 "#,
1583 TableUpdate::AddSchema {
1584 schema: test_schema.clone(),
1585 },
1586 );
1587 }
1588
1589 #[test]
1590 fn test_set_current_schema() {
1591 test_serde_json(
1592 r#"
1593{
1594 "action": "set-current-schema",
1595 "schema-id": 23
1596}
1597 "#,
1598 TableUpdate::SetCurrentSchema { schema_id: 23 },
1599 );
1600 }
1601
1602 #[test]
1603 fn test_add_spec() {
1604 test_serde_json(
1605 r#"
1606{
1607 "action": "add-spec",
1608 "spec": {
1609 "fields": [
1610 {
1611 "source-id": 4,
1612 "name": "ts_day",
1613 "transform": "day"
1614 },
1615 {
1616 "source-id": 1,
1617 "name": "id_bucket",
1618 "transform": "bucket[16]"
1619 },
1620 {
1621 "source-id": 2,
1622 "name": "id_truncate",
1623 "transform": "truncate[4]"
1624 }
1625 ]
1626 }
1627}
1628 "#,
1629 TableUpdate::AddSpec {
1630 spec: UnboundPartitionSpec::builder()
1631 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1632 .unwrap()
1633 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1634 .unwrap()
1635 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1636 .unwrap()
1637 .build(),
1638 },
1639 );
1640 }
1641
1642 #[test]
1643 fn test_set_default_spec() {
1644 test_serde_json(
1645 r#"
1646{
1647 "action": "set-default-spec",
1648 "spec-id": 1
1649}
1650 "#,
1651 TableUpdate::SetDefaultSpec { spec_id: 1 },
1652 )
1653 }
1654
1655 #[test]
1656 fn test_add_sort_order() {
1657 let json = r#"
1658{
1659 "action": "add-sort-order",
1660 "sort-order": {
1661 "order-id": 1,
1662 "fields": [
1663 {
1664 "transform": "identity",
1665 "source-id": 2,
1666 "direction": "asc",
1667 "null-order": "nulls-first"
1668 },
1669 {
1670 "transform": "bucket[4]",
1671 "source-id": 3,
1672 "direction": "desc",
1673 "null-order": "nulls-last"
1674 }
1675 ]
1676 }
1677}
1678 "#;
1679
1680 let update = TableUpdate::AddSortOrder {
1681 sort_order: SortOrder::builder()
1682 .with_order_id(1)
1683 .with_sort_field(
1684 SortField::builder()
1685 .source_id(2)
1686 .direction(SortDirection::Ascending)
1687 .null_order(NullOrder::First)
1688 .transform(Transform::Identity)
1689 .build(),
1690 )
1691 .with_sort_field(
1692 SortField::builder()
1693 .source_id(3)
1694 .direction(SortDirection::Descending)
1695 .null_order(NullOrder::Last)
1696 .transform(Transform::Bucket(4))
1697 .build(),
1698 )
1699 .build_unbound()
1700 .unwrap(),
1701 };
1702
1703 test_serde_json(json, update);
1704 }
1705
1706 #[test]
1707 fn test_set_default_order() {
1708 let json = r#"
1709{
1710 "action": "set-default-sort-order",
1711 "sort-order-id": 2
1712}
1713 "#;
1714 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1715
1716 test_serde_json(json, update);
1717 }
1718
1719 #[test]
1720 fn test_add_snapshot() {
1721 let json = r#"
1722{
1723 "action": "add-snapshot",
1724 "snapshot": {
1725 "snapshot-id": 3055729675574597000,
1726 "parent-snapshot-id": 3051729675574597000,
1727 "timestamp-ms": 1555100955770,
1728 "sequence-number": 1,
1729 "summary": {
1730 "operation": "append"
1731 },
1732 "manifest-list": "s3://a/b/2.avro",
1733 "schema-id": 1
1734 }
1735}
1736 "#;
1737
1738 let update = TableUpdate::AddSnapshot {
1739 snapshot: Snapshot::builder()
1740 .with_snapshot_id(3055729675574597000)
1741 .with_parent_snapshot_id(Some(3051729675574597000))
1742 .with_timestamp_ms(1555100955770)
1743 .with_sequence_number(1)
1744 .with_manifest_list("s3://a/b/2.avro")
1745 .with_schema_id(1)
1746 .with_summary(Summary {
1747 operation: Operation::Append,
1748 additional_properties: HashMap::default(),
1749 })
1750 .build(),
1751 };
1752
1753 test_serde_json(json, update);
1754 }
1755
1756 #[test]
1757 fn test_add_snapshot_v1() {
1758 let json = r#"
1759{
1760 "action": "add-snapshot",
1761 "snapshot": {
1762 "snapshot-id": 3055729675574597000,
1763 "parent-snapshot-id": 3051729675574597000,
1764 "timestamp-ms": 1555100955770,
1765 "summary": {
1766 "operation": "append"
1767 },
1768 "manifest-list": "s3://a/b/2.avro"
1769 }
1770}
1771 "#;
1772
1773 let update = TableUpdate::AddSnapshot {
1774 snapshot: Snapshot::builder()
1775 .with_snapshot_id(3055729675574597000)
1776 .with_parent_snapshot_id(Some(3051729675574597000))
1777 .with_timestamp_ms(1555100955770)
1778 .with_sequence_number(0)
1779 .with_manifest_list("s3://a/b/2.avro")
1780 .with_summary(Summary {
1781 operation: Operation::Append,
1782 additional_properties: HashMap::default(),
1783 })
1784 .build(),
1785 };
1786
1787 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1788 assert_eq!(actual, update, "Parsed value is not equal to expected");
1789 }
1790
1791 #[test]
1792 fn test_add_snapshot_v3() {
1793 let json = serde_json::json!(
1794 {
1795 "action": "add-snapshot",
1796 "snapshot": {
1797 "snapshot-id": 3055729675574597000i64,
1798 "parent-snapshot-id": 3051729675574597000i64,
1799 "timestamp-ms": 1555100955770i64,
1800 "first-row-id":0,
1801 "added-rows":2,
1802 "key-id":"key123",
1803 "summary": {
1804 "operation": "append"
1805 },
1806 "manifest-list": "s3://a/b/2.avro"
1807 }
1808 });
1809
1810 let update = TableUpdate::AddSnapshot {
1811 snapshot: Snapshot::builder()
1812 .with_snapshot_id(3055729675574597000)
1813 .with_parent_snapshot_id(Some(3051729675574597000))
1814 .with_timestamp_ms(1555100955770)
1815 .with_sequence_number(0)
1816 .with_manifest_list("s3://a/b/2.avro")
1817 .with_row_range(0, 2)
1818 .with_encryption_key_id(Some("key123".to_string()))
1819 .with_summary(Summary {
1820 operation: Operation::Append,
1821 additional_properties: HashMap::default(),
1822 })
1823 .build(),
1824 };
1825
1826 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1827 assert_eq!(actual, update, "Parsed value is not equal to expected");
1828 let restored: TableUpdate = serde_json::from_str(
1829 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1830 )
1831 .expect("Failed to parse from serialized json");
1832 assert_eq!(restored, update);
1833 }
1834
1835 #[test]
1836 fn test_remove_snapshots() {
1837 let json = r#"
1838{
1839 "action": "remove-snapshots",
1840 "snapshot-ids": [
1841 1,
1842 2
1843 ]
1844}
1845 "#;
1846
1847 let update = TableUpdate::RemoveSnapshots {
1848 snapshot_ids: vec![1, 2],
1849 };
1850 test_serde_json(json, update);
1851 }
1852
1853 #[test]
1854 fn test_remove_snapshot_ref() {
1855 let json = r#"
1856{
1857 "action": "remove-snapshot-ref",
1858 "ref-name": "snapshot-ref"
1859}
1860 "#;
1861
1862 let update = TableUpdate::RemoveSnapshotRef {
1863 ref_name: "snapshot-ref".to_string(),
1864 };
1865 test_serde_json(json, update);
1866 }
1867
1868 #[test]
1869 fn test_set_snapshot_ref_tag() {
1870 let json = r#"
1871{
1872 "action": "set-snapshot-ref",
1873 "type": "tag",
1874 "ref-name": "hank",
1875 "snapshot-id": 1,
1876 "max-ref-age-ms": 1
1877}
1878 "#;
1879
1880 let update = TableUpdate::SetSnapshotRef {
1881 ref_name: "hank".to_string(),
1882 reference: SnapshotReference {
1883 snapshot_id: 1,
1884 retention: SnapshotRetention::Tag {
1885 max_ref_age_ms: Some(1),
1886 },
1887 },
1888 };
1889
1890 test_serde_json(json, update);
1891 }
1892
1893 #[test]
1894 fn test_set_snapshot_ref_branch() {
1895 let json = r#"
1896{
1897 "action": "set-snapshot-ref",
1898 "type": "branch",
1899 "ref-name": "hank",
1900 "snapshot-id": 1,
1901 "min-snapshots-to-keep": 2,
1902 "max-snapshot-age-ms": 3,
1903 "max-ref-age-ms": 4
1904}
1905 "#;
1906
1907 let update = TableUpdate::SetSnapshotRef {
1908 ref_name: "hank".to_string(),
1909 reference: SnapshotReference {
1910 snapshot_id: 1,
1911 retention: SnapshotRetention::Branch {
1912 min_snapshots_to_keep: Some(2),
1913 max_snapshot_age_ms: Some(3),
1914 max_ref_age_ms: Some(4),
1915 },
1916 },
1917 };
1918
1919 test_serde_json(json, update);
1920 }
1921
1922 #[test]
1923 fn test_set_properties() {
1924 let json = r#"
1925{
1926 "action": "set-properties",
1927 "updates": {
1928 "prop1": "v1",
1929 "prop2": "v2"
1930 }
1931}
1932 "#;
1933
1934 let update = TableUpdate::SetProperties {
1935 updates: vec![
1936 ("prop1".to_string(), "v1".to_string()),
1937 ("prop2".to_string(), "v2".to_string()),
1938 ]
1939 .into_iter()
1940 .collect(),
1941 };
1942
1943 test_serde_json(json, update);
1944 }
1945
1946 #[test]
1947 fn test_remove_properties() {
1948 let json = r#"
1949{
1950 "action": "remove-properties",
1951 "removals": [
1952 "prop1",
1953 "prop2"
1954 ]
1955}
1956 "#;
1957
1958 let update = TableUpdate::RemoveProperties {
1959 removals: vec!["prop1".to_string(), "prop2".to_string()],
1960 };
1961
1962 test_serde_json(json, update);
1963 }
1964
1965 #[test]
1966 fn test_set_location() {
1967 let json = r#"
1968{
1969 "action": "set-location",
1970 "location": "s3://bucket/warehouse/tbl_location"
1971}
1972 "#;
1973
1974 let update = TableUpdate::SetLocation {
1975 location: "s3://bucket/warehouse/tbl_location".to_string(),
1976 };
1977
1978 test_serde_json(json, update);
1979 }
1980
1981 #[test]
1982 fn test_table_update_apply() {
1983 let table_creation = TableCreation::builder()
1984 .location("s3://db/table".to_string())
1985 .name("table".to_string())
1986 .properties(HashMap::new())
1987 .schema(Schema::builder().build().unwrap())
1988 .build();
1989 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1990 .unwrap()
1991 .build()
1992 .unwrap()
1993 .metadata;
1994 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1995 table_metadata,
1996 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1997 );
1998
1999 let uuid = uuid::Uuid::new_v4();
2000 let update = TableUpdate::AssignUuid { uuid };
2001 let updated_metadata = update
2002 .apply(table_metadata_builder)
2003 .unwrap()
2004 .build()
2005 .unwrap()
2006 .metadata;
2007 assert_eq!(updated_metadata.uuid(), uuid);
2008 }
2009
2010 #[test]
2011 fn test_view_assign_uuid() {
2012 test_serde_json(
2013 r#"
2014{
2015 "action": "assign-uuid",
2016 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2017}
2018 "#,
2019 ViewUpdate::AssignUuid {
2020 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2021 },
2022 );
2023 }
2024
2025 #[test]
2026 fn test_view_upgrade_format_version() {
2027 test_serde_json(
2028 r#"
2029{
2030 "action": "upgrade-format-version",
2031 "format-version": 1
2032}
2033 "#,
2034 ViewUpdate::UpgradeFormatVersion {
2035 format_version: ViewFormatVersion::V1,
2036 },
2037 );
2038 }
2039
2040 #[test]
2041 fn test_view_add_schema() {
2042 let test_schema = Schema::builder()
2043 .with_schema_id(1)
2044 .with_identifier_field_ids(vec![2])
2045 .with_fields(vec![
2046 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2047 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2048 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2049 ])
2050 .build()
2051 .unwrap();
2052 test_serde_json(
2053 r#"
2054{
2055 "action": "add-schema",
2056 "schema": {
2057 "type": "struct",
2058 "schema-id": 1,
2059 "fields": [
2060 {
2061 "id": 1,
2062 "name": "foo",
2063 "required": false,
2064 "type": "string"
2065 },
2066 {
2067 "id": 2,
2068 "name": "bar",
2069 "required": true,
2070 "type": "int"
2071 },
2072 {
2073 "id": 3,
2074 "name": "baz",
2075 "required": false,
2076 "type": "boolean"
2077 }
2078 ],
2079 "identifier-field-ids": [
2080 2
2081 ]
2082 },
2083 "last-column-id": 3
2084}
2085 "#,
2086 ViewUpdate::AddSchema {
2087 schema: test_schema.clone(),
2088 last_column_id: Some(3),
2089 },
2090 );
2091 }
2092
2093 #[test]
2094 fn test_view_set_location() {
2095 test_serde_json(
2096 r#"
2097{
2098 "action": "set-location",
2099 "location": "s3://db/view"
2100}
2101 "#,
2102 ViewUpdate::SetLocation {
2103 location: "s3://db/view".to_string(),
2104 },
2105 );
2106 }
2107
2108 #[test]
2109 fn test_view_set_properties() {
2110 test_serde_json(
2111 r#"
2112{
2113 "action": "set-properties",
2114 "updates": {
2115 "prop1": "v1",
2116 "prop2": "v2"
2117 }
2118}
2119 "#,
2120 ViewUpdate::SetProperties {
2121 updates: vec![
2122 ("prop1".to_string(), "v1".to_string()),
2123 ("prop2".to_string(), "v2".to_string()),
2124 ]
2125 .into_iter()
2126 .collect(),
2127 },
2128 );
2129 }
2130
2131 #[test]
2132 fn test_view_remove_properties() {
2133 test_serde_json(
2134 r#"
2135{
2136 "action": "remove-properties",
2137 "removals": [
2138 "prop1",
2139 "prop2"
2140 ]
2141}
2142 "#,
2143 ViewUpdate::RemoveProperties {
2144 removals: vec!["prop1".to_string(), "prop2".to_string()],
2145 },
2146 );
2147 }
2148
2149 #[test]
2150 fn test_view_add_view_version() {
2151 test_serde_json(
2152 r#"
2153{
2154 "action": "add-view-version",
2155 "view-version": {
2156 "version-id" : 1,
2157 "timestamp-ms" : 1573518431292,
2158 "schema-id" : 1,
2159 "default-catalog" : "prod",
2160 "default-namespace" : [ "default" ],
2161 "summary" : {
2162 "engine-name" : "Spark"
2163 },
2164 "representations" : [ {
2165 "type" : "sql",
2166 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2167 "dialect" : "spark"
2168 } ]
2169 }
2170}
2171 "#,
2172 ViewUpdate::AddViewVersion {
2173 view_version: ViewVersion::builder()
2174 .with_version_id(1)
2175 .with_timestamp_ms(1573518431292)
2176 .with_schema_id(1)
2177 .with_default_catalog(Some("prod".to_string()))
2178 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2179 .with_summary(
2180 vec![("engine-name".to_string(), "Spark".to_string())]
2181 .into_iter()
2182 .collect(),
2183 )
2184 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2185 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2186 dialect: "spark".to_string(),
2187 })]))
2188 .build(),
2189 },
2190 );
2191 }
2192
2193 #[test]
2194 fn test_view_set_current_view_version() {
2195 test_serde_json(
2196 r#"
2197{
2198 "action": "set-current-view-version",
2199 "view-version-id": 1
2200}
2201 "#,
2202 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2203 );
2204 }
2205
2206 #[test]
2207 fn test_remove_partition_specs_update() {
2208 test_serde_json(
2209 r#"
2210{
2211 "action": "remove-partition-specs",
2212 "spec-ids": [1, 2]
2213}
2214 "#,
2215 TableUpdate::RemovePartitionSpecs {
2216 spec_ids: vec![1, 2],
2217 },
2218 );
2219 }
2220
2221 #[test]
2222 fn test_set_statistics_file() {
2223 test_serde_json(
2224 r#"
2225 {
2226 "action": "set-statistics",
2227 "snapshot-id": 1940541653261589030,
2228 "statistics": {
2229 "snapshot-id": 1940541653261589030,
2230 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2231 "file-size-in-bytes": 124,
2232 "file-footer-size-in-bytes": 27,
2233 "blob-metadata": [
2234 {
2235 "type": "boring-type",
2236 "snapshot-id": 1940541653261589030,
2237 "sequence-number": 2,
2238 "fields": [
2239 1
2240 ],
2241 "properties": {
2242 "prop-key": "prop-value"
2243 }
2244 }
2245 ]
2246 }
2247 }
2248 "#,
2249 TableUpdate::SetStatistics {
2250 statistics: StatisticsFile {
2251 snapshot_id: 1940541653261589030,
2252 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2253 file_size_in_bytes: 124,
2254 file_footer_size_in_bytes: 27,
2255 key_metadata: None,
2256 blob_metadata: vec![BlobMetadata {
2257 r#type: "boring-type".to_string(),
2258 snapshot_id: 1940541653261589030,
2259 sequence_number: 2,
2260 fields: vec![1],
2261 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2262 .into_iter()
2263 .collect(),
2264 }],
2265 },
2266 },
2267 );
2268 }
2269
2270 #[test]
2271 fn test_remove_statistics_file() {
2272 test_serde_json(
2273 r#"
2274 {
2275 "action": "remove-statistics",
2276 "snapshot-id": 1940541653261589030
2277 }
2278 "#,
2279 TableUpdate::RemoveStatistics {
2280 snapshot_id: 1940541653261589030,
2281 },
2282 );
2283 }
2284
2285 #[test]
2286 fn test_set_partition_statistics_file() {
2287 test_serde_json(
2288 r#"
2289 {
2290 "action": "set-partition-statistics",
2291 "partition-statistics": {
2292 "snapshot-id": 1940541653261589030,
2293 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2294 "file-size-in-bytes": 43
2295 }
2296 }
2297 "#,
2298 TableUpdate::SetPartitionStatistics {
2299 partition_statistics: PartitionStatisticsFile {
2300 snapshot_id: 1940541653261589030,
2301 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2302 file_size_in_bytes: 43,
2303 },
2304 },
2305 )
2306 }
2307
2308 #[test]
2309 fn test_remove_partition_statistics_file() {
2310 test_serde_json(
2311 r#"
2312 {
2313 "action": "remove-partition-statistics",
2314 "snapshot-id": 1940541653261589030
2315 }
2316 "#,
2317 TableUpdate::RemovePartitionStatistics {
2318 snapshot_id: 1940541653261589030,
2319 },
2320 )
2321 }
2322
2323 #[test]
2324 fn test_remove_schema_update() {
2325 test_serde_json(
2326 r#"
2327 {
2328 "action": "remove-schemas",
2329 "schema-ids": [1, 2]
2330 }
2331 "#,
2332 TableUpdate::RemoveSchemas {
2333 schema_ids: vec![1, 2],
2334 },
2335 );
2336 }
2337
2338 #[test]
2339 fn test_add_encryption_key() {
2340 let key_bytes = "key".as_bytes();
2341 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2342 test_serde_json(
2343 format!(
2344 r#"
2345 {{
2346 "action": "add-encryption-key",
2347 "encryption-key": {{
2348 "key-id": "a",
2349 "encrypted-key-metadata": "{encoded_key}",
2350 "encrypted-by-id": "b"
2351 }}
2352 }}
2353 "#
2354 ),
2355 TableUpdate::AddEncryptionKey {
2356 encryption_key: EncryptedKey::builder()
2357 .key_id("a")
2358 .encrypted_key_metadata(key_bytes.to_vec())
2359 .encrypted_by_id("b")
2360 .build(),
2361 },
2362 );
2363 }
2364
2365 #[test]
2366 fn test_remove_encryption_key() {
2367 test_serde_json(
2368 r#"
2369 {
2370 "action": "remove-encryption-key",
2371 "key-id": "a"
2372 }
2373 "#,
2374 TableUpdate::RemoveEncryptionKey {
2375 key_id: "a".to_string(),
2376 },
2377 );
2378 }
2379
2380 #[test]
2381 fn test_table_commit() {
2382 let table = {
2383 let file = File::open(format!(
2384 "{}/testdata/table_metadata/{}",
2385 env!("CARGO_MANIFEST_DIR"),
2386 "TableMetadataV2Valid.json"
2387 ))
2388 .unwrap();
2389 let reader = BufReader::new(file);
2390 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2391
2392 Table::builder()
2393 .metadata(resp)
2394 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2395 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2396 .file_io(FileIO::new_with_memory())
2397 .build()
2398 .unwrap()
2399 };
2400
2401 let updates = vec![
2402 TableUpdate::SetLocation {
2403 location: "s3://bucket/test/new_location/data".to_string(),
2404 },
2405 TableUpdate::SetProperties {
2406 updates: vec![
2407 ("prop1".to_string(), "v1".to_string()),
2408 ("prop2".to_string(), "v2".to_string()),
2409 ]
2410 .into_iter()
2411 .collect(),
2412 },
2413 ];
2414
2415 let requirements = vec![TableRequirement::UuidMatch {
2416 uuid: table.metadata().table_uuid,
2417 }];
2418
2419 let table_commit = TableCommit::builder()
2420 .ident(table.identifier().to_owned())
2421 .updates(updates)
2422 .requirements(requirements)
2423 .build();
2424
2425 let updated_table = table_commit.apply(table).unwrap();
2426
2427 assert_eq!(
2428 updated_table.metadata().properties.get("prop1").unwrap(),
2429 "v1"
2430 );
2431 assert_eq!(
2432 updated_table.metadata().properties.get("prop2").unwrap(),
2433 "v2"
2434 );
2435
2436 assert!(
2438 updated_table
2439 .metadata_location()
2440 .unwrap()
2441 .starts_with("s3://bucket/test/location/metadata/00001-")
2442 );
2443
2444 assert_eq!(
2445 updated_table.metadata().location,
2446 "s3://bucket/test/new_location/data",
2447 );
2448 }
2449}