iceberg/arrow/
record_batch_transformer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::{
22    Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, RecordBatchOptions, RunArray,
23};
24use arrow_cast::cast;
25use arrow_schema::{
26    DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
27};
28use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
29
30use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element};
31use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
32use crate::metadata_columns::get_metadata_field;
33use crate::spec::{
34    Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform,
35};
36use crate::{Error, ErrorKind, Result};
37
38/// Build a map of field ID to constant value (as Datum) for identity-partitioned fields.
39///
40/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants
41/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.)
42/// store derived values in partition metadata, so source columns must be read from data files.
43///
44/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` (bucket number),
45/// but the actual `id` values (100, 200, 300) are only in the data file.
46///
47/// Matches Java's `PartitionUtil.constantsMap()` which filters `if (field.transform().isIdentity())`.
48///
49/// # References
50/// - Spec: https://iceberg.apache.org/spec/#column-projection
51/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
52fn constants_map(
53    partition_spec: &PartitionSpec,
54    partition_data: &Struct,
55    schema: &IcebergSchema,
56) -> Result<HashMap<i32, Datum>> {
57    let mut constants = HashMap::new();
58
59    for (pos, field) in partition_spec.fields().iter().enumerate() {
60        // Only identity transforms should use constant values from partition metadata
61        if matches!(field.transform, Transform::Identity) {
62            // Get the field from schema to extract its type
63            let iceberg_field = schema.field_by_id(field.source_id).ok_or(Error::new(
64                ErrorKind::Unexpected,
65                format!("Field {} not found in schema", field.source_id),
66            ))?;
67
68            // Ensure the field type is primitive
69            let prim_type = match &*iceberg_field.field_type {
70                crate::spec::Type::Primitive(prim_type) => prim_type,
71                _ => {
72                    return Err(Error::new(
73                        ErrorKind::Unexpected,
74                        format!(
75                            "Partition field {} has non-primitive type {:?}",
76                            field.source_id, iceberg_field.field_type
77                        ),
78                    ));
79                }
80            };
81
82            // Get the partition value for this field
83            // Handle both None (null) and Some(Literal::Primitive) cases
84            match &partition_data[pos] {
85                None => {
86                    // Skip null partition values - they will be resolved as null per Iceberg spec rule #4.
87                    // When a partition value is null, we don't add it to the constants map,
88                    // allowing downstream column resolution to handle it correctly.
89                    continue;
90                }
91                Some(Literal::Primitive(value)) => {
92                    // Create a Datum from the primitive type and value
93                    let datum = Datum::new(prim_type.clone(), value.clone());
94                    constants.insert(field.source_id, datum);
95                }
96                Some(literal) => {
97                    return Err(Error::new(
98                        ErrorKind::Unexpected,
99                        format!(
100                            "Partition field {} has non-primitive value: {:?}",
101                            field.source_id, literal
102                        ),
103                    ));
104                }
105            }
106        }
107    }
108
109    Ok(constants)
110}
111
112/// Indicates how a particular column in a processed RecordBatch should
113/// be sourced.
114#[derive(Debug)]
115pub(crate) enum ColumnSource {
116    // signifies that a column should be passed through unmodified
117    // from the file's RecordBatch
118    PassThrough {
119        source_index: usize,
120    },
121
122    // signifies that a column from the file's RecordBatch has undergone
123    // type promotion so the source column with the given index needs
124    // to be promoted to the specified type
125    Promote {
126        target_type: DataType,
127        source_index: usize,
128    },
129
130    // Signifies that a new column has been inserted before the column
131    // with index `index`. (we choose "before" rather than "after" so
132    // that we can use usize; if we insert after, then we need to
133    // be able to store -1 here to signify that a new
134    // column is to be added at the front of the column list).
135    // If multiple columns need to be inserted at a given
136    // location, they should all be given the same index, as the index
137    // here refers to the original RecordBatch, not the interim state after
138    // a preceding operation.
139    Add {
140        target_type: DataType,
141        value: Option<PrimitiveLiteral>,
142    },
143    // The iceberg spec refers to other permissible schema evolution actions
144    // (see https://iceberg.apache.org/spec/#schema-evolution):
145    // renaming fields, deleting fields and reordering fields.
146    // Renames only affect the schema of the RecordBatch rather than the
147    // columns themselves, so a single updated cached schema can
148    // be re-used and no per-column actions are required.
149    // Deletion and Reorder can be achieved without needing this
150    // post-processing step by using the projection mask.
151}
152
153#[derive(Debug)]
154enum BatchTransform {
155    // Indicates that no changes need to be performed to the RecordBatches
156    // coming in from the stream and that they can be passed through
157    // unmodified
158    PassThrough,
159
160    Modify {
161        // Every transformed RecordBatch will have the same schema. We create the
162        // target just once and cache it here. Helpfully, Arc<Schema> is needed in
163        // the constructor for RecordBatch, so we don't need an expensive copy
164        // each time we build a new RecordBatch
165        target_schema: Arc<ArrowSchema>,
166
167        // Indicates how each column in the target schema is derived.
168        operations: Vec<ColumnSource>,
169    },
170
171    // Sometimes only the schema will need modifying, for example when
172    // the column names have changed vs the file, but not the column types.
173    // we can avoid a heap allocation per RecordBach in this case by retaining
174    // the existing column Vec.
175    ModifySchema {
176        target_schema: Arc<ArrowSchema>,
177    },
178}
179
180#[derive(Debug)]
181enum SchemaComparison {
182    Equivalent,
183    NameChangesOnly,
184    Different,
185}
186
187/// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters.
188///
189/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and
190/// identity-partitioned fields to avoid duplicate work during batch processing.
191#[derive(Debug)]
192pub(crate) struct RecordBatchTransformerBuilder {
193    snapshot_schema: Arc<IcebergSchema>,
194    projected_iceberg_field_ids: Vec<i32>,
195    constant_fields: HashMap<i32, Datum>,
196}
197
198impl RecordBatchTransformerBuilder {
199    pub(crate) fn new(
200        snapshot_schema: Arc<IcebergSchema>,
201        projected_iceberg_field_ids: &[i32],
202    ) -> Self {
203        Self {
204            snapshot_schema,
205            projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
206            constant_fields: HashMap::new(),
207        }
208    }
209
210    /// Add a constant value for a specific field ID.
211    /// This is used for virtual/metadata fields like _file that have constant values per batch.
212    ///
213    /// # Arguments
214    /// * `field_id` - The field ID to associate with the constant
215    /// * `datum` - The constant value (with type) for this field
216    pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self {
217        self.constant_fields.insert(field_id, datum);
218        self
219    }
220
221    /// Set partition spec and data together for identifying identity-transformed partition columns.
222    ///
223    /// Both partition_spec and partition_data must be provided together since the spec defines
224    /// which fields are identity-partitioned, and the data provides their constant values.
225    /// This method computes the partition constants and merges them into constant_fields.
226    pub(crate) fn with_partition(
227        mut self,
228        partition_spec: Arc<PartitionSpec>,
229        partition_data: Struct,
230    ) -> Result<Self> {
231        // Compute partition constants for identity-transformed fields (already returns Datum)
232        let partition_constants =
233            constants_map(&partition_spec, &partition_data, &self.snapshot_schema)?;
234
235        // Add partition constants to constant_fields
236        for (field_id, datum) in partition_constants {
237            self.constant_fields.insert(field_id, datum);
238        }
239
240        Ok(self)
241    }
242
243    pub(crate) fn build(self) -> RecordBatchTransformer {
244        RecordBatchTransformer {
245            snapshot_schema: self.snapshot_schema,
246            projected_iceberg_field_ids: self.projected_iceberg_field_ids,
247            constant_fields: self.constant_fields,
248            batch_transform: None,
249        }
250    }
251}
252
253/// Transforms RecordBatches from Parquet files to match the Iceberg table schema.
254///
255/// Handles schema evolution, column reordering, type promotion, and implements the Iceberg spec's
256/// "Column Projection" rules for resolving field IDs "not present" in data files:
257/// 1. Return the value from partition metadata if an Identity Transform exists
258/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id (applied in ArrowReader)
259/// 3. Return the default value if it has a defined initial-default
260/// 4. Return null in all other cases
261///
262/// # Field ID Resolution
263///
264/// Field ID resolution happens in ArrowReader before data is read (matching Java's ReadConf):
265/// - If file has embedded field IDs: trust them (ParquetSchemaUtil.hasIds() = true)
266/// - If file lacks IDs and name_mapping exists: apply name mapping (ParquetSchemaUtil.applyNameMapping())
267/// - If file lacks IDs and no name_mapping: use position-based fallback (ParquetSchemaUtil.addFallbackIds())
268///
269/// By the time RecordBatchTransformer processes data, all field IDs are trustworthy.
270/// This transformer only handles remaining projection rules (#1, #3, #4) for fields still "not present".
271///
272/// # Partition Spec and Data
273///
274/// **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants)
275/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on
276/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the bucket number in
277/// partition metadata, so actual `id` values must be read from the data file.
278///
279/// # References
280/// - Spec: https://iceberg.apache.org/spec/#column-projection
281/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java (field ID resolution)
282/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java (partition constants)
283#[derive(Debug)]
284pub(crate) struct RecordBatchTransformer {
285    snapshot_schema: Arc<IcebergSchema>,
286    projected_iceberg_field_ids: Vec<i32>,
287    // Pre-computed constant field information: field_id -> Datum
288    // Includes both virtual/metadata fields (like _file) and identity-partitioned fields
289    // Datum holds both the Iceberg type and the value
290    constant_fields: HashMap<i32, Datum>,
291
292    // BatchTransform gets lazily constructed based on the schema of
293    // the first RecordBatch we receive from the file
294    batch_transform: Option<BatchTransform>,
295}
296
297impl RecordBatchTransformer {
298    pub(crate) fn process_record_batch(
299        &mut self,
300        record_batch: RecordBatch,
301    ) -> Result<RecordBatch> {
302        Ok(match &self.batch_transform {
303            Some(BatchTransform::PassThrough) => record_batch,
304            Some(BatchTransform::Modify {
305                target_schema,
306                operations,
307            }) => {
308                let options = RecordBatchOptions::default()
309                    .with_match_field_names(false)
310                    .with_row_count(Some(record_batch.num_rows()));
311                RecordBatch::try_new_with_options(
312                    Arc::clone(target_schema),
313                    self.transform_columns(record_batch.columns(), operations)?,
314                    &options,
315                )?
316            }
317            Some(BatchTransform::ModifySchema { target_schema }) => {
318                let options = RecordBatchOptions::default()
319                    .with_match_field_names(false)
320                    .with_row_count(Some(record_batch.num_rows()));
321                RecordBatch::try_new_with_options(
322                    Arc::clone(target_schema),
323                    record_batch.columns().to_vec(),
324                    &options,
325                )?
326            }
327            None => {
328                self.batch_transform = Some(Self::generate_batch_transform(
329                    record_batch.schema_ref(),
330                    self.snapshot_schema.as_ref(),
331                    &self.projected_iceberg_field_ids,
332                    &self.constant_fields,
333                )?);
334
335                self.process_record_batch(record_batch)?
336            }
337        })
338    }
339
340    // Compare the schema of the incoming RecordBatches to the schema of
341    // the Iceberg snapshot to determine what, if any, transformation
342    // needs to be applied. If the schemas match, we return BatchTransform::PassThrough
343    // to indicate that no changes need to be made. Otherwise, we return a
344    // BatchTransform::Modify containing the target RecordBatch schema and
345    // the list of `ColumnSource`s that indicate how to source each column in
346    // the resulting RecordBatches.
347    fn generate_batch_transform(
348        source_schema: &ArrowSchemaRef,
349        snapshot_schema: &IcebergSchema,
350        projected_iceberg_field_ids: &[i32],
351        constant_fields: &HashMap<i32, Datum>,
352    ) -> Result<BatchTransform> {
353        let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
354        let field_id_to_mapped_schema_map =
355            Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?;
356
357        // Create a new arrow schema by selecting fields from mapped_unprojected,
358        // in the order of the field ids in projected_iceberg_field_ids
359        let fields: Result<Vec<_>> = projected_iceberg_field_ids
360            .iter()
361            .map(|field_id| {
362                // Check if this is a constant field
363                if constant_fields.contains_key(field_id) {
364                    // For metadata/virtual fields (like _file), get name from metadata_columns
365                    // For partition fields, get name from schema (they exist in schema)
366                    if let Ok(iceberg_field) = get_metadata_field(*field_id) {
367                        // This is a metadata/virtual field - convert Iceberg field to Arrow
368                        let datum = constant_fields.get(field_id).ok_or(Error::new(
369                            ErrorKind::Unexpected,
370                            "constant field not found",
371                        ))?;
372                        let arrow_type = datum_to_arrow_type_with_ree(datum);
373                        let arrow_field =
374                            Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required)
375                                .with_metadata(HashMap::from([(
376                                    PARQUET_FIELD_ID_META_KEY.to_string(),
377                                    iceberg_field.id.to_string(),
378                                )]));
379                        Ok(Arc::new(arrow_field))
380                    } else {
381                        // This is a partition constant field (exists in schema but uses constant value)
382                        let field = &field_id_to_mapped_schema_map
383                            .get(field_id)
384                            .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
385                            .0;
386                        let datum = constant_fields.get(field_id).ok_or(Error::new(
387                            ErrorKind::Unexpected,
388                            "constant field not found",
389                        ))?;
390                        let arrow_type = datum_to_arrow_type_with_ree(datum);
391                        // Use the type from constant_fields (REE for constants)
392                        let constant_field =
393                            Field::new(field.name(), arrow_type, field.is_nullable())
394                                .with_metadata(field.metadata().clone());
395                        Ok(Arc::new(constant_field))
396                    }
397                } else {
398                    // Regular field - use schema as-is
399                    Ok(field_id_to_mapped_schema_map
400                        .get(field_id)
401                        .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
402                        .0
403                        .clone())
404                }
405            })
406            .collect();
407
408        let target_schema = Arc::new(ArrowSchema::new(fields?));
409
410        match Self::compare_schemas(source_schema, &target_schema) {
411            SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
412            SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }),
413            SchemaComparison::Different => Ok(BatchTransform::Modify {
414                operations: Self::generate_transform_operations(
415                    source_schema,
416                    snapshot_schema,
417                    projected_iceberg_field_ids,
418                    field_id_to_mapped_schema_map,
419                    constant_fields,
420                )?,
421                target_schema,
422            }),
423        }
424    }
425
426    /// Compares the source and target schemas
427    /// Determines if they have changed in any meaningful way:
428    ///  * If they have different numbers of fields, then we need to modify
429    ///    the incoming RecordBatch schema AND columns
430    ///  * If they have the same number of fields, but some of them differ in
431    ///    either data type or nullability, then we need to modify the
432    ///    incoming RecordBatch schema AND columns
433    ///  * If the schemas differ only in the column names, then we need
434    ///    to modify the RecordBatch schema BUT we can keep the
435    ///    original column data unmodified
436    ///  * If the schemas are identical (or differ only in inconsequential
437    ///    ways) then we can pass through the original RecordBatch unmodified
438    fn compare_schemas(
439        source_schema: &ArrowSchemaRef,
440        target_schema: &ArrowSchemaRef,
441    ) -> SchemaComparison {
442        if source_schema.fields().len() != target_schema.fields().len() {
443            return SchemaComparison::Different;
444        }
445
446        let mut names_changed = false;
447
448        for (source_field, target_field) in source_schema
449            .fields()
450            .iter()
451            .zip(target_schema.fields().iter())
452        {
453            if source_field.data_type() != target_field.data_type()
454                || source_field.is_nullable() != target_field.is_nullable()
455            {
456                return SchemaComparison::Different;
457            }
458
459            if source_field.name() != target_field.name() {
460                names_changed = true;
461            }
462        }
463
464        if names_changed {
465            SchemaComparison::NameChangesOnly
466        } else {
467            SchemaComparison::Equivalent
468        }
469    }
470
471    fn generate_transform_operations(
472        source_schema: &ArrowSchemaRef,
473        snapshot_schema: &IcebergSchema,
474        projected_iceberg_field_ids: &[i32],
475        field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
476        constant_fields: &HashMap<i32, Datum>,
477    ) -> Result<Vec<ColumnSource>> {
478        let field_id_to_source_schema_map =
479            Self::build_field_id_to_arrow_schema_map(source_schema)?;
480
481        projected_iceberg_field_ids
482            .iter()
483            .map(|field_id| {
484                // Check if this is a constant field (metadata/virtual or identity-partitioned)
485                // Constant fields always use their pre-computed constant values, regardless of whether
486                // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata
487                // is authoritative and should be preferred over file data.
488                if let Some(datum) = constant_fields.get(field_id) {
489                    let arrow_type = datum_to_arrow_type_with_ree(datum);
490                    return Ok(ColumnSource::Add {
491                        value: Some(datum.literal().clone()),
492                        target_type: arrow_type,
493                    });
494                }
495
496                let (target_field, _) =
497                    field_id_to_mapped_schema_map
498                        .get(field_id)
499                        .ok_or(Error::new(
500                            ErrorKind::Unexpected,
501                            "could not find field in schema",
502                        ))?;
503                let target_type = target_field.data_type();
504
505                let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
506                    ErrorKind::Unexpected,
507                    "Field not found in snapshot schema",
508                ))?;
509
510                // Iceberg spec's "Column Projection" rules (https://iceberg.apache.org/spec/#column-projection).
511                // For fields "not present" in data files:
512                // 1. Use partition metadata (identity transforms only)
513                // 2. Use name mapping
514                // 3. Use initial_default
515                // 4. Return null
516                //
517                // Why check partition constants before Parquet field IDs (Java: BaseParquetReaders.java:299):
518                // In add_files scenarios, partition columns may exist in BOTH Parquet AND partition metadata.
519                // Partition metadata is authoritative - it defines which partition this file belongs to.
520
521                // Field ID resolution now happens in ArrowReader via:
522                // 1. Embedded field IDs (ParquetSchemaUtil.hasIds() = true) - trust them
523                // 2. Name mapping (ParquetSchemaUtil.applyNameMapping()) - applied upfront
524                // 3. Position-based fallback (ParquetSchemaUtil.addFallbackIds()) - applied upfront
525                //
526                // At this point, all field IDs in the source schema are trustworthy.
527                // No conflict detection needed - schema resolution happened in reader.rs.
528                let field_by_id = field_id_to_source_schema_map.get(field_id).map(
529                    |(source_field, source_index)| {
530                        if source_field.data_type().equals_datatype(target_type) {
531                            ColumnSource::PassThrough {
532                                source_index: *source_index,
533                            }
534                        } else {
535                            ColumnSource::Promote {
536                                target_type: target_type.clone(),
537                                source_index: *source_index,
538                            }
539                        }
540                    },
541                );
542
543                // Apply spec's fallback steps for "not present" fields.
544                // Rule #1 (constants) is handled at the beginning of this function
545                let column_source = if let Some(source) = field_by_id {
546                    source
547                } else {
548                    // Rules #2, #3 and #4:
549                    // Rule #2 (name mapping) was already applied in reader.rs if needed.
550                    // If field_id is still not found, the column doesn't exist in the Parquet file.
551                    // Fall through to rule #3 (initial_default) or rule #4 (null).
552                    let default_value = iceberg_field.initial_default.as_ref().and_then(|lit| {
553                        if let Literal::Primitive(prim) = lit {
554                            Some(prim.clone())
555                        } else {
556                            None
557                        }
558                    });
559
560                    ColumnSource::Add {
561                        value: default_value,
562                        target_type: target_type.clone(),
563                    }
564                };
565
566                Ok(column_source)
567            })
568            .collect()
569    }
570
571    fn build_field_id_to_arrow_schema_map(
572        source_schema: &SchemaRef,
573    ) -> Result<HashMap<i32, (FieldRef, usize)>> {
574        let mut field_id_to_source_schema = HashMap::new();
575        for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() {
576            // Check if field has a field ID in metadata
577            if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
578                let this_field_id = field_id_str.parse().map_err(|e| {
579                    Error::new(
580                        ErrorKind::DataInvalid,
581                        format!("field id not parseable as an i32: {e}"),
582                    )
583                })?;
584
585                field_id_to_source_schema
586                    .insert(this_field_id, (source_field.clone(), source_field_idx));
587            }
588            // If field doesn't have a field ID, skip it - name mapping will handle it
589        }
590
591        Ok(field_id_to_source_schema)
592    }
593
594    fn transform_columns(
595        &self,
596        columns: &[Arc<dyn ArrowArray>],
597        operations: &[ColumnSource],
598    ) -> Result<Vec<Arc<dyn ArrowArray>>> {
599        if columns.is_empty() {
600            return Ok(columns.to_vec());
601        }
602        let num_rows = columns[0].len();
603
604        operations
605            .iter()
606            .map(|op| {
607                Ok(match op {
608                    ColumnSource::PassThrough { source_index } => columns[*source_index].clone(),
609
610                    ColumnSource::Promote {
611                        target_type,
612                        source_index,
613                    } => cast(&*columns[*source_index], target_type)?,
614
615                    ColumnSource::Add { target_type, value } => {
616                        Self::create_column(target_type, value, num_rows)?
617                    }
618                })
619            })
620            .collect()
621    }
622
623    fn create_column(
624        target_type: &DataType,
625        prim_lit: &Option<PrimitiveLiteral>,
626        num_rows: usize,
627    ) -> Result<ArrayRef> {
628        // Check if this is a RunEndEncoded type (for constant fields)
629        if let DataType::RunEndEncoded(_, values_field) = target_type {
630            // Helper to create a Run-End Encoded array
631            let create_ree_array = |values_array: ArrayRef| -> Result<ArrayRef> {
632                let run_ends = if num_rows == 0 {
633                    Int32Array::from(Vec::<i32>::new())
634                } else {
635                    Int32Array::from(vec![num_rows as i32])
636                };
637                Ok(Arc::new(
638                    RunArray::try_new(&run_ends, &values_array).map_err(|e| {
639                        Error::new(
640                            ErrorKind::Unexpected,
641                            "Failed to create RunArray for constant value",
642                        )
643                        .with_source(e)
644                    })?,
645                ))
646            };
647
648            // Create the values array using the helper function
649            let values_array =
650                create_primitive_array_single_element(values_field.data_type(), prim_lit)?;
651
652            // Wrap in Run-End Encoding
653            create_ree_array(values_array)
654        } else {
655            // Non-REE type (simple arrays for non-constant fields)
656            create_primitive_array_repeated(target_type, prim_lit, num_rows)
657        }
658    }
659}
660
661#[cfg(test)]
662mod test {
663    use std::collections::HashMap;
664    use std::sync::Arc;
665
666    use arrow_array::{
667        Array, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
668        StringArray,
669    };
670    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
671    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
672
673    use crate::arrow::record_batch_transformer::{
674        RecordBatchTransformer, RecordBatchTransformerBuilder,
675    };
676    use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type};
677
678    /// Helper to extract string values from either StringArray or RunEndEncoded<StringArray>
679    /// Returns empty string for null values
680    fn get_string_value(array: &dyn Array, index: usize) -> String {
681        if let Some(string_array) = array.as_any().downcast_ref::<StringArray>() {
682            if string_array.is_null(index) {
683                String::new()
684            } else {
685                string_array.value(index).to_string()
686            }
687        } else if let Some(run_array) = array
688            .as_any()
689            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
690        {
691            let values = run_array.values();
692            let string_values = values
693                .as_any()
694                .downcast_ref::<StringArray>()
695                .expect("REE values should be StringArray");
696            // For REE, all rows have the same value (index 0 in the values array)
697            if string_values.is_null(0) {
698                String::new()
699            } else {
700                string_values.value(0).to_string()
701            }
702        } else {
703            panic!("Expected StringArray or RunEndEncoded<StringArray>");
704        }
705    }
706
707    /// Helper to extract int values from either Int32Array or RunEndEncoded<Int32Array>
708    fn get_int_value(array: &dyn Array, index: usize) -> i32 {
709        if let Some(int_array) = array.as_any().downcast_ref::<Int32Array>() {
710            int_array.value(index)
711        } else if let Some(run_array) = array
712            .as_any()
713            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
714        {
715            let values = run_array.values();
716            let int_values = values
717                .as_any()
718                .downcast_ref::<Int32Array>()
719                .expect("REE values should be Int32Array");
720            int_values.value(0)
721        } else {
722            panic!("Expected Int32Array or RunEndEncoded<Int32Array>");
723        }
724    }
725
726    #[test]
727    fn build_field_id_to_source_schema_map_works() {
728        let arrow_schema = arrow_schema_already_same_as_target();
729
730        let result =
731            RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap();
732
733        let expected = HashMap::from_iter([
734            (10, (arrow_schema.fields()[0].clone(), 0)),
735            (11, (arrow_schema.fields()[1].clone(), 1)),
736            (12, (arrow_schema.fields()[2].clone(), 2)),
737            (14, (arrow_schema.fields()[3].clone(), 3)),
738            (15, (arrow_schema.fields()[4].clone(), 4)),
739        ]);
740
741        assert!(result.eq(&expected));
742    }
743
744    #[test]
745    fn processor_returns_properly_shaped_record_batch_when_no_schema_migration_required() {
746        let snapshot_schema = Arc::new(iceberg_table_schema());
747        let projected_iceberg_field_ids = [13, 14];
748
749        let mut inst =
750            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
751                .build();
752
753        let result = inst
754            .process_record_batch(source_record_batch_no_migration_required())
755            .unwrap();
756
757        let expected = source_record_batch_no_migration_required();
758
759        assert_eq!(result, expected);
760    }
761
762    #[test]
763    fn processor_returns_properly_shaped_record_batch_when_schema_migration_required() {
764        let snapshot_schema = Arc::new(iceberg_table_schema());
765        let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f
766
767        let mut inst =
768            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
769                .build();
770
771        let result = inst.process_record_batch(source_record_batch()).unwrap();
772
773        let expected = expected_record_batch_migration_required();
774
775        assert_eq!(result, expected);
776    }
777
778    #[test]
779    fn schema_evolution_adds_date_column_with_nulls() {
780        // Reproduces TestSelect.readAndWriteWithBranchAfterSchemaChange from iceberg-spark.
781        // When reading old snapshots after adding a DATE column, the transformer must
782        // populate the new column with NULL values since old files lack this field.
783        let snapshot_schema = Arc::new(
784            Schema::builder()
785                .with_schema_id(1)
786                .with_fields(vec![
787                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
788                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
789                    NestedField::optional(3, "date_col", Type::Primitive(PrimitiveType::Date))
790                        .into(),
791                ])
792                .build()
793                .unwrap(),
794        );
795        let projected_iceberg_field_ids = [1, 2, 3];
796
797        let mut transformer =
798            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
799                .build();
800
801        let file_schema = Arc::new(ArrowSchema::new(vec![
802            simple_field("id", DataType::Int32, false, "1"),
803            simple_field("name", DataType::Utf8, true, "2"),
804        ]));
805
806        let file_batch = RecordBatch::try_new(file_schema, vec![
807            Arc::new(Int32Array::from(vec![1, 2, 3])),
808            Arc::new(StringArray::from(vec![
809                Some("Alice"),
810                Some("Bob"),
811                Some("Charlie"),
812            ])),
813        ])
814        .unwrap();
815
816        let result = transformer.process_record_batch(file_batch).unwrap();
817
818        assert_eq!(result.num_columns(), 3);
819        assert_eq!(result.num_rows(), 3);
820
821        let id_column = result
822            .column(0)
823            .as_any()
824            .downcast_ref::<Int32Array>()
825            .unwrap();
826        assert_eq!(id_column.values(), &[1, 2, 3]);
827
828        let name_column = result
829            .column(1)
830            .as_any()
831            .downcast_ref::<StringArray>()
832            .unwrap();
833        assert_eq!(name_column.value(0), "Alice");
834        assert_eq!(name_column.value(1), "Bob");
835        assert_eq!(name_column.value(2), "Charlie");
836
837        let date_column = result
838            .column(2)
839            .as_any()
840            .downcast_ref::<Date32Array>()
841            .unwrap();
842        assert!(date_column.is_null(0));
843        assert!(date_column.is_null(1));
844        assert!(date_column.is_null(2));
845    }
846
847    #[test]
848    fn schema_evolution_adds_struct_column_with_nulls() {
849        // Test that when a struct column is added after data files are written,
850        // the transformer can materialize the missing struct column with null values.
851        // This reproduces the scenario from Iceberg 1.10.0 TestSparkReaderDeletes tests
852        // where binaryData and structData columns were added to the schema.
853        let snapshot_schema = Arc::new(
854            Schema::builder()
855                .with_schema_id(1)
856                .with_fields(vec![
857                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
858                    NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
859                    NestedField::optional(
860                        3,
861                        "struct_col",
862                        Type::Struct(crate::spec::StructType::new(vec![
863                            NestedField::optional(
864                                100,
865                                "inner_field",
866                                Type::Primitive(PrimitiveType::String),
867                            )
868                            .into(),
869                        ])),
870                    )
871                    .into(),
872                ])
873                .build()
874                .unwrap(),
875        );
876        let projected_iceberg_field_ids = [1, 2, 3];
877
878        let mut transformer =
879            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
880                .build();
881
882        let file_schema = Arc::new(ArrowSchema::new(vec![
883            simple_field("id", DataType::Int32, false, "1"),
884            simple_field("data", DataType::Utf8, false, "2"),
885        ]));
886
887        let file_batch = RecordBatch::try_new(file_schema, vec![
888            Arc::new(Int32Array::from(vec![1, 2, 3])),
889            Arc::new(StringArray::from(vec!["a", "b", "c"])),
890        ])
891        .unwrap();
892
893        let result = transformer.process_record_batch(file_batch).unwrap();
894
895        assert_eq!(result.num_columns(), 3);
896        assert_eq!(result.num_rows(), 3);
897
898        let id_column = result
899            .column(0)
900            .as_any()
901            .downcast_ref::<Int32Array>()
902            .unwrap();
903        assert_eq!(id_column.values(), &[1, 2, 3]);
904
905        let data_column = result
906            .column(1)
907            .as_any()
908            .downcast_ref::<StringArray>()
909            .unwrap();
910        assert_eq!(data_column.value(0), "a");
911        assert_eq!(data_column.value(1), "b");
912        assert_eq!(data_column.value(2), "c");
913
914        let struct_column = result
915            .column(2)
916            .as_any()
917            .downcast_ref::<arrow_array::StructArray>()
918            .unwrap();
919        assert!(struct_column.is_null(0));
920        assert!(struct_column.is_null(1));
921        assert!(struct_column.is_null(2));
922    }
923
924    pub fn source_record_batch() -> RecordBatch {
925        RecordBatch::try_new(
926            arrow_schema_promotion_addition_and_renaming_required(),
927            vec![
928                Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b
929                Arc::new(Float32Array::from(vec![
930                    Some(12.125),
931                    Some(23.375),
932                    Some(34.875),
933                ])), // c
934                Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d
935                Arc::new(StringArray::from(vec![
936                    Some("Apache"),
937                    Some("Iceberg"),
938                    Some("Rocks"),
939                ])), // e
940            ],
941        )
942        .unwrap()
943    }
944
945    pub fn source_record_batch_no_migration_required() -> RecordBatch {
946        RecordBatch::try_new(
947            arrow_schema_no_promotion_addition_or_renaming_required(),
948            vec![
949                Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d
950                Arc::new(StringArray::from(vec![
951                    Some("Apache"),
952                    Some("Iceberg"),
953                    Some("Rocks"),
954                ])), // e
955            ],
956        )
957        .unwrap()
958    }
959
960    pub fn expected_record_batch_migration_required() -> RecordBatch {
961        RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![
962            Arc::new(StringArray::from(Vec::<Option<String>>::from([
963                None, None, None,
964            ]))), // a
965            Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b
966            Arc::new(Float64Array::from(vec![
967                Some(12.125),
968                Some(23.375),
969                Some(34.875),
970            ])), // c
971            Arc::new(StringArray::from(vec![
972                Some("Apache"),
973                Some("Iceberg"),
974                Some("Rocks"),
975            ])), // e (d skipped by projection)
976            Arc::new(StringArray::from(vec![
977                Some("(╯°□°)╯"),
978                Some("(╯°□°)╯"),
979                Some("(╯°□°)╯"),
980            ])), // f
981        ])
982        .unwrap()
983    }
984
985    pub fn iceberg_table_schema() -> Schema {
986        Schema::builder()
987            .with_schema_id(2)
988            .with_fields(vec![
989                NestedField::optional(10, "a", Type::Primitive(PrimitiveType::String)).into(),
990                NestedField::required(11, "b", Type::Primitive(PrimitiveType::Long)).into(),
991                NestedField::required(12, "c", Type::Primitive(PrimitiveType::Double)).into(),
992                NestedField::required(13, "d", Type::Primitive(PrimitiveType::Int)).into(),
993                NestedField::optional(14, "e", Type::Primitive(PrimitiveType::String)).into(),
994                NestedField::required(15, "f", Type::Primitive(PrimitiveType::String))
995                    .with_initial_default(Literal::string("(╯°□°)╯"))
996                    .into(),
997            ])
998            .build()
999            .unwrap()
1000    }
1001
1002    fn arrow_schema_already_same_as_target() -> Arc<ArrowSchema> {
1003        Arc::new(ArrowSchema::new(vec![
1004            simple_field("a", DataType::Utf8, true, "10"),
1005            simple_field("b", DataType::Int64, false, "11"),
1006            simple_field("c", DataType::Float64, false, "12"),
1007            simple_field("e", DataType::Utf8, true, "14"),
1008            simple_field("f", DataType::Utf8, false, "15"),
1009        ]))
1010    }
1011
1012    fn arrow_schema_promotion_addition_and_renaming_required() -> Arc<ArrowSchema> {
1013        Arc::new(ArrowSchema::new(vec![
1014            simple_field("b", DataType::Int32, false, "11"),
1015            simple_field("c", DataType::Float32, false, "12"),
1016            simple_field("d", DataType::Int32, false, "13"),
1017            simple_field("e_old", DataType::Utf8, true, "14"),
1018        ]))
1019    }
1020
1021    fn arrow_schema_no_promotion_addition_or_renaming_required() -> Arc<ArrowSchema> {
1022        Arc::new(ArrowSchema::new(vec![
1023            simple_field("d", DataType::Int32, false, "13"),
1024            simple_field("e", DataType::Utf8, true, "14"),
1025        ]))
1026    }
1027
1028    /// Create a simple arrow field with metadata.
1029    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1030        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1031            PARQUET_FIELD_ID_META_KEY.to_string(),
1032            value.to_string(),
1033        )]))
1034    }
1035
1036    /// Test for add_files with Parquet files that have NO field IDs (Hive tables).
1037    ///
1038    /// This reproduces the scenario from Iceberg spec where:
1039    /// - Hive-style partitioned Parquet files are imported via add_files procedure
1040    /// - Parquet files originally DO NOT have field IDs (typical for Hive tables)
1041    /// - ArrowReader applies name mapping to assign correct Iceberg field IDs
1042    /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3), subdept (4)
1043    /// - Partition columns (id, dept) have initial_default values
1044    ///
1045    /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection),
1046    /// this scenario requires `schema.name-mapping.default` from table metadata
1047    /// to correctly map Parquet columns by name to Iceberg field IDs.
1048    /// This mapping is now applied in ArrowReader before data is processed.
1049    ///
1050    /// Expected behavior:
1051    /// 1. id=1 (from initial_default) - spec rule #3
1052    /// 2. name="John Doe" (from Parquet with field_id=2 assigned by reader) - found by field ID
1053    /// 3. dept="hr" (from initial_default) - spec rule #3
1054    /// 4. subdept="communications" (from Parquet with field_id=4 assigned by reader) - found by field ID
1055    #[test]
1056    fn add_files_with_name_mapping_applied_in_reader() {
1057        // Iceberg schema after add_files: id (partition), name, dept (partition), subdept
1058        let snapshot_schema = Arc::new(
1059            Schema::builder()
1060                .with_schema_id(0)
1061                .with_fields(vec![
1062                    NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int))
1063                        .with_initial_default(Literal::int(1))
1064                        .into(),
1065                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1066                    NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String))
1067                        .with_initial_default(Literal::string("hr"))
1068                        .into(),
1069                    NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String))
1070                        .into(),
1071                ])
1072                .build()
1073                .unwrap(),
1074        );
1075
1076        // Simulate ArrowReader having applied name mapping:
1077        // Original Parquet: name, subdept (NO field IDs)
1078        // After reader.rs applies name mapping: name (field_id=2), subdept (field_id=4)
1079        //
1080        // Note: Partition columns (id, dept) are NOT in the Parquet file - they're in directory paths
1081        use std::collections::HashMap;
1082        let parquet_schema = Arc::new(ArrowSchema::new(vec![
1083            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
1084                "PARQUET:field_id".to_string(),
1085                "2".to_string(),
1086            )])),
1087            Field::new("subdept", DataType::Utf8, true).with_metadata(HashMap::from([(
1088                "PARQUET:field_id".to_string(),
1089                "4".to_string(),
1090            )])),
1091        ]));
1092
1093        let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
1094
1095        let mut transformer =
1096            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids).build();
1097
1098        // Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications"
1099        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1100            Arc::new(StringArray::from(vec!["John Doe"])),
1101            Arc::new(StringArray::from(vec!["communications"])),
1102        ])
1103        .unwrap();
1104
1105        let result = transformer.process_record_batch(parquet_batch).unwrap();
1106
1107        // Verify the transformed RecordBatch has:
1108        // - id=1 (from initial_default, not from Parquet)
1109        // - name="John Doe" (from Parquet with correct field_id=2)
1110        // - dept="hr" (from initial_default, not from Parquet)
1111        // - subdept="communications" (from Parquet with correct field_id=4)
1112        assert_eq!(result.num_columns(), 4);
1113        assert_eq!(result.num_rows(), 1);
1114
1115        let id_column = result
1116            .column(0)
1117            .as_any()
1118            .downcast_ref::<Int32Array>()
1119            .unwrap();
1120        assert_eq!(id_column.value(0), 1);
1121
1122        let name_column = result
1123            .column(1)
1124            .as_any()
1125            .downcast_ref::<StringArray>()
1126            .unwrap();
1127        assert_eq!(name_column.value(0), "John Doe");
1128
1129        let dept_column = result
1130            .column(2)
1131            .as_any()
1132            .downcast_ref::<StringArray>()
1133            .unwrap();
1134        assert_eq!(dept_column.value(0), "hr");
1135
1136        let subdept_column = result
1137            .column(3)
1138            .as_any()
1139            .downcast_ref::<StringArray>()
1140            .unwrap();
1141        assert_eq!(subdept_column.value(0), "communications");
1142    }
1143
1144    /// Test for bucket partitioning where source columns must be read from data files.
1145    ///
1146    /// This test verifies correct implementation of the Iceberg spec's "Column Projection" rules:
1147    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
1148    ///
1149    /// # Why this test is critical
1150    ///
1151    /// The key insight is that partition metadata stores TRANSFORMED values, not source values:
1152    /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the bucket number)
1153    /// - The actual `id` column values (100, 200, 300) are ONLY in the data file
1154    ///
1155    /// If iceberg-rust incorrectly treated bucket-partitioned fields as constants, it would:
1156    /// 1. Replace all `id` values with the constant `2` from partition metadata
1157    /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows)
1158    /// 3. Return incorrect query results
1159    ///
1160    /// # What this test verifies
1161    ///
1162    /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the data file
1163    /// - The source column `id` contains actual values (100, 200, 300), not constants
1164    /// - Java's `PartitionUtil.constantsMap()` behavior is correctly replicated:
1165    ///   ```java
1166    ///   if (field.transform().isIdentity()) {  // FALSE for bucket transforms
1167    ///       idToConstant.put(field.sourceId(), converted);
1168    ///   }
1169    ///   ```
1170    ///
1171    /// # Real-world impact
1172    ///
1173    /// This reproduces the failure scenario from Iceberg Java's TestRuntimeFiltering:
1174    /// - Tables partitioned by `bucket(N, col)` are common for load balancing
1175    /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col = value`
1176    /// - Runtime filtering pushes predicates down to Iceberg file scans
1177    /// - Without this fix, the filter would match against constant partition values instead of data
1178    ///
1179    /// # References
1180    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
1181    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
1182    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
1183    #[test]
1184    fn bucket_partitioning_reads_source_column_from_file() {
1185        use crate::spec::{Struct, Transform};
1186
1187        // Table schema: id (data column), name (data column), id_bucket (partition column)
1188        let snapshot_schema = Arc::new(
1189            Schema::builder()
1190                .with_schema_id(0)
1191                .with_fields(vec![
1192                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1193                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1194                ])
1195                .build()
1196                .unwrap(),
1197        );
1198
1199        // Partition spec: bucket(4, id) - the id field is bucketed
1200        let partition_spec = Arc::new(
1201            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1202                .with_spec_id(0)
1203                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
1204                .unwrap()
1205                .build()
1206                .unwrap(),
1207        );
1208
1209        // Partition data: bucket value is 2
1210        // In Iceberg, partition data is a Struct where each field corresponds to a partition field
1211        let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
1212
1213        // Parquet file contains both id and name columns
1214        let parquet_schema = Arc::new(ArrowSchema::new(vec![
1215            simple_field("id", DataType::Int32, false, "1"),
1216            simple_field("name", DataType::Utf8, true, "2"),
1217        ]));
1218
1219        let projected_field_ids = [1, 2]; // id, name
1220
1221        let mut transformer =
1222            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1223                .with_partition(partition_spec, partition_data)
1224                .expect("Failed to add partition constants")
1225                .build();
1226
1227        // Create a Parquet RecordBatch with actual data
1228        // The id column MUST be read from here, not treated as a constant
1229        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1230            Arc::new(Int32Array::from(vec![100, 200, 300])),
1231            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1232        ])
1233        .unwrap();
1234
1235        let result = transformer.process_record_batch(parquet_batch).unwrap();
1236
1237        // Verify the transformed RecordBatch correctly reads id from the file
1238        // (NOT as a constant from partition metadata)
1239        assert_eq!(result.num_columns(), 2);
1240        assert_eq!(result.num_rows(), 3);
1241
1242        let id_column = result
1243            .column(0)
1244            .as_any()
1245            .downcast_ref::<Int32Array>()
1246            .unwrap();
1247        // These values MUST come from the Parquet file, not be replaced by constants
1248        assert_eq!(id_column.value(0), 100);
1249        assert_eq!(id_column.value(1), 200);
1250        assert_eq!(id_column.value(2), 300);
1251
1252        let name_column = result
1253            .column(1)
1254            .as_any()
1255            .downcast_ref::<StringArray>()
1256            .unwrap();
1257        assert_eq!(name_column.value(0), "Alice");
1258        assert_eq!(name_column.value(1), "Bob");
1259        assert_eq!(name_column.value(2), "Charlie");
1260    }
1261
1262    /// Test that identity-transformed partition fields ARE treated as constants.
1263    ///
1264    /// This is the complement to `bucket_partitioning_reads_source_column_from_file`,
1265    /// verifying that constants_map() correctly identifies identity-transformed
1266    /// partition fields per the Iceberg spec.
1267    ///
1268    /// # Spec requirement (format/spec.md "Column Projection")
1269    ///
1270    /// > "Return the value from partition metadata if an Identity Transform exists for the field
1271    /// >  and the partition value is present in the `partition` struct on `data_file` object
1272    /// >  in the manifest. This allows for metadata only migrations of Hive tables."
1273    ///
1274    /// # Why identity transforms use constants
1275    ///
1276    /// Unlike bucket/truncate/year/etc., identity transforms don't modify the value:
1277    /// - `identity(dept)` stores the actual `dept` value in partition metadata
1278    /// - Partition metadata has `dept = "engineering"` (the real value, not a hash/bucket)
1279    /// - This value can be used directly without reading the data file
1280    ///
1281    /// # Performance benefit
1282    ///
1283    /// For Hive migrations where partition columns aren't in data files:
1284    /// - Partition metadata provides the column values
1285    /// - No need to read from data files (metadata-only query optimization)
1286    /// - Common pattern: `dept=engineering/subdept=backend/file.parquet`
1287    ///   - `dept` and `subdept` are in directory structure, not in `file.parquet`
1288    ///   - Iceberg populates these from partition metadata as constants
1289    ///
1290    /// # What this test verifies
1291    ///
1292    /// - Identity-partitioned fields use constants from partition metadata
1293    /// - The `dept` column is populated with `"engineering"` (not read from file)
1294    /// - Java's `PartitionUtil.constantsMap()` behavior is matched:
1295    ///   ```java
1296    ///   if (field.transform().isIdentity()) {  // TRUE for identity
1297    ///       idToConstant.put(field.sourceId(), converted);
1298    ///   }
1299    ///   ```
1300    ///
1301    /// # References
1302    /// - Iceberg spec: format/spec.md "Column Projection"
1303    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
1304    #[test]
1305    fn identity_partition_uses_constant_from_metadata() {
1306        use crate::spec::{Struct, Transform};
1307
1308        // Table schema: id (data column), dept (partition column), name (data column)
1309        let snapshot_schema = Arc::new(
1310            Schema::builder()
1311                .with_schema_id(0)
1312                .with_fields(vec![
1313                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1314                    NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(),
1315                    NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(),
1316                ])
1317                .build()
1318                .unwrap(),
1319        );
1320
1321        // Partition spec: identity(dept) - the dept field uses identity transform
1322        let partition_spec = Arc::new(
1323            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1324                .with_spec_id(0)
1325                .add_partition_field("dept", "dept", Transform::Identity)
1326                .unwrap()
1327                .build()
1328                .unwrap(),
1329        );
1330
1331        // Partition data: dept="engineering"
1332        let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]);
1333
1334        // Parquet file contains only id and name (dept is in partition path)
1335        let parquet_schema = Arc::new(ArrowSchema::new(vec![
1336            simple_field("id", DataType::Int32, false, "1"),
1337            simple_field("name", DataType::Utf8, true, "3"),
1338        ]));
1339
1340        let projected_field_ids = [1, 2, 3]; // id, dept, name
1341
1342        let mut transformer =
1343            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1344                .with_partition(partition_spec, partition_data)
1345                .expect("Failed to add partition constants")
1346                .build();
1347
1348        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1349            Arc::new(Int32Array::from(vec![100, 200])),
1350            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
1351        ])
1352        .unwrap();
1353
1354        let result = transformer.process_record_batch(parquet_batch).unwrap();
1355
1356        // Verify the dept column is populated with the constant from partition metadata
1357        assert_eq!(result.num_columns(), 3);
1358        assert_eq!(result.num_rows(), 2);
1359
1360        // Use helpers to handle both simple and REE arrays
1361        assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
1362        assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
1363
1364        // dept column comes from partition metadata (constant) - will be REE
1365        assert_eq!(
1366            get_string_value(result.column(1).as_ref(), 0),
1367            "engineering"
1368        );
1369        assert_eq!(
1370            get_string_value(result.column(1).as_ref(), 1),
1371            "engineering"
1372        );
1373
1374        // name column comes from file
1375        assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice");
1376        assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob");
1377    }
1378
1379    /// Test bucket partitioning with renamed source column.
1380    ///
1381    /// This verifies correct behavior for TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java.
1382    /// When a source column is renamed after partitioning is established, field-ID-based mapping
1383    /// must still correctly identify the column in Parquet files.
1384    ///
1385    /// # Scenario
1386    ///
1387    /// 1. Table created with `bucket(4, id)` partitioning
1388    /// 2. Data written to Parquet files (field_id=1, name="id")
1389    /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id`
1390    /// 4. Iceberg schema now has: field_id=1, name="row_id"
1391    /// 5. Parquet files still have: field_id=1, name="id"
1392    ///
1393    /// # Expected Behavior Per Iceberg Spec
1394    ///
1395    /// Per the Iceberg spec "Column Projection" section and Java's PartitionUtil.constantsMap():
1396    /// - Bucket transforms are NON-identity, so partition metadata stores bucket numbers (0-3), not source values
1397    /// - Source columns for non-identity transforms MUST be read from data files
1398    /// - Field-ID-based mapping should find the column by field_id=1 (ignoring name mismatch)
1399    /// - Runtime filtering on `row_id` should work correctly
1400    ///
1401    /// # What This Tests
1402    ///
1403    /// This test ensures that when FileScanTask provides partition_spec and partition_data:
1404    /// - constants_map() correctly identifies that bucket(4, row_id) is NOT an identity transform
1405    /// - The source column (field_id=1) is NOT added to constants_map
1406    /// - Field-ID-based mapping reads actual values from the Parquet file
1407    /// - Values [100, 200, 300] are read, not replaced with bucket constant 2
1408    ///
1409    /// # References
1410    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable
1411    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap()
1412    /// - Iceberg spec: format/spec.md "Column Projection" section
1413    #[test]
1414    fn test_bucket_partitioning_with_renamed_source_column() {
1415        use crate::spec::{Struct, Transform};
1416
1417        // Iceberg schema after rename: row_id (was id), name
1418        let snapshot_schema = Arc::new(
1419            Schema::builder()
1420                .with_schema_id(0)
1421                .with_fields(vec![
1422                    NestedField::required(1, "row_id", Type::Primitive(PrimitiveType::Int)).into(),
1423                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1424                ])
1425                .build()
1426                .unwrap(),
1427        );
1428
1429        // Partition spec: bucket(4, row_id) - but source_id still points to field_id=1
1430        let partition_spec = Arc::new(
1431            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1432                .with_spec_id(0)
1433                .add_partition_field("row_id", "row_id_bucket", Transform::Bucket(4))
1434                .unwrap()
1435                .build()
1436                .unwrap(),
1437        );
1438
1439        // Partition data: bucket value is 2
1440        let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
1441
1442        // Parquet file has OLD column name "id" but SAME field_id=1
1443        // Field-ID-based mapping should find this despite name mismatch
1444        let parquet_schema = Arc::new(ArrowSchema::new(vec![
1445            simple_field("id", DataType::Int32, false, "1"),
1446            simple_field("name", DataType::Utf8, true, "2"),
1447        ]));
1448
1449        let projected_field_ids = [1, 2]; // row_id (field_id=1), name (field_id=2)
1450
1451        let mut transformer =
1452            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1453                .with_partition(partition_spec, partition_data)
1454                .expect("Failed to add partition constants")
1455                .build();
1456
1457        // Create a Parquet RecordBatch with actual data
1458        // Despite column rename, data should be read via field_id=1
1459        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1460            Arc::new(Int32Array::from(vec![100, 200, 300])),
1461            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1462        ])
1463        .unwrap();
1464
1465        let result = transformer.process_record_batch(parquet_batch).unwrap();
1466
1467        // Verify the transformed RecordBatch correctly reads data despite name mismatch
1468        assert_eq!(result.num_columns(), 2);
1469        assert_eq!(result.num_rows(), 3);
1470
1471        let row_id_column = result
1472            .column(0)
1473            .as_any()
1474            .downcast_ref::<Int32Array>()
1475            .unwrap();
1476        // These values MUST come from the Parquet file via field_id=1,
1477        // not be replaced by the bucket constant (2)
1478        assert_eq!(row_id_column.value(0), 100);
1479        assert_eq!(row_id_column.value(1), 200);
1480        assert_eq!(row_id_column.value(2), 300);
1481
1482        let name_column = result
1483            .column(1)
1484            .as_any()
1485            .downcast_ref::<StringArray>()
1486            .unwrap();
1487        assert_eq!(name_column.value(0), "Alice");
1488        assert_eq!(name_column.value(1), "Bob");
1489        assert_eq!(name_column.value(2), "Charlie");
1490    }
1491
1492    /// Comprehensive integration test that verifies all 4 Iceberg spec rules work correctly.
1493    ///
1494    /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection),
1495    /// "Values for field ids which are not present in a data file must be resolved
1496    /// according the following rules:"
1497    ///
1498    /// This test creates a scenario where each rule is exercised:
1499    /// - Rule #1: dept (identity-partitioned) -> constant from partition metadata
1500    /// - Rule #2: data (via name mapping) -> read from Parquet file by name
1501    /// - Rule #3: category (initial_default) -> use default value
1502    /// - Rule #4: notes (no default) -> return null
1503    ///
1504    /// # References
1505    /// - Iceberg spec: format/spec.md "Column Projection" section
1506    #[test]
1507    fn test_all_four_spec_rules() {
1508        use crate::spec::Transform;
1509
1510        // Iceberg schema with columns designed to exercise each spec rule
1511        let snapshot_schema = Arc::new(
1512            Schema::builder()
1513                .with_schema_id(0)
1514                .with_fields(vec![
1515                    // Field in Parquet by field ID (normal case)
1516                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1517                    // Rule #1: Identity-partitioned field - should use partition metadata
1518                    NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(),
1519                    // Rule #2: Field resolved by name mapping (ArrowReader already applied)
1520                    NestedField::required(3, "data", Type::Primitive(PrimitiveType::String)).into(),
1521                    // Rule #3: Field with initial_default
1522                    NestedField::optional(4, "category", Type::Primitive(PrimitiveType::String))
1523                        .with_initial_default(Literal::string("default_category"))
1524                        .into(),
1525                    // Rule #4: Field with no default - should be null
1526                    NestedField::optional(5, "notes", Type::Primitive(PrimitiveType::String))
1527                        .into(),
1528                ])
1529                .build()
1530                .unwrap(),
1531        );
1532
1533        // Partition spec: identity transform on dept
1534        let partition_spec = Arc::new(
1535            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1536                .with_spec_id(0)
1537                .add_partition_field("dept", "dept", Transform::Identity)
1538                .unwrap()
1539                .build()
1540                .unwrap(),
1541        );
1542
1543        // Partition data: dept="engineering"
1544        let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]);
1545
1546        // Parquet schema: simulates post-ArrowReader state where name mapping already applied
1547        // Has id (field_id=1) and data (field_id=3, assigned by ArrowReader via name mapping)
1548        // Missing: dept (in partition), category (has default), notes (no default)
1549        let parquet_schema = Arc::new(ArrowSchema::new(vec![
1550            simple_field("id", DataType::Int32, false, "1"),
1551            simple_field("data", DataType::Utf8, false, "3"),
1552        ]));
1553
1554        let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, category, notes
1555
1556        let mut transformer =
1557            RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1558                .with_partition(partition_spec, partition_data)
1559                .expect("Failed to add partition constants")
1560                .build();
1561
1562        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1563            Arc::new(Int32Array::from(vec![100, 200])),
1564            Arc::new(StringArray::from(vec!["value1", "value2"])),
1565        ])
1566        .unwrap();
1567
1568        let result = transformer.process_record_batch(parquet_batch).unwrap();
1569
1570        assert_eq!(result.num_columns(), 5);
1571        assert_eq!(result.num_rows(), 2);
1572
1573        // Verify each column demonstrates the correct spec rule:
1574
1575        // Normal case: id from Parquet by field ID
1576        // Use helpers to handle both simple and REE arrays
1577        assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
1578        assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
1579
1580        // Rule #1: dept from partition metadata (identity transform) - will be REE
1581        assert_eq!(
1582            get_string_value(result.column(1).as_ref(), 0),
1583            "engineering"
1584        );
1585        assert_eq!(
1586            get_string_value(result.column(1).as_ref(), 1),
1587            "engineering"
1588        );
1589
1590        // Rule #2: data from Parquet via name mapping - will be regular array
1591        assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1");
1592        assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2");
1593
1594        // Rule #3: category from initial_default - will be REE
1595        assert_eq!(
1596            get_string_value(result.column(3).as_ref(), 0),
1597            "default_category"
1598        );
1599        assert_eq!(
1600            get_string_value(result.column(3).as_ref(), 1),
1601            "default_category"
1602        );
1603
1604        // Rule #4: notes is null (no default, not in Parquet, not in partition) - will be REE with null
1605        // For null REE arrays, we still use the helper which handles extraction
1606        assert_eq!(get_string_value(result.column(4).as_ref(), 0), "");
1607        assert_eq!(get_string_value(result.column(4).as_ref(), 1), "");
1608    }
1609
1610    /// Test handling of null values in identity-partitioned columns.
1611    ///
1612    /// Reproduces TestPartitionValues.testNullPartitionValue() from iceberg-java, which
1613    /// writes records where the partition column has null values. Before the fix in #1922,
1614    /// this would error with "Partition field X has null value for identity transform".
1615    #[test]
1616    fn null_identity_partition_value() {
1617        use crate::spec::{Struct, Transform};
1618
1619        let schema = Arc::new(
1620            Schema::builder()
1621                .with_schema_id(0)
1622                .with_fields(vec![
1623                    NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1624                    NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String)).into(),
1625                ])
1626                .build()
1627                .unwrap(),
1628        );
1629
1630        let partition_spec = Arc::new(
1631            crate::spec::PartitionSpec::builder(schema.clone())
1632                .with_spec_id(0)
1633                .add_partition_field("data", "data", Transform::Identity)
1634                .unwrap()
1635                .build()
1636                .unwrap(),
1637        );
1638
1639        // Partition has null value for the data column
1640        let partition_data = Struct::from_iter(vec![None]);
1641
1642        let file_schema = Arc::new(ArrowSchema::new(vec![simple_field(
1643            "id",
1644            DataType::Int32,
1645            true,
1646            "1",
1647        )]));
1648
1649        let projected_field_ids = [1, 2];
1650
1651        let mut transformer = RecordBatchTransformerBuilder::new(schema, &projected_field_ids)
1652            .with_partition(partition_spec, partition_data)
1653            .expect("Should handle null partition values")
1654            .build();
1655
1656        let file_batch =
1657            RecordBatch::try_new(file_schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
1658                .unwrap();
1659
1660        let result = transformer.process_record_batch(file_batch).unwrap();
1661
1662        assert_eq!(result.num_columns(), 2);
1663        assert_eq!(result.num_rows(), 3);
1664
1665        let id_col = result
1666            .column(0)
1667            .as_any()
1668            .downcast_ref::<Int32Array>()
1669            .unwrap();
1670        assert_eq!(id_col.values(), &[1, 2, 3]);
1671
1672        // Partition column with null value should produce nulls
1673        let data_col = result.column(1);
1674        assert!(data_col.is_null(0));
1675        assert!(data_col.is_null(1));
1676        assert!(data_col.is_null(2));
1677    }
1678}