1pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::spec::{
42 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
43 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
44 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
45};
46use crate::table::Table;
47use crate::{Error, ErrorKind, Result};
48
49#[async_trait]
51#[cfg_attr(test, automock)]
52pub trait Catalog: Debug + Sync + Send {
53 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
55 -> Result<Vec<NamespaceIdent>>;
56
57 async fn create_namespace(
59 &self,
60 namespace: &NamespaceIdent,
61 properties: HashMap<String, String>,
62 ) -> Result<Namespace>;
63
64 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
66
67 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
69
70 async fn update_namespace(
76 &self,
77 namespace: &NamespaceIdent,
78 properties: HashMap<String, String>,
79 ) -> Result<()>;
80
81 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
83
84 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
86
87 async fn create_table(
89 &self,
90 namespace: &NamespaceIdent,
91 creation: TableCreation,
92 ) -> Result<Table>;
93
94 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
96
97 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
99
100 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
102
103 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
105
106 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
108
109 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
111}
112
113pub trait CatalogBuilder: Default + Debug + Send + Sync {
115 type C: Catalog;
117 fn load(
119 self,
120 name: impl Into<String>,
121 props: HashMap<String, String>,
122 ) -> impl Future<Output = Result<Self::C>> + Send;
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
131pub struct NamespaceIdent(Vec<String>);
132
133impl NamespaceIdent {
134 pub fn new(name: String) -> Self {
136 Self(vec![name])
137 }
138
139 pub fn from_vec(names: Vec<String>) -> Result<Self> {
141 if names.is_empty() {
142 return Err(Error::new(
143 ErrorKind::DataInvalid,
144 "Namespace identifier can't be empty!",
145 ));
146 }
147 Ok(Self(names))
148 }
149
150 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
152 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
153 }
154
155 pub fn to_url_string(&self) -> String {
157 self.as_ref().join("\u{001f}")
158 }
159
160 pub fn inner(self) -> Vec<String> {
162 self.0
163 }
164
165 pub fn parent(&self) -> Option<Self> {
168 self.0.split_last().and_then(|(_, parent)| {
169 if parent.is_empty() {
170 None
171 } else {
172 Some(Self(parent.to_vec()))
173 }
174 })
175 }
176}
177
178impl AsRef<Vec<String>> for NamespaceIdent {
179 fn as_ref(&self) -> &Vec<String> {
180 &self.0
181 }
182}
183
184impl Deref for NamespaceIdent {
185 type Target = [String];
186
187 fn deref(&self) -> &Self::Target {
188 &self.0
189 }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct Namespace {
195 name: NamespaceIdent,
196 properties: HashMap<String, String>,
197}
198
199impl Namespace {
200 pub fn new(name: NamespaceIdent) -> Self {
202 Self::with_properties(name, HashMap::default())
203 }
204
205 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
207 Self { name, properties }
208 }
209
210 pub fn name(&self) -> &NamespaceIdent {
212 &self.name
213 }
214
215 pub fn properties(&self) -> &HashMap<String, String> {
217 &self.properties
218 }
219}
220
221impl Display for NamespaceIdent {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 write!(f, "{}", self.0.join("."))
224 }
225}
226
227#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub struct TableIdent {
230 pub namespace: NamespaceIdent,
232 pub name: String,
234}
235
236impl TableIdent {
237 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
239 Self { namespace, name }
240 }
241
242 pub fn namespace(&self) -> &NamespaceIdent {
244 &self.namespace
245 }
246
247 pub fn name(&self) -> &str {
249 &self.name
250 }
251
252 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
254 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
255 let table_name = vec.pop().ok_or_else(|| {
256 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
257 })?;
258 let namespace_ident = NamespaceIdent::from_vec(vec)?;
259
260 Ok(Self {
261 namespace: namespace_ident,
262 name: table_name,
263 })
264 }
265}
266
267impl Display for TableIdent {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 write!(f, "{}.{}", self.namespace, self.name)
270 }
271}
272
273#[derive(Debug, TypedBuilder)]
275pub struct TableCreation {
276 pub name: String,
278 #[builder(default, setter(strip_option(fallback = location_opt)))]
280 pub location: Option<String>,
281 pub schema: Schema,
283 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
285 pub partition_spec: Option<UnboundPartitionSpec>,
286 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
288 pub sort_order: Option<SortOrder>,
289 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
291 props.into_iter().collect()
292 }))]
293 pub properties: HashMap<String, String>,
294 #[builder(default = FormatVersion::V2)]
296 pub format_version: FormatVersion,
297}
298
299#[derive(Debug, TypedBuilder)]
305#[builder(build_method(vis = "pub(crate)"))]
306pub struct TableCommit {
307 ident: TableIdent,
309 requirements: Vec<TableRequirement>,
313 updates: Vec<TableUpdate>,
315}
316
317impl TableCommit {
318 pub fn identifier(&self) -> &TableIdent {
320 &self.ident
321 }
322
323 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
325 take(&mut self.requirements)
326 }
327
328 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
330 take(&mut self.updates)
331 }
332
333 pub fn apply(self, table: Table) -> Result<Table> {
339 for requirement in self.requirements {
341 requirement.check(Some(table.metadata()))?;
342 }
343
344 let current_metadata_location = table.metadata_location_result()?;
346
347 let mut metadata_builder = table
349 .metadata()
350 .clone()
351 .into_builder(Some(current_metadata_location.to_string()));
352 for update in self.updates {
353 metadata_builder = update.apply(metadata_builder)?;
354 }
355
356 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
358 .with_next_version()
359 .to_string();
360
361 Ok(table
362 .with_metadata(Arc::new(metadata_builder.build()?.metadata))
363 .with_metadata_location(new_metadata_location))
364 }
365}
366
367#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
369#[serde(tag = "type")]
370pub enum TableRequirement {
371 #[serde(rename = "assert-create")]
373 NotExist,
374 #[serde(rename = "assert-table-uuid")]
376 UuidMatch {
377 uuid: Uuid,
379 },
380 #[serde(rename = "assert-ref-snapshot-id")]
383 RefSnapshotIdMatch {
384 r#ref: String,
386 #[serde(rename = "snapshot-id")]
389 snapshot_id: Option<i64>,
390 },
391 #[serde(rename = "assert-last-assigned-field-id")]
393 LastAssignedFieldIdMatch {
394 #[serde(rename = "last-assigned-field-id")]
396 last_assigned_field_id: i32,
397 },
398 #[serde(rename = "assert-current-schema-id")]
400 CurrentSchemaIdMatch {
401 #[serde(rename = "current-schema-id")]
403 current_schema_id: SchemaId,
404 },
405 #[serde(rename = "assert-last-assigned-partition-id")]
408 LastAssignedPartitionIdMatch {
409 #[serde(rename = "last-assigned-partition-id")]
411 last_assigned_partition_id: i32,
412 },
413 #[serde(rename = "assert-default-spec-id")]
415 DefaultSpecIdMatch {
416 #[serde(rename = "default-spec-id")]
418 default_spec_id: i32,
419 },
420 #[serde(rename = "assert-default-sort-order-id")]
422 DefaultSortOrderIdMatch {
423 #[serde(rename = "default-sort-order-id")]
425 default_sort_order_id: i64,
426 },
427}
428
429#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
431#[serde(tag = "action", rename_all = "kebab-case")]
432#[allow(clippy::large_enum_variant)]
433pub enum TableUpdate {
434 #[serde(rename_all = "kebab-case")]
436 UpgradeFormatVersion {
437 format_version: FormatVersion,
439 },
440 #[serde(rename_all = "kebab-case")]
442 AssignUuid {
443 uuid: Uuid,
445 },
446 #[serde(rename_all = "kebab-case")]
448 AddSchema {
449 schema: Schema,
451 },
452 #[serde(rename_all = "kebab-case")]
454 SetCurrentSchema {
455 schema_id: i32,
457 },
458 AddSpec {
460 spec: UnboundPartitionSpec,
462 },
463 #[serde(rename_all = "kebab-case")]
465 SetDefaultSpec {
466 spec_id: i32,
468 },
469 #[serde(rename_all = "kebab-case")]
471 AddSortOrder {
472 sort_order: SortOrder,
474 },
475 #[serde(rename_all = "kebab-case")]
477 SetDefaultSortOrder {
478 sort_order_id: i64,
480 },
481 #[serde(rename_all = "kebab-case")]
483 AddSnapshot {
484 #[serde(
486 deserialize_with = "deserialize_snapshot",
487 serialize_with = "serialize_snapshot"
488 )]
489 snapshot: Snapshot,
490 },
491 #[serde(rename_all = "kebab-case")]
493 SetSnapshotRef {
494 ref_name: String,
496 #[serde(flatten)]
498 reference: SnapshotReference,
499 },
500 #[serde(rename_all = "kebab-case")]
502 RemoveSnapshots {
503 snapshot_ids: Vec<i64>,
505 },
506 #[serde(rename_all = "kebab-case")]
508 RemoveSnapshotRef {
509 ref_name: String,
511 },
512 SetLocation {
514 location: String,
516 },
517 SetProperties {
519 updates: HashMap<String, String>,
521 },
522 RemoveProperties {
524 removals: Vec<String>,
526 },
527 #[serde(rename_all = "kebab-case")]
529 RemovePartitionSpecs {
530 spec_ids: Vec<i32>,
532 },
533 #[serde(with = "_serde_set_statistics")]
535 SetStatistics {
536 statistics: StatisticsFile,
538 },
539 #[serde(rename_all = "kebab-case")]
541 RemoveStatistics {
542 snapshot_id: i64,
544 },
545 #[serde(rename_all = "kebab-case")]
547 SetPartitionStatistics {
548 partition_statistics: PartitionStatisticsFile,
550 },
551 #[serde(rename_all = "kebab-case")]
553 RemovePartitionStatistics {
554 snapshot_id: i64,
556 },
557 #[serde(rename_all = "kebab-case")]
559 RemoveSchemas {
560 schema_ids: Vec<i32>,
562 },
563 #[serde(rename_all = "kebab-case")]
565 AddEncryptionKey {
566 encryption_key: EncryptedKey,
568 },
569 #[serde(rename_all = "kebab-case")]
571 RemoveEncryptionKey {
572 key_id: String,
574 },
575}
576
577impl TableUpdate {
578 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
580 match self {
581 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
582 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
583 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
584 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
585 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
586 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
587 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
588 builder.set_default_sort_order(sort_order_id)
589 }
590 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
591 TableUpdate::SetSnapshotRef {
592 ref_name,
593 reference,
594 } => builder.set_ref(&ref_name, reference),
595 TableUpdate::RemoveSnapshots { snapshot_ids } => {
596 Ok(builder.remove_snapshots(&snapshot_ids))
597 }
598 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
599 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
600 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
601 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
602 TableUpdate::UpgradeFormatVersion { format_version } => {
603 builder.upgrade_format_version(format_version)
604 }
605 TableUpdate::RemovePartitionSpecs { spec_ids } => {
606 builder.remove_partition_specs(&spec_ids)
607 }
608 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
609 TableUpdate::RemoveStatistics { snapshot_id } => {
610 Ok(builder.remove_statistics(snapshot_id))
611 }
612 TableUpdate::SetPartitionStatistics {
613 partition_statistics,
614 } => Ok(builder.set_partition_statistics(partition_statistics)),
615 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
616 Ok(builder.remove_partition_statistics(snapshot_id))
617 }
618 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
619 TableUpdate::AddEncryptionKey { encryption_key } => {
620 Ok(builder.add_encryption_key(encryption_key))
621 }
622 TableUpdate::RemoveEncryptionKey { key_id } => {
623 Ok(builder.remove_encryption_key(&key_id))
624 }
625 }
626 }
627}
628
629impl TableRequirement {
630 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
635 if let Some(metadata) = metadata {
636 match self {
637 TableRequirement::NotExist => {
638 return Err(Error::new(
639 ErrorKind::CatalogCommitConflicts,
640 format!(
641 "Requirement failed: Table with id {} already exists",
642 metadata.uuid()
643 ),
644 )
645 .with_retryable(true));
646 }
647 TableRequirement::UuidMatch { uuid } => {
648 if &metadata.uuid() != uuid {
649 return Err(Error::new(
650 ErrorKind::CatalogCommitConflicts,
651 "Requirement failed: Table UUID does not match",
652 )
653 .with_context("expected", *uuid)
654 .with_context("found", metadata.uuid())
655 .with_retryable(true));
656 }
657 }
658 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
659 if metadata.current_schema_id != *current_schema_id {
661 return Err(Error::new(
662 ErrorKind::CatalogCommitConflicts,
663 "Requirement failed: Current schema id does not match",
664 )
665 .with_context("expected", current_schema_id.to_string())
666 .with_context("found", metadata.current_schema_id.to_string())
667 .with_retryable(true));
668 }
669 }
670 TableRequirement::DefaultSortOrderIdMatch {
671 default_sort_order_id,
672 } => {
673 if metadata.default_sort_order().order_id != *default_sort_order_id {
674 return Err(Error::new(
675 ErrorKind::CatalogCommitConflicts,
676 "Requirement failed: Default sort order id does not match",
677 )
678 .with_context("expected", default_sort_order_id.to_string())
679 .with_context("found", metadata.default_sort_order().order_id.to_string())
680 .with_retryable(true));
681 }
682 }
683 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
684 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
685 if let Some(snapshot_id) = snapshot_id {
686 let snapshot_ref = snapshot_ref.ok_or(
687 Error::new(
688 ErrorKind::CatalogCommitConflicts,
689 format!("Requirement failed: Branch or tag `{ref}` not found"),
690 )
691 .with_retryable(true),
692 )?;
693 if snapshot_ref.snapshot_id() != *snapshot_id {
694 return Err(Error::new(
695 ErrorKind::CatalogCommitConflicts,
696 format!(
697 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
698 ),
699 )
700 .with_context("expected", snapshot_id.to_string())
701 .with_context("found", snapshot_ref.snapshot_id().to_string())
702 .with_retryable(true));
703 }
704 } else if snapshot_ref.is_some() {
705 return Err(Error::new(
707 ErrorKind::CatalogCommitConflicts,
708 format!("Requirement failed: Branch or tag `{ref}` already exists"),
709 )
710 .with_retryable(true));
711 }
712 }
713 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
714 if metadata.default_partition_spec_id() != *default_spec_id {
716 return Err(Error::new(
717 ErrorKind::CatalogCommitConflicts,
718 "Requirement failed: Default partition spec id does not match",
719 )
720 .with_context("expected", default_spec_id.to_string())
721 .with_context("found", metadata.default_partition_spec_id().to_string())
722 .with_retryable(true));
723 }
724 }
725 TableRequirement::LastAssignedPartitionIdMatch {
726 last_assigned_partition_id,
727 } => {
728 if metadata.last_partition_id != *last_assigned_partition_id {
729 return Err(Error::new(
730 ErrorKind::CatalogCommitConflicts,
731 "Requirement failed: Last assigned partition id does not match",
732 )
733 .with_context("expected", last_assigned_partition_id.to_string())
734 .with_context("found", metadata.last_partition_id.to_string())
735 .with_retryable(true));
736 }
737 }
738 TableRequirement::LastAssignedFieldIdMatch {
739 last_assigned_field_id,
740 } => {
741 if &metadata.last_column_id != last_assigned_field_id {
742 return Err(Error::new(
743 ErrorKind::CatalogCommitConflicts,
744 "Requirement failed: Last assigned field id does not match",
745 )
746 .with_context("expected", last_assigned_field_id.to_string())
747 .with_context("found", metadata.last_column_id.to_string())
748 .with_retryable(true));
749 }
750 }
751 };
752 } else {
753 match self {
754 TableRequirement::NotExist => {}
755 _ => {
756 return Err(Error::new(
757 ErrorKind::TableNotFound,
758 "Requirement failed: Table does not exist",
759 ));
760 }
761 }
762 }
763
764 Ok(())
765 }
766}
767
768pub(super) mod _serde {
769 use serde::{Deserialize as _, Deserializer, Serialize as _};
770
771 use super::*;
772 use crate::spec::{SchemaId, Summary};
773
774 pub(super) fn deserialize_snapshot<'de, D>(
775 deserializer: D,
776 ) -> std::result::Result<Snapshot, D::Error>
777 where D: Deserializer<'de> {
778 let buf = CatalogSnapshot::deserialize(deserializer)?;
779 Ok(buf.into())
780 }
781
782 pub(super) fn serialize_snapshot<S>(
783 snapshot: &Snapshot,
784 serializer: S,
785 ) -> std::result::Result<S::Ok, S::Error>
786 where
787 S: serde::Serializer,
788 {
789 let buf: CatalogSnapshot = snapshot.clone().into();
790 buf.serialize(serializer)
791 }
792
793 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
794 #[serde(rename_all = "kebab-case")]
795 struct CatalogSnapshot {
799 snapshot_id: i64,
800 #[serde(skip_serializing_if = "Option::is_none")]
801 parent_snapshot_id: Option<i64>,
802 #[serde(default)]
803 sequence_number: i64,
804 timestamp_ms: i64,
805 manifest_list: String,
806 summary: Summary,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 schema_id: Option<SchemaId>,
809 #[serde(skip_serializing_if = "Option::is_none")]
810 first_row_id: Option<u64>,
811 #[serde(skip_serializing_if = "Option::is_none")]
812 added_rows: Option<u64>,
813 #[serde(skip_serializing_if = "Option::is_none")]
814 key_id: Option<String>,
815 }
816
817 impl From<CatalogSnapshot> for Snapshot {
818 fn from(snapshot: CatalogSnapshot) -> Self {
819 let CatalogSnapshot {
820 snapshot_id,
821 parent_snapshot_id,
822 sequence_number,
823 timestamp_ms,
824 manifest_list,
825 schema_id,
826 summary,
827 first_row_id,
828 added_rows,
829 key_id,
830 } = snapshot;
831 let builder = Snapshot::builder()
832 .with_snapshot_id(snapshot_id)
833 .with_parent_snapshot_id(parent_snapshot_id)
834 .with_sequence_number(sequence_number)
835 .with_timestamp_ms(timestamp_ms)
836 .with_manifest_list(manifest_list)
837 .with_summary(summary)
838 .with_encryption_key_id(key_id);
839 let row_range = first_row_id.zip(added_rows);
840 match (schema_id, row_range) {
841 (None, None) => builder.build(),
842 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
843 (None, Some((first_row_id, last_row_id))) => {
844 builder.with_row_range(first_row_id, last_row_id).build()
845 }
846 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
847 .with_schema_id(schema_id)
848 .with_row_range(first_row_id, last_row_id)
849 .build(),
850 }
851 }
852 }
853
854 impl From<Snapshot> for CatalogSnapshot {
855 fn from(snapshot: Snapshot) -> Self {
856 let first_row_id = snapshot.first_row_id();
857 let added_rows = snapshot.added_rows_count();
858 let Snapshot {
859 snapshot_id,
860 parent_snapshot_id,
861 sequence_number,
862 timestamp_ms,
863 manifest_list,
864 summary,
865 schema_id,
866 row_range: _,
867 encryption_key_id: key_id,
868 } = snapshot;
869 CatalogSnapshot {
870 snapshot_id,
871 parent_snapshot_id,
872 sequence_number,
873 timestamp_ms,
874 manifest_list,
875 summary,
876 schema_id,
877 first_row_id,
878 added_rows,
879 key_id,
880 }
881 }
882 }
883}
884
885#[derive(Debug, TypedBuilder)]
887pub struct ViewCreation {
888 pub name: String,
890 pub location: String,
892 pub representations: ViewRepresentations,
894 pub schema: Schema,
896 #[builder(default)]
898 pub properties: HashMap<String, String>,
899 pub default_namespace: NamespaceIdent,
901 #[builder(default)]
903 pub default_catalog: Option<String>,
904 #[builder(default)]
907 pub summary: HashMap<String, String>,
908}
909
910#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
912#[serde(tag = "action", rename_all = "kebab-case")]
913#[allow(clippy::large_enum_variant)]
914pub enum ViewUpdate {
915 #[serde(rename_all = "kebab-case")]
917 AssignUuid {
918 uuid: Uuid,
920 },
921 #[serde(rename_all = "kebab-case")]
923 UpgradeFormatVersion {
924 format_version: ViewFormatVersion,
926 },
927 #[serde(rename_all = "kebab-case")]
929 AddSchema {
930 schema: Schema,
932 last_column_id: Option<i32>,
934 },
935 #[serde(rename_all = "kebab-case")]
937 SetLocation {
938 location: String,
940 },
941 #[serde(rename_all = "kebab-case")]
945 SetProperties {
946 updates: HashMap<String, String>,
948 },
949 #[serde(rename_all = "kebab-case")]
951 RemoveProperties {
952 removals: Vec<String>,
954 },
955 #[serde(rename_all = "kebab-case")]
957 AddViewVersion {
958 view_version: ViewVersion,
960 },
961 #[serde(rename_all = "kebab-case")]
963 SetCurrentViewVersion {
964 view_version_id: i32,
966 },
967}
968
969mod _serde_set_statistics {
970 use serde::{Deserialize, Deserializer, Serialize, Serializer};
973
974 use super::*;
975
976 #[derive(Debug, Serialize, Deserialize)]
977 #[serde(rename_all = "kebab-case")]
978 struct SetStatistics {
979 snapshot_id: Option<i64>,
980 statistics: StatisticsFile,
981 }
982
983 pub fn serialize<S>(
984 value: &StatisticsFile,
985 serializer: S,
986 ) -> std::result::Result<S::Ok, S::Error>
987 where
988 S: Serializer,
989 {
990 SetStatistics {
991 snapshot_id: Some(value.snapshot_id),
992 statistics: value.clone(),
993 }
994 .serialize(serializer)
995 }
996
997 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
998 where D: Deserializer<'de> {
999 let SetStatistics {
1000 snapshot_id,
1001 statistics,
1002 } = SetStatistics::deserialize(deserializer)?;
1003 if let Some(snapshot_id) = snapshot_id {
1004 if snapshot_id != statistics.snapshot_id {
1005 return Err(serde::de::Error::custom(format!(
1006 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1007 statistics.snapshot_id
1008 )));
1009 }
1010 }
1011
1012 Ok(statistics)
1013 }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use std::collections::HashMap;
1019 use std::fmt::Debug;
1020 use std::fs::File;
1021 use std::io::BufReader;
1022
1023 use base64::Engine as _;
1024 use serde::Serialize;
1025 use serde::de::DeserializeOwned;
1026 use uuid::uuid;
1027
1028 use super::ViewUpdate;
1029 use crate::io::FileIOBuilder;
1030 use crate::spec::{
1031 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1032 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1033 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1034 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1035 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1036 ViewVersion,
1037 };
1038 use crate::table::Table;
1039 use crate::{
1040 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1041 };
1042
1043 #[test]
1044 fn test_parent_namespace() {
1045 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1046 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1047 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1048
1049 assert_eq!(ns1.parent(), None);
1050 assert_eq!(ns2.parent(), Some(ns1.clone()));
1051 assert_eq!(ns3.parent(), Some(ns2.clone()));
1052 }
1053
1054 #[test]
1055 fn test_create_table_id() {
1056 let table_id = TableIdent {
1057 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1058 name: "t1".to_string(),
1059 };
1060
1061 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1062 }
1063
1064 #[test]
1065 fn test_table_creation_iterator_properties() {
1066 let builder = TableCreation::builder()
1067 .name("table".to_string())
1068 .schema(Schema::builder().build().unwrap());
1069
1070 fn s(k: &str, v: &str) -> (String, String) {
1071 (k.to_string(), v.to_string())
1072 }
1073
1074 let table_creation = builder
1075 .properties([s("key", "value"), s("foo", "bar")])
1076 .build();
1077
1078 assert_eq!(
1079 HashMap::from([s("key", "value"), s("foo", "bar")]),
1080 table_creation.properties
1081 );
1082 }
1083
1084 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1085 json: impl ToString,
1086 expected: T,
1087 ) {
1088 let json_str = json.to_string();
1089 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1090 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1091
1092 let restored: T = serde_json::from_str(
1093 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1094 )
1095 .expect("Failed to parse from serialized json");
1096
1097 assert_eq!(
1098 restored, expected,
1099 "Parsed restored value is not equal to expected"
1100 );
1101 }
1102
1103 fn metadata() -> TableMetadata {
1104 let tbl_creation = TableCreation::builder()
1105 .name("table".to_string())
1106 .location("/path/to/table".to_string())
1107 .schema(Schema::builder().build().unwrap())
1108 .build();
1109
1110 TableMetadataBuilder::from_table_creation(tbl_creation)
1111 .unwrap()
1112 .assign_uuid(uuid::Uuid::nil())
1113 .build()
1114 .unwrap()
1115 .metadata
1116 }
1117
1118 #[test]
1119 fn test_check_requirement_not_exist() {
1120 let metadata = metadata();
1121 let requirement = TableRequirement::NotExist;
1122
1123 assert!(requirement.check(Some(&metadata)).is_err());
1124 assert!(requirement.check(None).is_ok());
1125 }
1126
1127 #[test]
1128 fn test_check_table_uuid() {
1129 let metadata = metadata();
1130
1131 let requirement = TableRequirement::UuidMatch {
1132 uuid: uuid::Uuid::now_v7(),
1133 };
1134 assert!(requirement.check(Some(&metadata)).is_err());
1135
1136 let requirement = TableRequirement::UuidMatch {
1137 uuid: uuid::Uuid::nil(),
1138 };
1139 assert!(requirement.check(Some(&metadata)).is_ok());
1140 }
1141
1142 #[test]
1143 fn test_check_ref_snapshot_id() {
1144 let metadata = metadata();
1145
1146 let requirement = TableRequirement::RefSnapshotIdMatch {
1148 r#ref: "my_branch".to_string(),
1149 snapshot_id: Some(1),
1150 };
1151 assert!(requirement.check(Some(&metadata)).is_err());
1152
1153 let requirement = TableRequirement::RefSnapshotIdMatch {
1155 r#ref: "my_branch".to_string(),
1156 snapshot_id: None,
1157 };
1158 assert!(requirement.check(Some(&metadata)).is_ok());
1159
1160 let snapshot = Snapshot::builder()
1162 .with_snapshot_id(3051729675574597004)
1163 .with_sequence_number(10)
1164 .with_timestamp_ms(9992191116217)
1165 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1166 .with_schema_id(0)
1167 .with_summary(Summary {
1168 operation: Operation::Append,
1169 additional_properties: HashMap::new(),
1170 })
1171 .build();
1172
1173 let builder = metadata.into_builder(None);
1174 let builder = TableUpdate::AddSnapshot {
1175 snapshot: snapshot.clone(),
1176 }
1177 .apply(builder)
1178 .unwrap();
1179 let metadata = TableUpdate::SetSnapshotRef {
1180 ref_name: MAIN_BRANCH.to_string(),
1181 reference: SnapshotReference {
1182 snapshot_id: snapshot.snapshot_id(),
1183 retention: SnapshotRetention::Branch {
1184 min_snapshots_to_keep: Some(10),
1185 max_snapshot_age_ms: None,
1186 max_ref_age_ms: None,
1187 },
1188 },
1189 }
1190 .apply(builder)
1191 .unwrap()
1192 .build()
1193 .unwrap()
1194 .metadata;
1195
1196 let requirement = TableRequirement::RefSnapshotIdMatch {
1198 r#ref: "main".to_string(),
1199 snapshot_id: Some(3051729675574597004),
1200 };
1201 assert!(requirement.check(Some(&metadata)).is_ok());
1202
1203 let requirement = TableRequirement::RefSnapshotIdMatch {
1205 r#ref: "main".to_string(),
1206 snapshot_id: Some(1),
1207 };
1208 assert!(requirement.check(Some(&metadata)).is_err());
1209 }
1210
1211 #[test]
1212 fn test_check_last_assigned_field_id() {
1213 let metadata = metadata();
1214
1215 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1216 last_assigned_field_id: 1,
1217 };
1218 assert!(requirement.check(Some(&metadata)).is_err());
1219
1220 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1221 last_assigned_field_id: 0,
1222 };
1223 assert!(requirement.check(Some(&metadata)).is_ok());
1224 }
1225
1226 #[test]
1227 fn test_check_current_schema_id() {
1228 let metadata = metadata();
1229
1230 let requirement = TableRequirement::CurrentSchemaIdMatch {
1231 current_schema_id: 1,
1232 };
1233 assert!(requirement.check(Some(&metadata)).is_err());
1234
1235 let requirement = TableRequirement::CurrentSchemaIdMatch {
1236 current_schema_id: 0,
1237 };
1238 assert!(requirement.check(Some(&metadata)).is_ok());
1239 }
1240
1241 #[test]
1242 fn test_check_last_assigned_partition_id() {
1243 let metadata = metadata();
1244 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1245 last_assigned_partition_id: 0,
1246 };
1247 assert!(requirement.check(Some(&metadata)).is_err());
1248
1249 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1250 last_assigned_partition_id: 999,
1251 };
1252 assert!(requirement.check(Some(&metadata)).is_ok());
1253 }
1254
1255 #[test]
1256 fn test_check_default_spec_id() {
1257 let metadata = metadata();
1258
1259 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1260 assert!(requirement.check(Some(&metadata)).is_err());
1261
1262 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1263 assert!(requirement.check(Some(&metadata)).is_ok());
1264 }
1265
1266 #[test]
1267 fn test_check_default_sort_order_id() {
1268 let metadata = metadata();
1269
1270 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1271 default_sort_order_id: 1,
1272 };
1273 assert!(requirement.check(Some(&metadata)).is_err());
1274
1275 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1276 default_sort_order_id: 0,
1277 };
1278 assert!(requirement.check(Some(&metadata)).is_ok());
1279 }
1280
1281 #[test]
1282 fn test_table_uuid() {
1283 test_serde_json(
1284 r#"
1285{
1286 "type": "assert-table-uuid",
1287 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1288}
1289 "#,
1290 TableRequirement::UuidMatch {
1291 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1292 },
1293 );
1294 }
1295
1296 #[test]
1297 fn test_assert_table_not_exists() {
1298 test_serde_json(
1299 r#"
1300{
1301 "type": "assert-create"
1302}
1303 "#,
1304 TableRequirement::NotExist,
1305 );
1306 }
1307
1308 #[test]
1309 fn test_assert_ref_snapshot_id() {
1310 test_serde_json(
1311 r#"
1312{
1313 "type": "assert-ref-snapshot-id",
1314 "ref": "snapshot-name",
1315 "snapshot-id": null
1316}
1317 "#,
1318 TableRequirement::RefSnapshotIdMatch {
1319 r#ref: "snapshot-name".to_string(),
1320 snapshot_id: None,
1321 },
1322 );
1323
1324 test_serde_json(
1325 r#"
1326{
1327 "type": "assert-ref-snapshot-id",
1328 "ref": "snapshot-name",
1329 "snapshot-id": 1
1330}
1331 "#,
1332 TableRequirement::RefSnapshotIdMatch {
1333 r#ref: "snapshot-name".to_string(),
1334 snapshot_id: Some(1),
1335 },
1336 );
1337 }
1338
1339 #[test]
1340 fn test_assert_last_assigned_field_id() {
1341 test_serde_json(
1342 r#"
1343{
1344 "type": "assert-last-assigned-field-id",
1345 "last-assigned-field-id": 12
1346}
1347 "#,
1348 TableRequirement::LastAssignedFieldIdMatch {
1349 last_assigned_field_id: 12,
1350 },
1351 );
1352 }
1353
1354 #[test]
1355 fn test_assert_current_schema_id() {
1356 test_serde_json(
1357 r#"
1358{
1359 "type": "assert-current-schema-id",
1360 "current-schema-id": 4
1361}
1362 "#,
1363 TableRequirement::CurrentSchemaIdMatch {
1364 current_schema_id: 4,
1365 },
1366 );
1367 }
1368
1369 #[test]
1370 fn test_assert_last_assigned_partition_id() {
1371 test_serde_json(
1372 r#"
1373{
1374 "type": "assert-last-assigned-partition-id",
1375 "last-assigned-partition-id": 1004
1376}
1377 "#,
1378 TableRequirement::LastAssignedPartitionIdMatch {
1379 last_assigned_partition_id: 1004,
1380 },
1381 );
1382 }
1383
1384 #[test]
1385 fn test_assert_default_spec_id() {
1386 test_serde_json(
1387 r#"
1388{
1389 "type": "assert-default-spec-id",
1390 "default-spec-id": 5
1391}
1392 "#,
1393 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1394 );
1395 }
1396
1397 #[test]
1398 fn test_assert_default_sort_order() {
1399 let json = r#"
1400{
1401 "type": "assert-default-sort-order-id",
1402 "default-sort-order-id": 10
1403}
1404 "#;
1405
1406 let update = TableRequirement::DefaultSortOrderIdMatch {
1407 default_sort_order_id: 10,
1408 };
1409
1410 test_serde_json(json, update);
1411 }
1412
1413 #[test]
1414 fn test_parse_assert_invalid() {
1415 assert!(
1416 serde_json::from_str::<TableRequirement>(
1417 r#"
1418{
1419 "default-sort-order-id": 10
1420}
1421"#
1422 )
1423 .is_err(),
1424 "Table requirements should not be parsed without type."
1425 );
1426 }
1427
1428 #[test]
1429 fn test_assign_uuid() {
1430 test_serde_json(
1431 r#"
1432{
1433 "action": "assign-uuid",
1434 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1435}
1436 "#,
1437 TableUpdate::AssignUuid {
1438 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1439 },
1440 );
1441 }
1442
1443 #[test]
1444 fn test_upgrade_format_version() {
1445 test_serde_json(
1446 r#"
1447{
1448 "action": "upgrade-format-version",
1449 "format-version": 2
1450}
1451 "#,
1452 TableUpdate::UpgradeFormatVersion {
1453 format_version: FormatVersion::V2,
1454 },
1455 );
1456 }
1457
1458 #[test]
1459 fn test_add_schema() {
1460 let test_schema = Schema::builder()
1461 .with_schema_id(1)
1462 .with_identifier_field_ids(vec![2])
1463 .with_fields(vec![
1464 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1465 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1466 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1467 ])
1468 .build()
1469 .unwrap();
1470 test_serde_json(
1471 r#"
1472{
1473 "action": "add-schema",
1474 "schema": {
1475 "type": "struct",
1476 "schema-id": 1,
1477 "fields": [
1478 {
1479 "id": 1,
1480 "name": "foo",
1481 "required": false,
1482 "type": "string"
1483 },
1484 {
1485 "id": 2,
1486 "name": "bar",
1487 "required": true,
1488 "type": "int"
1489 },
1490 {
1491 "id": 3,
1492 "name": "baz",
1493 "required": false,
1494 "type": "boolean"
1495 }
1496 ],
1497 "identifier-field-ids": [
1498 2
1499 ]
1500 },
1501 "last-column-id": 3
1502}
1503 "#,
1504 TableUpdate::AddSchema {
1505 schema: test_schema.clone(),
1506 },
1507 );
1508
1509 test_serde_json(
1510 r#"
1511{
1512 "action": "add-schema",
1513 "schema": {
1514 "type": "struct",
1515 "schema-id": 1,
1516 "fields": [
1517 {
1518 "id": 1,
1519 "name": "foo",
1520 "required": false,
1521 "type": "string"
1522 },
1523 {
1524 "id": 2,
1525 "name": "bar",
1526 "required": true,
1527 "type": "int"
1528 },
1529 {
1530 "id": 3,
1531 "name": "baz",
1532 "required": false,
1533 "type": "boolean"
1534 }
1535 ],
1536 "identifier-field-ids": [
1537 2
1538 ]
1539 }
1540}
1541 "#,
1542 TableUpdate::AddSchema {
1543 schema: test_schema.clone(),
1544 },
1545 );
1546 }
1547
1548 #[test]
1549 fn test_set_current_schema() {
1550 test_serde_json(
1551 r#"
1552{
1553 "action": "set-current-schema",
1554 "schema-id": 23
1555}
1556 "#,
1557 TableUpdate::SetCurrentSchema { schema_id: 23 },
1558 );
1559 }
1560
1561 #[test]
1562 fn test_add_spec() {
1563 test_serde_json(
1564 r#"
1565{
1566 "action": "add-spec",
1567 "spec": {
1568 "fields": [
1569 {
1570 "source-id": 4,
1571 "name": "ts_day",
1572 "transform": "day"
1573 },
1574 {
1575 "source-id": 1,
1576 "name": "id_bucket",
1577 "transform": "bucket[16]"
1578 },
1579 {
1580 "source-id": 2,
1581 "name": "id_truncate",
1582 "transform": "truncate[4]"
1583 }
1584 ]
1585 }
1586}
1587 "#,
1588 TableUpdate::AddSpec {
1589 spec: UnboundPartitionSpec::builder()
1590 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1591 .unwrap()
1592 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1593 .unwrap()
1594 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1595 .unwrap()
1596 .build(),
1597 },
1598 );
1599 }
1600
1601 #[test]
1602 fn test_set_default_spec() {
1603 test_serde_json(
1604 r#"
1605{
1606 "action": "set-default-spec",
1607 "spec-id": 1
1608}
1609 "#,
1610 TableUpdate::SetDefaultSpec { spec_id: 1 },
1611 )
1612 }
1613
1614 #[test]
1615 fn test_add_sort_order() {
1616 let json = r#"
1617{
1618 "action": "add-sort-order",
1619 "sort-order": {
1620 "order-id": 1,
1621 "fields": [
1622 {
1623 "transform": "identity",
1624 "source-id": 2,
1625 "direction": "asc",
1626 "null-order": "nulls-first"
1627 },
1628 {
1629 "transform": "bucket[4]",
1630 "source-id": 3,
1631 "direction": "desc",
1632 "null-order": "nulls-last"
1633 }
1634 ]
1635 }
1636}
1637 "#;
1638
1639 let update = TableUpdate::AddSortOrder {
1640 sort_order: SortOrder::builder()
1641 .with_order_id(1)
1642 .with_sort_field(
1643 SortField::builder()
1644 .source_id(2)
1645 .direction(SortDirection::Ascending)
1646 .null_order(NullOrder::First)
1647 .transform(Transform::Identity)
1648 .build(),
1649 )
1650 .with_sort_field(
1651 SortField::builder()
1652 .source_id(3)
1653 .direction(SortDirection::Descending)
1654 .null_order(NullOrder::Last)
1655 .transform(Transform::Bucket(4))
1656 .build(),
1657 )
1658 .build_unbound()
1659 .unwrap(),
1660 };
1661
1662 test_serde_json(json, update);
1663 }
1664
1665 #[test]
1666 fn test_set_default_order() {
1667 let json = r#"
1668{
1669 "action": "set-default-sort-order",
1670 "sort-order-id": 2
1671}
1672 "#;
1673 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1674
1675 test_serde_json(json, update);
1676 }
1677
1678 #[test]
1679 fn test_add_snapshot() {
1680 let json = r#"
1681{
1682 "action": "add-snapshot",
1683 "snapshot": {
1684 "snapshot-id": 3055729675574597000,
1685 "parent-snapshot-id": 3051729675574597000,
1686 "timestamp-ms": 1555100955770,
1687 "sequence-number": 1,
1688 "summary": {
1689 "operation": "append"
1690 },
1691 "manifest-list": "s3://a/b/2.avro",
1692 "schema-id": 1
1693 }
1694}
1695 "#;
1696
1697 let update = TableUpdate::AddSnapshot {
1698 snapshot: Snapshot::builder()
1699 .with_snapshot_id(3055729675574597000)
1700 .with_parent_snapshot_id(Some(3051729675574597000))
1701 .with_timestamp_ms(1555100955770)
1702 .with_sequence_number(1)
1703 .with_manifest_list("s3://a/b/2.avro")
1704 .with_schema_id(1)
1705 .with_summary(Summary {
1706 operation: Operation::Append,
1707 additional_properties: HashMap::default(),
1708 })
1709 .build(),
1710 };
1711
1712 test_serde_json(json, update);
1713 }
1714
1715 #[test]
1716 fn test_add_snapshot_v1() {
1717 let json = r#"
1718{
1719 "action": "add-snapshot",
1720 "snapshot": {
1721 "snapshot-id": 3055729675574597000,
1722 "parent-snapshot-id": 3051729675574597000,
1723 "timestamp-ms": 1555100955770,
1724 "summary": {
1725 "operation": "append"
1726 },
1727 "manifest-list": "s3://a/b/2.avro"
1728 }
1729}
1730 "#;
1731
1732 let update = TableUpdate::AddSnapshot {
1733 snapshot: Snapshot::builder()
1734 .with_snapshot_id(3055729675574597000)
1735 .with_parent_snapshot_id(Some(3051729675574597000))
1736 .with_timestamp_ms(1555100955770)
1737 .with_sequence_number(0)
1738 .with_manifest_list("s3://a/b/2.avro")
1739 .with_summary(Summary {
1740 operation: Operation::Append,
1741 additional_properties: HashMap::default(),
1742 })
1743 .build(),
1744 };
1745
1746 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1747 assert_eq!(actual, update, "Parsed value is not equal to expected");
1748 }
1749
1750 #[test]
1751 fn test_add_snapshot_v3() {
1752 let json = serde_json::json!(
1753 {
1754 "action": "add-snapshot",
1755 "snapshot": {
1756 "snapshot-id": 3055729675574597000i64,
1757 "parent-snapshot-id": 3051729675574597000i64,
1758 "timestamp-ms": 1555100955770i64,
1759 "first-row-id":0,
1760 "added-rows":2,
1761 "key-id":"key123",
1762 "summary": {
1763 "operation": "append"
1764 },
1765 "manifest-list": "s3://a/b/2.avro"
1766 }
1767 });
1768
1769 let update = TableUpdate::AddSnapshot {
1770 snapshot: Snapshot::builder()
1771 .with_snapshot_id(3055729675574597000)
1772 .with_parent_snapshot_id(Some(3051729675574597000))
1773 .with_timestamp_ms(1555100955770)
1774 .with_sequence_number(0)
1775 .with_manifest_list("s3://a/b/2.avro")
1776 .with_row_range(0, 2)
1777 .with_encryption_key_id(Some("key123".to_string()))
1778 .with_summary(Summary {
1779 operation: Operation::Append,
1780 additional_properties: HashMap::default(),
1781 })
1782 .build(),
1783 };
1784
1785 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1786 assert_eq!(actual, update, "Parsed value is not equal to expected");
1787 let restored: TableUpdate = serde_json::from_str(
1788 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1789 )
1790 .expect("Failed to parse from serialized json");
1791 assert_eq!(restored, update);
1792 }
1793
1794 #[test]
1795 fn test_remove_snapshots() {
1796 let json = r#"
1797{
1798 "action": "remove-snapshots",
1799 "snapshot-ids": [
1800 1,
1801 2
1802 ]
1803}
1804 "#;
1805
1806 let update = TableUpdate::RemoveSnapshots {
1807 snapshot_ids: vec![1, 2],
1808 };
1809 test_serde_json(json, update);
1810 }
1811
1812 #[test]
1813 fn test_remove_snapshot_ref() {
1814 let json = r#"
1815{
1816 "action": "remove-snapshot-ref",
1817 "ref-name": "snapshot-ref"
1818}
1819 "#;
1820
1821 let update = TableUpdate::RemoveSnapshotRef {
1822 ref_name: "snapshot-ref".to_string(),
1823 };
1824 test_serde_json(json, update);
1825 }
1826
1827 #[test]
1828 fn test_set_snapshot_ref_tag() {
1829 let json = r#"
1830{
1831 "action": "set-snapshot-ref",
1832 "type": "tag",
1833 "ref-name": "hank",
1834 "snapshot-id": 1,
1835 "max-ref-age-ms": 1
1836}
1837 "#;
1838
1839 let update = TableUpdate::SetSnapshotRef {
1840 ref_name: "hank".to_string(),
1841 reference: SnapshotReference {
1842 snapshot_id: 1,
1843 retention: SnapshotRetention::Tag {
1844 max_ref_age_ms: Some(1),
1845 },
1846 },
1847 };
1848
1849 test_serde_json(json, update);
1850 }
1851
1852 #[test]
1853 fn test_set_snapshot_ref_branch() {
1854 let json = r#"
1855{
1856 "action": "set-snapshot-ref",
1857 "type": "branch",
1858 "ref-name": "hank",
1859 "snapshot-id": 1,
1860 "min-snapshots-to-keep": 2,
1861 "max-snapshot-age-ms": 3,
1862 "max-ref-age-ms": 4
1863}
1864 "#;
1865
1866 let update = TableUpdate::SetSnapshotRef {
1867 ref_name: "hank".to_string(),
1868 reference: SnapshotReference {
1869 snapshot_id: 1,
1870 retention: SnapshotRetention::Branch {
1871 min_snapshots_to_keep: Some(2),
1872 max_snapshot_age_ms: Some(3),
1873 max_ref_age_ms: Some(4),
1874 },
1875 },
1876 };
1877
1878 test_serde_json(json, update);
1879 }
1880
1881 #[test]
1882 fn test_set_properties() {
1883 let json = r#"
1884{
1885 "action": "set-properties",
1886 "updates": {
1887 "prop1": "v1",
1888 "prop2": "v2"
1889 }
1890}
1891 "#;
1892
1893 let update = TableUpdate::SetProperties {
1894 updates: vec![
1895 ("prop1".to_string(), "v1".to_string()),
1896 ("prop2".to_string(), "v2".to_string()),
1897 ]
1898 .into_iter()
1899 .collect(),
1900 };
1901
1902 test_serde_json(json, update);
1903 }
1904
1905 #[test]
1906 fn test_remove_properties() {
1907 let json = r#"
1908{
1909 "action": "remove-properties",
1910 "removals": [
1911 "prop1",
1912 "prop2"
1913 ]
1914}
1915 "#;
1916
1917 let update = TableUpdate::RemoveProperties {
1918 removals: vec!["prop1".to_string(), "prop2".to_string()],
1919 };
1920
1921 test_serde_json(json, update);
1922 }
1923
1924 #[test]
1925 fn test_set_location() {
1926 let json = r#"
1927{
1928 "action": "set-location",
1929 "location": "s3://bucket/warehouse/tbl_location"
1930}
1931 "#;
1932
1933 let update = TableUpdate::SetLocation {
1934 location: "s3://bucket/warehouse/tbl_location".to_string(),
1935 };
1936
1937 test_serde_json(json, update);
1938 }
1939
1940 #[test]
1941 fn test_table_update_apply() {
1942 let table_creation = TableCreation::builder()
1943 .location("s3://db/table".to_string())
1944 .name("table".to_string())
1945 .properties(HashMap::new())
1946 .schema(Schema::builder().build().unwrap())
1947 .build();
1948 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1949 .unwrap()
1950 .build()
1951 .unwrap()
1952 .metadata;
1953 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1954 table_metadata,
1955 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1956 );
1957
1958 let uuid = uuid::Uuid::new_v4();
1959 let update = TableUpdate::AssignUuid { uuid };
1960 let updated_metadata = update
1961 .apply(table_metadata_builder)
1962 .unwrap()
1963 .build()
1964 .unwrap()
1965 .metadata;
1966 assert_eq!(updated_metadata.uuid(), uuid);
1967 }
1968
1969 #[test]
1970 fn test_view_assign_uuid() {
1971 test_serde_json(
1972 r#"
1973{
1974 "action": "assign-uuid",
1975 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1976}
1977 "#,
1978 ViewUpdate::AssignUuid {
1979 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1980 },
1981 );
1982 }
1983
1984 #[test]
1985 fn test_view_upgrade_format_version() {
1986 test_serde_json(
1987 r#"
1988{
1989 "action": "upgrade-format-version",
1990 "format-version": 1
1991}
1992 "#,
1993 ViewUpdate::UpgradeFormatVersion {
1994 format_version: ViewFormatVersion::V1,
1995 },
1996 );
1997 }
1998
1999 #[test]
2000 fn test_view_add_schema() {
2001 let test_schema = Schema::builder()
2002 .with_schema_id(1)
2003 .with_identifier_field_ids(vec![2])
2004 .with_fields(vec![
2005 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2006 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2007 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2008 ])
2009 .build()
2010 .unwrap();
2011 test_serde_json(
2012 r#"
2013{
2014 "action": "add-schema",
2015 "schema": {
2016 "type": "struct",
2017 "schema-id": 1,
2018 "fields": [
2019 {
2020 "id": 1,
2021 "name": "foo",
2022 "required": false,
2023 "type": "string"
2024 },
2025 {
2026 "id": 2,
2027 "name": "bar",
2028 "required": true,
2029 "type": "int"
2030 },
2031 {
2032 "id": 3,
2033 "name": "baz",
2034 "required": false,
2035 "type": "boolean"
2036 }
2037 ],
2038 "identifier-field-ids": [
2039 2
2040 ]
2041 },
2042 "last-column-id": 3
2043}
2044 "#,
2045 ViewUpdate::AddSchema {
2046 schema: test_schema.clone(),
2047 last_column_id: Some(3),
2048 },
2049 );
2050 }
2051
2052 #[test]
2053 fn test_view_set_location() {
2054 test_serde_json(
2055 r#"
2056{
2057 "action": "set-location",
2058 "location": "s3://db/view"
2059}
2060 "#,
2061 ViewUpdate::SetLocation {
2062 location: "s3://db/view".to_string(),
2063 },
2064 );
2065 }
2066
2067 #[test]
2068 fn test_view_set_properties() {
2069 test_serde_json(
2070 r#"
2071{
2072 "action": "set-properties",
2073 "updates": {
2074 "prop1": "v1",
2075 "prop2": "v2"
2076 }
2077}
2078 "#,
2079 ViewUpdate::SetProperties {
2080 updates: vec![
2081 ("prop1".to_string(), "v1".to_string()),
2082 ("prop2".to_string(), "v2".to_string()),
2083 ]
2084 .into_iter()
2085 .collect(),
2086 },
2087 );
2088 }
2089
2090 #[test]
2091 fn test_view_remove_properties() {
2092 test_serde_json(
2093 r#"
2094{
2095 "action": "remove-properties",
2096 "removals": [
2097 "prop1",
2098 "prop2"
2099 ]
2100}
2101 "#,
2102 ViewUpdate::RemoveProperties {
2103 removals: vec!["prop1".to_string(), "prop2".to_string()],
2104 },
2105 );
2106 }
2107
2108 #[test]
2109 fn test_view_add_view_version() {
2110 test_serde_json(
2111 r#"
2112{
2113 "action": "add-view-version",
2114 "view-version": {
2115 "version-id" : 1,
2116 "timestamp-ms" : 1573518431292,
2117 "schema-id" : 1,
2118 "default-catalog" : "prod",
2119 "default-namespace" : [ "default" ],
2120 "summary" : {
2121 "engine-name" : "Spark"
2122 },
2123 "representations" : [ {
2124 "type" : "sql",
2125 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2126 "dialect" : "spark"
2127 } ]
2128 }
2129}
2130 "#,
2131 ViewUpdate::AddViewVersion {
2132 view_version: ViewVersion::builder()
2133 .with_version_id(1)
2134 .with_timestamp_ms(1573518431292)
2135 .with_schema_id(1)
2136 .with_default_catalog(Some("prod".to_string()))
2137 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2138 .with_summary(
2139 vec![("engine-name".to_string(), "Spark".to_string())]
2140 .into_iter()
2141 .collect(),
2142 )
2143 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2144 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2145 dialect: "spark".to_string(),
2146 })]))
2147 .build(),
2148 },
2149 );
2150 }
2151
2152 #[test]
2153 fn test_view_set_current_view_version() {
2154 test_serde_json(
2155 r#"
2156{
2157 "action": "set-current-view-version",
2158 "view-version-id": 1
2159}
2160 "#,
2161 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2162 );
2163 }
2164
2165 #[test]
2166 fn test_remove_partition_specs_update() {
2167 test_serde_json(
2168 r#"
2169{
2170 "action": "remove-partition-specs",
2171 "spec-ids": [1, 2]
2172}
2173 "#,
2174 TableUpdate::RemovePartitionSpecs {
2175 spec_ids: vec![1, 2],
2176 },
2177 );
2178 }
2179
2180 #[test]
2181 fn test_set_statistics_file() {
2182 test_serde_json(
2183 r#"
2184 {
2185 "action": "set-statistics",
2186 "snapshot-id": 1940541653261589030,
2187 "statistics": {
2188 "snapshot-id": 1940541653261589030,
2189 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2190 "file-size-in-bytes": 124,
2191 "file-footer-size-in-bytes": 27,
2192 "blob-metadata": [
2193 {
2194 "type": "boring-type",
2195 "snapshot-id": 1940541653261589030,
2196 "sequence-number": 2,
2197 "fields": [
2198 1
2199 ],
2200 "properties": {
2201 "prop-key": "prop-value"
2202 }
2203 }
2204 ]
2205 }
2206 }
2207 "#,
2208 TableUpdate::SetStatistics {
2209 statistics: StatisticsFile {
2210 snapshot_id: 1940541653261589030,
2211 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2212 file_size_in_bytes: 124,
2213 file_footer_size_in_bytes: 27,
2214 key_metadata: None,
2215 blob_metadata: vec![BlobMetadata {
2216 r#type: "boring-type".to_string(),
2217 snapshot_id: 1940541653261589030,
2218 sequence_number: 2,
2219 fields: vec![1],
2220 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2221 .into_iter()
2222 .collect(),
2223 }],
2224 },
2225 },
2226 );
2227 }
2228
2229 #[test]
2230 fn test_remove_statistics_file() {
2231 test_serde_json(
2232 r#"
2233 {
2234 "action": "remove-statistics",
2235 "snapshot-id": 1940541653261589030
2236 }
2237 "#,
2238 TableUpdate::RemoveStatistics {
2239 snapshot_id: 1940541653261589030,
2240 },
2241 );
2242 }
2243
2244 #[test]
2245 fn test_set_partition_statistics_file() {
2246 test_serde_json(
2247 r#"
2248 {
2249 "action": "set-partition-statistics",
2250 "partition-statistics": {
2251 "snapshot-id": 1940541653261589030,
2252 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2253 "file-size-in-bytes": 43
2254 }
2255 }
2256 "#,
2257 TableUpdate::SetPartitionStatistics {
2258 partition_statistics: PartitionStatisticsFile {
2259 snapshot_id: 1940541653261589030,
2260 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2261 file_size_in_bytes: 43,
2262 },
2263 },
2264 )
2265 }
2266
2267 #[test]
2268 fn test_remove_partition_statistics_file() {
2269 test_serde_json(
2270 r#"
2271 {
2272 "action": "remove-partition-statistics",
2273 "snapshot-id": 1940541653261589030
2274 }
2275 "#,
2276 TableUpdate::RemovePartitionStatistics {
2277 snapshot_id: 1940541653261589030,
2278 },
2279 )
2280 }
2281
2282 #[test]
2283 fn test_remove_schema_update() {
2284 test_serde_json(
2285 r#"
2286 {
2287 "action": "remove-schemas",
2288 "schema-ids": [1, 2]
2289 }
2290 "#,
2291 TableUpdate::RemoveSchemas {
2292 schema_ids: vec![1, 2],
2293 },
2294 );
2295 }
2296
2297 #[test]
2298 fn test_add_encryption_key() {
2299 let key_bytes = "key".as_bytes();
2300 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2301 test_serde_json(
2302 format!(
2303 r#"
2304 {{
2305 "action": "add-encryption-key",
2306 "encryption-key": {{
2307 "key-id": "a",
2308 "encrypted-key-metadata": "{encoded_key}",
2309 "encrypted-by-id": "b"
2310 }}
2311 }}
2312 "#
2313 ),
2314 TableUpdate::AddEncryptionKey {
2315 encryption_key: EncryptedKey::builder()
2316 .key_id("a")
2317 .encrypted_key_metadata(key_bytes.to_vec())
2318 .encrypted_by_id("b")
2319 .build(),
2320 },
2321 );
2322 }
2323
2324 #[test]
2325 fn test_remove_encryption_key() {
2326 test_serde_json(
2327 r#"
2328 {
2329 "action": "remove-encryption-key",
2330 "key-id": "a"
2331 }
2332 "#,
2333 TableUpdate::RemoveEncryptionKey {
2334 key_id: "a".to_string(),
2335 },
2336 );
2337 }
2338
2339 #[test]
2340 fn test_table_commit() {
2341 let table = {
2342 let file = File::open(format!(
2343 "{}/testdata/table_metadata/{}",
2344 env!("CARGO_MANIFEST_DIR"),
2345 "TableMetadataV2Valid.json"
2346 ))
2347 .unwrap();
2348 let reader = BufReader::new(file);
2349 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2350
2351 Table::builder()
2352 .metadata(resp)
2353 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2354 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2355 .file_io(FileIOBuilder::new("memory").build().unwrap())
2356 .build()
2357 .unwrap()
2358 };
2359
2360 let updates = vec![
2361 TableUpdate::SetLocation {
2362 location: "s3://bucket/test/new_location/data".to_string(),
2363 },
2364 TableUpdate::SetProperties {
2365 updates: vec![
2366 ("prop1".to_string(), "v1".to_string()),
2367 ("prop2".to_string(), "v2".to_string()),
2368 ]
2369 .into_iter()
2370 .collect(),
2371 },
2372 ];
2373
2374 let requirements = vec![TableRequirement::UuidMatch {
2375 uuid: table.metadata().table_uuid,
2376 }];
2377
2378 let table_commit = TableCommit::builder()
2379 .ident(table.identifier().to_owned())
2380 .updates(updates)
2381 .requirements(requirements)
2382 .build();
2383
2384 let updated_table = table_commit.apply(table).unwrap();
2385
2386 assert_eq!(
2387 updated_table.metadata().properties.get("prop1").unwrap(),
2388 "v1"
2389 );
2390 assert_eq!(
2391 updated_table.metadata().properties.get("prop2").unwrap(),
2392 "v2"
2393 );
2394
2395 assert!(
2397 updated_table
2398 .metadata_location()
2399 .unwrap()
2400 .starts_with("s3://bucket/test/location/metadata/00001-")
2401 );
2402
2403 assert_eq!(
2404 updated_table.metadata().location,
2405 "s3://bucket/test/new_location/data",
2406 );
2407 }
2408}