iceberg/arrow/record_batch_transformer.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::{
22 Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, RecordBatchOptions, RunArray,
23};
24use arrow_cast::cast;
25use arrow_schema::{
26 DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
27};
28use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
29
30use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element};
31use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
32use crate::metadata_columns::get_metadata_field;
33use crate::spec::{
34 Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform,
35};
36use crate::{Error, ErrorKind, Result};
37
38/// Build a map of field ID to constant value (as Datum) for identity-partitioned fields.
39///
40/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants
41/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.)
42/// store derived values in partition metadata, so source columns must be read from data files.
43///
44/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` (bucket number),
45/// but the actual `id` values (100, 200, 300) are only in the data file.
46///
47/// Matches Java's `PartitionUtil.constantsMap()` which filters `if (field.transform().isIdentity())`.
48///
49/// # References
50/// - Spec: https://iceberg.apache.org/spec/#column-projection
51/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
52fn constants_map(
53 partition_spec: &PartitionSpec,
54 partition_data: &Struct,
55 schema: &IcebergSchema,
56) -> Result<HashMap<i32, Datum>> {
57 let mut constants = HashMap::new();
58
59 for (pos, field) in partition_spec.fields().iter().enumerate() {
60 // Only identity transforms should use constant values from partition metadata
61 if matches!(field.transform, Transform::Identity) {
62 // Get the field from schema to extract its type
63 let iceberg_field = schema.field_by_id(field.source_id).ok_or(Error::new(
64 ErrorKind::Unexpected,
65 format!("Field {} not found in schema", field.source_id),
66 ))?;
67
68 // Ensure the field type is primitive
69 let prim_type = match &*iceberg_field.field_type {
70 crate::spec::Type::Primitive(prim_type) => prim_type,
71 _ => {
72 return Err(Error::new(
73 ErrorKind::Unexpected,
74 format!(
75 "Partition field {} has non-primitive type {:?}",
76 field.source_id, iceberg_field.field_type
77 ),
78 ));
79 }
80 };
81
82 // Get the partition value for this field
83 // Handle both None (null) and Some(Literal::Primitive) cases
84 match &partition_data[pos] {
85 None => {
86 // Skip null partition values - they will be resolved as null per Iceberg spec rule #4.
87 // When a partition value is null, we don't add it to the constants map,
88 // allowing downstream column resolution to handle it correctly.
89 continue;
90 }
91 Some(Literal::Primitive(value)) => {
92 // Create a Datum from the primitive type and value
93 let datum = Datum::new(prim_type.clone(), value.clone());
94 constants.insert(field.source_id, datum);
95 }
96 Some(literal) => {
97 return Err(Error::new(
98 ErrorKind::Unexpected,
99 format!(
100 "Partition field {} has non-primitive value: {:?}",
101 field.source_id, literal
102 ),
103 ));
104 }
105 }
106 }
107 }
108
109 Ok(constants)
110}
111
112/// Indicates how a particular column in a processed RecordBatch should
113/// be sourced.
114#[derive(Debug)]
115pub(crate) enum ColumnSource {
116 // signifies that a column should be passed through unmodified
117 // from the file's RecordBatch
118 PassThrough {
119 source_index: usize,
120 },
121
122 // signifies that a column from the file's RecordBatch has undergone
123 // type promotion so the source column with the given index needs
124 // to be promoted to the specified type
125 Promote {
126 target_type: DataType,
127 source_index: usize,
128 },
129
130 // Signifies that a new column has been inserted before the column
131 // with index `index`. (we choose "before" rather than "after" so
132 // that we can use usize; if we insert after, then we need to
133 // be able to store -1 here to signify that a new
134 // column is to be added at the front of the column list).
135 // If multiple columns need to be inserted at a given
136 // location, they should all be given the same index, as the index
137 // here refers to the original RecordBatch, not the interim state after
138 // a preceding operation.
139 Add {
140 target_type: DataType,
141 value: Option<PrimitiveLiteral>,
142 },
143 // The iceberg spec refers to other permissible schema evolution actions
144 // (see https://iceberg.apache.org/spec/#schema-evolution):
145 // renaming fields, deleting fields and reordering fields.
146 // Renames only affect the schema of the RecordBatch rather than the
147 // columns themselves, so a single updated cached schema can
148 // be re-used and no per-column actions are required.
149 // Deletion and Reorder can be achieved without needing this
150 // post-processing step by using the projection mask.
151}
152
153#[derive(Debug)]
154enum BatchTransform {
155 // Indicates that no changes need to be performed to the RecordBatches
156 // coming in from the stream and that they can be passed through
157 // unmodified
158 PassThrough,
159
160 Modify {
161 // Every transformed RecordBatch will have the same schema. We create the
162 // target just once and cache it here. Helpfully, Arc<Schema> is needed in
163 // the constructor for RecordBatch, so we don't need an expensive copy
164 // each time we build a new RecordBatch
165 target_schema: Arc<ArrowSchema>,
166
167 // Indicates how each column in the target schema is derived.
168 operations: Vec<ColumnSource>,
169 },
170
171 // Sometimes only the schema will need modifying, for example when
172 // the column names have changed vs the file, but not the column types.
173 // we can avoid a heap allocation per RecordBach in this case by retaining
174 // the existing column Vec.
175 ModifySchema {
176 target_schema: Arc<ArrowSchema>,
177 },
178}
179
180#[derive(Debug)]
181enum SchemaComparison {
182 Equivalent,
183 NameChangesOnly,
184 Different,
185}
186
187/// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters.
188///
189/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and
190/// identity-partitioned fields to avoid duplicate work during batch processing.
191#[derive(Debug)]
192pub(crate) struct RecordBatchTransformerBuilder {
193 snapshot_schema: Arc<IcebergSchema>,
194 projected_iceberg_field_ids: Vec<i32>,
195 constant_fields: HashMap<i32, Datum>,
196}
197
198impl RecordBatchTransformerBuilder {
199 pub(crate) fn new(
200 snapshot_schema: Arc<IcebergSchema>,
201 projected_iceberg_field_ids: &[i32],
202 ) -> Self {
203 Self {
204 snapshot_schema,
205 projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
206 constant_fields: HashMap::new(),
207 }
208 }
209
210 /// Add a constant value for a specific field ID.
211 /// This is used for virtual/metadata fields like _file that have constant values per batch.
212 ///
213 /// # Arguments
214 /// * `field_id` - The field ID to associate with the constant
215 /// * `datum` - The constant value (with type) for this field
216 pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self {
217 self.constant_fields.insert(field_id, datum);
218 self
219 }
220
221 /// Set partition spec and data together for identifying identity-transformed partition columns.
222 ///
223 /// Both partition_spec and partition_data must be provided together since the spec defines
224 /// which fields are identity-partitioned, and the data provides their constant values.
225 /// This method computes the partition constants and merges them into constant_fields.
226 pub(crate) fn with_partition(
227 mut self,
228 partition_spec: Arc<PartitionSpec>,
229 partition_data: Struct,
230 ) -> Result<Self> {
231 // Compute partition constants for identity-transformed fields (already returns Datum)
232 let partition_constants =
233 constants_map(&partition_spec, &partition_data, &self.snapshot_schema)?;
234
235 // Add partition constants to constant_fields
236 for (field_id, datum) in partition_constants {
237 self.constant_fields.insert(field_id, datum);
238 }
239
240 Ok(self)
241 }
242
243 pub(crate) fn build(self) -> RecordBatchTransformer {
244 RecordBatchTransformer {
245 snapshot_schema: self.snapshot_schema,
246 projected_iceberg_field_ids: self.projected_iceberg_field_ids,
247 constant_fields: self.constant_fields,
248 batch_transform: None,
249 }
250 }
251}
252
253/// Transforms RecordBatches from Parquet files to match the Iceberg table schema.
254///
255/// Handles schema evolution, column reordering, type promotion, and implements the Iceberg spec's
256/// "Column Projection" rules for resolving field IDs "not present" in data files:
257/// 1. Return the value from partition metadata if an Identity Transform exists
258/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id (applied in ArrowReader)
259/// 3. Return the default value if it has a defined initial-default
260/// 4. Return null in all other cases
261///
262/// # Field ID Resolution
263///
264/// Field ID resolution happens in ArrowReader before data is read (matching Java's ReadConf):
265/// - If file has embedded field IDs: trust them (ParquetSchemaUtil.hasIds() = true)
266/// - If file lacks IDs and name_mapping exists: apply name mapping (ParquetSchemaUtil.applyNameMapping())
267/// - If file lacks IDs and no name_mapping: use position-based fallback (ParquetSchemaUtil.addFallbackIds())
268///
269/// By the time RecordBatchTransformer processes data, all field IDs are trustworthy.
270/// This transformer only handles remaining projection rules (#1, #3, #4) for fields still "not present".
271///
272/// # Partition Spec and Data
273///
274/// **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants)
275/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on
276/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the bucket number in
277/// partition metadata, so actual `id` values must be read from the data file.
278///
279/// # References
280/// - Spec: https://iceberg.apache.org/spec/#column-projection
281/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java (field ID resolution)
282/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java (partition constants)
283#[derive(Debug)]
284pub(crate) struct RecordBatchTransformer {
285 snapshot_schema: Arc<IcebergSchema>,
286 projected_iceberg_field_ids: Vec<i32>,
287 // Pre-computed constant field information: field_id -> Datum
288 // Includes both virtual/metadata fields (like _file) and identity-partitioned fields
289 // Datum holds both the Iceberg type and the value
290 constant_fields: HashMap<i32, Datum>,
291
292 // BatchTransform gets lazily constructed based on the schema of
293 // the first RecordBatch we receive from the file
294 batch_transform: Option<BatchTransform>,
295}
296
297impl RecordBatchTransformer {
298 pub(crate) fn process_record_batch(
299 &mut self,
300 record_batch: RecordBatch,
301 ) -> Result<RecordBatch> {
302 Ok(match &self.batch_transform {
303 Some(BatchTransform::PassThrough) => record_batch,
304 Some(BatchTransform::Modify {
305 target_schema,
306 operations,
307 }) => {
308 let options = RecordBatchOptions::default()
309 .with_match_field_names(false)
310 .with_row_count(Some(record_batch.num_rows()));
311 RecordBatch::try_new_with_options(
312 Arc::clone(target_schema),
313 self.transform_columns(record_batch.columns(), operations)?,
314 &options,
315 )?
316 }
317 Some(BatchTransform::ModifySchema { target_schema }) => {
318 let options = RecordBatchOptions::default()
319 .with_match_field_names(false)
320 .with_row_count(Some(record_batch.num_rows()));
321 RecordBatch::try_new_with_options(
322 Arc::clone(target_schema),
323 record_batch.columns().to_vec(),
324 &options,
325 )?
326 }
327 None => {
328 self.batch_transform = Some(Self::generate_batch_transform(
329 record_batch.schema_ref(),
330 self.snapshot_schema.as_ref(),
331 &self.projected_iceberg_field_ids,
332 &self.constant_fields,
333 )?);
334
335 self.process_record_batch(record_batch)?
336 }
337 })
338 }
339
340 // Compare the schema of the incoming RecordBatches to the schema of
341 // the Iceberg snapshot to determine what, if any, transformation
342 // needs to be applied. If the schemas match, we return BatchTransform::PassThrough
343 // to indicate that no changes need to be made. Otherwise, we return a
344 // BatchTransform::Modify containing the target RecordBatch schema and
345 // the list of `ColumnSource`s that indicate how to source each column in
346 // the resulting RecordBatches.
347 fn generate_batch_transform(
348 source_schema: &ArrowSchemaRef,
349 snapshot_schema: &IcebergSchema,
350 projected_iceberg_field_ids: &[i32],
351 constant_fields: &HashMap<i32, Datum>,
352 ) -> Result<BatchTransform> {
353 let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
354 let field_id_to_mapped_schema_map =
355 Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?;
356
357 // Create a new arrow schema by selecting fields from mapped_unprojected,
358 // in the order of the field ids in projected_iceberg_field_ids
359 let fields: Result<Vec<_>> = projected_iceberg_field_ids
360 .iter()
361 .map(|field_id| {
362 // Check if this is a constant field
363 if constant_fields.contains_key(field_id) {
364 // For metadata/virtual fields (like _file), get name from metadata_columns
365 // For partition fields, get name from schema (they exist in schema)
366 if let Ok(iceberg_field) = get_metadata_field(*field_id) {
367 // This is a metadata/virtual field - convert Iceberg field to Arrow
368 let datum = constant_fields.get(field_id).ok_or(Error::new(
369 ErrorKind::Unexpected,
370 "constant field not found",
371 ))?;
372 let arrow_type = datum_to_arrow_type_with_ree(datum);
373 let arrow_field =
374 Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required)
375 .with_metadata(HashMap::from([(
376 PARQUET_FIELD_ID_META_KEY.to_string(),
377 iceberg_field.id.to_string(),
378 )]));
379 Ok(Arc::new(arrow_field))
380 } else {
381 // This is a partition constant field (exists in schema but uses constant value)
382 let field = &field_id_to_mapped_schema_map
383 .get(field_id)
384 .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
385 .0;
386 let datum = constant_fields.get(field_id).ok_or(Error::new(
387 ErrorKind::Unexpected,
388 "constant field not found",
389 ))?;
390 let arrow_type = datum_to_arrow_type_with_ree(datum);
391 // Use the type from constant_fields (REE for constants)
392 let constant_field =
393 Field::new(field.name(), arrow_type, field.is_nullable())
394 .with_metadata(field.metadata().clone());
395 Ok(Arc::new(constant_field))
396 }
397 } else {
398 // Regular field - use schema as-is
399 Ok(field_id_to_mapped_schema_map
400 .get(field_id)
401 .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
402 .0
403 .clone())
404 }
405 })
406 .collect();
407
408 let target_schema = Arc::new(ArrowSchema::new(fields?));
409
410 match Self::compare_schemas(source_schema, &target_schema) {
411 SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
412 SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }),
413 SchemaComparison::Different => Ok(BatchTransform::Modify {
414 operations: Self::generate_transform_operations(
415 source_schema,
416 snapshot_schema,
417 projected_iceberg_field_ids,
418 field_id_to_mapped_schema_map,
419 constant_fields,
420 )?,
421 target_schema,
422 }),
423 }
424 }
425
426 /// Compares the source and target schemas
427 /// Determines if they have changed in any meaningful way:
428 /// * If they have different numbers of fields, then we need to modify
429 /// the incoming RecordBatch schema AND columns
430 /// * If they have the same number of fields, but some of them differ in
431 /// either data type or nullability, then we need to modify the
432 /// incoming RecordBatch schema AND columns
433 /// * If the schemas differ only in the column names, then we need
434 /// to modify the RecordBatch schema BUT we can keep the
435 /// original column data unmodified
436 /// * If the schemas are identical (or differ only in inconsequential
437 /// ways) then we can pass through the original RecordBatch unmodified
438 fn compare_schemas(
439 source_schema: &ArrowSchemaRef,
440 target_schema: &ArrowSchemaRef,
441 ) -> SchemaComparison {
442 if source_schema.fields().len() != target_schema.fields().len() {
443 return SchemaComparison::Different;
444 }
445
446 let mut names_changed = false;
447
448 for (source_field, target_field) in source_schema
449 .fields()
450 .iter()
451 .zip(target_schema.fields().iter())
452 {
453 if source_field.data_type() != target_field.data_type()
454 || source_field.is_nullable() != target_field.is_nullable()
455 {
456 return SchemaComparison::Different;
457 }
458
459 if source_field.name() != target_field.name() {
460 names_changed = true;
461 }
462 }
463
464 if names_changed {
465 SchemaComparison::NameChangesOnly
466 } else {
467 SchemaComparison::Equivalent
468 }
469 }
470
471 fn generate_transform_operations(
472 source_schema: &ArrowSchemaRef,
473 snapshot_schema: &IcebergSchema,
474 projected_iceberg_field_ids: &[i32],
475 field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
476 constant_fields: &HashMap<i32, Datum>,
477 ) -> Result<Vec<ColumnSource>> {
478 let field_id_to_source_schema_map =
479 Self::build_field_id_to_arrow_schema_map(source_schema)?;
480
481 projected_iceberg_field_ids
482 .iter()
483 .map(|field_id| {
484 // Check if this is a constant field (metadata/virtual or identity-partitioned)
485 // Constant fields always use their pre-computed constant values, regardless of whether
486 // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata
487 // is authoritative and should be preferred over file data.
488 if let Some(datum) = constant_fields.get(field_id) {
489 let arrow_type = datum_to_arrow_type_with_ree(datum);
490 return Ok(ColumnSource::Add {
491 value: Some(datum.literal().clone()),
492 target_type: arrow_type,
493 });
494 }
495
496 let (target_field, _) =
497 field_id_to_mapped_schema_map
498 .get(field_id)
499 .ok_or(Error::new(
500 ErrorKind::Unexpected,
501 "could not find field in schema",
502 ))?;
503 let target_type = target_field.data_type();
504
505 let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
506 ErrorKind::Unexpected,
507 "Field not found in snapshot schema",
508 ))?;
509
510 // Iceberg spec's "Column Projection" rules (https://iceberg.apache.org/spec/#column-projection).
511 // For fields "not present" in data files:
512 // 1. Use partition metadata (identity transforms only)
513 // 2. Use name mapping
514 // 3. Use initial_default
515 // 4. Return null
516 //
517 // Why check partition constants before Parquet field IDs (Java: BaseParquetReaders.java:299):
518 // In add_files scenarios, partition columns may exist in BOTH Parquet AND partition metadata.
519 // Partition metadata is authoritative - it defines which partition this file belongs to.
520
521 // Field ID resolution now happens in ArrowReader via:
522 // 1. Embedded field IDs (ParquetSchemaUtil.hasIds() = true) - trust them
523 // 2. Name mapping (ParquetSchemaUtil.applyNameMapping()) - applied upfront
524 // 3. Position-based fallback (ParquetSchemaUtil.addFallbackIds()) - applied upfront
525 //
526 // At this point, all field IDs in the source schema are trustworthy.
527 // No conflict detection needed - schema resolution happened in reader.rs.
528 let field_by_id = field_id_to_source_schema_map.get(field_id).map(
529 |(source_field, source_index)| {
530 if source_field.data_type().equals_datatype(target_type) {
531 ColumnSource::PassThrough {
532 source_index: *source_index,
533 }
534 } else {
535 ColumnSource::Promote {
536 target_type: target_type.clone(),
537 source_index: *source_index,
538 }
539 }
540 },
541 );
542
543 // Apply spec's fallback steps for "not present" fields.
544 // Rule #1 (constants) is handled at the beginning of this function
545 let column_source = if let Some(source) = field_by_id {
546 source
547 } else {
548 // Rules #2, #3 and #4:
549 // Rule #2 (name mapping) was already applied in reader.rs if needed.
550 // If field_id is still not found, the column doesn't exist in the Parquet file.
551 // Fall through to rule #3 (initial_default) or rule #4 (null).
552 let default_value = iceberg_field.initial_default.as_ref().and_then(|lit| {
553 if let Literal::Primitive(prim) = lit {
554 Some(prim.clone())
555 } else {
556 None
557 }
558 });
559
560 ColumnSource::Add {
561 value: default_value,
562 target_type: target_type.clone(),
563 }
564 };
565
566 Ok(column_source)
567 })
568 .collect()
569 }
570
571 fn build_field_id_to_arrow_schema_map(
572 source_schema: &SchemaRef,
573 ) -> Result<HashMap<i32, (FieldRef, usize)>> {
574 let mut field_id_to_source_schema = HashMap::new();
575 for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() {
576 // Check if field has a field ID in metadata
577 if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
578 let this_field_id = field_id_str.parse().map_err(|e| {
579 Error::new(
580 ErrorKind::DataInvalid,
581 format!("field id not parseable as an i32: {e}"),
582 )
583 })?;
584
585 field_id_to_source_schema
586 .insert(this_field_id, (source_field.clone(), source_field_idx));
587 }
588 // If field doesn't have a field ID, skip it - name mapping will handle it
589 }
590
591 Ok(field_id_to_source_schema)
592 }
593
594 fn transform_columns(
595 &self,
596 columns: &[Arc<dyn ArrowArray>],
597 operations: &[ColumnSource],
598 ) -> Result<Vec<Arc<dyn ArrowArray>>> {
599 if columns.is_empty() {
600 return Ok(columns.to_vec());
601 }
602 let num_rows = columns[0].len();
603
604 operations
605 .iter()
606 .map(|op| {
607 Ok(match op {
608 ColumnSource::PassThrough { source_index } => columns[*source_index].clone(),
609
610 ColumnSource::Promote {
611 target_type,
612 source_index,
613 } => cast(&*columns[*source_index], target_type)?,
614
615 ColumnSource::Add { target_type, value } => {
616 Self::create_column(target_type, value, num_rows)?
617 }
618 })
619 })
620 .collect()
621 }
622
623 fn create_column(
624 target_type: &DataType,
625 prim_lit: &Option<PrimitiveLiteral>,
626 num_rows: usize,
627 ) -> Result<ArrayRef> {
628 // Check if this is a RunEndEncoded type (for constant fields)
629 if let DataType::RunEndEncoded(_, values_field) = target_type {
630 // Helper to create a Run-End Encoded array
631 let create_ree_array = |values_array: ArrayRef| -> Result<ArrayRef> {
632 let run_ends = if num_rows == 0 {
633 Int32Array::from(Vec::<i32>::new())
634 } else {
635 Int32Array::from(vec![num_rows as i32])
636 };
637 Ok(Arc::new(
638 RunArray::try_new(&run_ends, &values_array).map_err(|e| {
639 Error::new(
640 ErrorKind::Unexpected,
641 "Failed to create RunArray for constant value",
642 )
643 .with_source(e)
644 })?,
645 ))
646 };
647
648 // Create the values array using the helper function
649 let values_array =
650 create_primitive_array_single_element(values_field.data_type(), prim_lit)?;
651
652 // Wrap in Run-End Encoding
653 create_ree_array(values_array)
654 } else {
655 // Non-REE type (simple arrays for non-constant fields)
656 create_primitive_array_repeated(target_type, prim_lit, num_rows)
657 }
658 }
659}
660
661#[cfg(test)]
662mod test {
663 use std::collections::HashMap;
664 use std::sync::Arc;
665
666 use arrow_array::{
667 Array, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
668 StringArray,
669 };
670 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
671 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
672
673 use crate::arrow::record_batch_transformer::{
674 RecordBatchTransformer, RecordBatchTransformerBuilder,
675 };
676 use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type};
677
678 /// Helper to extract string values from either StringArray or RunEndEncoded<StringArray>
679 /// Returns empty string for null values
680 fn get_string_value(array: &dyn Array, index: usize) -> String {
681 if let Some(string_array) = array.as_any().downcast_ref::<StringArray>() {
682 if string_array.is_null(index) {
683 String::new()
684 } else {
685 string_array.value(index).to_string()
686 }
687 } else if let Some(run_array) = array
688 .as_any()
689 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
690 {
691 let values = run_array.values();
692 let string_values = values
693 .as_any()
694 .downcast_ref::<StringArray>()
695 .expect("REE values should be StringArray");
696 // For REE, all rows have the same value (index 0 in the values array)
697 if string_values.is_null(0) {
698 String::new()
699 } else {
700 string_values.value(0).to_string()
701 }
702 } else {
703 panic!("Expected StringArray or RunEndEncoded<StringArray>");
704 }
705 }
706
707 /// Helper to extract int values from either Int32Array or RunEndEncoded<Int32Array>
708 fn get_int_value(array: &dyn Array, index: usize) -> i32 {
709 if let Some(int_array) = array.as_any().downcast_ref::<Int32Array>() {
710 int_array.value(index)
711 } else if let Some(run_array) = array
712 .as_any()
713 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
714 {
715 let values = run_array.values();
716 let int_values = values
717 .as_any()
718 .downcast_ref::<Int32Array>()
719 .expect("REE values should be Int32Array");
720 int_values.value(0)
721 } else {
722 panic!("Expected Int32Array or RunEndEncoded<Int32Array>");
723 }
724 }
725
726 #[test]
727 fn build_field_id_to_source_schema_map_works() {
728 let arrow_schema = arrow_schema_already_same_as_target();
729
730 let result =
731 RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap();
732
733 let expected = HashMap::from_iter([
734 (10, (arrow_schema.fields()[0].clone(), 0)),
735 (11, (arrow_schema.fields()[1].clone(), 1)),
736 (12, (arrow_schema.fields()[2].clone(), 2)),
737 (14, (arrow_schema.fields()[3].clone(), 3)),
738 (15, (arrow_schema.fields()[4].clone(), 4)),
739 ]);
740
741 assert!(result.eq(&expected));
742 }
743
744 #[test]
745 fn processor_returns_properly_shaped_record_batch_when_no_schema_migration_required() {
746 let snapshot_schema = Arc::new(iceberg_table_schema());
747 let projected_iceberg_field_ids = [13, 14];
748
749 let mut inst =
750 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
751 .build();
752
753 let result = inst
754 .process_record_batch(source_record_batch_no_migration_required())
755 .unwrap();
756
757 let expected = source_record_batch_no_migration_required();
758
759 assert_eq!(result, expected);
760 }
761
762 #[test]
763 fn processor_returns_properly_shaped_record_batch_when_schema_migration_required() {
764 let snapshot_schema = Arc::new(iceberg_table_schema());
765 let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f
766
767 let mut inst =
768 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
769 .build();
770
771 let result = inst.process_record_batch(source_record_batch()).unwrap();
772
773 let expected = expected_record_batch_migration_required();
774
775 assert_eq!(result, expected);
776 }
777
778 #[test]
779 fn schema_evolution_adds_date_column_with_nulls() {
780 // Reproduces TestSelect.readAndWriteWithBranchAfterSchemaChange from iceberg-spark.
781 // When reading old snapshots after adding a DATE column, the transformer must
782 // populate the new column with NULL values since old files lack this field.
783 let snapshot_schema = Arc::new(
784 Schema::builder()
785 .with_schema_id(1)
786 .with_fields(vec![
787 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
788 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
789 NestedField::optional(3, "date_col", Type::Primitive(PrimitiveType::Date))
790 .into(),
791 ])
792 .build()
793 .unwrap(),
794 );
795 let projected_iceberg_field_ids = [1, 2, 3];
796
797 let mut transformer =
798 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
799 .build();
800
801 let file_schema = Arc::new(ArrowSchema::new(vec![
802 simple_field("id", DataType::Int32, false, "1"),
803 simple_field("name", DataType::Utf8, true, "2"),
804 ]));
805
806 let file_batch = RecordBatch::try_new(file_schema, vec![
807 Arc::new(Int32Array::from(vec![1, 2, 3])),
808 Arc::new(StringArray::from(vec![
809 Some("Alice"),
810 Some("Bob"),
811 Some("Charlie"),
812 ])),
813 ])
814 .unwrap();
815
816 let result = transformer.process_record_batch(file_batch).unwrap();
817
818 assert_eq!(result.num_columns(), 3);
819 assert_eq!(result.num_rows(), 3);
820
821 let id_column = result
822 .column(0)
823 .as_any()
824 .downcast_ref::<Int32Array>()
825 .unwrap();
826 assert_eq!(id_column.values(), &[1, 2, 3]);
827
828 let name_column = result
829 .column(1)
830 .as_any()
831 .downcast_ref::<StringArray>()
832 .unwrap();
833 assert_eq!(name_column.value(0), "Alice");
834 assert_eq!(name_column.value(1), "Bob");
835 assert_eq!(name_column.value(2), "Charlie");
836
837 let date_column = result
838 .column(2)
839 .as_any()
840 .downcast_ref::<Date32Array>()
841 .unwrap();
842 assert!(date_column.is_null(0));
843 assert!(date_column.is_null(1));
844 assert!(date_column.is_null(2));
845 }
846
847 #[test]
848 fn schema_evolution_adds_struct_column_with_nulls() {
849 // Test that when a struct column is added after data files are written,
850 // the transformer can materialize the missing struct column with null values.
851 // This reproduces the scenario from Iceberg 1.10.0 TestSparkReaderDeletes tests
852 // where binaryData and structData columns were added to the schema.
853 let snapshot_schema = Arc::new(
854 Schema::builder()
855 .with_schema_id(1)
856 .with_fields(vec![
857 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
858 NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
859 NestedField::optional(
860 3,
861 "struct_col",
862 Type::Struct(crate::spec::StructType::new(vec![
863 NestedField::optional(
864 100,
865 "inner_field",
866 Type::Primitive(PrimitiveType::String),
867 )
868 .into(),
869 ])),
870 )
871 .into(),
872 ])
873 .build()
874 .unwrap(),
875 );
876 let projected_iceberg_field_ids = [1, 2, 3];
877
878 let mut transformer =
879 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
880 .build();
881
882 let file_schema = Arc::new(ArrowSchema::new(vec![
883 simple_field("id", DataType::Int32, false, "1"),
884 simple_field("data", DataType::Utf8, false, "2"),
885 ]));
886
887 let file_batch = RecordBatch::try_new(file_schema, vec![
888 Arc::new(Int32Array::from(vec![1, 2, 3])),
889 Arc::new(StringArray::from(vec!["a", "b", "c"])),
890 ])
891 .unwrap();
892
893 let result = transformer.process_record_batch(file_batch).unwrap();
894
895 assert_eq!(result.num_columns(), 3);
896 assert_eq!(result.num_rows(), 3);
897
898 let id_column = result
899 .column(0)
900 .as_any()
901 .downcast_ref::<Int32Array>()
902 .unwrap();
903 assert_eq!(id_column.values(), &[1, 2, 3]);
904
905 let data_column = result
906 .column(1)
907 .as_any()
908 .downcast_ref::<StringArray>()
909 .unwrap();
910 assert_eq!(data_column.value(0), "a");
911 assert_eq!(data_column.value(1), "b");
912 assert_eq!(data_column.value(2), "c");
913
914 let struct_column = result
915 .column(2)
916 .as_any()
917 .downcast_ref::<arrow_array::StructArray>()
918 .unwrap();
919 assert!(struct_column.is_null(0));
920 assert!(struct_column.is_null(1));
921 assert!(struct_column.is_null(2));
922 }
923
924 pub fn source_record_batch() -> RecordBatch {
925 RecordBatch::try_new(
926 arrow_schema_promotion_addition_and_renaming_required(),
927 vec![
928 Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b
929 Arc::new(Float32Array::from(vec![
930 Some(12.125),
931 Some(23.375),
932 Some(34.875),
933 ])), // c
934 Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d
935 Arc::new(StringArray::from(vec![
936 Some("Apache"),
937 Some("Iceberg"),
938 Some("Rocks"),
939 ])), // e
940 ],
941 )
942 .unwrap()
943 }
944
945 pub fn source_record_batch_no_migration_required() -> RecordBatch {
946 RecordBatch::try_new(
947 arrow_schema_no_promotion_addition_or_renaming_required(),
948 vec![
949 Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d
950 Arc::new(StringArray::from(vec![
951 Some("Apache"),
952 Some("Iceberg"),
953 Some("Rocks"),
954 ])), // e
955 ],
956 )
957 .unwrap()
958 }
959
960 pub fn expected_record_batch_migration_required() -> RecordBatch {
961 RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![
962 Arc::new(StringArray::from(Vec::<Option<String>>::from([
963 None, None, None,
964 ]))), // a
965 Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b
966 Arc::new(Float64Array::from(vec![
967 Some(12.125),
968 Some(23.375),
969 Some(34.875),
970 ])), // c
971 Arc::new(StringArray::from(vec![
972 Some("Apache"),
973 Some("Iceberg"),
974 Some("Rocks"),
975 ])), // e (d skipped by projection)
976 Arc::new(StringArray::from(vec![
977 Some("(╯°□°)╯"),
978 Some("(╯°□°)╯"),
979 Some("(╯°□°)╯"),
980 ])), // f
981 ])
982 .unwrap()
983 }
984
985 pub fn iceberg_table_schema() -> Schema {
986 Schema::builder()
987 .with_schema_id(2)
988 .with_fields(vec![
989 NestedField::optional(10, "a", Type::Primitive(PrimitiveType::String)).into(),
990 NestedField::required(11, "b", Type::Primitive(PrimitiveType::Long)).into(),
991 NestedField::required(12, "c", Type::Primitive(PrimitiveType::Double)).into(),
992 NestedField::required(13, "d", Type::Primitive(PrimitiveType::Int)).into(),
993 NestedField::optional(14, "e", Type::Primitive(PrimitiveType::String)).into(),
994 NestedField::required(15, "f", Type::Primitive(PrimitiveType::String))
995 .with_initial_default(Literal::string("(╯°□°)╯"))
996 .into(),
997 ])
998 .build()
999 .unwrap()
1000 }
1001
1002 fn arrow_schema_already_same_as_target() -> Arc<ArrowSchema> {
1003 Arc::new(ArrowSchema::new(vec![
1004 simple_field("a", DataType::Utf8, true, "10"),
1005 simple_field("b", DataType::Int64, false, "11"),
1006 simple_field("c", DataType::Float64, false, "12"),
1007 simple_field("e", DataType::Utf8, true, "14"),
1008 simple_field("f", DataType::Utf8, false, "15"),
1009 ]))
1010 }
1011
1012 fn arrow_schema_promotion_addition_and_renaming_required() -> Arc<ArrowSchema> {
1013 Arc::new(ArrowSchema::new(vec![
1014 simple_field("b", DataType::Int32, false, "11"),
1015 simple_field("c", DataType::Float32, false, "12"),
1016 simple_field("d", DataType::Int32, false, "13"),
1017 simple_field("e_old", DataType::Utf8, true, "14"),
1018 ]))
1019 }
1020
1021 fn arrow_schema_no_promotion_addition_or_renaming_required() -> Arc<ArrowSchema> {
1022 Arc::new(ArrowSchema::new(vec![
1023 simple_field("d", DataType::Int32, false, "13"),
1024 simple_field("e", DataType::Utf8, true, "14"),
1025 ]))
1026 }
1027
1028 /// Create a simple arrow field with metadata.
1029 fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1030 Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1031 PARQUET_FIELD_ID_META_KEY.to_string(),
1032 value.to_string(),
1033 )]))
1034 }
1035
1036 /// Test for add_files with Parquet files that have NO field IDs (Hive tables).
1037 ///
1038 /// This reproduces the scenario from Iceberg spec where:
1039 /// - Hive-style partitioned Parquet files are imported via add_files procedure
1040 /// - Parquet files originally DO NOT have field IDs (typical for Hive tables)
1041 /// - ArrowReader applies name mapping to assign correct Iceberg field IDs
1042 /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3), subdept (4)
1043 /// - Partition columns (id, dept) have initial_default values
1044 ///
1045 /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection),
1046 /// this scenario requires `schema.name-mapping.default` from table metadata
1047 /// to correctly map Parquet columns by name to Iceberg field IDs.
1048 /// This mapping is now applied in ArrowReader before data is processed.
1049 ///
1050 /// Expected behavior:
1051 /// 1. id=1 (from initial_default) - spec rule #3
1052 /// 2. name="John Doe" (from Parquet with field_id=2 assigned by reader) - found by field ID
1053 /// 3. dept="hr" (from initial_default) - spec rule #3
1054 /// 4. subdept="communications" (from Parquet with field_id=4 assigned by reader) - found by field ID
1055 #[test]
1056 fn add_files_with_name_mapping_applied_in_reader() {
1057 // Iceberg schema after add_files: id (partition), name, dept (partition), subdept
1058 let snapshot_schema = Arc::new(
1059 Schema::builder()
1060 .with_schema_id(0)
1061 .with_fields(vec![
1062 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int))
1063 .with_initial_default(Literal::int(1))
1064 .into(),
1065 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1066 NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String))
1067 .with_initial_default(Literal::string("hr"))
1068 .into(),
1069 NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String))
1070 .into(),
1071 ])
1072 .build()
1073 .unwrap(),
1074 );
1075
1076 // Simulate ArrowReader having applied name mapping:
1077 // Original Parquet: name, subdept (NO field IDs)
1078 // After reader.rs applies name mapping: name (field_id=2), subdept (field_id=4)
1079 //
1080 // Note: Partition columns (id, dept) are NOT in the Parquet file - they're in directory paths
1081 use std::collections::HashMap;
1082 let parquet_schema = Arc::new(ArrowSchema::new(vec![
1083 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
1084 "PARQUET:field_id".to_string(),
1085 "2".to_string(),
1086 )])),
1087 Field::new("subdept", DataType::Utf8, true).with_metadata(HashMap::from([(
1088 "PARQUET:field_id".to_string(),
1089 "4".to_string(),
1090 )])),
1091 ]));
1092
1093 let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
1094
1095 let mut transformer =
1096 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids).build();
1097
1098 // Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications"
1099 let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1100 Arc::new(StringArray::from(vec!["John Doe"])),
1101 Arc::new(StringArray::from(vec!["communications"])),
1102 ])
1103 .unwrap();
1104
1105 let result = transformer.process_record_batch(parquet_batch).unwrap();
1106
1107 // Verify the transformed RecordBatch has:
1108 // - id=1 (from initial_default, not from Parquet)
1109 // - name="John Doe" (from Parquet with correct field_id=2)
1110 // - dept="hr" (from initial_default, not from Parquet)
1111 // - subdept="communications" (from Parquet with correct field_id=4)
1112 assert_eq!(result.num_columns(), 4);
1113 assert_eq!(result.num_rows(), 1);
1114
1115 let id_column = result
1116 .column(0)
1117 .as_any()
1118 .downcast_ref::<Int32Array>()
1119 .unwrap();
1120 assert_eq!(id_column.value(0), 1);
1121
1122 let name_column = result
1123 .column(1)
1124 .as_any()
1125 .downcast_ref::<StringArray>()
1126 .unwrap();
1127 assert_eq!(name_column.value(0), "John Doe");
1128
1129 let dept_column = result
1130 .column(2)
1131 .as_any()
1132 .downcast_ref::<StringArray>()
1133 .unwrap();
1134 assert_eq!(dept_column.value(0), "hr");
1135
1136 let subdept_column = result
1137 .column(3)
1138 .as_any()
1139 .downcast_ref::<StringArray>()
1140 .unwrap();
1141 assert_eq!(subdept_column.value(0), "communications");
1142 }
1143
1144 /// Test for bucket partitioning where source columns must be read from data files.
1145 ///
1146 /// This test verifies correct implementation of the Iceberg spec's "Column Projection" rules:
1147 /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
1148 ///
1149 /// # Why this test is critical
1150 ///
1151 /// The key insight is that partition metadata stores TRANSFORMED values, not source values:
1152 /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the bucket number)
1153 /// - The actual `id` column values (100, 200, 300) are ONLY in the data file
1154 ///
1155 /// If iceberg-rust incorrectly treated bucket-partitioned fields as constants, it would:
1156 /// 1. Replace all `id` values with the constant `2` from partition metadata
1157 /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows)
1158 /// 3. Return incorrect query results
1159 ///
1160 /// # What this test verifies
1161 ///
1162 /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the data file
1163 /// - The source column `id` contains actual values (100, 200, 300), not constants
1164 /// - Java's `PartitionUtil.constantsMap()` behavior is correctly replicated:
1165 /// ```java
1166 /// if (field.transform().isIdentity()) { // FALSE for bucket transforms
1167 /// idToConstant.put(field.sourceId(), converted);
1168 /// }
1169 /// ```
1170 ///
1171 /// # Real-world impact
1172 ///
1173 /// This reproduces the failure scenario from Iceberg Java's TestRuntimeFiltering:
1174 /// - Tables partitioned by `bucket(N, col)` are common for load balancing
1175 /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col = value`
1176 /// - Runtime filtering pushes predicates down to Iceberg file scans
1177 /// - Without this fix, the filter would match against constant partition values instead of data
1178 ///
1179 /// # References
1180 /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
1181 /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
1182 /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
1183 #[test]
1184 fn bucket_partitioning_reads_source_column_from_file() {
1185 use crate::spec::{Struct, Transform};
1186
1187 // Table schema: id (data column), name (data column), id_bucket (partition column)
1188 let snapshot_schema = Arc::new(
1189 Schema::builder()
1190 .with_schema_id(0)
1191 .with_fields(vec![
1192 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1193 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1194 ])
1195 .build()
1196 .unwrap(),
1197 );
1198
1199 // Partition spec: bucket(4, id) - the id field is bucketed
1200 let partition_spec = Arc::new(
1201 crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1202 .with_spec_id(0)
1203 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
1204 .unwrap()
1205 .build()
1206 .unwrap(),
1207 );
1208
1209 // Partition data: bucket value is 2
1210 // In Iceberg, partition data is a Struct where each field corresponds to a partition field
1211 let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
1212
1213 // Parquet file contains both id and name columns
1214 let parquet_schema = Arc::new(ArrowSchema::new(vec![
1215 simple_field("id", DataType::Int32, false, "1"),
1216 simple_field("name", DataType::Utf8, true, "2"),
1217 ]));
1218
1219 let projected_field_ids = [1, 2]; // id, name
1220
1221 let mut transformer =
1222 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1223 .with_partition(partition_spec, partition_data)
1224 .expect("Failed to add partition constants")
1225 .build();
1226
1227 // Create a Parquet RecordBatch with actual data
1228 // The id column MUST be read from here, not treated as a constant
1229 let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1230 Arc::new(Int32Array::from(vec![100, 200, 300])),
1231 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1232 ])
1233 .unwrap();
1234
1235 let result = transformer.process_record_batch(parquet_batch).unwrap();
1236
1237 // Verify the transformed RecordBatch correctly reads id from the file
1238 // (NOT as a constant from partition metadata)
1239 assert_eq!(result.num_columns(), 2);
1240 assert_eq!(result.num_rows(), 3);
1241
1242 let id_column = result
1243 .column(0)
1244 .as_any()
1245 .downcast_ref::<Int32Array>()
1246 .unwrap();
1247 // These values MUST come from the Parquet file, not be replaced by constants
1248 assert_eq!(id_column.value(0), 100);
1249 assert_eq!(id_column.value(1), 200);
1250 assert_eq!(id_column.value(2), 300);
1251
1252 let name_column = result
1253 .column(1)
1254 .as_any()
1255 .downcast_ref::<StringArray>()
1256 .unwrap();
1257 assert_eq!(name_column.value(0), "Alice");
1258 assert_eq!(name_column.value(1), "Bob");
1259 assert_eq!(name_column.value(2), "Charlie");
1260 }
1261
1262 /// Test that identity-transformed partition fields ARE treated as constants.
1263 ///
1264 /// This is the complement to `bucket_partitioning_reads_source_column_from_file`,
1265 /// verifying that constants_map() correctly identifies identity-transformed
1266 /// partition fields per the Iceberg spec.
1267 ///
1268 /// # Spec requirement (format/spec.md "Column Projection")
1269 ///
1270 /// > "Return the value from partition metadata if an Identity Transform exists for the field
1271 /// > and the partition value is present in the `partition` struct on `data_file` object
1272 /// > in the manifest. This allows for metadata only migrations of Hive tables."
1273 ///
1274 /// # Why identity transforms use constants
1275 ///
1276 /// Unlike bucket/truncate/year/etc., identity transforms don't modify the value:
1277 /// - `identity(dept)` stores the actual `dept` value in partition metadata
1278 /// - Partition metadata has `dept = "engineering"` (the real value, not a hash/bucket)
1279 /// - This value can be used directly without reading the data file
1280 ///
1281 /// # Performance benefit
1282 ///
1283 /// For Hive migrations where partition columns aren't in data files:
1284 /// - Partition metadata provides the column values
1285 /// - No need to read from data files (metadata-only query optimization)
1286 /// - Common pattern: `dept=engineering/subdept=backend/file.parquet`
1287 /// - `dept` and `subdept` are in directory structure, not in `file.parquet`
1288 /// - Iceberg populates these from partition metadata as constants
1289 ///
1290 /// # What this test verifies
1291 ///
1292 /// - Identity-partitioned fields use constants from partition metadata
1293 /// - The `dept` column is populated with `"engineering"` (not read from file)
1294 /// - Java's `PartitionUtil.constantsMap()` behavior is matched:
1295 /// ```java
1296 /// if (field.transform().isIdentity()) { // TRUE for identity
1297 /// idToConstant.put(field.sourceId(), converted);
1298 /// }
1299 /// ```
1300 ///
1301 /// # References
1302 /// - Iceberg spec: format/spec.md "Column Projection"
1303 /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
1304 #[test]
1305 fn identity_partition_uses_constant_from_metadata() {
1306 use crate::spec::{Struct, Transform};
1307
1308 // Table schema: id (data column), dept (partition column), name (data column)
1309 let snapshot_schema = Arc::new(
1310 Schema::builder()
1311 .with_schema_id(0)
1312 .with_fields(vec![
1313 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1314 NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(),
1315 NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(),
1316 ])
1317 .build()
1318 .unwrap(),
1319 );
1320
1321 // Partition spec: identity(dept) - the dept field uses identity transform
1322 let partition_spec = Arc::new(
1323 crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1324 .with_spec_id(0)
1325 .add_partition_field("dept", "dept", Transform::Identity)
1326 .unwrap()
1327 .build()
1328 .unwrap(),
1329 );
1330
1331 // Partition data: dept="engineering"
1332 let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]);
1333
1334 // Parquet file contains only id and name (dept is in partition path)
1335 let parquet_schema = Arc::new(ArrowSchema::new(vec![
1336 simple_field("id", DataType::Int32, false, "1"),
1337 simple_field("name", DataType::Utf8, true, "3"),
1338 ]));
1339
1340 let projected_field_ids = [1, 2, 3]; // id, dept, name
1341
1342 let mut transformer =
1343 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1344 .with_partition(partition_spec, partition_data)
1345 .expect("Failed to add partition constants")
1346 .build();
1347
1348 let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1349 Arc::new(Int32Array::from(vec![100, 200])),
1350 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
1351 ])
1352 .unwrap();
1353
1354 let result = transformer.process_record_batch(parquet_batch).unwrap();
1355
1356 // Verify the dept column is populated with the constant from partition metadata
1357 assert_eq!(result.num_columns(), 3);
1358 assert_eq!(result.num_rows(), 2);
1359
1360 // Use helpers to handle both simple and REE arrays
1361 assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
1362 assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
1363
1364 // dept column comes from partition metadata (constant) - will be REE
1365 assert_eq!(
1366 get_string_value(result.column(1).as_ref(), 0),
1367 "engineering"
1368 );
1369 assert_eq!(
1370 get_string_value(result.column(1).as_ref(), 1),
1371 "engineering"
1372 );
1373
1374 // name column comes from file
1375 assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice");
1376 assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob");
1377 }
1378
1379 /// Test bucket partitioning with renamed source column.
1380 ///
1381 /// This verifies correct behavior for TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java.
1382 /// When a source column is renamed after partitioning is established, field-ID-based mapping
1383 /// must still correctly identify the column in Parquet files.
1384 ///
1385 /// # Scenario
1386 ///
1387 /// 1. Table created with `bucket(4, id)` partitioning
1388 /// 2. Data written to Parquet files (field_id=1, name="id")
1389 /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id`
1390 /// 4. Iceberg schema now has: field_id=1, name="row_id"
1391 /// 5. Parquet files still have: field_id=1, name="id"
1392 ///
1393 /// # Expected Behavior Per Iceberg Spec
1394 ///
1395 /// Per the Iceberg spec "Column Projection" section and Java's PartitionUtil.constantsMap():
1396 /// - Bucket transforms are NON-identity, so partition metadata stores bucket numbers (0-3), not source values
1397 /// - Source columns for non-identity transforms MUST be read from data files
1398 /// - Field-ID-based mapping should find the column by field_id=1 (ignoring name mismatch)
1399 /// - Runtime filtering on `row_id` should work correctly
1400 ///
1401 /// # What This Tests
1402 ///
1403 /// This test ensures that when FileScanTask provides partition_spec and partition_data:
1404 /// - constants_map() correctly identifies that bucket(4, row_id) is NOT an identity transform
1405 /// - The source column (field_id=1) is NOT added to constants_map
1406 /// - Field-ID-based mapping reads actual values from the Parquet file
1407 /// - Values [100, 200, 300] are read, not replaced with bucket constant 2
1408 ///
1409 /// # References
1410 /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable
1411 /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap()
1412 /// - Iceberg spec: format/spec.md "Column Projection" section
1413 #[test]
1414 fn test_bucket_partitioning_with_renamed_source_column() {
1415 use crate::spec::{Struct, Transform};
1416
1417 // Iceberg schema after rename: row_id (was id), name
1418 let snapshot_schema = Arc::new(
1419 Schema::builder()
1420 .with_schema_id(0)
1421 .with_fields(vec![
1422 NestedField::required(1, "row_id", Type::Primitive(PrimitiveType::Int)).into(),
1423 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1424 ])
1425 .build()
1426 .unwrap(),
1427 );
1428
1429 // Partition spec: bucket(4, row_id) - but source_id still points to field_id=1
1430 let partition_spec = Arc::new(
1431 crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1432 .with_spec_id(0)
1433 .add_partition_field("row_id", "row_id_bucket", Transform::Bucket(4))
1434 .unwrap()
1435 .build()
1436 .unwrap(),
1437 );
1438
1439 // Partition data: bucket value is 2
1440 let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
1441
1442 // Parquet file has OLD column name "id" but SAME field_id=1
1443 // Field-ID-based mapping should find this despite name mismatch
1444 let parquet_schema = Arc::new(ArrowSchema::new(vec![
1445 simple_field("id", DataType::Int32, false, "1"),
1446 simple_field("name", DataType::Utf8, true, "2"),
1447 ]));
1448
1449 let projected_field_ids = [1, 2]; // row_id (field_id=1), name (field_id=2)
1450
1451 let mut transformer =
1452 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1453 .with_partition(partition_spec, partition_data)
1454 .expect("Failed to add partition constants")
1455 .build();
1456
1457 // Create a Parquet RecordBatch with actual data
1458 // Despite column rename, data should be read via field_id=1
1459 let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1460 Arc::new(Int32Array::from(vec![100, 200, 300])),
1461 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1462 ])
1463 .unwrap();
1464
1465 let result = transformer.process_record_batch(parquet_batch).unwrap();
1466
1467 // Verify the transformed RecordBatch correctly reads data despite name mismatch
1468 assert_eq!(result.num_columns(), 2);
1469 assert_eq!(result.num_rows(), 3);
1470
1471 let row_id_column = result
1472 .column(0)
1473 .as_any()
1474 .downcast_ref::<Int32Array>()
1475 .unwrap();
1476 // These values MUST come from the Parquet file via field_id=1,
1477 // not be replaced by the bucket constant (2)
1478 assert_eq!(row_id_column.value(0), 100);
1479 assert_eq!(row_id_column.value(1), 200);
1480 assert_eq!(row_id_column.value(2), 300);
1481
1482 let name_column = result
1483 .column(1)
1484 .as_any()
1485 .downcast_ref::<StringArray>()
1486 .unwrap();
1487 assert_eq!(name_column.value(0), "Alice");
1488 assert_eq!(name_column.value(1), "Bob");
1489 assert_eq!(name_column.value(2), "Charlie");
1490 }
1491
1492 /// Comprehensive integration test that verifies all 4 Iceberg spec rules work correctly.
1493 ///
1494 /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection),
1495 /// "Values for field ids which are not present in a data file must be resolved
1496 /// according the following rules:"
1497 ///
1498 /// This test creates a scenario where each rule is exercised:
1499 /// - Rule #1: dept (identity-partitioned) -> constant from partition metadata
1500 /// - Rule #2: data (via name mapping) -> read from Parquet file by name
1501 /// - Rule #3: category (initial_default) -> use default value
1502 /// - Rule #4: notes (no default) -> return null
1503 ///
1504 /// # References
1505 /// - Iceberg spec: format/spec.md "Column Projection" section
1506 #[test]
1507 fn test_all_four_spec_rules() {
1508 use crate::spec::Transform;
1509
1510 // Iceberg schema with columns designed to exercise each spec rule
1511 let snapshot_schema = Arc::new(
1512 Schema::builder()
1513 .with_schema_id(0)
1514 .with_fields(vec![
1515 // Field in Parquet by field ID (normal case)
1516 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1517 // Rule #1: Identity-partitioned field - should use partition metadata
1518 NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(),
1519 // Rule #2: Field resolved by name mapping (ArrowReader already applied)
1520 NestedField::required(3, "data", Type::Primitive(PrimitiveType::String)).into(),
1521 // Rule #3: Field with initial_default
1522 NestedField::optional(4, "category", Type::Primitive(PrimitiveType::String))
1523 .with_initial_default(Literal::string("default_category"))
1524 .into(),
1525 // Rule #4: Field with no default - should be null
1526 NestedField::optional(5, "notes", Type::Primitive(PrimitiveType::String))
1527 .into(),
1528 ])
1529 .build()
1530 .unwrap(),
1531 );
1532
1533 // Partition spec: identity transform on dept
1534 let partition_spec = Arc::new(
1535 crate::spec::PartitionSpec::builder(snapshot_schema.clone())
1536 .with_spec_id(0)
1537 .add_partition_field("dept", "dept", Transform::Identity)
1538 .unwrap()
1539 .build()
1540 .unwrap(),
1541 );
1542
1543 // Partition data: dept="engineering"
1544 let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]);
1545
1546 // Parquet schema: simulates post-ArrowReader state where name mapping already applied
1547 // Has id (field_id=1) and data (field_id=3, assigned by ArrowReader via name mapping)
1548 // Missing: dept (in partition), category (has default), notes (no default)
1549 let parquet_schema = Arc::new(ArrowSchema::new(vec![
1550 simple_field("id", DataType::Int32, false, "1"),
1551 simple_field("data", DataType::Utf8, false, "3"),
1552 ]));
1553
1554 let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, category, notes
1555
1556 let mut transformer =
1557 RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1558 .with_partition(partition_spec, partition_data)
1559 .expect("Failed to add partition constants")
1560 .build();
1561
1562 let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1563 Arc::new(Int32Array::from(vec![100, 200])),
1564 Arc::new(StringArray::from(vec!["value1", "value2"])),
1565 ])
1566 .unwrap();
1567
1568 let result = transformer.process_record_batch(parquet_batch).unwrap();
1569
1570 assert_eq!(result.num_columns(), 5);
1571 assert_eq!(result.num_rows(), 2);
1572
1573 // Verify each column demonstrates the correct spec rule:
1574
1575 // Normal case: id from Parquet by field ID
1576 // Use helpers to handle both simple and REE arrays
1577 assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
1578 assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
1579
1580 // Rule #1: dept from partition metadata (identity transform) - will be REE
1581 assert_eq!(
1582 get_string_value(result.column(1).as_ref(), 0),
1583 "engineering"
1584 );
1585 assert_eq!(
1586 get_string_value(result.column(1).as_ref(), 1),
1587 "engineering"
1588 );
1589
1590 // Rule #2: data from Parquet via name mapping - will be regular array
1591 assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1");
1592 assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2");
1593
1594 // Rule #3: category from initial_default - will be REE
1595 assert_eq!(
1596 get_string_value(result.column(3).as_ref(), 0),
1597 "default_category"
1598 );
1599 assert_eq!(
1600 get_string_value(result.column(3).as_ref(), 1),
1601 "default_category"
1602 );
1603
1604 // Rule #4: notes is null (no default, not in Parquet, not in partition) - will be REE with null
1605 // For null REE arrays, we still use the helper which handles extraction
1606 assert_eq!(get_string_value(result.column(4).as_ref(), 0), "");
1607 assert_eq!(get_string_value(result.column(4).as_ref(), 1), "");
1608 }
1609
1610 /// Test handling of null values in identity-partitioned columns.
1611 ///
1612 /// Reproduces TestPartitionValues.testNullPartitionValue() from iceberg-java, which
1613 /// writes records where the partition column has null values. Before the fix in #1922,
1614 /// this would error with "Partition field X has null value for identity transform".
1615 #[test]
1616 fn null_identity_partition_value() {
1617 use crate::spec::{Struct, Transform};
1618
1619 let schema = Arc::new(
1620 Schema::builder()
1621 .with_schema_id(0)
1622 .with_fields(vec![
1623 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1624 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String)).into(),
1625 ])
1626 .build()
1627 .unwrap(),
1628 );
1629
1630 let partition_spec = Arc::new(
1631 crate::spec::PartitionSpec::builder(schema.clone())
1632 .with_spec_id(0)
1633 .add_partition_field("data", "data", Transform::Identity)
1634 .unwrap()
1635 .build()
1636 .unwrap(),
1637 );
1638
1639 // Partition has null value for the data column
1640 let partition_data = Struct::from_iter(vec![None]);
1641
1642 let file_schema = Arc::new(ArrowSchema::new(vec![simple_field(
1643 "id",
1644 DataType::Int32,
1645 true,
1646 "1",
1647 )]));
1648
1649 let projected_field_ids = [1, 2];
1650
1651 let mut transformer = RecordBatchTransformerBuilder::new(schema, &projected_field_ids)
1652 .with_partition(partition_spec, partition_data)
1653 .expect("Should handle null partition values")
1654 .build();
1655
1656 let file_batch =
1657 RecordBatch::try_new(file_schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
1658 .unwrap();
1659
1660 let result = transformer.process_record_batch(file_batch).unwrap();
1661
1662 assert_eq!(result.num_columns(), 2);
1663 assert_eq!(result.num_rows(), 3);
1664
1665 let id_col = result
1666 .column(0)
1667 .as_any()
1668 .downcast_ref::<Int32Array>()
1669 .unwrap();
1670 assert_eq!(id_col.values(), &[1, 2, 3]);
1671
1672 // Partition column with null value should produce nulls
1673 let data_col = result.column(1);
1674 assert!(data_col.is_null(0));
1675 assert!(data_col.is_null(1));
1676 assert!(data_col.is_null(2));
1677 }
1678}