1use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31 ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32 apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46 pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49 let concurrency_limit_data_files = self.concurrency_limit_data_files;
50 let scan_metrics = ScanMetrics::new();
51
52 let task_reader = FileScanTaskReader {
53 batch_size: self.batch_size,
54 file_io: self.file_io,
55 delete_file_loader: self
56 .delete_file_loader
57 .with_scan_metrics(scan_metrics.clone()),
58 row_group_filtering_enabled: self.row_group_filtering_enabled,
59 row_selection_enabled: self.row_selection_enabled,
60 parquet_read_options: self.parquet_read_options,
61 scan_metrics: scan_metrics.clone(),
62 };
63
64 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66 Box::pin(
67 tasks
68 .and_then(move |task| task_reader.clone().process(task))
69 .map_err(|err| {
70 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71 .with_source(err)
72 })
73 .try_flatten(),
74 )
75 } else {
76 Box::pin(
77 tasks
78 .map_ok(move |task| task_reader.clone().process(task))
79 .map_err(|err| {
80 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81 .with_source(err)
82 })
83 .try_buffer_unordered(concurrency_limit_data_files)
84 .try_flatten_unordered(concurrency_limit_data_files),
85 )
86 };
87
88 Ok(ScanResult::new(stream, scan_metrics))
89 }
90}
91
92#[derive(Clone)]
95struct FileScanTaskReader {
96 batch_size: Option<usize>,
97 file_io: FileIO,
98 delete_file_loader: CachingDeleteFileLoader,
99 row_group_filtering_enabled: bool,
100 row_selection_enabled: bool,
101 parquet_read_options: ParquetReadOptions,
102 scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106 async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107 let should_load_page_index =
108 (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109 let mut parquet_read_options = self.parquet_read_options;
110 parquet_read_options.preload_page_index = should_load_page_index;
111
112 let delete_filter_rx = self
113 .delete_file_loader
114 .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118 &task.data_file_path,
119 &self.file_io,
120 task.file_size_in_bytes,
121 parquet_read_options,
122 self.scan_metrics.bytes_read_counter(),
123 )
124 .await?;
125
126 let missing_field_ids = arrow_metadata
130 .schema()
131 .fields()
132 .iter()
133 .next()
134 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136 let arrow_metadata = if missing_field_ids {
152 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154 apply_name_mapping_to_arrow_schema(
159 Arc::clone(arrow_metadata.schema()),
160 name_mapping,
161 )?
162 } else {
163 add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166 };
167
168 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170 |e| {
171 Error::new(
172 ErrorKind::Unexpected,
173 "Failed to create ArrowReaderMetadata with field ID schema",
174 )
175 .with_source(e)
176 },
177 )?
178 } else {
179 arrow_metadata
181 };
182
183 let arrow_metadata = if let Some(coerced_schema) =
186 coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187 {
188 let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190 |e| {
191 Error::new(
192 ErrorKind::Unexpected,
193 format!(
194 "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195 ),
196 )
197 .with_source(e)
198 },
199 )?
200 } else {
201 arrow_metadata
202 };
203
204 let mut record_batch_stream_builder =
206 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208 let project_field_ids_without_metadata: Vec<i32> = task
210 .project_field_ids
211 .iter()
212 .filter(|&&id| !is_metadata_field(id))
213 .copied()
214 .collect();
215
216 let projection_mask = ArrowReader::get_arrow_projection_mask(
221 &project_field_ids_without_metadata,
222 &task.schema,
223 record_batch_stream_builder.parquet_schema(),
224 record_batch_stream_builder.schema(),
225 missing_field_ids, )?;
227
228 record_batch_stream_builder =
229 record_batch_stream_builder.with_projection(projection_mask.clone());
230
231 let mut record_batch_transformer_builder =
235 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239 let file_datum = Datum::string(task.data_file_path.clone());
240 record_batch_transformer_builder =
241 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242 }
243
244 if let (Some(partition_spec), Some(partition_data)) =
245 (task.partition_spec.clone(), task.partition.clone())
246 {
247 record_batch_transformer_builder =
248 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249 }
250
251 let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253 if let Some(batch_size) = self.batch_size {
254 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255 }
256
257 let delete_filter = delete_filter_rx.await.unwrap()?;
258 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260 let final_predicate = match (&task.predicate, delete_predicate) {
265 (None, None) => None,
266 (Some(predicate), None) => Some(predicate.clone()),
267 (None, Some(ref predicate)) => Some(predicate.clone()),
268 (Some(filter_predicate), Some(delete_predicate)) => {
269 Some(filter_predicate.clone().and(delete_predicate))
270 }
271 };
272
273 let mut selected_row_group_indices = None;
289 let mut row_selection = None;
290
291 if task.start != 0 || task.length != 0 {
294 let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295 record_batch_stream_builder.metadata(),
296 task.start,
297 task.length,
298 )?;
299 selected_row_group_indices = Some(byte_range_filtered_row_groups);
300 }
301
302 if let Some(predicate) = final_predicate {
303 let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304 record_batch_stream_builder.parquet_schema(),
305 &predicate,
306 )?;
307
308 let row_filter = ArrowReader::get_row_filter(
309 &predicate,
310 record_batch_stream_builder.parquet_schema(),
311 &iceberg_field_ids,
312 &field_id_map,
313 )?;
314 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316 if self.row_group_filtering_enabled {
317 let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318 &predicate,
319 record_batch_stream_builder.metadata(),
320 &field_id_map,
321 &task.schema,
322 )?;
323
324 selected_row_group_indices = match selected_row_group_indices {
327 Some(byte_range_filtered) => {
328 let intersection: Vec<usize> = byte_range_filtered
330 .into_iter()
331 .filter(|idx| predicate_filtered_row_groups.contains(idx))
332 .collect();
333 Some(intersection)
334 }
335 None => Some(predicate_filtered_row_groups),
336 };
337 }
338
339 if self.row_selection_enabled {
340 row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341 &predicate,
342 record_batch_stream_builder.metadata(),
343 &selected_row_group_indices,
344 &field_id_map,
345 &task.schema,
346 )?);
347 }
348 }
349
350 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352 if let Some(positional_delete_indexes) = positional_delete_indexes {
353 let delete_row_selection = {
354 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356 ArrowReader::build_deletes_row_selection(
357 record_batch_stream_builder.metadata().row_groups(),
358 &selected_row_group_indices,
359 &positional_delete_indexes,
360 )
361 }?;
362
363 row_selection = match row_selection {
366 None => Some(delete_row_selection),
367 Some(filter_row_selection) => {
368 Some(filter_row_selection.intersection(&delete_row_selection))
369 }
370 };
371 }
372
373 if let Some(row_selection) = row_selection {
374 record_batch_stream_builder =
375 record_batch_stream_builder.with_row_selection(row_selection);
376 }
377
378 if let Some(selected_row_group_indices) = selected_row_group_indices {
379 record_batch_stream_builder =
380 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381 }
382
383 let record_batch_stream =
386 record_batch_stream_builder
387 .build()?
388 .map(move |batch| match batch {
389 Ok(batch) => {
390 record_batch_transformer.process_record_batch(batch)
392 }
393 Err(err) => Err(err.into()),
394 });
395
396 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397 }
398}
399
400impl ArrowReader {
401 pub(crate) async fn open_parquet_file(
404 data_file_path: &str,
405 file_io: &FileIO,
406 file_size_in_bytes: u64,
407 parquet_read_options: ParquetReadOptions,
408 bytes_read: &Arc<AtomicU64>,
409 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410 let parquet_file = file_io.new_input(data_file_path)?;
411 let counting_reader =
412 CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413 Self::build_parquet_reader(
414 Box::new(counting_reader),
415 file_size_in_bytes,
416 parquet_read_options,
417 )
418 .await
419 }
420
421 async fn build_parquet_reader(
422 parquet_reader: Box<dyn FileRead>,
423 file_size_in_bytes: u64,
424 parquet_read_options: ParquetReadOptions,
425 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426 let mut reader = ArrowFileReader::new(
427 FileMetadata {
428 size: file_size_in_bytes,
429 },
430 parquet_reader,
431 )
432 .with_parquet_read_options(parquet_read_options);
433
434 let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435 .await
436 .map_err(|e| {
437 Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438 })?;
439
440 Ok((reader, arrow_metadata))
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::collections::HashMap;
447 use std::fs::File;
448 use std::sync::Arc;
449
450 use arrow_array::cast::AsArray;
451 use arrow_array::{Array, ArrayRef, RecordBatch};
452 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453 use futures::TryStreamExt;
454 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455 use parquet::basic::Compression;
456 use parquet::file::properties::WriterProperties;
457 use tempfile::TempDir;
458
459 use crate::arrow::ArrowReaderBuilder;
460 use crate::io::FileIO;
461 use crate::scan::{FileScanTask, FileScanTaskStream};
462 use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
463
464 const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
467 const MICROS_PER_DAY: i64 = 86_400_000_000;
468 const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
470 const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
471
472 fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
473 let mut val = parquet::data_type::Int96::new();
474 val.set_data(
475 (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
476 (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
477 INT96_TEST_JULIAN_DAY,
478 );
479 let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
480 + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
481 (val, expected_micros)
482 }
483
484 async fn read_int96_batches(
485 file_path: &str,
486 schema: SchemaRef,
487 project_field_ids: Vec<i32>,
488 ) -> Vec<RecordBatch> {
489 let file_io = FileIO::new_with_fs();
490 let reader = ArrowReaderBuilder::new(file_io).build();
491
492 let file_size = std::fs::metadata(file_path).unwrap().len();
493 let task = FileScanTask {
494 file_size_in_bytes: file_size,
495 start: 0,
496 length: file_size,
497 record_count: None,
498 data_file_path: file_path.to_string(),
499 data_file_format: DataFileFormat::Parquet,
500 schema,
501 project_field_ids,
502 predicate: None,
503 deletes: vec![],
504 partition: None,
505 partition_spec: None,
506 name_mapping: None,
507 case_sensitive: false,
508 };
509
510 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
511 reader
512 .read(tasks)
513 .unwrap()
514 .stream()
515 .try_collect()
516 .await
517 .unwrap()
518 }
519
520 fn write_int96_parquet_file(
522 table_location: &str,
523 filename: &str,
524 with_field_ids: bool,
525 ) -> (String, Vec<i64>) {
526 use parquet::basic::{Repetition, Type as PhysicalType};
527 use parquet::data_type::{Int32Type, Int96, Int96Type};
528 use parquet::file::writer::SerializedFileWriter;
529 use parquet::schema::types::Type as SchemaType;
530
531 let file_path = format!("{table_location}/{filename}");
532
533 let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
534 .with_repetition(Repetition::OPTIONAL);
535 let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
536 .with_repetition(Repetition::REQUIRED);
537
538 if with_field_ids {
539 ts_builder = ts_builder.with_id(Some(1));
540 id_builder = id_builder.with_id(Some(2));
541 }
542
543 let schema = SchemaType::group_type_builder("schema")
544 .with_fields(vec![
545 Arc::new(ts_builder.build().unwrap()),
546 Arc::new(id_builder.build().unwrap()),
547 ])
548 .build()
549 .unwrap();
550
551 const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
553 const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
554 const JULIAN_2100: u32 = 2_488_070;
555
556 let test_data: Vec<(u32, u32, u32, i64)> = vec![
557 (
559 0,
560 0,
561 JULIAN_3333,
562 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
563 ),
564 (
566 (NOON_NANOS & 0xFFFFFFFF) as u32,
567 (NOON_NANOS >> 32) as u32,
568 JULIAN_3333,
569 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
570 + (NOON_NANOS / 1_000) as i64,
571 ),
572 (
574 0,
575 0,
576 JULIAN_2100,
577 (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
578 ),
579 ];
580
581 let int96_values: Vec<Int96> = test_data
582 .iter()
583 .map(|(lo, hi, day, _)| {
584 let mut v = Int96::new();
585 v.set_data(*lo, *hi, *day);
586 v
587 })
588 .collect();
589
590 let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
591 let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
592
593 let file = File::create(&file_path).unwrap();
594 let mut writer =
595 SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
596
597 let mut row_group = writer.next_row_group().unwrap();
598 {
599 let mut col = row_group.next_column().unwrap().unwrap();
601 col.typed::<Int96Type>()
602 .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
603 .unwrap();
604 col.close().unwrap();
605 }
606 {
607 let mut col = row_group.next_column().unwrap().unwrap();
608 col.typed::<Int32Type>()
609 .write_batch(&id_values, None, None)
610 .unwrap();
611 col.close().unwrap();
612 }
613 row_group.close().unwrap();
614 writer.close().unwrap();
615
616 (file_path, expected_micros)
617 }
618
619 async fn assert_int96_read_matches(
620 file_path: &str,
621 schema: SchemaRef,
622 project_field_ids: Vec<i32>,
623 expected_micros: &[i64],
624 ) {
625 use arrow_array::TimestampMicrosecondArray;
626
627 let batches = read_int96_batches(file_path, schema, project_field_ids).await;
628
629 assert_eq!(batches.len(), 1);
630 let ts_array = batches[0]
631 .column(0)
632 .as_any()
633 .downcast_ref::<TimestampMicrosecondArray>()
634 .expect("Expected TimestampMicrosecondArray");
635
636 for (i, expected) in expected_micros.iter().enumerate() {
637 assert_eq!(
638 ts_array.value(i),
639 *expected,
640 "Row {i}: got {}, expected {expected}",
641 ts_array.value(i)
642 );
643 }
644 }
645
646 #[tokio::test]
649 async fn test_read_with_concurrency_one() {
650 use arrow_array::Int32Array;
651
652 let schema = Arc::new(
653 Schema::builder()
654 .with_schema_id(1)
655 .with_fields(vec![
656 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
657 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
658 .into(),
659 ])
660 .build()
661 .unwrap(),
662 );
663
664 let arrow_schema = Arc::new(ArrowSchema::new(vec![
665 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
666 PARQUET_FIELD_ID_META_KEY.to_string(),
667 "1".to_string(),
668 )])),
669 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
670 PARQUET_FIELD_ID_META_KEY.to_string(),
671 "2".to_string(),
672 )])),
673 ]));
674
675 let tmp_dir = TempDir::new().unwrap();
676 let table_location = tmp_dir.path().to_str().unwrap().to_string();
677 let file_io = FileIO::new_with_fs();
678
679 let props = WriterProperties::builder()
681 .set_compression(Compression::SNAPPY)
682 .build();
683
684 for file_num in 0..3 {
685 let id_data = Arc::new(Int32Array::from_iter_values(
686 file_num * 10..(file_num + 1) * 10,
687 )) as ArrayRef;
688 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
689
690 let to_write =
691 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
692
693 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
694 let mut writer =
695 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
696 writer.write(&to_write).expect("Writing batch");
697 writer.close().unwrap();
698 }
699
700 let reader = ArrowReaderBuilder::new(file_io)
702 .with_data_file_concurrency_limit(1)
703 .build();
704
705 let tasks = vec![
707 Ok(FileScanTask {
708 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
709 .unwrap()
710 .len(),
711 start: 0,
712 length: 0,
713 record_count: None,
714 data_file_path: format!("{table_location}/file_0.parquet"),
715 data_file_format: DataFileFormat::Parquet,
716 schema: schema.clone(),
717 project_field_ids: vec![1, 2],
718 predicate: None,
719 deletes: vec![],
720 partition: None,
721 partition_spec: None,
722 name_mapping: None,
723 case_sensitive: false,
724 }),
725 Ok(FileScanTask {
726 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
727 .unwrap()
728 .len(),
729 start: 0,
730 length: 0,
731 record_count: None,
732 data_file_path: format!("{table_location}/file_1.parquet"),
733 data_file_format: DataFileFormat::Parquet,
734 schema: schema.clone(),
735 project_field_ids: vec![1, 2],
736 predicate: None,
737 deletes: vec![],
738 partition: None,
739 partition_spec: None,
740 name_mapping: None,
741 case_sensitive: false,
742 }),
743 Ok(FileScanTask {
744 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
745 .unwrap()
746 .len(),
747 start: 0,
748 length: 0,
749 record_count: None,
750 data_file_path: format!("{table_location}/file_2.parquet"),
751 data_file_format: DataFileFormat::Parquet,
752 schema: schema.clone(),
753 project_field_ids: vec![1, 2],
754 predicate: None,
755 deletes: vec![],
756 partition: None,
757 partition_spec: None,
758 name_mapping: None,
759 case_sensitive: false,
760 }),
761 ];
762
763 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
764
765 let result = reader
766 .read(tasks_stream)
767 .unwrap()
768 .stream()
769 .try_collect::<Vec<RecordBatch>>()
770 .await
771 .unwrap();
772
773 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
775 assert_eq!(total_rows, 30, "Should have 30 total rows");
776
777 let mut all_ids = Vec::new();
779 let mut all_file_nums = Vec::new();
780
781 for batch in &result {
782 let id_col = batch
783 .column(0)
784 .as_primitive::<arrow_array::types::Int32Type>();
785 let file_num_col = batch
786 .column(1)
787 .as_primitive::<arrow_array::types::Int32Type>();
788
789 for i in 0..batch.num_rows() {
790 all_ids.push(id_col.value(i));
791 all_file_nums.push(file_num_col.value(i));
792 }
793 }
794
795 assert_eq!(all_ids.len(), 30);
796 assert_eq!(all_file_nums.len(), 30);
797
798 for i in 0..10 {
803 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
804 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
805 }
806 for i in 10..20 {
807 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
808 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
809 }
810 for i in 20..30 {
811 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
812 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
813 }
814 }
815
816 #[tokio::test]
817 async fn test_read_int96_timestamps_with_field_ids() {
818 let schema = Arc::new(
819 Schema::builder()
820 .with_schema_id(1)
821 .with_fields(vec![
822 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
823 .into(),
824 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
825 ])
826 .build()
827 .unwrap(),
828 );
829
830 let tmp_dir = TempDir::new().unwrap();
831 let table_location = tmp_dir.path().to_str().unwrap().to_string();
832 let (file_path, expected_micros) =
833 write_int96_parquet_file(&table_location, "with_ids.parquet", true);
834
835 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
836 }
837
838 #[tokio::test]
839 async fn test_read_int96_timestamps_without_field_ids() {
840 let schema = Arc::new(
841 Schema::builder()
842 .with_schema_id(1)
843 .with_fields(vec![
844 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
845 .into(),
846 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
847 ])
848 .build()
849 .unwrap(),
850 );
851
852 let tmp_dir = TempDir::new().unwrap();
853 let table_location = tmp_dir.path().to_str().unwrap().to_string();
854 let (file_path, expected_micros) =
855 write_int96_parquet_file(&table_location, "no_ids.parquet", false);
856
857 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
858 }
859
860 #[tokio::test]
861 async fn test_read_int96_timestamps_in_struct() {
862 use arrow_array::{StructArray, TimestampMicrosecondArray};
863 use parquet::basic::{Repetition, Type as PhysicalType};
864 use parquet::data_type::Int96Type;
865 use parquet::file::writer::SerializedFileWriter;
866 use parquet::schema::types::Type as SchemaType;
867
868 let tmp_dir = TempDir::new().unwrap();
869 let table_location = tmp_dir.path().to_str().unwrap().to_string();
870 let file_path = format!("{table_location}/struct_int96.parquet");
871
872 let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
873 .with_repetition(Repetition::OPTIONAL)
874 .with_id(Some(2))
875 .build()
876 .unwrap();
877
878 let struct_type = SchemaType::group_type_builder("data")
879 .with_repetition(Repetition::REQUIRED)
880 .with_id(Some(1))
881 .with_fields(vec![Arc::new(ts_type)])
882 .build()
883 .unwrap();
884
885 let parquet_schema = SchemaType::group_type_builder("schema")
886 .with_fields(vec![Arc::new(struct_type)])
887 .build()
888 .unwrap();
889
890 let (int96_val, expected_micros) = make_int96_test_value();
891
892 let file = File::create(&file_path).unwrap();
893 let mut writer =
894 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
895
896 let mut row_group = writer.next_row_group().unwrap();
899 {
900 let mut col = row_group.next_column().unwrap().unwrap();
901 col.typed::<Int96Type>()
902 .write_batch(&[int96_val], Some(&[1]), None)
903 .unwrap();
904 col.close().unwrap();
905 }
906 row_group.close().unwrap();
907 writer.close().unwrap();
908
909 let iceberg_schema = Arc::new(
910 Schema::builder()
911 .with_schema_id(1)
912 .with_fields(vec![
913 NestedField::required(
914 1,
915 "data",
916 Type::Struct(crate::spec::StructType::new(vec![
917 NestedField::optional(
918 2,
919 "ts",
920 Type::Primitive(PrimitiveType::Timestamp),
921 )
922 .into(),
923 ])),
924 )
925 .into(),
926 ])
927 .build()
928 .unwrap(),
929 );
930
931 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
932
933 assert_eq!(batches.len(), 1);
934 let struct_array = batches[0]
935 .column(0)
936 .as_any()
937 .downcast_ref::<StructArray>()
938 .expect("Expected StructArray");
939 let ts_array = struct_array
940 .column(0)
941 .as_any()
942 .downcast_ref::<TimestampMicrosecondArray>()
943 .expect("Expected TimestampMicrosecondArray inside struct");
944
945 assert_eq!(
946 ts_array.value(0),
947 expected_micros,
948 "INT96 in struct: got {}, expected {expected_micros}",
949 ts_array.value(0)
950 );
951 }
952
953 #[tokio::test]
954 async fn test_read_int96_timestamps_in_list() {
955 use arrow_array::{ListArray, TimestampMicrosecondArray};
956 use parquet::basic::{Repetition, Type as PhysicalType};
957 use parquet::data_type::Int96Type;
958 use parquet::file::writer::SerializedFileWriter;
959 use parquet::schema::types::Type as SchemaType;
960
961 let tmp_dir = TempDir::new().unwrap();
962 let table_location = tmp_dir.path().to_str().unwrap().to_string();
963 let file_path = format!("{table_location}/list_int96.parquet");
964
965 let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
972 .with_repetition(Repetition::OPTIONAL)
973 .with_id(Some(2))
974 .build()
975 .unwrap();
976
977 let list_group = SchemaType::group_type_builder("list")
978 .with_repetition(Repetition::REPEATED)
979 .with_fields(vec![Arc::new(element_type)])
980 .build()
981 .unwrap();
982
983 let list_type = SchemaType::group_type_builder("timestamps")
984 .with_repetition(Repetition::OPTIONAL)
985 .with_id(Some(1))
986 .with_logical_type(Some(parquet::basic::LogicalType::List))
987 .with_fields(vec![Arc::new(list_group)])
988 .build()
989 .unwrap();
990
991 let parquet_schema = SchemaType::group_type_builder("schema")
992 .with_fields(vec![Arc::new(list_type)])
993 .build()
994 .unwrap();
995
996 let (int96_val, expected_micros) = make_int96_test_value();
997
998 let file = File::create(&file_path).unwrap();
999 let mut writer =
1000 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1001
1002 let mut row_group = writer.next_row_group().unwrap();
1006 {
1007 let mut col = row_group.next_column().unwrap().unwrap();
1008 col.typed::<Int96Type>()
1009 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1010 .unwrap();
1011 col.close().unwrap();
1012 }
1013 row_group.close().unwrap();
1014 writer.close().unwrap();
1015
1016 let iceberg_schema = Arc::new(
1017 Schema::builder()
1018 .with_schema_id(1)
1019 .with_fields(vec![
1020 NestedField::optional(
1021 1,
1022 "timestamps",
1023 Type::List(crate::spec::ListType {
1024 element_field: NestedField::optional(
1025 2,
1026 "element",
1027 Type::Primitive(PrimitiveType::Timestamp),
1028 )
1029 .into(),
1030 }),
1031 )
1032 .into(),
1033 ])
1034 .build()
1035 .unwrap(),
1036 );
1037
1038 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1039
1040 assert_eq!(batches.len(), 1);
1041 let list_array = batches[0]
1042 .column(0)
1043 .as_any()
1044 .downcast_ref::<ListArray>()
1045 .expect("Expected ListArray");
1046 let ts_array = list_array
1047 .values()
1048 .as_any()
1049 .downcast_ref::<TimestampMicrosecondArray>()
1050 .expect("Expected TimestampMicrosecondArray inside list");
1051
1052 assert_eq!(
1053 ts_array.value(0),
1054 expected_micros,
1055 "INT96 in list: got {}, expected {expected_micros}",
1056 ts_array.value(0)
1057 );
1058 }
1059
1060 #[tokio::test]
1061 async fn test_read_int96_timestamps_in_map() {
1062 use arrow_array::{MapArray, TimestampMicrosecondArray};
1063 use parquet::basic::{Repetition, Type as PhysicalType};
1064 use parquet::data_type::{ByteArrayType, Int96Type};
1065 use parquet::file::writer::SerializedFileWriter;
1066 use parquet::schema::types::Type as SchemaType;
1067
1068 let tmp_dir = TempDir::new().unwrap();
1069 let table_location = tmp_dir.path().to_str().unwrap().to_string();
1070 let file_path = format!("{table_location}/map_int96.parquet");
1071
1072 let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1080 .with_repetition(Repetition::REQUIRED)
1081 .with_logical_type(Some(parquet::basic::LogicalType::String))
1082 .with_id(Some(2))
1083 .build()
1084 .unwrap();
1085
1086 let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1087 .with_repetition(Repetition::OPTIONAL)
1088 .with_id(Some(3))
1089 .build()
1090 .unwrap();
1091
1092 let key_value_group = SchemaType::group_type_builder("key_value")
1093 .with_repetition(Repetition::REPEATED)
1094 .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1095 .build()
1096 .unwrap();
1097
1098 let map_type = SchemaType::group_type_builder("ts_map")
1099 .with_repetition(Repetition::OPTIONAL)
1100 .with_id(Some(1))
1101 .with_logical_type(Some(parquet::basic::LogicalType::Map))
1102 .with_fields(vec![Arc::new(key_value_group)])
1103 .build()
1104 .unwrap();
1105
1106 let parquet_schema = SchemaType::group_type_builder("schema")
1107 .with_fields(vec![Arc::new(map_type)])
1108 .build()
1109 .unwrap();
1110
1111 let (int96_val, expected_micros) = make_int96_test_value();
1112
1113 let file = File::create(&file_path).unwrap();
1114 let mut writer =
1115 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1116
1117 let mut row_group = writer.next_row_group().unwrap();
1122 {
1123 let mut col = row_group.next_column().unwrap().unwrap();
1124 col.typed::<ByteArrayType>()
1125 .write_batch(
1126 &[parquet::data_type::ByteArray::from("event_time")],
1127 Some(&[2]),
1128 Some(&[0]),
1129 )
1130 .unwrap();
1131 col.close().unwrap();
1132 }
1133 {
1134 let mut col = row_group.next_column().unwrap().unwrap();
1135 col.typed::<Int96Type>()
1136 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1137 .unwrap();
1138 col.close().unwrap();
1139 }
1140 row_group.close().unwrap();
1141 writer.close().unwrap();
1142
1143 let iceberg_schema = Arc::new(
1144 Schema::builder()
1145 .with_schema_id(1)
1146 .with_fields(vec![
1147 NestedField::optional(
1148 1,
1149 "ts_map",
1150 Type::Map(crate::spec::MapType {
1151 key_field: NestedField::required(
1152 2,
1153 "key",
1154 Type::Primitive(PrimitiveType::String),
1155 )
1156 .into(),
1157 value_field: NestedField::optional(
1158 3,
1159 "value",
1160 Type::Primitive(PrimitiveType::Timestamp),
1161 )
1162 .into(),
1163 }),
1164 )
1165 .into(),
1166 ])
1167 .build()
1168 .unwrap(),
1169 );
1170
1171 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1172
1173 assert_eq!(batches.len(), 1);
1174 let map_array = batches[0]
1175 .column(0)
1176 .as_any()
1177 .downcast_ref::<MapArray>()
1178 .expect("Expected MapArray");
1179 let ts_array = map_array
1180 .values()
1181 .as_any()
1182 .downcast_ref::<TimestampMicrosecondArray>()
1183 .expect("Expected TimestampMicrosecondArray as map values");
1184
1185 assert_eq!(
1186 ts_array.value(0),
1187 expected_micros,
1188 "INT96 in map: got {}, expected {expected_micros}",
1189 ts_array.value(0)
1190 );
1191 }
1192}