iceberg/transaction/
update_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use typed_builder::TypedBuilder;
23
24use crate::spec::{
25    ListType, Literal, MapType, NestedField, NestedFieldRef, SCHEMA_NAME_DELIMITER, Schema,
26    StructType, Type,
27};
28use crate::table::Table;
29use crate::transaction::action::{ActionCommit, TransactionAction};
30use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
31
32// Default ID for a new column. This will be re-assigned to a fresh ID at commit time.
33const DEFAULT_FIELD_ID: i32 = 0;
34
35/// Declarative specification for adding a column in [`UpdateSchemaAction`].
36///
37/// Use helper constructors such as [`AddColumn::optional`] and [`AddColumn::required`],
38/// optionally combined with [`AddColumn::with_parent`] and [`AddColumn::with_doc`], then pass
39/// the value to
40/// [`UpdateSchemaAction::add_column`].
41#[derive(TypedBuilder)]
42pub struct AddColumn {
43    #[builder(default = None, setter(strip_option, into))]
44    parent: Option<String>,
45    #[builder(setter(into))]
46    name: String,
47    #[builder(default = false)]
48    required: bool,
49    field_type: Type,
50    #[builder(default = None, setter(strip_option, into))]
51    doc: Option<String>,
52    #[builder(default = None, setter(strip_option))]
53    initial_default: Option<Literal>,
54    #[builder(default = None, setter(strip_option))]
55    write_default: Option<Literal>,
56}
57
58impl AddColumn {
59    /// Create a root-level optional column specification.
60    pub fn optional(name: impl ToString, field_type: Type) -> Self {
61        Self::builder()
62            .name(name.to_string())
63            .field_type(field_type)
64            .required(false)
65            .build()
66    }
67
68    /// Create a root-level required column specification.
69    pub fn required(name: impl ToString, field_type: Type, initial_default: Literal) -> Self {
70        Self::builder()
71            .name(name.to_string())
72            .field_type(field_type)
73            .required(true)
74            .initial_default(initial_default.clone())
75            .write_default(initial_default)
76            .build()
77    }
78
79    fn to_nested_field(&self) -> NestedFieldRef {
80        let mut field = NestedField::new(
81            DEFAULT_FIELD_ID,
82            self.name.clone(),
83            self.field_type.clone(),
84            self.required,
85        );
86
87        field.doc = self.doc.clone();
88        field.initial_default = self.initial_default.clone();
89        field.write_default = self.write_default.clone();
90        Arc::new(field)
91    }
92}
93
94/// Schema evolution API modeled after the Java `SchemaUpdate` implementation.
95///
96/// This action accumulates schema modifications (column additions and deletions)
97/// via builder methods. At commit time, it validates all operations against the
98/// current table schema, auto-assigns field IDs from `table.metadata().last_column_id()`,
99/// builds a new schema, and emits `AddSchema` + `SetCurrentSchema` updates with a
100/// `CurrentSchemaIdMatch` requirement.
101///
102/// # Example
103///
104/// ```ignore
105/// let tx = Transaction::new(&table);
106/// let action = tx.update_schema()
107///     .add_column(AddColumn::optional("new_col", Type::Primitive(PrimitiveType::Int)))
108///     .add_column(
109///         AddColumn::optional("email", Type::Primitive(PrimitiveType::String))
110///             .with_parent("person")
111///     )
112///     .delete_column("old_col");
113/// let tx = action.apply(tx).unwrap();
114/// let table = tx.commit(&catalog).await.unwrap();
115/// ```
116pub struct UpdateSchemaAction {
117    additions: Vec<AddColumn>,
118    deletes: Vec<String>,
119}
120
121impl UpdateSchemaAction {
122    /// Creates a new empty `UpdateSchemaAction`.
123    pub(crate) fn new() -> Self {
124        Self {
125            additions: Vec::new(),
126            deletes: Vec::new(),
127        }
128    }
129
130    // --- Root-level additions ---
131
132    /// Add a column to the table schema.
133    ///
134    /// To add a root-level column, leave `AddColumn::parent` as `None`.
135    /// For nested additions, set a parent path (for example via [`AddColumn::with_parent`]).
136    /// If the parent resolves to a map/list, the column is added to map value/list element.
137    pub fn add_column(mut self, add_column: AddColumn) -> Self {
138        self.additions.push(add_column);
139        self
140    }
141
142    // --- Other builder methods ---
143
144    /// Record a column deletion by name.
145    ///
146    /// At commit time, the column must exist in the current schema.
147    pub fn delete_column(mut self, name: impl ToString) -> Self {
148        self.deletes.push(name.to_string());
149        self
150    }
151}
152
153// ---------------------------------------------------------------------------
154// ID assignment helpers
155// ---------------------------------------------------------------------------
156
157/// Recursively assign fresh field IDs to a `NestedField` and all its nested sub-fields.
158///
159/// This follows the same recursive pattern as `ReassignFieldIds::reassign_ids_visit_type`
160/// from `crate::spec::schema::id_reassigner`, but operates on new fields with placeholder
161/// IDs rather than reassigning an existing schema. `ReassignFieldIds` cannot be used
162/// directly here because it rejects duplicate old IDs (all new fields share placeholder
163/// ID `DEFAULT_FIELD_ID`).
164fn assign_fresh_ids(field: &NestedField, next_id: &mut i32) -> NestedFieldRef {
165    *next_id += 1;
166    let new_id = *next_id;
167    let new_type = assign_fresh_ids_to_type(&field.field_type, next_id);
168
169    Arc::new(NestedField {
170        id: new_id,
171        name: field.name.clone(),
172        required: field.required,
173        field_type: Box::new(new_type),
174        doc: field.doc.clone(),
175        initial_default: field.initial_default.clone(),
176        write_default: field.write_default.clone(),
177    })
178}
179
180/// Recursively assign fresh field IDs to all nested fields within a `Type`.
181fn assign_fresh_ids_to_type(field_type: &Type, next_id: &mut i32) -> Type {
182    match field_type {
183        Type::Primitive(_) => field_type.clone(),
184        Type::Struct(struct_type) => {
185            let new_fields: Vec<NestedFieldRef> = struct_type
186                .fields()
187                .iter()
188                .map(|f| assign_fresh_ids(f, next_id))
189                .collect();
190            Type::Struct(StructType::new(new_fields))
191        }
192        Type::List(list_type) => {
193            let new_element = assign_fresh_ids(&list_type.element_field, next_id);
194            Type::List(ListType {
195                element_field: new_element,
196            })
197        }
198        Type::Map(map_type) => {
199            let new_key = assign_fresh_ids(&map_type.key_field, next_id);
200            let new_value = assign_fresh_ids(&map_type.value_field, next_id);
201            Type::Map(MapType {
202                key_field: new_key,
203                value_field: new_value,
204            })
205        }
206    }
207}
208
209// ---------------------------------------------------------------------------
210// Parent path resolution
211// ---------------------------------------------------------------------------
212
213/// Resolve a parent path to the target struct's parent field ID and a reference
214/// to its `StructType`.
215///
216/// If the parent is a map, navigates to the value field. If a list, navigates to
217/// the element field. The final target must be a struct type.
218fn resolve_parent_target<'a>(
219    base_schema: &'a Schema,
220    parent: &str,
221) -> Result<(i32, &'a StructType)> {
222    base_schema
223        .field_by_name(parent)
224        .ok_or_else(|| {
225            Error::new(
226                ErrorKind::PreconditionFailed,
227                format!("Cannot add column: parent '{parent}' not found"),
228            )
229        })
230        .and_then(|parent_field| match parent_field.field_type.as_ref() {
231            Type::Struct(s) => Ok((parent_field.id, s)),
232            Type::Map(m) => match m.value_field.field_type.as_ref() {
233                Type::Struct(s) => Ok((m.value_field.id, s)),
234                _ => Err(Error::new(
235                    ErrorKind::PreconditionFailed,
236                    format!("Cannot add column: map value of '{parent}' is not a struct"),
237                )),
238            },
239            Type::List(l) => match l.element_field.field_type.as_ref() {
240                Type::Struct(s) => Ok((l.element_field.id, s)),
241                _ => Err(Error::new(
242                    ErrorKind::PreconditionFailed,
243                    format!("Cannot add column: list element of '{parent}' is not a struct"),
244                )),
245            },
246            _ => Err(Error::new(
247                ErrorKind::PreconditionFailed,
248                format!("Cannot add column: parent '{parent}' is not a struct, map, or list"),
249            )),
250        })
251}
252
253// ---------------------------------------------------------------------------
254// Schema tree rebuild
255// ---------------------------------------------------------------------------
256
257/// Rebuild a slice of fields, applying deletions and additions at every level,
258/// plus any additions keyed by `parent_id` (`None` represents the table root).
259fn rebuild_fields(
260    fields: &[NestedFieldRef],
261    adds: &HashMap<Option<i32>, Vec<NestedFieldRef>>,
262    delete_ids: &HashSet<i32>,
263    parent_id: Option<i32>,
264) -> Vec<NestedFieldRef> {
265    fields
266        .iter()
267        .filter(|f| !delete_ids.contains(&f.id))
268        .map(|f| rebuild_field(f, adds, delete_ids))
269        .chain(adds.get(&parent_id).into_iter().flatten().cloned())
270        .collect()
271}
272
273/// Recursively rebuild a single field. If the field (or any descendant) is a struct
274/// that has pending additions, those additions are appended to the struct's fields.
275/// Fields whose IDs appear in `delete_ids` are filtered out at every struct level.
276fn rebuild_field(
277    field: &NestedFieldRef,
278    adds: &HashMap<Option<i32>, Vec<NestedFieldRef>>,
279    delete_ids: &HashSet<i32>,
280) -> NestedFieldRef {
281    match field.field_type.as_ref() {
282        Type::Primitive(_) => field.clone(),
283        Type::Struct(s) => {
284            let new_fields = rebuild_fields(s.fields(), adds, delete_ids, Some(field.id));
285            Arc::new(NestedField {
286                id: field.id,
287                name: field.name.clone(),
288                required: field.required,
289                field_type: Box::new(Type::Struct(StructType::new(new_fields))),
290                doc: field.doc.clone(),
291                initial_default: field.initial_default.clone(),
292                write_default: field.write_default.clone(),
293            })
294        }
295        Type::List(l) => {
296            let new_element = rebuild_field(&l.element_field, adds, delete_ids);
297            Arc::new(NestedField {
298                id: field.id,
299                name: field.name.clone(),
300                required: field.required,
301                field_type: Box::new(Type::List(ListType {
302                    element_field: new_element,
303                })),
304                doc: field.doc.clone(),
305                initial_default: field.initial_default.clone(),
306                write_default: field.write_default.clone(),
307            })
308        }
309        Type::Map(m) => {
310            let new_key = rebuild_field(&m.key_field, adds, delete_ids);
311            let new_value = rebuild_field(&m.value_field, adds, delete_ids);
312            Arc::new(NestedField {
313                id: field.id,
314                name: field.name.clone(),
315                required: field.required,
316                field_type: Box::new(Type::Map(MapType {
317                    key_field: new_key,
318                    value_field: new_value,
319                })),
320                doc: field.doc.clone(),
321                initial_default: field.initial_default.clone(),
322                write_default: field.write_default.clone(),
323            })
324        }
325    }
326}
327
328// ---------------------------------------------------------------------------
329// TransactionAction implementation
330// ---------------------------------------------------------------------------
331
332#[async_trait]
333impl TransactionAction for UpdateSchemaAction {
334    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
335        let base_schema = table.metadata().current_schema();
336        let mut last_column_id = table.metadata().last_column_id();
337
338        // --- 1. Validate deletes ---
339        let delete_ids = self
340            .deletes
341            .iter()
342            .map(|name: &String| {
343                base_schema
344                    .field_by_name(name)
345                    .ok_or_else(|| {
346                        Error::new(
347                            ErrorKind::PreconditionFailed,
348                            format!("Cannot delete missing column: {name}"),
349                        )
350                    })
351                    .and_then(|field| {
352                        match base_schema
353                            .identifier_field_ids()
354                            .find(|id| *id == field.id)
355                        {
356                            Some(_) => Err(Error::new(
357                                ErrorKind::PreconditionFailed,
358                                format!("Cannot delete identifier field: {name}"),
359                            )),
360                            None => Ok(field.id),
361                        }
362                    })
363            })
364            .collect::<Result<HashSet<i32>>>()?;
365
366        // --- 2. Resolve parents, validate additions, assign IDs, and group by parent ID ---
367        // We assign IDs inline (before grouping) to preserve the caller's insertion order,
368        // since HashMap iteration order is non-deterministic.
369        let mut additions_by_parent: HashMap<Option<i32>, Vec<NestedFieldRef>> = HashMap::new();
370
371        for add in &self.additions {
372            let pending_field = add.to_nested_field();
373
374            // Check that name does not contain `SCHEMA_NAME_DELIMITER`.
375            if pending_field.name.contains(SCHEMA_NAME_DELIMITER) {
376                return Err(Error::new(
377                    ErrorKind::PreconditionFailed,
378                    format!(
379                        "Cannot add column with ambiguous name: {}. Use `AddColumn::with_parent` to add a column to a nested struct.",
380                        pending_field.name
381                    ),
382                ));
383            }
384
385            // Required columns without an initial default need allow_incompatible_changes.
386            if pending_field.required && pending_field.initial_default.is_none() {
387                return Err(Error::new(
388                    ErrorKind::PreconditionFailed,
389                    format!(
390                        "Incompatible change: cannot add required column without an initial default: {}",
391                        pending_field.name
392                    ),
393                ));
394            }
395
396            let parent_id = match &add.parent {
397                None => {
398                    // Root-level: check name conflict against root-level fields.
399                    if let Some(existing) = base_schema.field_by_name(&pending_field.name)
400                        && !delete_ids.contains(&existing.id)
401                    {
402                        return Err(Error::new(
403                            ErrorKind::PreconditionFailed,
404                            format!(
405                                "Cannot add column, name already exists: {}",
406                                pending_field.name
407                            ),
408                        ));
409                    }
410                    None
411                }
412                Some(parent_path) => {
413                    // Nested: resolve parent, check name conflict within parent struct.
414                    let (resolved_parent_id, parent_struct) =
415                        resolve_parent_target(base_schema, parent_path)?;
416
417                    if parent_struct.fields().iter().any(|f| {
418                        f.name == pending_field.name
419                            && !delete_ids.contains(&f.id)
420                            && !delete_ids.contains(&resolved_parent_id)
421                    }) {
422                        return Err(Error::new(
423                            ErrorKind::PreconditionFailed,
424                            format!(
425                                "Cannot add column, name already exists in '{}': {}",
426                                parent_path, pending_field.name
427                            ),
428                        ));
429                    }
430
431                    Some(resolved_parent_id)
432                }
433            };
434
435            // Assign fresh IDs immediately, preserving insertion order.
436            let field = assign_fresh_ids(&pending_field, &mut last_column_id);
437
438            additions_by_parent
439                .entry(parent_id)
440                .or_default()
441                .push(field);
442        }
443
444        // --- 4. Rebuild the schema tree with additions and deletions ---
445        let new_fields = rebuild_fields(
446            base_schema.as_struct().fields(),
447            &additions_by_parent,
448            &delete_ids,
449            None,
450        );
451
452        // --- 5. Build the new schema ---
453        let schema = Schema::builder()
454            .with_fields(new_fields)
455            .with_identifier_field_ids(base_schema.identifier_field_ids())
456            .build()?;
457
458        let updates = vec![
459            TableUpdate::AddSchema { schema },
460            TableUpdate::SetCurrentSchema { schema_id: -1 },
461        ];
462
463        let requirements = vec![TableRequirement::CurrentSchemaIdMatch {
464            current_schema_id: base_schema.schema_id(),
465        }];
466
467        Ok(ActionCommit::new(updates, requirements))
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use std::io::BufReader;
474    use std::sync::Arc;
475
476    use as_any::Downcast;
477
478    use crate::spec::{
479        DEFAULT_SCHEMA_ID, Literal, NestedField, PrimitiveType, StructType, TableMetadata, Type,
480    };
481    use crate::table::Table;
482    use crate::transaction::Transaction;
483    use crate::transaction::action::{ApplyTransactionAction, TransactionAction};
484    use crate::transaction::tests::make_v2_table;
485    use crate::transaction::update_schema::{AddColumn, DEFAULT_FIELD_ID, UpdateSchemaAction};
486    use crate::{ErrorKind, TableIdent, TableRequirement, TableUpdate};
487
488    // The V2 test table has:
489    //   last_column_id: 3
490    //   current schema (id=1): x(1, req, long), y(2, req, long), z(3, req, long)
491    //   identifier_field_ids: [1, 2]
492
493    /// Build a V2 test table that includes nested types:
494    ///
495    ///   last_column_id: 14
496    ///   current schema (id=0):
497    ///     x(1, req, long)           -- identifier
498    ///     y(2, req, long)           -- identifier
499    ///     z(3, req, long)
500    ///     person(4, opt, struct)
501    ///       name(5, opt, string)
502    ///       age(6, req, int)
503    ///     tags(7, opt, list<struct>)
504    ///       element(8, req, struct)
505    ///         key(9, opt, string)
506    ///         value(10, opt, string)
507    ///     props(11, opt, map<string, struct>)
508    ///       key(12, req, string)
509    ///       value(13, req, struct)
510    ///         data(14, opt, string)
511    fn make_v2_table_with_nested() -> Table {
512        let json = r#"{
513            "format-version": 2,
514            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c2",
515            "location": "s3://bucket/test/location",
516            "last-sequence-number": 0,
517            "last-updated-ms": 1602638573590,
518            "last-column-id": 14,
519            "current-schema-id": 0,
520            "schemas": [
521                {
522                    "type": "struct",
523                    "schema-id": 0,
524                    "identifier-field-ids": [1, 2],
525                    "fields": [
526                        {"id": 1, "name": "x", "required": true, "type": "long"},
527                        {"id": 2, "name": "y", "required": true, "type": "long"},
528                        {"id": 3, "name": "z", "required": true, "type": "long"},
529                        {"id": 4, "name": "person", "required": false, "type": {
530                            "type": "struct",
531                            "fields": [
532                                {"id": 5, "name": "name", "required": false, "type": "string"},
533                                {"id": 6, "name": "age", "required": true, "type": "int"}
534                            ]
535                        }},
536                        {"id": 7, "name": "tags", "required": false, "type": {
537                            "type": "list",
538                            "element-id": 8,
539                            "element": {
540                                "type": "struct",
541                                "fields": [
542                                    {"id": 9, "name": "key", "required": false, "type": "string"},
543                                    {"id": 10, "name": "value", "required": false, "type": "string"}
544                                ]
545                            },
546                            "element-required": true
547                        }},
548                        {"id": 11, "name": "props", "required": false, "type": {
549                            "type": "map",
550                            "key-id": 12,
551                            "key": "string",
552                            "value-id": 13,
553                            "value": {
554                                "type": "struct",
555                                "fields": [
556                                    {"id": 14, "name": "data", "required": false, "type": "string"}
557                                ]
558                            },
559                            "value-required": true
560                        }}
561                    ]
562                }
563            ],
564            "default-spec-id": 0,
565            "partition-specs": [
566                {"spec-id": 0, "fields": []}
567            ],
568            "last-partition-id": 999,
569            "default-sort-order-id": 0,
570            "sort-orders": [
571                {"order-id": 0, "fields": []}
572            ],
573            "properties": {},
574            "current-snapshot-id": -1,
575            "snapshots": []
576        }"#;
577
578        let reader = BufReader::new(json.as_bytes());
579        let metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
580
581        Table::builder()
582            .metadata(metadata)
583            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
584            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
585            .file_io(crate::io::FileIO::new_with_memory())
586            .runtime(crate::test_utils::test_runtime())
587            .build()
588            .unwrap()
589    }
590
591    // -----------------------------------------------------------------------
592    // Existing root-level tests
593    // -----------------------------------------------------------------------
594
595    #[tokio::test]
596    async fn test_add_column() {
597        let table = make_v2_table();
598        let tx = Transaction::new(&table);
599
600        let action = tx.update_schema().add_column(AddColumn::optional(
601            "new_col",
602            Type::Primitive(PrimitiveType::Int),
603        ));
604
605        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
606        let updates = action_commit.take_updates();
607        let requirements = action_commit.take_requirements();
608
609        assert_eq!(updates.len(), 2);
610
611        // Extract the new schema from the AddSchema update.
612        let new_schema = match &updates[0] {
613            TableUpdate::AddSchema { schema } => schema,
614            other => panic!("expected AddSchema, got {other:?}"),
615        };
616
617        let expected_schema = table
618            .metadata()
619            .current_schema()
620            .as_ref()
621            .clone()
622            .into_builder()
623            .with_schema_id(DEFAULT_SCHEMA_ID)
624            .with_fields([
625                NestedField::optional(4, "new_col", Type::Primitive(PrimitiveType::Int)).into(),
626            ])
627            .build()
628            .unwrap();
629        assert_eq!(new_schema, &expected_schema);
630
631        assert_eq!(updates[1], TableUpdate::SetCurrentSchema { schema_id: -1 });
632
633        // Verify requirement.
634        assert_eq!(requirements.len(), 1);
635        assert_eq!(requirements[0], TableRequirement::CurrentSchemaIdMatch {
636            current_schema_id: table.metadata().current_schema().schema_id()
637        });
638    }
639
640    #[tokio::test]
641    async fn test_add_column_with_doc() {
642        let table = make_v2_table();
643        let tx = Transaction::new(&table);
644
645        let action = tx.update_schema().add_column(
646            AddColumn::builder()
647                .name("documented_col")
648                .field_type(Type::Primitive(PrimitiveType::String))
649                .doc("A documented column")
650                .build(),
651        );
652
653        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
654        let updates = action_commit.take_updates();
655
656        let new_schema = match &updates[0] {
657            TableUpdate::AddSchema { schema } => schema,
658            other => panic!("expected AddSchema, got {other:?}"),
659        };
660
661        let field = new_schema
662            .field_by_name("documented_col")
663            .expect("documented_col should exist");
664        assert_eq!(field.id, 4);
665        assert!(!field.required);
666        assert_eq!(field.doc.as_deref(), Some("A documented column"));
667    }
668
669    #[tokio::test]
670    async fn test_add_required_column_with_initial_default() {
671        let table = make_v2_table();
672        let tx = Transaction::new(&table);
673
674        let action = tx.update_schema().add_column(AddColumn::required(
675            "req_col",
676            Type::Primitive(PrimitiveType::Int),
677            Literal::int(0),
678        ));
679
680        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
681        let updates = action_commit.take_updates();
682
683        let new_schema = match &updates[0] {
684            TableUpdate::AddSchema { schema } => schema,
685            other => panic!("expected AddSchema, got {other:?}"),
686        };
687
688        let field = new_schema
689            .field_by_name("req_col")
690            .expect("req_col should exist");
691        assert_eq!(field.id, 4);
692        assert!(field.required);
693        assert_eq!(field.initial_default, Some(Literal::int(0)));
694        assert_eq!(field.write_default, Some(Literal::int(0)));
695    }
696
697    #[tokio::test]
698    async fn test_add_column_name_conflict_fails() {
699        let table = make_v2_table();
700        let tx = Transaction::new(&table);
701
702        // "x" already exists in the V2 test schema.
703        let action = tx.update_schema().add_column(AddColumn::optional(
704            "x",
705            Type::Primitive(PrimitiveType::Int),
706        ));
707
708        let result = Arc::new(action).commit(&table).await;
709        let err = match result {
710            Err(e) => e,
711            Ok(_) => panic!("should reject adding a column with an existing name"),
712        };
713        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
714        assert!(
715            err.message().contains("already exists"),
716            "error should mention name conflict, got: {}",
717            err.message()
718        );
719    }
720
721    #[tokio::test]
722    async fn test_delete_column() {
723        let table = make_v2_table();
724        let tx = Transaction::new(&table);
725
726        // z is not an identifier field, so we can delete it.
727        let action = tx.update_schema().delete_column("z");
728
729        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
730        let updates = action_commit.take_updates();
731
732        let new_schema = match &updates[0] {
733            TableUpdate::AddSchema { schema } => schema,
734            other => panic!("expected AddSchema, got {other:?}"),
735        };
736
737        assert!(
738            new_schema.field_by_name("z").is_none(),
739            "z should be deleted"
740        );
741        assert!(new_schema.field_by_name("x").is_some());
742        assert!(new_schema.field_by_name("y").is_some());
743    }
744
745    #[tokio::test]
746    async fn test_delete_missing_column_fails() {
747        let table = make_v2_table();
748        let tx = Transaction::new(&table);
749
750        let action = tx.update_schema().delete_column("nonexistent");
751
752        let result = Arc::new(action).commit(&table).await;
753        let err = match result {
754            Err(e) => e,
755            Ok(_) => panic!("should reject deleting a non-existent column"),
756        };
757        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
758        assert!(
759            err.message().contains("nonexistent"),
760            "error should mention the missing column, got: {}",
761            err.message()
762        );
763    }
764
765    #[tokio::test]
766    async fn test_add_and_delete_combined() {
767        let table = make_v2_table();
768        let tx = Transaction::new(&table);
769
770        // Delete z, add a new column.
771        let action = tx
772            .update_schema()
773            .delete_column("z")
774            .add_column(AddColumn::optional(
775                "w",
776                Type::Primitive(PrimitiveType::Boolean),
777            ));
778
779        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
780        let updates = action_commit.take_updates();
781
782        let new_schema = match &updates[0] {
783            TableUpdate::AddSchema { schema } => schema,
784            other => panic!("expected AddSchema, got {other:?}"),
785        };
786
787        assert!(
788            new_schema.field_by_name("z").is_none(),
789            "z should be deleted"
790        );
791        let w = new_schema.field_by_name("w").expect("w should exist");
792        assert_eq!(w.id, 4);
793        assert!(!w.required);
794    }
795
796    #[tokio::test]
797    async fn test_delete_and_readd_same_name() {
798        let table = make_v2_table();
799        let tx = Transaction::new(&table);
800
801        // Delete z, then add a new column named z -- should succeed.
802        let action = tx
803            .update_schema()
804            .delete_column("z")
805            .add_column(AddColumn::optional(
806                "z",
807                Type::Primitive(PrimitiveType::Boolean),
808            ));
809
810        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
811        let updates = action_commit.take_updates();
812
813        let new_schema = match &updates[0] {
814            TableUpdate::AddSchema { schema } => schema,
815            other => panic!("expected AddSchema, got {other:?}"),
816        };
817
818        let z = new_schema
819            .field_by_name("z")
820            .expect("z should exist with new type");
821        assert_eq!(z.id, 4); // new ID, not the old 3
822        assert_eq!(*z.field_type, Type::Primitive(PrimitiveType::Boolean));
823    }
824
825    #[test]
826    fn test_apply() {
827        let table = make_v2_table();
828        let tx = Transaction::new(&table);
829
830        let tx = tx
831            .update_schema()
832            .add_column(AddColumn::optional(
833                "new_col",
834                Type::Primitive(PrimitiveType::Int),
835            ))
836            .apply(tx)
837            .unwrap();
838
839        assert_eq!(tx.actions.len(), 1);
840        (*tx.actions[0])
841            .downcast_ref::<UpdateSchemaAction>()
842            .expect("UpdateSchemaAction was not applied to Transaction!");
843    }
844
845    // -----------------------------------------------------------------------
846    // Nested add tests
847    // -----------------------------------------------------------------------
848
849    #[tokio::test]
850    async fn test_add_column_to_struct() {
851        let table = make_v2_table_with_nested();
852        let tx = Transaction::new(&table);
853
854        // Add "email" to the "person" struct.
855        let action = tx.update_schema().add_column(
856            AddColumn::builder()
857                .name("email")
858                .field_type(Type::Primitive(PrimitiveType::String))
859                .parent("person")
860                .build(),
861        );
862
863        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
864        let updates = action_commit.take_updates();
865
866        let new_schema = match &updates[0] {
867            TableUpdate::AddSchema { schema } => schema,
868            other => panic!("expected AddSchema, got {other:?}"),
869        };
870
871        // "email" should be nested under "person" with ID = last_column_id + 1 = 15.
872        let email = new_schema
873            .field_by_name("person.email")
874            .expect("person.email should exist");
875        assert_eq!(email.id, 15);
876        assert!(!email.required);
877        assert_eq!(*email.field_type, Type::Primitive(PrimitiveType::String));
878
879        // Original nested fields should still be there.
880        assert!(new_schema.field_by_name("person.name").is_some());
881        assert!(new_schema.field_by_name("person.age").is_some());
882    }
883
884    #[tokio::test]
885    async fn test_add_column_to_struct_with_doc() {
886        let table = make_v2_table_with_nested();
887        let tx = Transaction::new(&table);
888
889        let action = tx.update_schema().add_column(
890            AddColumn::builder()
891                .name("phone")
892                .field_type(Type::Primitive(PrimitiveType::String))
893                .parent("person")
894                .doc("Phone number")
895                .build(),
896        );
897
898        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
899        let updates = action_commit.take_updates();
900
901        let new_schema = match &updates[0] {
902            TableUpdate::AddSchema { schema } => schema,
903            other => panic!("expected AddSchema, got {other:?}"),
904        };
905
906        let phone = new_schema
907            .field_by_name("person.phone")
908            .expect("person.phone should exist");
909        assert_eq!(phone.id, 15);
910        assert_eq!(phone.doc.as_deref(), Some("Phone number"));
911    }
912
913    #[tokio::test]
914    async fn test_add_column_to_list_element_struct() {
915        let table = make_v2_table_with_nested();
916        let tx = Transaction::new(&table);
917
918        // "tags" is a list<struct{key, value}>. Adding to the list navigates to its
919        // element struct automatically.
920        let action = tx.update_schema().add_column(
921            AddColumn::builder()
922                .name("score")
923                .field_type(Type::Primitive(PrimitiveType::Double))
924                .parent("tags")
925                .build(),
926        );
927
928        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
929        let updates = action_commit.take_updates();
930
931        let new_schema = match &updates[0] {
932            TableUpdate::AddSchema { schema } => schema,
933            other => panic!("expected AddSchema, got {other:?}"),
934        };
935
936        // The list element struct should now contain "score".
937        let score = new_schema
938            .field_by_name("tags.element.score")
939            .expect("tags.element.score should exist");
940        assert_eq!(score.id, 15);
941        assert!(!score.required);
942
943        // Existing fields preserved.
944        assert!(new_schema.field_by_name("tags.element.key").is_some());
945        assert!(new_schema.field_by_name("tags.element.value").is_some());
946    }
947
948    #[tokio::test]
949    async fn test_add_column_to_map_value_struct() {
950        let table = make_v2_table_with_nested();
951        let tx = Transaction::new(&table);
952
953        // "props" is a map<string, struct{data}>. Adding to the map navigates to its
954        // value struct automatically.
955        let action = tx.update_schema().add_column(
956            AddColumn::builder()
957                .name("version")
958                .field_type(Type::Primitive(PrimitiveType::Int))
959                .parent("props")
960                .build(),
961        );
962
963        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
964        let updates = action_commit.take_updates();
965
966        let new_schema = match &updates[0] {
967            TableUpdate::AddSchema { schema } => schema,
968            other => panic!("expected AddSchema, got {other:?}"),
969        };
970
971        let version = new_schema
972            .field_by_name("props.value.version")
973            .expect("props.value.version should exist");
974        assert_eq!(version.id, 15);
975
976        // Existing map value fields preserved.
977        assert!(new_schema.field_by_name("props.value.data").is_some());
978    }
979
980    #[tokio::test]
981    async fn test_add_column_to_nonexistent_parent_fails() {
982        let table = make_v2_table_with_nested();
983        let tx = Transaction::new(&table);
984
985        let action = tx.update_schema().add_column(
986            AddColumn::builder()
987                .name("col")
988                .field_type(Type::Primitive(PrimitiveType::Int))
989                .parent("nonexistent")
990                .build(),
991        );
992
993        let err = match Arc::new(action).commit(&table).await {
994            Err(e) => e,
995            Ok(_) => panic!("should reject adding to a nonexistent parent"),
996        };
997        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
998        assert!(
999            err.message().contains("nonexistent"),
1000            "error should mention the missing parent, got: {}",
1001            err.message()
1002        );
1003    }
1004
1005    #[tokio::test]
1006    async fn test_add_column_to_primitive_parent_fails() {
1007        let table = make_v2_table_with_nested();
1008        let tx = Transaction::new(&table);
1009
1010        // "x" is a primitive (long), not a struct.
1011        let action = tx.update_schema().add_column(
1012            AddColumn::builder()
1013                .name("col")
1014                .field_type(Type::Primitive(PrimitiveType::Int))
1015                .parent("x")
1016                .build(),
1017        );
1018
1019        let err = match Arc::new(action).commit(&table).await {
1020            Err(e) => e,
1021            Ok(_) => panic!("should reject adding to a primitive parent"),
1022        };
1023        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
1024        assert!(
1025            err.message().contains("not a struct"),
1026            "error should mention type mismatch, got: {}",
1027            err.message()
1028        );
1029    }
1030
1031    #[tokio::test]
1032    async fn test_add_column_to_nested_name_conflict_fails() {
1033        let table = make_v2_table_with_nested();
1034        let tx = Transaction::new(&table);
1035
1036        // "name" already exists in the "person" struct.
1037        let action = tx.update_schema().add_column(
1038            AddColumn::builder()
1039                .name("name")
1040                .field_type(Type::Primitive(PrimitiveType::String))
1041                .parent("person")
1042                .build(),
1043        );
1044
1045        let err = match Arc::new(action).commit(&table).await {
1046            Err(e) => e,
1047            Ok(_) => panic!("should reject adding a column with conflicting name"),
1048        };
1049        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
1050        assert!(
1051            err.message().contains("already exists"),
1052            "error should mention name conflict, got: {}",
1053            err.message()
1054        );
1055    }
1056
1057    #[tokio::test]
1058    async fn test_root_and_nested_add_combined() {
1059        let table = make_v2_table_with_nested();
1060        let tx = Transaction::new(&table);
1061
1062        // Add a root column and a nested column in the same action.
1063        let action = tx
1064            .update_schema()
1065            .add_column(AddColumn::optional(
1066                "root_col",
1067                Type::Primitive(PrimitiveType::Boolean),
1068            ))
1069            .add_column(
1070                AddColumn::builder()
1071                    .name("email")
1072                    .field_type(Type::Primitive(PrimitiveType::String))
1073                    .parent("person")
1074                    .build(),
1075            );
1076
1077        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
1078        let updates = action_commit.take_updates();
1079
1080        let new_schema = match &updates[0] {
1081            TableUpdate::AddSchema { schema } => schema,
1082            other => panic!("expected AddSchema, got {other:?}"),
1083        };
1084
1085        // Root column gets the first fresh ID.
1086        let root_col = new_schema
1087            .field_by_name("root_col")
1088            .expect("root_col should exist");
1089        assert_eq!(root_col.id, 15);
1090
1091        // Nested column gets the next ID.
1092        let email = new_schema
1093            .field_by_name("person.email")
1094            .expect("person.email should exist");
1095        assert_eq!(email.id, 16);
1096    }
1097
1098    #[tokio::test]
1099    async fn test_add_nested_struct_type_with_fresh_ids() {
1100        // Adding a new column whose TYPE contains nested fields (e.g. a struct column). All sub-fields must receive
1101        // fresh IDs, not placeholder `DEFAULT_FIELD_ID`.
1102        let table = make_v2_table();
1103        let tx = Transaction::new(&table);
1104
1105        let action = tx.update_schema().add_column(AddColumn::optional(
1106            "address",
1107            Type::Struct(StructType::new(vec![
1108                NestedField::optional(
1109                    DEFAULT_FIELD_ID,
1110                    "street",
1111                    Type::Primitive(PrimitiveType::String),
1112                )
1113                .into(),
1114                NestedField::optional(
1115                    DEFAULT_FIELD_ID,
1116                    "city",
1117                    Type::Primitive(PrimitiveType::String),
1118                )
1119                .into(),
1120            ])),
1121        ));
1122
1123        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
1124        let updates = action_commit.take_updates();
1125
1126        let new_schema = match &updates[0] {
1127            TableUpdate::AddSchema { schema } => schema,
1128            other => panic!("expected AddSchema, got {other:?}"),
1129        };
1130
1131        // "address" gets ID 4 (last_column_id=3, +1).
1132        let address = new_schema
1133            .field_by_name("address")
1134            .expect("address should exist");
1135        assert_eq!(address.id, 4);
1136
1137        // Sub-fields get IDs 5 and 6.
1138        let street = new_schema
1139            .field_by_name("address.street")
1140            .expect("address.street should exist");
1141        assert_eq!(street.id, 5);
1142
1143        let city = new_schema
1144            .field_by_name("address.city")
1145            .expect("address.city should exist");
1146        assert_eq!(city.id, 6);
1147    }
1148}