1pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::io::StorageFactory;
42use crate::spec::{
43 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
44 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
45 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
46};
47use crate::table::Table;
48use crate::{Error, ErrorKind, Result};
49
50#[async_trait]
52#[cfg_attr(test, automock)]
53pub trait Catalog: Debug + Sync + Send {
54 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
56 -> Result<Vec<NamespaceIdent>>;
57
58 async fn create_namespace(
60 &self,
61 namespace: &NamespaceIdent,
62 properties: HashMap<String, String>,
63 ) -> Result<Namespace>;
64
65 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
67
68 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
70
71 async fn update_namespace(
77 &self,
78 namespace: &NamespaceIdent,
79 properties: HashMap<String, String>,
80 ) -> Result<()>;
81
82 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
84
85 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
87
88 async fn create_table(
90 &self,
91 namespace: &NamespaceIdent,
92 creation: TableCreation,
93 ) -> Result<Table>;
94
95 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
97
98 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
100
101 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
103
104 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
106
107 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
109
110 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
112}
113
114pub trait CatalogBuilder: Default + Debug + Send + Sync {
116 type C: Catalog;
118
119 fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
145
146 fn load(
148 self,
149 name: impl Into<String>,
150 props: HashMap<String, String>,
151 ) -> impl Future<Output = Result<Self::C>> + Send;
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
160pub struct NamespaceIdent(Vec<String>);
161
162impl NamespaceIdent {
163 pub fn new(name: String) -> Self {
165 Self(vec![name])
166 }
167
168 pub fn from_vec(names: Vec<String>) -> Result<Self> {
170 if names.is_empty() {
171 return Err(Error::new(
172 ErrorKind::DataInvalid,
173 "Namespace identifier can't be empty!",
174 ));
175 }
176 Ok(Self(names))
177 }
178
179 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
181 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
182 }
183
184 pub fn to_url_string(&self) -> String {
186 self.as_ref().join("\u{001f}")
187 }
188
189 pub fn inner(self) -> Vec<String> {
191 self.0
192 }
193
194 pub fn parent(&self) -> Option<Self> {
197 self.0.split_last().and_then(|(_, parent)| {
198 if parent.is_empty() {
199 None
200 } else {
201 Some(Self(parent.to_vec()))
202 }
203 })
204 }
205}
206
207impl AsRef<Vec<String>> for NamespaceIdent {
208 fn as_ref(&self) -> &Vec<String> {
209 &self.0
210 }
211}
212
213impl Deref for NamespaceIdent {
214 type Target = [String];
215
216 fn deref(&self) -> &Self::Target {
217 &self.0
218 }
219}
220
221#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Namespace {
224 name: NamespaceIdent,
225 properties: HashMap<String, String>,
226}
227
228impl Namespace {
229 pub fn new(name: NamespaceIdent) -> Self {
231 Self::with_properties(name, HashMap::default())
232 }
233
234 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
236 Self { name, properties }
237 }
238
239 pub fn name(&self) -> &NamespaceIdent {
241 &self.name
242 }
243
244 pub fn properties(&self) -> &HashMap<String, String> {
246 &self.properties
247 }
248}
249
250impl Display for NamespaceIdent {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.join("."))
253 }
254}
255
256#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub struct TableIdent {
259 pub namespace: NamespaceIdent,
261 pub name: String,
263}
264
265impl TableIdent {
266 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
268 Self { namespace, name }
269 }
270
271 pub fn namespace(&self) -> &NamespaceIdent {
273 &self.namespace
274 }
275
276 pub fn name(&self) -> &str {
278 &self.name
279 }
280
281 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
283 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
284 let table_name = vec.pop().ok_or_else(|| {
285 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
286 })?;
287 let namespace_ident = NamespaceIdent::from_vec(vec)?;
288
289 Ok(Self {
290 namespace: namespace_ident,
291 name: table_name,
292 })
293 }
294}
295
296impl Display for TableIdent {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(f, "{}.{}", self.namespace, self.name)
299 }
300}
301
302#[derive(Debug, TypedBuilder)]
304pub struct TableCreation {
305 pub name: String,
307 #[builder(default, setter(strip_option(fallback = location_opt)))]
309 pub location: Option<String>,
310 pub schema: Schema,
312 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
314 pub partition_spec: Option<UnboundPartitionSpec>,
315 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
317 pub sort_order: Option<SortOrder>,
318 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
320 props.into_iter().collect()
321 }))]
322 pub properties: HashMap<String, String>,
323 #[builder(default = FormatVersion::V2)]
325 pub format_version: FormatVersion,
326}
327
328#[derive(Debug, TypedBuilder)]
334#[builder(build_method(vis = "pub(crate)"))]
335pub struct TableCommit {
336 ident: TableIdent,
338 requirements: Vec<TableRequirement>,
342 updates: Vec<TableUpdate>,
344}
345
346impl TableCommit {
347 pub fn identifier(&self) -> &TableIdent {
349 &self.ident
350 }
351
352 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
354 take(&mut self.requirements)
355 }
356
357 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
359 take(&mut self.updates)
360 }
361
362 pub fn apply(self, table: Table) -> Result<Table> {
368 for requirement in self.requirements {
370 requirement.check(Some(table.metadata()))?;
371 }
372
373 let current_metadata_location = table.metadata_location_result()?;
375
376 let mut metadata_builder = table
378 .metadata()
379 .clone()
380 .into_builder(Some(current_metadata_location.to_string()));
381 for update in self.updates {
382 metadata_builder = update.apply(metadata_builder)?;
383 }
384
385 let new_metadata = metadata_builder.build()?.metadata;
387
388 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
389 .with_next_version()
390 .with_new_metadata(&new_metadata)
391 .to_string();
392
393 Ok(table
394 .with_metadata(Arc::new(new_metadata))
395 .with_metadata_location(new_metadata_location))
396 }
397}
398
399#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
401#[serde(tag = "type")]
402pub enum TableRequirement {
403 #[serde(rename = "assert-create")]
405 NotExist,
406 #[serde(rename = "assert-table-uuid")]
408 UuidMatch {
409 uuid: Uuid,
411 },
412 #[serde(rename = "assert-ref-snapshot-id")]
415 RefSnapshotIdMatch {
416 r#ref: String,
418 #[serde(rename = "snapshot-id")]
421 snapshot_id: Option<i64>,
422 },
423 #[serde(rename = "assert-last-assigned-field-id")]
425 LastAssignedFieldIdMatch {
426 #[serde(rename = "last-assigned-field-id")]
428 last_assigned_field_id: i32,
429 },
430 #[serde(rename = "assert-current-schema-id")]
432 CurrentSchemaIdMatch {
433 #[serde(rename = "current-schema-id")]
435 current_schema_id: SchemaId,
436 },
437 #[serde(rename = "assert-last-assigned-partition-id")]
440 LastAssignedPartitionIdMatch {
441 #[serde(rename = "last-assigned-partition-id")]
443 last_assigned_partition_id: i32,
444 },
445 #[serde(rename = "assert-default-spec-id")]
447 DefaultSpecIdMatch {
448 #[serde(rename = "default-spec-id")]
450 default_spec_id: i32,
451 },
452 #[serde(rename = "assert-default-sort-order-id")]
454 DefaultSortOrderIdMatch {
455 #[serde(rename = "default-sort-order-id")]
457 default_sort_order_id: i64,
458 },
459}
460
461#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
463#[serde(tag = "action", rename_all = "kebab-case")]
464#[allow(clippy::large_enum_variant)]
465pub enum TableUpdate {
466 #[serde(rename_all = "kebab-case")]
468 UpgradeFormatVersion {
469 format_version: FormatVersion,
471 },
472 #[serde(rename_all = "kebab-case")]
474 AssignUuid {
475 uuid: Uuid,
477 },
478 #[serde(rename_all = "kebab-case")]
480 AddSchema {
481 schema: Schema,
483 },
484 #[serde(rename_all = "kebab-case")]
486 SetCurrentSchema {
487 schema_id: i32,
489 },
490 AddSpec {
492 spec: UnboundPartitionSpec,
494 },
495 #[serde(rename_all = "kebab-case")]
497 SetDefaultSpec {
498 spec_id: i32,
500 },
501 #[serde(rename_all = "kebab-case")]
503 AddSortOrder {
504 sort_order: SortOrder,
506 },
507 #[serde(rename_all = "kebab-case")]
509 SetDefaultSortOrder {
510 sort_order_id: i64,
512 },
513 #[serde(rename_all = "kebab-case")]
515 AddSnapshot {
516 #[serde(
518 deserialize_with = "deserialize_snapshot",
519 serialize_with = "serialize_snapshot"
520 )]
521 snapshot: Snapshot,
522 },
523 #[serde(rename_all = "kebab-case")]
525 SetSnapshotRef {
526 ref_name: String,
528 #[serde(flatten)]
530 reference: SnapshotReference,
531 },
532 #[serde(rename_all = "kebab-case")]
534 RemoveSnapshots {
535 snapshot_ids: Vec<i64>,
537 },
538 #[serde(rename_all = "kebab-case")]
540 RemoveSnapshotRef {
541 ref_name: String,
543 },
544 SetLocation {
546 location: String,
548 },
549 SetProperties {
551 updates: HashMap<String, String>,
553 },
554 RemoveProperties {
556 removals: Vec<String>,
558 },
559 #[serde(rename_all = "kebab-case")]
561 RemovePartitionSpecs {
562 spec_ids: Vec<i32>,
564 },
565 #[serde(with = "_serde_set_statistics")]
567 SetStatistics {
568 statistics: StatisticsFile,
570 },
571 #[serde(rename_all = "kebab-case")]
573 RemoveStatistics {
574 snapshot_id: i64,
576 },
577 #[serde(rename_all = "kebab-case")]
579 SetPartitionStatistics {
580 partition_statistics: PartitionStatisticsFile,
582 },
583 #[serde(rename_all = "kebab-case")]
585 RemovePartitionStatistics {
586 snapshot_id: i64,
588 },
589 #[serde(rename_all = "kebab-case")]
591 RemoveSchemas {
592 schema_ids: Vec<i32>,
594 },
595 #[serde(rename_all = "kebab-case")]
597 AddEncryptionKey {
598 encryption_key: EncryptedKey,
600 },
601 #[serde(rename_all = "kebab-case")]
603 RemoveEncryptionKey {
604 key_id: String,
606 },
607}
608
609impl TableUpdate {
610 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
612 match self {
613 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
614 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
615 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
616 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
617 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
618 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
619 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
620 builder.set_default_sort_order(sort_order_id)
621 }
622 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
623 TableUpdate::SetSnapshotRef {
624 ref_name,
625 reference,
626 } => builder.set_ref(&ref_name, reference),
627 TableUpdate::RemoveSnapshots { snapshot_ids } => {
628 Ok(builder.remove_snapshots(&snapshot_ids))
629 }
630 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
631 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
632 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
633 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
634 TableUpdate::UpgradeFormatVersion { format_version } => {
635 builder.upgrade_format_version(format_version)
636 }
637 TableUpdate::RemovePartitionSpecs { spec_ids } => {
638 builder.remove_partition_specs(&spec_ids)
639 }
640 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
641 TableUpdate::RemoveStatistics { snapshot_id } => {
642 Ok(builder.remove_statistics(snapshot_id))
643 }
644 TableUpdate::SetPartitionStatistics {
645 partition_statistics,
646 } => Ok(builder.set_partition_statistics(partition_statistics)),
647 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
648 Ok(builder.remove_partition_statistics(snapshot_id))
649 }
650 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
651 TableUpdate::AddEncryptionKey { encryption_key } => {
652 Ok(builder.add_encryption_key(encryption_key))
653 }
654 TableUpdate::RemoveEncryptionKey { key_id } => {
655 Ok(builder.remove_encryption_key(&key_id))
656 }
657 }
658 }
659}
660
661impl TableRequirement {
662 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
667 if let Some(metadata) = metadata {
668 match self {
669 TableRequirement::NotExist => {
670 return Err(Error::new(
671 ErrorKind::CatalogCommitConflicts,
672 format!(
673 "Requirement failed: Table with id {} already exists",
674 metadata.uuid()
675 ),
676 )
677 .with_retryable(true));
678 }
679 TableRequirement::UuidMatch { uuid } => {
680 if &metadata.uuid() != uuid {
681 return Err(Error::new(
682 ErrorKind::CatalogCommitConflicts,
683 "Requirement failed: Table UUID does not match",
684 )
685 .with_context("expected", *uuid)
686 .with_context("found", metadata.uuid())
687 .with_retryable(true));
688 }
689 }
690 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
691 if metadata.current_schema_id != *current_schema_id {
693 return Err(Error::new(
694 ErrorKind::CatalogCommitConflicts,
695 "Requirement failed: Current schema id does not match",
696 )
697 .with_context("expected", current_schema_id.to_string())
698 .with_context("found", metadata.current_schema_id.to_string())
699 .with_retryable(true));
700 }
701 }
702 TableRequirement::DefaultSortOrderIdMatch {
703 default_sort_order_id,
704 } => {
705 if metadata.default_sort_order().order_id != *default_sort_order_id {
706 return Err(Error::new(
707 ErrorKind::CatalogCommitConflicts,
708 "Requirement failed: Default sort order id does not match",
709 )
710 .with_context("expected", default_sort_order_id.to_string())
711 .with_context("found", metadata.default_sort_order().order_id.to_string())
712 .with_retryable(true));
713 }
714 }
715 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
716 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
717 if let Some(snapshot_id) = snapshot_id {
718 let snapshot_ref = snapshot_ref.ok_or(
719 Error::new(
720 ErrorKind::CatalogCommitConflicts,
721 format!("Requirement failed: Branch or tag `{ref}` not found"),
722 )
723 .with_retryable(true),
724 )?;
725 if snapshot_ref.snapshot_id() != *snapshot_id {
726 return Err(Error::new(
727 ErrorKind::CatalogCommitConflicts,
728 format!(
729 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
730 ),
731 )
732 .with_context("expected", snapshot_id.to_string())
733 .with_context("found", snapshot_ref.snapshot_id().to_string())
734 .with_retryable(true));
735 }
736 } else if snapshot_ref.is_some() {
737 return Err(Error::new(
739 ErrorKind::CatalogCommitConflicts,
740 format!("Requirement failed: Branch or tag `{ref}` already exists"),
741 )
742 .with_retryable(true));
743 }
744 }
745 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
746 if metadata.default_partition_spec_id() != *default_spec_id {
748 return Err(Error::new(
749 ErrorKind::CatalogCommitConflicts,
750 "Requirement failed: Default partition spec id does not match",
751 )
752 .with_context("expected", default_spec_id.to_string())
753 .with_context("found", metadata.default_partition_spec_id().to_string())
754 .with_retryable(true));
755 }
756 }
757 TableRequirement::LastAssignedPartitionIdMatch {
758 last_assigned_partition_id,
759 } => {
760 if metadata.last_partition_id != *last_assigned_partition_id {
761 return Err(Error::new(
762 ErrorKind::CatalogCommitConflicts,
763 "Requirement failed: Last assigned partition id does not match",
764 )
765 .with_context("expected", last_assigned_partition_id.to_string())
766 .with_context("found", metadata.last_partition_id.to_string())
767 .with_retryable(true));
768 }
769 }
770 TableRequirement::LastAssignedFieldIdMatch {
771 last_assigned_field_id,
772 } => {
773 if &metadata.last_column_id != last_assigned_field_id {
774 return Err(Error::new(
775 ErrorKind::CatalogCommitConflicts,
776 "Requirement failed: Last assigned field id does not match",
777 )
778 .with_context("expected", last_assigned_field_id.to_string())
779 .with_context("found", metadata.last_column_id.to_string())
780 .with_retryable(true));
781 }
782 }
783 };
784 } else {
785 match self {
786 TableRequirement::NotExist => {}
787 _ => {
788 return Err(Error::new(
789 ErrorKind::TableNotFound,
790 "Requirement failed: Table does not exist",
791 ));
792 }
793 }
794 }
795
796 Ok(())
797 }
798}
799
800pub(super) mod _serde {
801 use serde::{Deserialize as _, Deserializer, Serialize as _};
802
803 use super::*;
804 use crate::spec::{SchemaId, Summary};
805
806 pub(super) fn deserialize_snapshot<'de, D>(
807 deserializer: D,
808 ) -> std::result::Result<Snapshot, D::Error>
809 where D: Deserializer<'de> {
810 let buf = CatalogSnapshot::deserialize(deserializer)?;
811 Ok(buf.into())
812 }
813
814 pub(super) fn serialize_snapshot<S>(
815 snapshot: &Snapshot,
816 serializer: S,
817 ) -> std::result::Result<S::Ok, S::Error>
818 where
819 S: serde::Serializer,
820 {
821 let buf: CatalogSnapshot = snapshot.clone().into();
822 buf.serialize(serializer)
823 }
824
825 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
826 #[serde(rename_all = "kebab-case")]
827 struct CatalogSnapshot {
831 snapshot_id: i64,
832 #[serde(skip_serializing_if = "Option::is_none")]
833 parent_snapshot_id: Option<i64>,
834 #[serde(default)]
835 sequence_number: i64,
836 timestamp_ms: i64,
837 manifest_list: String,
838 summary: Summary,
839 #[serde(skip_serializing_if = "Option::is_none")]
840 schema_id: Option<SchemaId>,
841 #[serde(skip_serializing_if = "Option::is_none")]
842 first_row_id: Option<u64>,
843 #[serde(skip_serializing_if = "Option::is_none")]
844 added_rows: Option<u64>,
845 #[serde(skip_serializing_if = "Option::is_none")]
846 key_id: Option<String>,
847 }
848
849 impl From<CatalogSnapshot> for Snapshot {
850 fn from(snapshot: CatalogSnapshot) -> Self {
851 let CatalogSnapshot {
852 snapshot_id,
853 parent_snapshot_id,
854 sequence_number,
855 timestamp_ms,
856 manifest_list,
857 schema_id,
858 summary,
859 first_row_id,
860 added_rows,
861 key_id,
862 } = snapshot;
863 let builder = Snapshot::builder()
864 .with_snapshot_id(snapshot_id)
865 .with_parent_snapshot_id(parent_snapshot_id)
866 .with_sequence_number(sequence_number)
867 .with_timestamp_ms(timestamp_ms)
868 .with_manifest_list(manifest_list)
869 .with_summary(summary)
870 .with_encryption_key_id(key_id);
871 let row_range = first_row_id.zip(added_rows);
872 match (schema_id, row_range) {
873 (None, None) => builder.build(),
874 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
875 (None, Some((first_row_id, last_row_id))) => {
876 builder.with_row_range(first_row_id, last_row_id).build()
877 }
878 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
879 .with_schema_id(schema_id)
880 .with_row_range(first_row_id, last_row_id)
881 .build(),
882 }
883 }
884 }
885
886 impl From<Snapshot> for CatalogSnapshot {
887 fn from(snapshot: Snapshot) -> Self {
888 let first_row_id = snapshot.first_row_id();
889 let added_rows = snapshot.added_rows_count();
890 let Snapshot {
891 snapshot_id,
892 parent_snapshot_id,
893 sequence_number,
894 timestamp_ms,
895 manifest_list,
896 summary,
897 schema_id,
898 row_range: _,
899 encryption_key_id: key_id,
900 } = snapshot;
901 CatalogSnapshot {
902 snapshot_id,
903 parent_snapshot_id,
904 sequence_number,
905 timestamp_ms,
906 manifest_list,
907 summary,
908 schema_id,
909 first_row_id,
910 added_rows,
911 key_id,
912 }
913 }
914 }
915}
916
917#[derive(Debug, TypedBuilder)]
919pub struct ViewCreation {
920 pub name: String,
922 pub location: String,
924 pub representations: ViewRepresentations,
926 pub schema: Schema,
928 #[builder(default)]
930 pub properties: HashMap<String, String>,
931 pub default_namespace: NamespaceIdent,
933 #[builder(default)]
935 pub default_catalog: Option<String>,
936 #[builder(default)]
939 pub summary: HashMap<String, String>,
940}
941
942#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
944#[serde(tag = "action", rename_all = "kebab-case")]
945#[allow(clippy::large_enum_variant)]
946pub enum ViewUpdate {
947 #[serde(rename_all = "kebab-case")]
949 AssignUuid {
950 uuid: Uuid,
952 },
953 #[serde(rename_all = "kebab-case")]
955 UpgradeFormatVersion {
956 format_version: ViewFormatVersion,
958 },
959 #[serde(rename_all = "kebab-case")]
961 AddSchema {
962 schema: Schema,
964 last_column_id: Option<i32>,
966 },
967 #[serde(rename_all = "kebab-case")]
969 SetLocation {
970 location: String,
972 },
973 #[serde(rename_all = "kebab-case")]
977 SetProperties {
978 updates: HashMap<String, String>,
980 },
981 #[serde(rename_all = "kebab-case")]
983 RemoveProperties {
984 removals: Vec<String>,
986 },
987 #[serde(rename_all = "kebab-case")]
989 AddViewVersion {
990 view_version: ViewVersion,
992 },
993 #[serde(rename_all = "kebab-case")]
995 SetCurrentViewVersion {
996 view_version_id: i32,
998 },
999}
1000
1001mod _serde_set_statistics {
1002 use serde::{Deserialize, Deserializer, Serialize, Serializer};
1005
1006 use super::*;
1007
1008 #[derive(Debug, Serialize, Deserialize)]
1009 #[serde(rename_all = "kebab-case")]
1010 struct SetStatistics {
1011 snapshot_id: Option<i64>,
1012 statistics: StatisticsFile,
1013 }
1014
1015 pub fn serialize<S>(
1016 value: &StatisticsFile,
1017 serializer: S,
1018 ) -> std::result::Result<S::Ok, S::Error>
1019 where
1020 S: Serializer,
1021 {
1022 SetStatistics {
1023 snapshot_id: Some(value.snapshot_id),
1024 statistics: value.clone(),
1025 }
1026 .serialize(serializer)
1027 }
1028
1029 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1030 where D: Deserializer<'de> {
1031 let SetStatistics {
1032 snapshot_id,
1033 statistics,
1034 } = SetStatistics::deserialize(deserializer)?;
1035 if let Some(snapshot_id) = snapshot_id
1036 && snapshot_id != statistics.snapshot_id
1037 {
1038 return Err(serde::de::Error::custom(format!(
1039 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1040 statistics.snapshot_id
1041 )));
1042 }
1043
1044 Ok(statistics)
1045 }
1046}
1047
1048#[cfg(test)]
1049mod tests {
1050 use std::collections::HashMap;
1051 use std::fmt::Debug;
1052 use std::fs::File;
1053 use std::io::BufReader;
1054
1055 use base64::Engine as _;
1056 use serde::Serialize;
1057 use serde::de::DeserializeOwned;
1058 use uuid::uuid;
1059
1060 use super::ViewUpdate;
1061 use crate::io::FileIO;
1062 use crate::spec::{
1063 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1064 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1065 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1066 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1067 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1068 ViewVersion,
1069 };
1070 use crate::table::Table;
1071 use crate::{
1072 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1073 };
1074
1075 #[test]
1076 fn test_parent_namespace() {
1077 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1078 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1079 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1080
1081 assert_eq!(ns1.parent(), None);
1082 assert_eq!(ns2.parent(), Some(ns1.clone()));
1083 assert_eq!(ns3.parent(), Some(ns2.clone()));
1084 }
1085
1086 #[test]
1087 fn test_create_table_id() {
1088 let table_id = TableIdent {
1089 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1090 name: "t1".to_string(),
1091 };
1092
1093 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1094 }
1095
1096 #[test]
1097 fn test_table_creation_iterator_properties() {
1098 let builder = TableCreation::builder()
1099 .name("table".to_string())
1100 .schema(Schema::builder().build().unwrap());
1101
1102 fn s(k: &str, v: &str) -> (String, String) {
1103 (k.to_string(), v.to_string())
1104 }
1105
1106 let table_creation = builder
1107 .properties([s("key", "value"), s("foo", "bar")])
1108 .build();
1109
1110 assert_eq!(
1111 HashMap::from([s("key", "value"), s("foo", "bar")]),
1112 table_creation.properties
1113 );
1114 }
1115
1116 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1117 json: impl ToString,
1118 expected: T,
1119 ) {
1120 let json_str = json.to_string();
1121 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1122 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1123
1124 let restored: T = serde_json::from_str(
1125 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1126 )
1127 .expect("Failed to parse from serialized json");
1128
1129 assert_eq!(
1130 restored, expected,
1131 "Parsed restored value is not equal to expected"
1132 );
1133 }
1134
1135 fn metadata() -> TableMetadata {
1136 let tbl_creation = TableCreation::builder()
1137 .name("table".to_string())
1138 .location("/path/to/table".to_string())
1139 .schema(Schema::builder().build().unwrap())
1140 .build();
1141
1142 TableMetadataBuilder::from_table_creation(tbl_creation)
1143 .unwrap()
1144 .assign_uuid(uuid::Uuid::nil())
1145 .build()
1146 .unwrap()
1147 .metadata
1148 }
1149
1150 #[test]
1151 fn test_check_requirement_not_exist() {
1152 let metadata = metadata();
1153 let requirement = TableRequirement::NotExist;
1154
1155 assert!(requirement.check(Some(&metadata)).is_err());
1156 assert!(requirement.check(None).is_ok());
1157 }
1158
1159 #[test]
1160 fn test_check_table_uuid() {
1161 let metadata = metadata();
1162
1163 let requirement = TableRequirement::UuidMatch {
1164 uuid: uuid::Uuid::now_v7(),
1165 };
1166 assert!(requirement.check(Some(&metadata)).is_err());
1167
1168 let requirement = TableRequirement::UuidMatch {
1169 uuid: uuid::Uuid::nil(),
1170 };
1171 assert!(requirement.check(Some(&metadata)).is_ok());
1172 }
1173
1174 #[test]
1175 fn test_check_ref_snapshot_id() {
1176 let metadata = metadata();
1177
1178 let requirement = TableRequirement::RefSnapshotIdMatch {
1180 r#ref: "my_branch".to_string(),
1181 snapshot_id: Some(1),
1182 };
1183 assert!(requirement.check(Some(&metadata)).is_err());
1184
1185 let requirement = TableRequirement::RefSnapshotIdMatch {
1187 r#ref: "my_branch".to_string(),
1188 snapshot_id: None,
1189 };
1190 assert!(requirement.check(Some(&metadata)).is_ok());
1191
1192 let snapshot = Snapshot::builder()
1194 .with_snapshot_id(3051729675574597004)
1195 .with_sequence_number(10)
1196 .with_timestamp_ms(9992191116217)
1197 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1198 .with_schema_id(0)
1199 .with_summary(Summary {
1200 operation: Operation::Append,
1201 additional_properties: HashMap::new(),
1202 })
1203 .build();
1204
1205 let builder = metadata.into_builder(None);
1206 let builder = TableUpdate::AddSnapshot {
1207 snapshot: snapshot.clone(),
1208 }
1209 .apply(builder)
1210 .unwrap();
1211 let metadata = TableUpdate::SetSnapshotRef {
1212 ref_name: MAIN_BRANCH.to_string(),
1213 reference: SnapshotReference {
1214 snapshot_id: snapshot.snapshot_id(),
1215 retention: SnapshotRetention::Branch {
1216 min_snapshots_to_keep: Some(10),
1217 max_snapshot_age_ms: None,
1218 max_ref_age_ms: None,
1219 },
1220 },
1221 }
1222 .apply(builder)
1223 .unwrap()
1224 .build()
1225 .unwrap()
1226 .metadata;
1227
1228 let requirement = TableRequirement::RefSnapshotIdMatch {
1230 r#ref: "main".to_string(),
1231 snapshot_id: Some(3051729675574597004),
1232 };
1233 assert!(requirement.check(Some(&metadata)).is_ok());
1234
1235 let requirement = TableRequirement::RefSnapshotIdMatch {
1237 r#ref: "main".to_string(),
1238 snapshot_id: Some(1),
1239 };
1240 assert!(requirement.check(Some(&metadata)).is_err());
1241 }
1242
1243 #[test]
1244 fn test_check_last_assigned_field_id() {
1245 let metadata = metadata();
1246
1247 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1248 last_assigned_field_id: 1,
1249 };
1250 assert!(requirement.check(Some(&metadata)).is_err());
1251
1252 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1253 last_assigned_field_id: 0,
1254 };
1255 assert!(requirement.check(Some(&metadata)).is_ok());
1256 }
1257
1258 #[test]
1259 fn test_check_current_schema_id() {
1260 let metadata = metadata();
1261
1262 let requirement = TableRequirement::CurrentSchemaIdMatch {
1263 current_schema_id: 1,
1264 };
1265 assert!(requirement.check(Some(&metadata)).is_err());
1266
1267 let requirement = TableRequirement::CurrentSchemaIdMatch {
1268 current_schema_id: 0,
1269 };
1270 assert!(requirement.check(Some(&metadata)).is_ok());
1271 }
1272
1273 #[test]
1274 fn test_check_last_assigned_partition_id() {
1275 let metadata = metadata();
1276 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1277 last_assigned_partition_id: 0,
1278 };
1279 assert!(requirement.check(Some(&metadata)).is_err());
1280
1281 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1282 last_assigned_partition_id: 999,
1283 };
1284 assert!(requirement.check(Some(&metadata)).is_ok());
1285 }
1286
1287 #[test]
1288 fn test_check_default_spec_id() {
1289 let metadata = metadata();
1290
1291 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1292 assert!(requirement.check(Some(&metadata)).is_err());
1293
1294 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1295 assert!(requirement.check(Some(&metadata)).is_ok());
1296 }
1297
1298 #[test]
1299 fn test_check_default_sort_order_id() {
1300 let metadata = metadata();
1301
1302 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1303 default_sort_order_id: 1,
1304 };
1305 assert!(requirement.check(Some(&metadata)).is_err());
1306
1307 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1308 default_sort_order_id: 0,
1309 };
1310 assert!(requirement.check(Some(&metadata)).is_ok());
1311 }
1312
1313 #[test]
1314 fn test_table_uuid() {
1315 test_serde_json(
1316 r#"
1317{
1318 "type": "assert-table-uuid",
1319 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1320}
1321 "#,
1322 TableRequirement::UuidMatch {
1323 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1324 },
1325 );
1326 }
1327
1328 #[test]
1329 fn test_assert_table_not_exists() {
1330 test_serde_json(
1331 r#"
1332{
1333 "type": "assert-create"
1334}
1335 "#,
1336 TableRequirement::NotExist,
1337 );
1338 }
1339
1340 #[test]
1341 fn test_assert_ref_snapshot_id() {
1342 test_serde_json(
1343 r#"
1344{
1345 "type": "assert-ref-snapshot-id",
1346 "ref": "snapshot-name",
1347 "snapshot-id": null
1348}
1349 "#,
1350 TableRequirement::RefSnapshotIdMatch {
1351 r#ref: "snapshot-name".to_string(),
1352 snapshot_id: None,
1353 },
1354 );
1355
1356 test_serde_json(
1357 r#"
1358{
1359 "type": "assert-ref-snapshot-id",
1360 "ref": "snapshot-name",
1361 "snapshot-id": 1
1362}
1363 "#,
1364 TableRequirement::RefSnapshotIdMatch {
1365 r#ref: "snapshot-name".to_string(),
1366 snapshot_id: Some(1),
1367 },
1368 );
1369 }
1370
1371 #[test]
1372 fn test_assert_last_assigned_field_id() {
1373 test_serde_json(
1374 r#"
1375{
1376 "type": "assert-last-assigned-field-id",
1377 "last-assigned-field-id": 12
1378}
1379 "#,
1380 TableRequirement::LastAssignedFieldIdMatch {
1381 last_assigned_field_id: 12,
1382 },
1383 );
1384 }
1385
1386 #[test]
1387 fn test_assert_current_schema_id() {
1388 test_serde_json(
1389 r#"
1390{
1391 "type": "assert-current-schema-id",
1392 "current-schema-id": 4
1393}
1394 "#,
1395 TableRequirement::CurrentSchemaIdMatch {
1396 current_schema_id: 4,
1397 },
1398 );
1399 }
1400
1401 #[test]
1402 fn test_assert_last_assigned_partition_id() {
1403 test_serde_json(
1404 r#"
1405{
1406 "type": "assert-last-assigned-partition-id",
1407 "last-assigned-partition-id": 1004
1408}
1409 "#,
1410 TableRequirement::LastAssignedPartitionIdMatch {
1411 last_assigned_partition_id: 1004,
1412 },
1413 );
1414 }
1415
1416 #[test]
1417 fn test_assert_default_spec_id() {
1418 test_serde_json(
1419 r#"
1420{
1421 "type": "assert-default-spec-id",
1422 "default-spec-id": 5
1423}
1424 "#,
1425 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1426 );
1427 }
1428
1429 #[test]
1430 fn test_assert_default_sort_order() {
1431 let json = r#"
1432{
1433 "type": "assert-default-sort-order-id",
1434 "default-sort-order-id": 10
1435}
1436 "#;
1437
1438 let update = TableRequirement::DefaultSortOrderIdMatch {
1439 default_sort_order_id: 10,
1440 };
1441
1442 test_serde_json(json, update);
1443 }
1444
1445 #[test]
1446 fn test_parse_assert_invalid() {
1447 assert!(
1448 serde_json::from_str::<TableRequirement>(
1449 r#"
1450{
1451 "default-sort-order-id": 10
1452}
1453"#
1454 )
1455 .is_err(),
1456 "Table requirements should not be parsed without type."
1457 );
1458 }
1459
1460 #[test]
1461 fn test_assign_uuid() {
1462 test_serde_json(
1463 r#"
1464{
1465 "action": "assign-uuid",
1466 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1467}
1468 "#,
1469 TableUpdate::AssignUuid {
1470 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1471 },
1472 );
1473 }
1474
1475 #[test]
1476 fn test_upgrade_format_version() {
1477 test_serde_json(
1478 r#"
1479{
1480 "action": "upgrade-format-version",
1481 "format-version": 2
1482}
1483 "#,
1484 TableUpdate::UpgradeFormatVersion {
1485 format_version: FormatVersion::V2,
1486 },
1487 );
1488 }
1489
1490 #[test]
1491 fn test_add_schema() {
1492 let test_schema = Schema::builder()
1493 .with_schema_id(1)
1494 .with_identifier_field_ids(vec![2])
1495 .with_fields(vec![
1496 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1497 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1498 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1499 ])
1500 .build()
1501 .unwrap();
1502 test_serde_json(
1503 r#"
1504{
1505 "action": "add-schema",
1506 "schema": {
1507 "type": "struct",
1508 "schema-id": 1,
1509 "fields": [
1510 {
1511 "id": 1,
1512 "name": "foo",
1513 "required": false,
1514 "type": "string"
1515 },
1516 {
1517 "id": 2,
1518 "name": "bar",
1519 "required": true,
1520 "type": "int"
1521 },
1522 {
1523 "id": 3,
1524 "name": "baz",
1525 "required": false,
1526 "type": "boolean"
1527 }
1528 ],
1529 "identifier-field-ids": [
1530 2
1531 ]
1532 },
1533 "last-column-id": 3
1534}
1535 "#,
1536 TableUpdate::AddSchema {
1537 schema: test_schema.clone(),
1538 },
1539 );
1540
1541 test_serde_json(
1542 r#"
1543{
1544 "action": "add-schema",
1545 "schema": {
1546 "type": "struct",
1547 "schema-id": 1,
1548 "fields": [
1549 {
1550 "id": 1,
1551 "name": "foo",
1552 "required": false,
1553 "type": "string"
1554 },
1555 {
1556 "id": 2,
1557 "name": "bar",
1558 "required": true,
1559 "type": "int"
1560 },
1561 {
1562 "id": 3,
1563 "name": "baz",
1564 "required": false,
1565 "type": "boolean"
1566 }
1567 ],
1568 "identifier-field-ids": [
1569 2
1570 ]
1571 }
1572}
1573 "#,
1574 TableUpdate::AddSchema {
1575 schema: test_schema.clone(),
1576 },
1577 );
1578 }
1579
1580 #[test]
1581 fn test_set_current_schema() {
1582 test_serde_json(
1583 r#"
1584{
1585 "action": "set-current-schema",
1586 "schema-id": 23
1587}
1588 "#,
1589 TableUpdate::SetCurrentSchema { schema_id: 23 },
1590 );
1591 }
1592
1593 #[test]
1594 fn test_add_spec() {
1595 test_serde_json(
1596 r#"
1597{
1598 "action": "add-spec",
1599 "spec": {
1600 "fields": [
1601 {
1602 "source-id": 4,
1603 "name": "ts_day",
1604 "transform": "day"
1605 },
1606 {
1607 "source-id": 1,
1608 "name": "id_bucket",
1609 "transform": "bucket[16]"
1610 },
1611 {
1612 "source-id": 2,
1613 "name": "id_truncate",
1614 "transform": "truncate[4]"
1615 }
1616 ]
1617 }
1618}
1619 "#,
1620 TableUpdate::AddSpec {
1621 spec: UnboundPartitionSpec::builder()
1622 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1623 .unwrap()
1624 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1625 .unwrap()
1626 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1627 .unwrap()
1628 .build(),
1629 },
1630 );
1631 }
1632
1633 #[test]
1634 fn test_set_default_spec() {
1635 test_serde_json(
1636 r#"
1637{
1638 "action": "set-default-spec",
1639 "spec-id": 1
1640}
1641 "#,
1642 TableUpdate::SetDefaultSpec { spec_id: 1 },
1643 )
1644 }
1645
1646 #[test]
1647 fn test_add_sort_order() {
1648 let json = r#"
1649{
1650 "action": "add-sort-order",
1651 "sort-order": {
1652 "order-id": 1,
1653 "fields": [
1654 {
1655 "transform": "identity",
1656 "source-id": 2,
1657 "direction": "asc",
1658 "null-order": "nulls-first"
1659 },
1660 {
1661 "transform": "bucket[4]",
1662 "source-id": 3,
1663 "direction": "desc",
1664 "null-order": "nulls-last"
1665 }
1666 ]
1667 }
1668}
1669 "#;
1670
1671 let update = TableUpdate::AddSortOrder {
1672 sort_order: SortOrder::builder()
1673 .with_order_id(1)
1674 .with_sort_field(
1675 SortField::builder()
1676 .source_id(2)
1677 .direction(SortDirection::Ascending)
1678 .null_order(NullOrder::First)
1679 .transform(Transform::Identity)
1680 .build(),
1681 )
1682 .with_sort_field(
1683 SortField::builder()
1684 .source_id(3)
1685 .direction(SortDirection::Descending)
1686 .null_order(NullOrder::Last)
1687 .transform(Transform::Bucket(4))
1688 .build(),
1689 )
1690 .build_unbound()
1691 .unwrap(),
1692 };
1693
1694 test_serde_json(json, update);
1695 }
1696
1697 #[test]
1698 fn test_set_default_order() {
1699 let json = r#"
1700{
1701 "action": "set-default-sort-order",
1702 "sort-order-id": 2
1703}
1704 "#;
1705 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1706
1707 test_serde_json(json, update);
1708 }
1709
1710 #[test]
1711 fn test_add_snapshot() {
1712 let json = r#"
1713{
1714 "action": "add-snapshot",
1715 "snapshot": {
1716 "snapshot-id": 3055729675574597000,
1717 "parent-snapshot-id": 3051729675574597000,
1718 "timestamp-ms": 1555100955770,
1719 "sequence-number": 1,
1720 "summary": {
1721 "operation": "append"
1722 },
1723 "manifest-list": "s3://a/b/2.avro",
1724 "schema-id": 1
1725 }
1726}
1727 "#;
1728
1729 let update = TableUpdate::AddSnapshot {
1730 snapshot: Snapshot::builder()
1731 .with_snapshot_id(3055729675574597000)
1732 .with_parent_snapshot_id(Some(3051729675574597000))
1733 .with_timestamp_ms(1555100955770)
1734 .with_sequence_number(1)
1735 .with_manifest_list("s3://a/b/2.avro")
1736 .with_schema_id(1)
1737 .with_summary(Summary {
1738 operation: Operation::Append,
1739 additional_properties: HashMap::default(),
1740 })
1741 .build(),
1742 };
1743
1744 test_serde_json(json, update);
1745 }
1746
1747 #[test]
1748 fn test_add_snapshot_v1() {
1749 let json = r#"
1750{
1751 "action": "add-snapshot",
1752 "snapshot": {
1753 "snapshot-id": 3055729675574597000,
1754 "parent-snapshot-id": 3051729675574597000,
1755 "timestamp-ms": 1555100955770,
1756 "summary": {
1757 "operation": "append"
1758 },
1759 "manifest-list": "s3://a/b/2.avro"
1760 }
1761}
1762 "#;
1763
1764 let update = TableUpdate::AddSnapshot {
1765 snapshot: Snapshot::builder()
1766 .with_snapshot_id(3055729675574597000)
1767 .with_parent_snapshot_id(Some(3051729675574597000))
1768 .with_timestamp_ms(1555100955770)
1769 .with_sequence_number(0)
1770 .with_manifest_list("s3://a/b/2.avro")
1771 .with_summary(Summary {
1772 operation: Operation::Append,
1773 additional_properties: HashMap::default(),
1774 })
1775 .build(),
1776 };
1777
1778 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1779 assert_eq!(actual, update, "Parsed value is not equal to expected");
1780 }
1781
1782 #[test]
1783 fn test_add_snapshot_v3() {
1784 let json = serde_json::json!(
1785 {
1786 "action": "add-snapshot",
1787 "snapshot": {
1788 "snapshot-id": 3055729675574597000i64,
1789 "parent-snapshot-id": 3051729675574597000i64,
1790 "timestamp-ms": 1555100955770i64,
1791 "first-row-id":0,
1792 "added-rows":2,
1793 "key-id":"key123",
1794 "summary": {
1795 "operation": "append"
1796 },
1797 "manifest-list": "s3://a/b/2.avro"
1798 }
1799 });
1800
1801 let update = TableUpdate::AddSnapshot {
1802 snapshot: Snapshot::builder()
1803 .with_snapshot_id(3055729675574597000)
1804 .with_parent_snapshot_id(Some(3051729675574597000))
1805 .with_timestamp_ms(1555100955770)
1806 .with_sequence_number(0)
1807 .with_manifest_list("s3://a/b/2.avro")
1808 .with_row_range(0, 2)
1809 .with_encryption_key_id(Some("key123".to_string()))
1810 .with_summary(Summary {
1811 operation: Operation::Append,
1812 additional_properties: HashMap::default(),
1813 })
1814 .build(),
1815 };
1816
1817 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1818 assert_eq!(actual, update, "Parsed value is not equal to expected");
1819 let restored: TableUpdate = serde_json::from_str(
1820 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1821 )
1822 .expect("Failed to parse from serialized json");
1823 assert_eq!(restored, update);
1824 }
1825
1826 #[test]
1827 fn test_remove_snapshots() {
1828 let json = r#"
1829{
1830 "action": "remove-snapshots",
1831 "snapshot-ids": [
1832 1,
1833 2
1834 ]
1835}
1836 "#;
1837
1838 let update = TableUpdate::RemoveSnapshots {
1839 snapshot_ids: vec![1, 2],
1840 };
1841 test_serde_json(json, update);
1842 }
1843
1844 #[test]
1845 fn test_remove_snapshot_ref() {
1846 let json = r#"
1847{
1848 "action": "remove-snapshot-ref",
1849 "ref-name": "snapshot-ref"
1850}
1851 "#;
1852
1853 let update = TableUpdate::RemoveSnapshotRef {
1854 ref_name: "snapshot-ref".to_string(),
1855 };
1856 test_serde_json(json, update);
1857 }
1858
1859 #[test]
1860 fn test_set_snapshot_ref_tag() {
1861 let json = r#"
1862{
1863 "action": "set-snapshot-ref",
1864 "type": "tag",
1865 "ref-name": "hank",
1866 "snapshot-id": 1,
1867 "max-ref-age-ms": 1
1868}
1869 "#;
1870
1871 let update = TableUpdate::SetSnapshotRef {
1872 ref_name: "hank".to_string(),
1873 reference: SnapshotReference {
1874 snapshot_id: 1,
1875 retention: SnapshotRetention::Tag {
1876 max_ref_age_ms: Some(1),
1877 },
1878 },
1879 };
1880
1881 test_serde_json(json, update);
1882 }
1883
1884 #[test]
1885 fn test_set_snapshot_ref_branch() {
1886 let json = r#"
1887{
1888 "action": "set-snapshot-ref",
1889 "type": "branch",
1890 "ref-name": "hank",
1891 "snapshot-id": 1,
1892 "min-snapshots-to-keep": 2,
1893 "max-snapshot-age-ms": 3,
1894 "max-ref-age-ms": 4
1895}
1896 "#;
1897
1898 let update = TableUpdate::SetSnapshotRef {
1899 ref_name: "hank".to_string(),
1900 reference: SnapshotReference {
1901 snapshot_id: 1,
1902 retention: SnapshotRetention::Branch {
1903 min_snapshots_to_keep: Some(2),
1904 max_snapshot_age_ms: Some(3),
1905 max_ref_age_ms: Some(4),
1906 },
1907 },
1908 };
1909
1910 test_serde_json(json, update);
1911 }
1912
1913 #[test]
1914 fn test_set_properties() {
1915 let json = r#"
1916{
1917 "action": "set-properties",
1918 "updates": {
1919 "prop1": "v1",
1920 "prop2": "v2"
1921 }
1922}
1923 "#;
1924
1925 let update = TableUpdate::SetProperties {
1926 updates: vec![
1927 ("prop1".to_string(), "v1".to_string()),
1928 ("prop2".to_string(), "v2".to_string()),
1929 ]
1930 .into_iter()
1931 .collect(),
1932 };
1933
1934 test_serde_json(json, update);
1935 }
1936
1937 #[test]
1938 fn test_remove_properties() {
1939 let json = r#"
1940{
1941 "action": "remove-properties",
1942 "removals": [
1943 "prop1",
1944 "prop2"
1945 ]
1946}
1947 "#;
1948
1949 let update = TableUpdate::RemoveProperties {
1950 removals: vec!["prop1".to_string(), "prop2".to_string()],
1951 };
1952
1953 test_serde_json(json, update);
1954 }
1955
1956 #[test]
1957 fn test_set_location() {
1958 let json = r#"
1959{
1960 "action": "set-location",
1961 "location": "s3://bucket/warehouse/tbl_location"
1962}
1963 "#;
1964
1965 let update = TableUpdate::SetLocation {
1966 location: "s3://bucket/warehouse/tbl_location".to_string(),
1967 };
1968
1969 test_serde_json(json, update);
1970 }
1971
1972 #[test]
1973 fn test_table_update_apply() {
1974 let table_creation = TableCreation::builder()
1975 .location("s3://db/table".to_string())
1976 .name("table".to_string())
1977 .properties(HashMap::new())
1978 .schema(Schema::builder().build().unwrap())
1979 .build();
1980 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1981 .unwrap()
1982 .build()
1983 .unwrap()
1984 .metadata;
1985 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1986 table_metadata,
1987 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1988 );
1989
1990 let uuid = uuid::Uuid::new_v4();
1991 let update = TableUpdate::AssignUuid { uuid };
1992 let updated_metadata = update
1993 .apply(table_metadata_builder)
1994 .unwrap()
1995 .build()
1996 .unwrap()
1997 .metadata;
1998 assert_eq!(updated_metadata.uuid(), uuid);
1999 }
2000
2001 #[test]
2002 fn test_view_assign_uuid() {
2003 test_serde_json(
2004 r#"
2005{
2006 "action": "assign-uuid",
2007 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2008}
2009 "#,
2010 ViewUpdate::AssignUuid {
2011 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2012 },
2013 );
2014 }
2015
2016 #[test]
2017 fn test_view_upgrade_format_version() {
2018 test_serde_json(
2019 r#"
2020{
2021 "action": "upgrade-format-version",
2022 "format-version": 1
2023}
2024 "#,
2025 ViewUpdate::UpgradeFormatVersion {
2026 format_version: ViewFormatVersion::V1,
2027 },
2028 );
2029 }
2030
2031 #[test]
2032 fn test_view_add_schema() {
2033 let test_schema = Schema::builder()
2034 .with_schema_id(1)
2035 .with_identifier_field_ids(vec![2])
2036 .with_fields(vec![
2037 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2038 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2039 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2040 ])
2041 .build()
2042 .unwrap();
2043 test_serde_json(
2044 r#"
2045{
2046 "action": "add-schema",
2047 "schema": {
2048 "type": "struct",
2049 "schema-id": 1,
2050 "fields": [
2051 {
2052 "id": 1,
2053 "name": "foo",
2054 "required": false,
2055 "type": "string"
2056 },
2057 {
2058 "id": 2,
2059 "name": "bar",
2060 "required": true,
2061 "type": "int"
2062 },
2063 {
2064 "id": 3,
2065 "name": "baz",
2066 "required": false,
2067 "type": "boolean"
2068 }
2069 ],
2070 "identifier-field-ids": [
2071 2
2072 ]
2073 },
2074 "last-column-id": 3
2075}
2076 "#,
2077 ViewUpdate::AddSchema {
2078 schema: test_schema.clone(),
2079 last_column_id: Some(3),
2080 },
2081 );
2082 }
2083
2084 #[test]
2085 fn test_view_set_location() {
2086 test_serde_json(
2087 r#"
2088{
2089 "action": "set-location",
2090 "location": "s3://db/view"
2091}
2092 "#,
2093 ViewUpdate::SetLocation {
2094 location: "s3://db/view".to_string(),
2095 },
2096 );
2097 }
2098
2099 #[test]
2100 fn test_view_set_properties() {
2101 test_serde_json(
2102 r#"
2103{
2104 "action": "set-properties",
2105 "updates": {
2106 "prop1": "v1",
2107 "prop2": "v2"
2108 }
2109}
2110 "#,
2111 ViewUpdate::SetProperties {
2112 updates: vec![
2113 ("prop1".to_string(), "v1".to_string()),
2114 ("prop2".to_string(), "v2".to_string()),
2115 ]
2116 .into_iter()
2117 .collect(),
2118 },
2119 );
2120 }
2121
2122 #[test]
2123 fn test_view_remove_properties() {
2124 test_serde_json(
2125 r#"
2126{
2127 "action": "remove-properties",
2128 "removals": [
2129 "prop1",
2130 "prop2"
2131 ]
2132}
2133 "#,
2134 ViewUpdate::RemoveProperties {
2135 removals: vec!["prop1".to_string(), "prop2".to_string()],
2136 },
2137 );
2138 }
2139
2140 #[test]
2141 fn test_view_add_view_version() {
2142 test_serde_json(
2143 r#"
2144{
2145 "action": "add-view-version",
2146 "view-version": {
2147 "version-id" : 1,
2148 "timestamp-ms" : 1573518431292,
2149 "schema-id" : 1,
2150 "default-catalog" : "prod",
2151 "default-namespace" : [ "default" ],
2152 "summary" : {
2153 "engine-name" : "Spark"
2154 },
2155 "representations" : [ {
2156 "type" : "sql",
2157 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2158 "dialect" : "spark"
2159 } ]
2160 }
2161}
2162 "#,
2163 ViewUpdate::AddViewVersion {
2164 view_version: ViewVersion::builder()
2165 .with_version_id(1)
2166 .with_timestamp_ms(1573518431292)
2167 .with_schema_id(1)
2168 .with_default_catalog(Some("prod".to_string()))
2169 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2170 .with_summary(
2171 vec![("engine-name".to_string(), "Spark".to_string())]
2172 .into_iter()
2173 .collect(),
2174 )
2175 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2176 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2177 dialect: "spark".to_string(),
2178 })]))
2179 .build(),
2180 },
2181 );
2182 }
2183
2184 #[test]
2185 fn test_view_set_current_view_version() {
2186 test_serde_json(
2187 r#"
2188{
2189 "action": "set-current-view-version",
2190 "view-version-id": 1
2191}
2192 "#,
2193 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2194 );
2195 }
2196
2197 #[test]
2198 fn test_remove_partition_specs_update() {
2199 test_serde_json(
2200 r#"
2201{
2202 "action": "remove-partition-specs",
2203 "spec-ids": [1, 2]
2204}
2205 "#,
2206 TableUpdate::RemovePartitionSpecs {
2207 spec_ids: vec![1, 2],
2208 },
2209 );
2210 }
2211
2212 #[test]
2213 fn test_set_statistics_file() {
2214 test_serde_json(
2215 r#"
2216 {
2217 "action": "set-statistics",
2218 "snapshot-id": 1940541653261589030,
2219 "statistics": {
2220 "snapshot-id": 1940541653261589030,
2221 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2222 "file-size-in-bytes": 124,
2223 "file-footer-size-in-bytes": 27,
2224 "blob-metadata": [
2225 {
2226 "type": "boring-type",
2227 "snapshot-id": 1940541653261589030,
2228 "sequence-number": 2,
2229 "fields": [
2230 1
2231 ],
2232 "properties": {
2233 "prop-key": "prop-value"
2234 }
2235 }
2236 ]
2237 }
2238 }
2239 "#,
2240 TableUpdate::SetStatistics {
2241 statistics: StatisticsFile {
2242 snapshot_id: 1940541653261589030,
2243 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2244 file_size_in_bytes: 124,
2245 file_footer_size_in_bytes: 27,
2246 key_metadata: None,
2247 blob_metadata: vec![BlobMetadata {
2248 r#type: "boring-type".to_string(),
2249 snapshot_id: 1940541653261589030,
2250 sequence_number: 2,
2251 fields: vec![1],
2252 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2253 .into_iter()
2254 .collect(),
2255 }],
2256 },
2257 },
2258 );
2259 }
2260
2261 #[test]
2262 fn test_remove_statistics_file() {
2263 test_serde_json(
2264 r#"
2265 {
2266 "action": "remove-statistics",
2267 "snapshot-id": 1940541653261589030
2268 }
2269 "#,
2270 TableUpdate::RemoveStatistics {
2271 snapshot_id: 1940541653261589030,
2272 },
2273 );
2274 }
2275
2276 #[test]
2277 fn test_set_partition_statistics_file() {
2278 test_serde_json(
2279 r#"
2280 {
2281 "action": "set-partition-statistics",
2282 "partition-statistics": {
2283 "snapshot-id": 1940541653261589030,
2284 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2285 "file-size-in-bytes": 43
2286 }
2287 }
2288 "#,
2289 TableUpdate::SetPartitionStatistics {
2290 partition_statistics: PartitionStatisticsFile {
2291 snapshot_id: 1940541653261589030,
2292 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2293 file_size_in_bytes: 43,
2294 },
2295 },
2296 )
2297 }
2298
2299 #[test]
2300 fn test_remove_partition_statistics_file() {
2301 test_serde_json(
2302 r#"
2303 {
2304 "action": "remove-partition-statistics",
2305 "snapshot-id": 1940541653261589030
2306 }
2307 "#,
2308 TableUpdate::RemovePartitionStatistics {
2309 snapshot_id: 1940541653261589030,
2310 },
2311 )
2312 }
2313
2314 #[test]
2315 fn test_remove_schema_update() {
2316 test_serde_json(
2317 r#"
2318 {
2319 "action": "remove-schemas",
2320 "schema-ids": [1, 2]
2321 }
2322 "#,
2323 TableUpdate::RemoveSchemas {
2324 schema_ids: vec![1, 2],
2325 },
2326 );
2327 }
2328
2329 #[test]
2330 fn test_add_encryption_key() {
2331 let key_bytes = "key".as_bytes();
2332 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2333 test_serde_json(
2334 format!(
2335 r#"
2336 {{
2337 "action": "add-encryption-key",
2338 "encryption-key": {{
2339 "key-id": "a",
2340 "encrypted-key-metadata": "{encoded_key}",
2341 "encrypted-by-id": "b"
2342 }}
2343 }}
2344 "#
2345 ),
2346 TableUpdate::AddEncryptionKey {
2347 encryption_key: EncryptedKey::builder()
2348 .key_id("a")
2349 .encrypted_key_metadata(key_bytes.to_vec())
2350 .encrypted_by_id("b")
2351 .build(),
2352 },
2353 );
2354 }
2355
2356 #[test]
2357 fn test_remove_encryption_key() {
2358 test_serde_json(
2359 r#"
2360 {
2361 "action": "remove-encryption-key",
2362 "key-id": "a"
2363 }
2364 "#,
2365 TableUpdate::RemoveEncryptionKey {
2366 key_id: "a".to_string(),
2367 },
2368 );
2369 }
2370
2371 #[test]
2372 fn test_table_commit() {
2373 let table = {
2374 let file = File::open(format!(
2375 "{}/testdata/table_metadata/{}",
2376 env!("CARGO_MANIFEST_DIR"),
2377 "TableMetadataV2Valid.json"
2378 ))
2379 .unwrap();
2380 let reader = BufReader::new(file);
2381 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2382
2383 Table::builder()
2384 .metadata(resp)
2385 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2386 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2387 .file_io(FileIO::new_with_memory())
2388 .build()
2389 .unwrap()
2390 };
2391
2392 let updates = vec![
2393 TableUpdate::SetLocation {
2394 location: "s3://bucket/test/new_location/data".to_string(),
2395 },
2396 TableUpdate::SetProperties {
2397 updates: vec![
2398 ("prop1".to_string(), "v1".to_string()),
2399 ("prop2".to_string(), "v2".to_string()),
2400 ]
2401 .into_iter()
2402 .collect(),
2403 },
2404 ];
2405
2406 let requirements = vec![TableRequirement::UuidMatch {
2407 uuid: table.metadata().table_uuid,
2408 }];
2409
2410 let table_commit = TableCommit::builder()
2411 .ident(table.identifier().to_owned())
2412 .updates(updates)
2413 .requirements(requirements)
2414 .build();
2415
2416 let updated_table = table_commit.apply(table).unwrap();
2417
2418 assert_eq!(
2419 updated_table.metadata().properties.get("prop1").unwrap(),
2420 "v1"
2421 );
2422 assert_eq!(
2423 updated_table.metadata().properties.get("prop2").unwrap(),
2424 "v2"
2425 );
2426
2427 assert!(
2429 updated_table
2430 .metadata_location()
2431 .unwrap()
2432 .starts_with("s3://bucket/test/location/metadata/00001-")
2433 );
2434
2435 assert_eq!(
2436 updated_table.metadata().location,
2437 "s3://bucket/test/new_location/data",
2438 );
2439 }
2440}