1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use typed_builder::TypedBuilder;
23
24use crate::spec::{
25 ListType, Literal, MapType, NestedField, NestedFieldRef, SCHEMA_NAME_DELIMITER, Schema,
26 StructType, Type,
27};
28use crate::table::Table;
29use crate::transaction::action::{ActionCommit, TransactionAction};
30use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
31
32const DEFAULT_FIELD_ID: i32 = 0;
34
35#[derive(TypedBuilder)]
42pub struct AddColumn {
43 #[builder(default = None, setter(strip_option, into))]
44 parent: Option<String>,
45 #[builder(setter(into))]
46 name: String,
47 #[builder(default = false)]
48 required: bool,
49 field_type: Type,
50 #[builder(default = None, setter(strip_option, into))]
51 doc: Option<String>,
52 #[builder(default = None, setter(strip_option))]
53 initial_default: Option<Literal>,
54 #[builder(default = None, setter(strip_option))]
55 write_default: Option<Literal>,
56}
57
58impl AddColumn {
59 pub fn optional(name: impl ToString, field_type: Type) -> Self {
61 Self::builder()
62 .name(name.to_string())
63 .field_type(field_type)
64 .required(false)
65 .build()
66 }
67
68 pub fn required(name: impl ToString, field_type: Type, initial_default: Literal) -> Self {
70 Self::builder()
71 .name(name.to_string())
72 .field_type(field_type)
73 .required(true)
74 .initial_default(initial_default.clone())
75 .write_default(initial_default)
76 .build()
77 }
78
79 fn to_nested_field(&self) -> NestedFieldRef {
80 let mut field = NestedField::new(
81 DEFAULT_FIELD_ID,
82 self.name.clone(),
83 self.field_type.clone(),
84 self.required,
85 );
86
87 field.doc = self.doc.clone();
88 field.initial_default = self.initial_default.clone();
89 field.write_default = self.write_default.clone();
90 Arc::new(field)
91 }
92}
93
94pub struct UpdateSchemaAction {
117 additions: Vec<AddColumn>,
118 deletes: Vec<String>,
119}
120
121impl UpdateSchemaAction {
122 pub(crate) fn new() -> Self {
124 Self {
125 additions: Vec::new(),
126 deletes: Vec::new(),
127 }
128 }
129
130 pub fn add_column(mut self, add_column: AddColumn) -> Self {
138 self.additions.push(add_column);
139 self
140 }
141
142 pub fn delete_column(mut self, name: impl ToString) -> Self {
148 self.deletes.push(name.to_string());
149 self
150 }
151}
152
153fn assign_fresh_ids(field: &NestedField, next_id: &mut i32) -> NestedFieldRef {
165 *next_id += 1;
166 let new_id = *next_id;
167 let new_type = assign_fresh_ids_to_type(&field.field_type, next_id);
168
169 Arc::new(NestedField {
170 id: new_id,
171 name: field.name.clone(),
172 required: field.required,
173 field_type: Box::new(new_type),
174 doc: field.doc.clone(),
175 initial_default: field.initial_default.clone(),
176 write_default: field.write_default.clone(),
177 })
178}
179
180fn assign_fresh_ids_to_type(field_type: &Type, next_id: &mut i32) -> Type {
182 match field_type {
183 Type::Primitive(_) => field_type.clone(),
184 Type::Struct(struct_type) => {
185 let new_fields: Vec<NestedFieldRef> = struct_type
186 .fields()
187 .iter()
188 .map(|f| assign_fresh_ids(f, next_id))
189 .collect();
190 Type::Struct(StructType::new(new_fields))
191 }
192 Type::List(list_type) => {
193 let new_element = assign_fresh_ids(&list_type.element_field, next_id);
194 Type::List(ListType {
195 element_field: new_element,
196 })
197 }
198 Type::Map(map_type) => {
199 let new_key = assign_fresh_ids(&map_type.key_field, next_id);
200 let new_value = assign_fresh_ids(&map_type.value_field, next_id);
201 Type::Map(MapType {
202 key_field: new_key,
203 value_field: new_value,
204 })
205 }
206 }
207}
208
209fn resolve_parent_target<'a>(
219 base_schema: &'a Schema,
220 parent: &str,
221) -> Result<(i32, &'a StructType)> {
222 base_schema
223 .field_by_name(parent)
224 .ok_or_else(|| {
225 Error::new(
226 ErrorKind::PreconditionFailed,
227 format!("Cannot add column: parent '{parent}' not found"),
228 )
229 })
230 .and_then(|parent_field| match parent_field.field_type.as_ref() {
231 Type::Struct(s) => Ok((parent_field.id, s)),
232 Type::Map(m) => match m.value_field.field_type.as_ref() {
233 Type::Struct(s) => Ok((m.value_field.id, s)),
234 _ => Err(Error::new(
235 ErrorKind::PreconditionFailed,
236 format!("Cannot add column: map value of '{parent}' is not a struct"),
237 )),
238 },
239 Type::List(l) => match l.element_field.field_type.as_ref() {
240 Type::Struct(s) => Ok((l.element_field.id, s)),
241 _ => Err(Error::new(
242 ErrorKind::PreconditionFailed,
243 format!("Cannot add column: list element of '{parent}' is not a struct"),
244 )),
245 },
246 _ => Err(Error::new(
247 ErrorKind::PreconditionFailed,
248 format!("Cannot add column: parent '{parent}' is not a struct, map, or list"),
249 )),
250 })
251}
252
253fn rebuild_fields(
260 fields: &[NestedFieldRef],
261 adds: &HashMap<Option<i32>, Vec<NestedFieldRef>>,
262 delete_ids: &HashSet<i32>,
263 parent_id: Option<i32>,
264) -> Vec<NestedFieldRef> {
265 fields
266 .iter()
267 .filter(|f| !delete_ids.contains(&f.id))
268 .map(|f| rebuild_field(f, adds, delete_ids))
269 .chain(adds.get(&parent_id).into_iter().flatten().cloned())
270 .collect()
271}
272
273fn rebuild_field(
277 field: &NestedFieldRef,
278 adds: &HashMap<Option<i32>, Vec<NestedFieldRef>>,
279 delete_ids: &HashSet<i32>,
280) -> NestedFieldRef {
281 match field.field_type.as_ref() {
282 Type::Primitive(_) => field.clone(),
283 Type::Struct(s) => {
284 let new_fields = rebuild_fields(s.fields(), adds, delete_ids, Some(field.id));
285 Arc::new(NestedField {
286 id: field.id,
287 name: field.name.clone(),
288 required: field.required,
289 field_type: Box::new(Type::Struct(StructType::new(new_fields))),
290 doc: field.doc.clone(),
291 initial_default: field.initial_default.clone(),
292 write_default: field.write_default.clone(),
293 })
294 }
295 Type::List(l) => {
296 let new_element = rebuild_field(&l.element_field, adds, delete_ids);
297 Arc::new(NestedField {
298 id: field.id,
299 name: field.name.clone(),
300 required: field.required,
301 field_type: Box::new(Type::List(ListType {
302 element_field: new_element,
303 })),
304 doc: field.doc.clone(),
305 initial_default: field.initial_default.clone(),
306 write_default: field.write_default.clone(),
307 })
308 }
309 Type::Map(m) => {
310 let new_key = rebuild_field(&m.key_field, adds, delete_ids);
311 let new_value = rebuild_field(&m.value_field, adds, delete_ids);
312 Arc::new(NestedField {
313 id: field.id,
314 name: field.name.clone(),
315 required: field.required,
316 field_type: Box::new(Type::Map(MapType {
317 key_field: new_key,
318 value_field: new_value,
319 })),
320 doc: field.doc.clone(),
321 initial_default: field.initial_default.clone(),
322 write_default: field.write_default.clone(),
323 })
324 }
325 }
326}
327
328#[async_trait]
333impl TransactionAction for UpdateSchemaAction {
334 async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
335 let base_schema = table.metadata().current_schema();
336 let mut last_column_id = table.metadata().last_column_id();
337
338 let delete_ids = self
340 .deletes
341 .iter()
342 .map(|name: &String| {
343 base_schema
344 .field_by_name(name)
345 .ok_or_else(|| {
346 Error::new(
347 ErrorKind::PreconditionFailed,
348 format!("Cannot delete missing column: {name}"),
349 )
350 })
351 .and_then(|field| {
352 match base_schema
353 .identifier_field_ids()
354 .find(|id| *id == field.id)
355 {
356 Some(_) => Err(Error::new(
357 ErrorKind::PreconditionFailed,
358 format!("Cannot delete identifier field: {name}"),
359 )),
360 None => Ok(field.id),
361 }
362 })
363 })
364 .collect::<Result<HashSet<i32>>>()?;
365
366 let mut additions_by_parent: HashMap<Option<i32>, Vec<NestedFieldRef>> = HashMap::new();
370
371 for add in &self.additions {
372 let pending_field = add.to_nested_field();
373
374 if pending_field.name.contains(SCHEMA_NAME_DELIMITER) {
376 return Err(Error::new(
377 ErrorKind::PreconditionFailed,
378 format!(
379 "Cannot add column with ambiguous name: {}. Use `AddColumn::with_parent` to add a column to a nested struct.",
380 pending_field.name
381 ),
382 ));
383 }
384
385 if pending_field.required && pending_field.initial_default.is_none() {
387 return Err(Error::new(
388 ErrorKind::PreconditionFailed,
389 format!(
390 "Incompatible change: cannot add required column without an initial default: {}",
391 pending_field.name
392 ),
393 ));
394 }
395
396 let parent_id = match &add.parent {
397 None => {
398 if let Some(existing) = base_schema.field_by_name(&pending_field.name)
400 && !delete_ids.contains(&existing.id)
401 {
402 return Err(Error::new(
403 ErrorKind::PreconditionFailed,
404 format!(
405 "Cannot add column, name already exists: {}",
406 pending_field.name
407 ),
408 ));
409 }
410 None
411 }
412 Some(parent_path) => {
413 let (resolved_parent_id, parent_struct) =
415 resolve_parent_target(base_schema, parent_path)?;
416
417 if parent_struct.fields().iter().any(|f| {
418 f.name == pending_field.name
419 && !delete_ids.contains(&f.id)
420 && !delete_ids.contains(&resolved_parent_id)
421 }) {
422 return Err(Error::new(
423 ErrorKind::PreconditionFailed,
424 format!(
425 "Cannot add column, name already exists in '{}': {}",
426 parent_path, pending_field.name
427 ),
428 ));
429 }
430
431 Some(resolved_parent_id)
432 }
433 };
434
435 let field = assign_fresh_ids(&pending_field, &mut last_column_id);
437
438 additions_by_parent
439 .entry(parent_id)
440 .or_default()
441 .push(field);
442 }
443
444 let new_fields = rebuild_fields(
446 base_schema.as_struct().fields(),
447 &additions_by_parent,
448 &delete_ids,
449 None,
450 );
451
452 let schema = Schema::builder()
454 .with_fields(new_fields)
455 .with_identifier_field_ids(base_schema.identifier_field_ids())
456 .build()?;
457
458 let updates = vec![
459 TableUpdate::AddSchema { schema },
460 TableUpdate::SetCurrentSchema { schema_id: -1 },
461 ];
462
463 let requirements = vec![TableRequirement::CurrentSchemaIdMatch {
464 current_schema_id: base_schema.schema_id(),
465 }];
466
467 Ok(ActionCommit::new(updates, requirements))
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use std::io::BufReader;
474 use std::sync::Arc;
475
476 use as_any::Downcast;
477
478 use crate::spec::{
479 DEFAULT_SCHEMA_ID, Literal, NestedField, PrimitiveType, StructType, TableMetadata, Type,
480 };
481 use crate::table::Table;
482 use crate::transaction::Transaction;
483 use crate::transaction::action::{ApplyTransactionAction, TransactionAction};
484 use crate::transaction::tests::make_v2_table;
485 use crate::transaction::update_schema::{AddColumn, DEFAULT_FIELD_ID, UpdateSchemaAction};
486 use crate::{ErrorKind, TableIdent, TableRequirement, TableUpdate};
487
488 fn make_v2_table_with_nested() -> Table {
512 let json = r#"{
513 "format-version": 2,
514 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c2",
515 "location": "s3://bucket/test/location",
516 "last-sequence-number": 0,
517 "last-updated-ms": 1602638573590,
518 "last-column-id": 14,
519 "current-schema-id": 0,
520 "schemas": [
521 {
522 "type": "struct",
523 "schema-id": 0,
524 "identifier-field-ids": [1, 2],
525 "fields": [
526 {"id": 1, "name": "x", "required": true, "type": "long"},
527 {"id": 2, "name": "y", "required": true, "type": "long"},
528 {"id": 3, "name": "z", "required": true, "type": "long"},
529 {"id": 4, "name": "person", "required": false, "type": {
530 "type": "struct",
531 "fields": [
532 {"id": 5, "name": "name", "required": false, "type": "string"},
533 {"id": 6, "name": "age", "required": true, "type": "int"}
534 ]
535 }},
536 {"id": 7, "name": "tags", "required": false, "type": {
537 "type": "list",
538 "element-id": 8,
539 "element": {
540 "type": "struct",
541 "fields": [
542 {"id": 9, "name": "key", "required": false, "type": "string"},
543 {"id": 10, "name": "value", "required": false, "type": "string"}
544 ]
545 },
546 "element-required": true
547 }},
548 {"id": 11, "name": "props", "required": false, "type": {
549 "type": "map",
550 "key-id": 12,
551 "key": "string",
552 "value-id": 13,
553 "value": {
554 "type": "struct",
555 "fields": [
556 {"id": 14, "name": "data", "required": false, "type": "string"}
557 ]
558 },
559 "value-required": true
560 }}
561 ]
562 }
563 ],
564 "default-spec-id": 0,
565 "partition-specs": [
566 {"spec-id": 0, "fields": []}
567 ],
568 "last-partition-id": 999,
569 "default-sort-order-id": 0,
570 "sort-orders": [
571 {"order-id": 0, "fields": []}
572 ],
573 "properties": {},
574 "current-snapshot-id": -1,
575 "snapshots": []
576 }"#;
577
578 let reader = BufReader::new(json.as_bytes());
579 let metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
580
581 Table::builder()
582 .metadata(metadata)
583 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
584 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
585 .file_io(crate::io::FileIO::new_with_memory())
586 .runtime(crate::test_utils::test_runtime())
587 .build()
588 .unwrap()
589 }
590
591 #[tokio::test]
596 async fn test_add_column() {
597 let table = make_v2_table();
598 let tx = Transaction::new(&table);
599
600 let action = tx.update_schema().add_column(AddColumn::optional(
601 "new_col",
602 Type::Primitive(PrimitiveType::Int),
603 ));
604
605 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
606 let updates = action_commit.take_updates();
607 let requirements = action_commit.take_requirements();
608
609 assert_eq!(updates.len(), 2);
610
611 let new_schema = match &updates[0] {
613 TableUpdate::AddSchema { schema } => schema,
614 other => panic!("expected AddSchema, got {other:?}"),
615 };
616
617 let expected_schema = table
618 .metadata()
619 .current_schema()
620 .as_ref()
621 .clone()
622 .into_builder()
623 .with_schema_id(DEFAULT_SCHEMA_ID)
624 .with_fields([
625 NestedField::optional(4, "new_col", Type::Primitive(PrimitiveType::Int)).into(),
626 ])
627 .build()
628 .unwrap();
629 assert_eq!(new_schema, &expected_schema);
630
631 assert_eq!(updates[1], TableUpdate::SetCurrentSchema { schema_id: -1 });
632
633 assert_eq!(requirements.len(), 1);
635 assert_eq!(requirements[0], TableRequirement::CurrentSchemaIdMatch {
636 current_schema_id: table.metadata().current_schema().schema_id()
637 });
638 }
639
640 #[tokio::test]
641 async fn test_add_column_with_doc() {
642 let table = make_v2_table();
643 let tx = Transaction::new(&table);
644
645 let action = tx.update_schema().add_column(
646 AddColumn::builder()
647 .name("documented_col")
648 .field_type(Type::Primitive(PrimitiveType::String))
649 .doc("A documented column")
650 .build(),
651 );
652
653 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
654 let updates = action_commit.take_updates();
655
656 let new_schema = match &updates[0] {
657 TableUpdate::AddSchema { schema } => schema,
658 other => panic!("expected AddSchema, got {other:?}"),
659 };
660
661 let field = new_schema
662 .field_by_name("documented_col")
663 .expect("documented_col should exist");
664 assert_eq!(field.id, 4);
665 assert!(!field.required);
666 assert_eq!(field.doc.as_deref(), Some("A documented column"));
667 }
668
669 #[tokio::test]
670 async fn test_add_required_column_with_initial_default() {
671 let table = make_v2_table();
672 let tx = Transaction::new(&table);
673
674 let action = tx.update_schema().add_column(AddColumn::required(
675 "req_col",
676 Type::Primitive(PrimitiveType::Int),
677 Literal::int(0),
678 ));
679
680 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
681 let updates = action_commit.take_updates();
682
683 let new_schema = match &updates[0] {
684 TableUpdate::AddSchema { schema } => schema,
685 other => panic!("expected AddSchema, got {other:?}"),
686 };
687
688 let field = new_schema
689 .field_by_name("req_col")
690 .expect("req_col should exist");
691 assert_eq!(field.id, 4);
692 assert!(field.required);
693 assert_eq!(field.initial_default, Some(Literal::int(0)));
694 assert_eq!(field.write_default, Some(Literal::int(0)));
695 }
696
697 #[tokio::test]
698 async fn test_add_column_name_conflict_fails() {
699 let table = make_v2_table();
700 let tx = Transaction::new(&table);
701
702 let action = tx.update_schema().add_column(AddColumn::optional(
704 "x",
705 Type::Primitive(PrimitiveType::Int),
706 ));
707
708 let result = Arc::new(action).commit(&table).await;
709 let err = match result {
710 Err(e) => e,
711 Ok(_) => panic!("should reject adding a column with an existing name"),
712 };
713 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
714 assert!(
715 err.message().contains("already exists"),
716 "error should mention name conflict, got: {}",
717 err.message()
718 );
719 }
720
721 #[tokio::test]
722 async fn test_delete_column() {
723 let table = make_v2_table();
724 let tx = Transaction::new(&table);
725
726 let action = tx.update_schema().delete_column("z");
728
729 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
730 let updates = action_commit.take_updates();
731
732 let new_schema = match &updates[0] {
733 TableUpdate::AddSchema { schema } => schema,
734 other => panic!("expected AddSchema, got {other:?}"),
735 };
736
737 assert!(
738 new_schema.field_by_name("z").is_none(),
739 "z should be deleted"
740 );
741 assert!(new_schema.field_by_name("x").is_some());
742 assert!(new_schema.field_by_name("y").is_some());
743 }
744
745 #[tokio::test]
746 async fn test_delete_missing_column_fails() {
747 let table = make_v2_table();
748 let tx = Transaction::new(&table);
749
750 let action = tx.update_schema().delete_column("nonexistent");
751
752 let result = Arc::new(action).commit(&table).await;
753 let err = match result {
754 Err(e) => e,
755 Ok(_) => panic!("should reject deleting a non-existent column"),
756 };
757 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
758 assert!(
759 err.message().contains("nonexistent"),
760 "error should mention the missing column, got: {}",
761 err.message()
762 );
763 }
764
765 #[tokio::test]
766 async fn test_add_and_delete_combined() {
767 let table = make_v2_table();
768 let tx = Transaction::new(&table);
769
770 let action = tx
772 .update_schema()
773 .delete_column("z")
774 .add_column(AddColumn::optional(
775 "w",
776 Type::Primitive(PrimitiveType::Boolean),
777 ));
778
779 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
780 let updates = action_commit.take_updates();
781
782 let new_schema = match &updates[0] {
783 TableUpdate::AddSchema { schema } => schema,
784 other => panic!("expected AddSchema, got {other:?}"),
785 };
786
787 assert!(
788 new_schema.field_by_name("z").is_none(),
789 "z should be deleted"
790 );
791 let w = new_schema.field_by_name("w").expect("w should exist");
792 assert_eq!(w.id, 4);
793 assert!(!w.required);
794 }
795
796 #[tokio::test]
797 async fn test_delete_and_readd_same_name() {
798 let table = make_v2_table();
799 let tx = Transaction::new(&table);
800
801 let action = tx
803 .update_schema()
804 .delete_column("z")
805 .add_column(AddColumn::optional(
806 "z",
807 Type::Primitive(PrimitiveType::Boolean),
808 ));
809
810 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
811 let updates = action_commit.take_updates();
812
813 let new_schema = match &updates[0] {
814 TableUpdate::AddSchema { schema } => schema,
815 other => panic!("expected AddSchema, got {other:?}"),
816 };
817
818 let z = new_schema
819 .field_by_name("z")
820 .expect("z should exist with new type");
821 assert_eq!(z.id, 4); assert_eq!(*z.field_type, Type::Primitive(PrimitiveType::Boolean));
823 }
824
825 #[test]
826 fn test_apply() {
827 let table = make_v2_table();
828 let tx = Transaction::new(&table);
829
830 let tx = tx
831 .update_schema()
832 .add_column(AddColumn::optional(
833 "new_col",
834 Type::Primitive(PrimitiveType::Int),
835 ))
836 .apply(tx)
837 .unwrap();
838
839 assert_eq!(tx.actions.len(), 1);
840 (*tx.actions[0])
841 .downcast_ref::<UpdateSchemaAction>()
842 .expect("UpdateSchemaAction was not applied to Transaction!");
843 }
844
845 #[tokio::test]
850 async fn test_add_column_to_struct() {
851 let table = make_v2_table_with_nested();
852 let tx = Transaction::new(&table);
853
854 let action = tx.update_schema().add_column(
856 AddColumn::builder()
857 .name("email")
858 .field_type(Type::Primitive(PrimitiveType::String))
859 .parent("person")
860 .build(),
861 );
862
863 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
864 let updates = action_commit.take_updates();
865
866 let new_schema = match &updates[0] {
867 TableUpdate::AddSchema { schema } => schema,
868 other => panic!("expected AddSchema, got {other:?}"),
869 };
870
871 let email = new_schema
873 .field_by_name("person.email")
874 .expect("person.email should exist");
875 assert_eq!(email.id, 15);
876 assert!(!email.required);
877 assert_eq!(*email.field_type, Type::Primitive(PrimitiveType::String));
878
879 assert!(new_schema.field_by_name("person.name").is_some());
881 assert!(new_schema.field_by_name("person.age").is_some());
882 }
883
884 #[tokio::test]
885 async fn test_add_column_to_struct_with_doc() {
886 let table = make_v2_table_with_nested();
887 let tx = Transaction::new(&table);
888
889 let action = tx.update_schema().add_column(
890 AddColumn::builder()
891 .name("phone")
892 .field_type(Type::Primitive(PrimitiveType::String))
893 .parent("person")
894 .doc("Phone number")
895 .build(),
896 );
897
898 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
899 let updates = action_commit.take_updates();
900
901 let new_schema = match &updates[0] {
902 TableUpdate::AddSchema { schema } => schema,
903 other => panic!("expected AddSchema, got {other:?}"),
904 };
905
906 let phone = new_schema
907 .field_by_name("person.phone")
908 .expect("person.phone should exist");
909 assert_eq!(phone.id, 15);
910 assert_eq!(phone.doc.as_deref(), Some("Phone number"));
911 }
912
913 #[tokio::test]
914 async fn test_add_column_to_list_element_struct() {
915 let table = make_v2_table_with_nested();
916 let tx = Transaction::new(&table);
917
918 let action = tx.update_schema().add_column(
921 AddColumn::builder()
922 .name("score")
923 .field_type(Type::Primitive(PrimitiveType::Double))
924 .parent("tags")
925 .build(),
926 );
927
928 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
929 let updates = action_commit.take_updates();
930
931 let new_schema = match &updates[0] {
932 TableUpdate::AddSchema { schema } => schema,
933 other => panic!("expected AddSchema, got {other:?}"),
934 };
935
936 let score = new_schema
938 .field_by_name("tags.element.score")
939 .expect("tags.element.score should exist");
940 assert_eq!(score.id, 15);
941 assert!(!score.required);
942
943 assert!(new_schema.field_by_name("tags.element.key").is_some());
945 assert!(new_schema.field_by_name("tags.element.value").is_some());
946 }
947
948 #[tokio::test]
949 async fn test_add_column_to_map_value_struct() {
950 let table = make_v2_table_with_nested();
951 let tx = Transaction::new(&table);
952
953 let action = tx.update_schema().add_column(
956 AddColumn::builder()
957 .name("version")
958 .field_type(Type::Primitive(PrimitiveType::Int))
959 .parent("props")
960 .build(),
961 );
962
963 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
964 let updates = action_commit.take_updates();
965
966 let new_schema = match &updates[0] {
967 TableUpdate::AddSchema { schema } => schema,
968 other => panic!("expected AddSchema, got {other:?}"),
969 };
970
971 let version = new_schema
972 .field_by_name("props.value.version")
973 .expect("props.value.version should exist");
974 assert_eq!(version.id, 15);
975
976 assert!(new_schema.field_by_name("props.value.data").is_some());
978 }
979
980 #[tokio::test]
981 async fn test_add_column_to_nonexistent_parent_fails() {
982 let table = make_v2_table_with_nested();
983 let tx = Transaction::new(&table);
984
985 let action = tx.update_schema().add_column(
986 AddColumn::builder()
987 .name("col")
988 .field_type(Type::Primitive(PrimitiveType::Int))
989 .parent("nonexistent")
990 .build(),
991 );
992
993 let err = match Arc::new(action).commit(&table).await {
994 Err(e) => e,
995 Ok(_) => panic!("should reject adding to a nonexistent parent"),
996 };
997 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
998 assert!(
999 err.message().contains("nonexistent"),
1000 "error should mention the missing parent, got: {}",
1001 err.message()
1002 );
1003 }
1004
1005 #[tokio::test]
1006 async fn test_add_column_to_primitive_parent_fails() {
1007 let table = make_v2_table_with_nested();
1008 let tx = Transaction::new(&table);
1009
1010 let action = tx.update_schema().add_column(
1012 AddColumn::builder()
1013 .name("col")
1014 .field_type(Type::Primitive(PrimitiveType::Int))
1015 .parent("x")
1016 .build(),
1017 );
1018
1019 let err = match Arc::new(action).commit(&table).await {
1020 Err(e) => e,
1021 Ok(_) => panic!("should reject adding to a primitive parent"),
1022 };
1023 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
1024 assert!(
1025 err.message().contains("not a struct"),
1026 "error should mention type mismatch, got: {}",
1027 err.message()
1028 );
1029 }
1030
1031 #[tokio::test]
1032 async fn test_add_column_to_nested_name_conflict_fails() {
1033 let table = make_v2_table_with_nested();
1034 let tx = Transaction::new(&table);
1035
1036 let action = tx.update_schema().add_column(
1038 AddColumn::builder()
1039 .name("name")
1040 .field_type(Type::Primitive(PrimitiveType::String))
1041 .parent("person")
1042 .build(),
1043 );
1044
1045 let err = match Arc::new(action).commit(&table).await {
1046 Err(e) => e,
1047 Ok(_) => panic!("should reject adding a column with conflicting name"),
1048 };
1049 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
1050 assert!(
1051 err.message().contains("already exists"),
1052 "error should mention name conflict, got: {}",
1053 err.message()
1054 );
1055 }
1056
1057 #[tokio::test]
1058 async fn test_root_and_nested_add_combined() {
1059 let table = make_v2_table_with_nested();
1060 let tx = Transaction::new(&table);
1061
1062 let action = tx
1064 .update_schema()
1065 .add_column(AddColumn::optional(
1066 "root_col",
1067 Type::Primitive(PrimitiveType::Boolean),
1068 ))
1069 .add_column(
1070 AddColumn::builder()
1071 .name("email")
1072 .field_type(Type::Primitive(PrimitiveType::String))
1073 .parent("person")
1074 .build(),
1075 );
1076
1077 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
1078 let updates = action_commit.take_updates();
1079
1080 let new_schema = match &updates[0] {
1081 TableUpdate::AddSchema { schema } => schema,
1082 other => panic!("expected AddSchema, got {other:?}"),
1083 };
1084
1085 let root_col = new_schema
1087 .field_by_name("root_col")
1088 .expect("root_col should exist");
1089 assert_eq!(root_col.id, 15);
1090
1091 let email = new_schema
1093 .field_by_name("person.email")
1094 .expect("person.email should exist");
1095 assert_eq!(email.id, 16);
1096 }
1097
1098 #[tokio::test]
1099 async fn test_add_nested_struct_type_with_fresh_ids() {
1100 let table = make_v2_table();
1103 let tx = Transaction::new(&table);
1104
1105 let action = tx.update_schema().add_column(AddColumn::optional(
1106 "address",
1107 Type::Struct(StructType::new(vec![
1108 NestedField::optional(
1109 DEFAULT_FIELD_ID,
1110 "street",
1111 Type::Primitive(PrimitiveType::String),
1112 )
1113 .into(),
1114 NestedField::optional(
1115 DEFAULT_FIELD_ID,
1116 "city",
1117 Type::Primitive(PrimitiveType::String),
1118 )
1119 .into(),
1120 ])),
1121 ));
1122
1123 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
1124 let updates = action_commit.take_updates();
1125
1126 let new_schema = match &updates[0] {
1127 TableUpdate::AddSchema { schema } => schema,
1128 other => panic!("expected AddSchema, got {other:?}"),
1129 };
1130
1131 let address = new_schema
1133 .field_by_name("address")
1134 .expect("address should exist");
1135 assert_eq!(address.id, 4);
1136
1137 let street = new_schema
1139 .field_by_name("address.street")
1140 .expect("address.street should exist");
1141 assert_eq!(street.id, 5);
1142
1143 let city = new_schema
1144 .field_by_name("address.city")
1145 .expect("address.city should exist");
1146 assert_eq!(city.id, 6);
1147 }
1148}