1use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31 ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32 apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46 pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49 let concurrency_limit_data_files = self.concurrency_limit_data_files;
50 let scan_metrics = ScanMetrics::new();
51
52 let task_reader = FileScanTaskReader {
53 batch_size: self.batch_size,
54 file_io: self.file_io,
55 delete_file_loader: self
56 .delete_file_loader
57 .with_scan_metrics(scan_metrics.clone()),
58 row_group_filtering_enabled: self.row_group_filtering_enabled,
59 row_selection_enabled: self.row_selection_enabled,
60 parquet_read_options: self.parquet_read_options,
61 scan_metrics: scan_metrics.clone(),
62 };
63
64 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66 Box::pin(
67 tasks
68 .and_then(move |task| task_reader.clone().process(task))
69 .map_err(|err| {
70 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71 .with_source(err)
72 })
73 .try_flatten(),
74 )
75 } else {
76 Box::pin(
77 tasks
78 .map_ok(move |task| task_reader.clone().process(task))
79 .map_err(|err| {
80 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81 .with_source(err)
82 })
83 .try_buffer_unordered(concurrency_limit_data_files)
84 .try_flatten_unordered(concurrency_limit_data_files),
85 )
86 };
87
88 Ok(ScanResult::new(stream, scan_metrics))
89 }
90}
91
92#[derive(Clone)]
95struct FileScanTaskReader {
96 batch_size: Option<usize>,
97 file_io: FileIO,
98 delete_file_loader: CachingDeleteFileLoader,
99 row_group_filtering_enabled: bool,
100 row_selection_enabled: bool,
101 parquet_read_options: ParquetReadOptions,
102 scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106 async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107 let should_load_page_index =
108 (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109 let mut parquet_read_options = self.parquet_read_options;
110 parquet_read_options.preload_page_index = should_load_page_index;
111
112 let delete_filter_rx = self
113 .delete_file_loader
114 .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118 &task.data_file_path,
119 &self.file_io,
120 task.file_size_in_bytes,
121 parquet_read_options,
122 self.scan_metrics.bytes_read_counter(),
123 )
124 .await?;
125
126 let missing_field_ids = arrow_metadata
130 .schema()
131 .fields()
132 .iter()
133 .next()
134 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136 let use_position_fallback = missing_field_ids && task.name_mapping.is_none();
141
142 let arrow_metadata = if missing_field_ids {
158 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
160 apply_name_mapping_to_arrow_schema(
165 Arc::clone(arrow_metadata.schema()),
166 name_mapping,
167 )?
168 } else {
169 add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
172 };
173
174 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
175 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
176 |e| {
177 Error::new(
178 ErrorKind::Unexpected,
179 "Failed to create ArrowReaderMetadata with field ID schema",
180 )
181 .with_source(e)
182 },
183 )?
184 } else {
185 arrow_metadata
187 };
188
189 let arrow_metadata = if let Some(coerced_schema) =
192 coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
193 {
194 let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
195 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
196 |e| {
197 Error::new(
198 ErrorKind::Unexpected,
199 format!(
200 "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
201 ),
202 )
203 .with_source(e)
204 },
205 )?
206 } else {
207 arrow_metadata
208 };
209
210 let mut record_batch_stream_builder =
212 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
213
214 let project_field_ids_without_metadata: Vec<i32> = task
216 .project_field_ids
217 .iter()
218 .filter(|&&id| !is_metadata_field(id))
219 .copied()
220 .collect();
221
222 let projection_mask = ArrowReader::get_arrow_projection_mask(
228 &project_field_ids_without_metadata,
229 &task.schema,
230 record_batch_stream_builder.parquet_schema(),
231 record_batch_stream_builder.schema(),
232 use_position_fallback, )?;
234
235 record_batch_stream_builder =
236 record_batch_stream_builder.with_projection(projection_mask.clone());
237
238 let mut record_batch_transformer_builder =
242 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
243
244 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
246 let file_datum = Datum::string(task.data_file_path.clone());
247 record_batch_transformer_builder =
248 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
249 }
250
251 if let (Some(partition_spec), Some(partition_data)) =
252 (task.partition_spec.clone(), task.partition.clone())
253 {
254 record_batch_transformer_builder =
255 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
256 }
257
258 let mut record_batch_transformer = record_batch_transformer_builder.build();
259
260 if let Some(batch_size) = self.batch_size {
261 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
262 }
263
264 let delete_filter = delete_filter_rx.await.unwrap()?;
265 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
266
267 let final_predicate = match (&task.predicate, delete_predicate) {
272 (None, None) => None,
273 (Some(predicate), None) => Some(predicate.clone()),
274 (None, Some(ref predicate)) => Some(predicate.clone()),
275 (Some(filter_predicate), Some(delete_predicate)) => {
276 Some(filter_predicate.clone().and(delete_predicate))
277 }
278 };
279
280 let mut selected_row_group_indices = None;
296 let mut row_selection = None;
297
298 if task.start != 0 || task.length != 0 {
301 let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
302 record_batch_stream_builder.metadata(),
303 task.start,
304 task.length,
305 )?;
306 selected_row_group_indices = Some(byte_range_filtered_row_groups);
307 }
308
309 if let Some(predicate) = final_predicate {
310 let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
311 record_batch_stream_builder.parquet_schema(),
312 record_batch_stream_builder.schema(),
313 &predicate,
314 use_position_fallback,
315 )?;
316
317 let row_filter = ArrowReader::get_row_filter(
318 &predicate,
319 record_batch_stream_builder.parquet_schema(),
320 &iceberg_field_ids,
321 &field_id_map,
322 )?;
323 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
324
325 if self.row_group_filtering_enabled {
326 let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
327 &predicate,
328 record_batch_stream_builder.metadata(),
329 &field_id_map,
330 &task.schema,
331 )?;
332
333 selected_row_group_indices = match selected_row_group_indices {
336 Some(byte_range_filtered) => {
337 let intersection: Vec<usize> = byte_range_filtered
339 .into_iter()
340 .filter(|idx| predicate_filtered_row_groups.contains(idx))
341 .collect();
342 Some(intersection)
343 }
344 None => Some(predicate_filtered_row_groups),
345 };
346 }
347
348 if self.row_selection_enabled {
349 row_selection = ArrowReader::get_row_selection_for_filter_predicate(
350 &predicate,
351 record_batch_stream_builder.metadata(),
352 &selected_row_group_indices,
353 &field_id_map,
354 &task.schema,
355 )?;
356 }
357 }
358
359 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
360
361 if let Some(positional_delete_indexes) = positional_delete_indexes {
362 let delete_row_selection = {
363 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
364
365 ArrowReader::build_deletes_row_selection(
366 record_batch_stream_builder.metadata().row_groups(),
367 &selected_row_group_indices,
368 &positional_delete_indexes,
369 )
370 }?;
371
372 row_selection = match row_selection {
375 None => Some(delete_row_selection),
376 Some(filter_row_selection) => {
377 Some(filter_row_selection.intersection(&delete_row_selection))
378 }
379 };
380 }
381
382 if let Some(row_selection) = row_selection {
383 record_batch_stream_builder =
384 record_batch_stream_builder.with_row_selection(row_selection);
385 }
386
387 if let Some(selected_row_group_indices) = selected_row_group_indices {
388 record_batch_stream_builder =
389 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
390 }
391
392 let record_batch_stream =
395 record_batch_stream_builder
396 .build()?
397 .map(move |batch| match batch {
398 Ok(batch) => {
399 record_batch_transformer.process_record_batch(batch)
401 }
402 Err(err) => Err(err.into()),
403 });
404
405 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
406 }
407}
408
409impl ArrowReader {
410 pub(crate) async fn open_parquet_file(
413 data_file_path: &str,
414 file_io: &FileIO,
415 file_size_in_bytes: u64,
416 parquet_read_options: ParquetReadOptions,
417 bytes_read: &Arc<AtomicU64>,
418 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
419 let parquet_file = file_io.new_input(data_file_path)?;
420 let counting_reader =
421 CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
422 Self::build_parquet_reader(
423 Box::new(counting_reader),
424 file_size_in_bytes,
425 parquet_read_options,
426 )
427 .await
428 }
429
430 async fn build_parquet_reader(
431 parquet_reader: Box<dyn FileRead>,
432 file_size_in_bytes: u64,
433 parquet_read_options: ParquetReadOptions,
434 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
435 let mut reader = ArrowFileReader::new(
436 FileMetadata {
437 size: file_size_in_bytes,
438 },
439 parquet_reader,
440 )
441 .with_parquet_read_options(parquet_read_options);
442
443 let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
444 .await
445 .map_err(|e| {
446 Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
447 })?;
448
449 Ok((reader, arrow_metadata))
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use std::collections::HashMap;
456 use std::fs::File;
457 use std::sync::Arc;
458
459 use arrow_array::cast::AsArray;
460 use arrow_array::{Array, ArrayRef, RecordBatch};
461 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
462 use futures::TryStreamExt;
463 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
464 use parquet::basic::Compression;
465 use parquet::file::properties::WriterProperties;
466 use tempfile::TempDir;
467
468 use crate::Runtime;
469 use crate::arrow::ArrowReaderBuilder;
470 use crate::io::FileIO;
471 use crate::scan::{FileScanTask, FileScanTaskStream};
472 use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
473
474 const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
477 const MICROS_PER_DAY: i64 = 86_400_000_000;
478 const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
480 const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
481
482 fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
483 let mut val = parquet::data_type::Int96::new();
484 val.set_data(
485 (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
486 (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
487 INT96_TEST_JULIAN_DAY,
488 );
489 let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
490 + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
491 (val, expected_micros)
492 }
493
494 async fn read_int96_batches(
495 file_path: &str,
496 schema: SchemaRef,
497 project_field_ids: Vec<i32>,
498 ) -> Vec<RecordBatch> {
499 let file_io = FileIO::new_with_fs();
500 let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
501
502 let file_size = std::fs::metadata(file_path).unwrap().len();
503 let task = FileScanTask::builder()
504 .with_file_size_in_bytes(file_size)
505 .with_start(0)
506 .with_length(file_size)
507 .with_data_file_path(file_path.to_string())
508 .with_data_file_format(DataFileFormat::Parquet)
509 .with_schema(schema)
510 .with_project_field_ids(project_field_ids)
511 .with_case_sensitive(false)
512 .build();
513
514 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
515 reader
516 .read(tasks)
517 .unwrap()
518 .stream()
519 .try_collect()
520 .await
521 .unwrap()
522 }
523
524 fn write_int96_parquet_file(
526 table_location: &str,
527 filename: &str,
528 with_field_ids: bool,
529 ) -> (String, Vec<i64>) {
530 use parquet::basic::{Repetition, Type as PhysicalType};
531 use parquet::data_type::{Int32Type, Int96, Int96Type};
532 use parquet::file::writer::SerializedFileWriter;
533 use parquet::schema::types::Type as SchemaType;
534
535 let file_path = format!("{table_location}/{filename}");
536
537 let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
538 .with_repetition(Repetition::OPTIONAL);
539 let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
540 .with_repetition(Repetition::REQUIRED);
541
542 if with_field_ids {
543 ts_builder = ts_builder.with_id(Some(1));
544 id_builder = id_builder.with_id(Some(2));
545 }
546
547 let schema = SchemaType::group_type_builder("schema")
548 .with_fields(vec![
549 Arc::new(ts_builder.build().unwrap()),
550 Arc::new(id_builder.build().unwrap()),
551 ])
552 .build()
553 .unwrap();
554
555 const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
557 const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
558 const JULIAN_2100: u32 = 2_488_070;
559
560 let test_data: Vec<(u32, u32, u32, i64)> = vec![
561 (
563 0,
564 0,
565 JULIAN_3333,
566 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
567 ),
568 (
570 (NOON_NANOS & 0xFFFFFFFF) as u32,
571 (NOON_NANOS >> 32) as u32,
572 JULIAN_3333,
573 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
574 + (NOON_NANOS / 1_000) as i64,
575 ),
576 (
578 0,
579 0,
580 JULIAN_2100,
581 (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
582 ),
583 ];
584
585 let int96_values: Vec<Int96> = test_data
586 .iter()
587 .map(|(lo, hi, day, _)| {
588 let mut v = Int96::new();
589 v.set_data(*lo, *hi, *day);
590 v
591 })
592 .collect();
593
594 let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
595 let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
596
597 let file = File::create(&file_path).unwrap();
598 let mut writer =
599 SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
600
601 let mut row_group = writer.next_row_group().unwrap();
602 {
603 let mut col = row_group.next_column().unwrap().unwrap();
605 col.typed::<Int96Type>()
606 .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
607 .unwrap();
608 col.close().unwrap();
609 }
610 {
611 let mut col = row_group.next_column().unwrap().unwrap();
612 col.typed::<Int32Type>()
613 .write_batch(&id_values, None, None)
614 .unwrap();
615 col.close().unwrap();
616 }
617 row_group.close().unwrap();
618 writer.close().unwrap();
619
620 (file_path, expected_micros)
621 }
622
623 async fn assert_int96_read_matches(
624 file_path: &str,
625 schema: SchemaRef,
626 project_field_ids: Vec<i32>,
627 expected_micros: &[i64],
628 ) {
629 use arrow_array::TimestampMicrosecondArray;
630
631 let batches = read_int96_batches(file_path, schema, project_field_ids).await;
632
633 assert_eq!(batches.len(), 1);
634 let ts_array = batches[0]
635 .column(0)
636 .as_any()
637 .downcast_ref::<TimestampMicrosecondArray>()
638 .expect("Expected TimestampMicrosecondArray");
639
640 for (i, expected) in expected_micros.iter().enumerate() {
641 assert_eq!(
642 ts_array.value(i),
643 *expected,
644 "Row {i}: got {}, expected {expected}",
645 ts_array.value(i)
646 );
647 }
648 }
649
650 #[tokio::test]
653 async fn test_read_with_concurrency_one() {
654 use arrow_array::Int32Array;
655
656 let schema = Arc::new(
657 Schema::builder()
658 .with_schema_id(1)
659 .with_fields(vec![
660 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
661 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
662 .into(),
663 ])
664 .build()
665 .unwrap(),
666 );
667
668 let arrow_schema = Arc::new(ArrowSchema::new(vec![
669 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
670 PARQUET_FIELD_ID_META_KEY.to_string(),
671 "1".to_string(),
672 )])),
673 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
674 PARQUET_FIELD_ID_META_KEY.to_string(),
675 "2".to_string(),
676 )])),
677 ]));
678
679 let tmp_dir = TempDir::new().unwrap();
680 let table_location = tmp_dir.path().to_str().unwrap().to_string();
681 let file_io = FileIO::new_with_fs();
682
683 let props = WriterProperties::builder()
685 .set_compression(Compression::SNAPPY)
686 .build();
687
688 for file_num in 0..3 {
689 let id_data = Arc::new(Int32Array::from_iter_values(
690 file_num * 10..(file_num + 1) * 10,
691 )) as ArrayRef;
692 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
693
694 let to_write =
695 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
696
697 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
698 let mut writer =
699 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
700 writer.write(&to_write).expect("Writing batch");
701 writer.close().unwrap();
702 }
703
704 let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
706 .with_data_file_concurrency_limit(1)
707 .build();
708
709 let tasks = vec![
711 Ok(FileScanTask::builder()
712 .with_file_size_in_bytes(
713 std::fs::metadata(format!("{table_location}/file_0.parquet"))
714 .unwrap()
715 .len(),
716 )
717 .with_start(0)
718 .with_length(0)
719 .with_data_file_path(format!("{table_location}/file_0.parquet"))
720 .with_data_file_format(DataFileFormat::Parquet)
721 .with_schema(schema.clone())
722 .with_project_field_ids(vec![1, 2])
723 .with_case_sensitive(false)
724 .build()),
725 Ok(FileScanTask::builder()
726 .with_file_size_in_bytes(
727 std::fs::metadata(format!("{table_location}/file_1.parquet"))
728 .unwrap()
729 .len(),
730 )
731 .with_start(0)
732 .with_length(0)
733 .with_data_file_path(format!("{table_location}/file_1.parquet"))
734 .with_data_file_format(DataFileFormat::Parquet)
735 .with_schema(schema.clone())
736 .with_project_field_ids(vec![1, 2])
737 .with_case_sensitive(false)
738 .build()),
739 Ok(FileScanTask::builder()
740 .with_file_size_in_bytes(
741 std::fs::metadata(format!("{table_location}/file_2.parquet"))
742 .unwrap()
743 .len(),
744 )
745 .with_start(0)
746 .with_length(0)
747 .with_data_file_path(format!("{table_location}/file_2.parquet"))
748 .with_data_file_format(DataFileFormat::Parquet)
749 .with_schema(schema.clone())
750 .with_project_field_ids(vec![1, 2])
751 .with_case_sensitive(false)
752 .build()),
753 ];
754
755 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
756
757 let result = reader
758 .read(tasks_stream)
759 .unwrap()
760 .stream()
761 .try_collect::<Vec<RecordBatch>>()
762 .await
763 .unwrap();
764
765 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
767 assert_eq!(total_rows, 30, "Should have 30 total rows");
768
769 let mut all_ids = Vec::new();
771 let mut all_file_nums = Vec::new();
772
773 for batch in &result {
774 let id_col = batch
775 .column(0)
776 .as_primitive::<arrow_array::types::Int32Type>();
777 let file_num_col = batch
778 .column(1)
779 .as_primitive::<arrow_array::types::Int32Type>();
780
781 for i in 0..batch.num_rows() {
782 all_ids.push(id_col.value(i));
783 all_file_nums.push(file_num_col.value(i));
784 }
785 }
786
787 assert_eq!(all_ids.len(), 30);
788 assert_eq!(all_file_nums.len(), 30);
789
790 for i in 0..10 {
795 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
796 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
797 }
798 for i in 10..20 {
799 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
800 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
801 }
802 for i in 20..30 {
803 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
804 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
805 }
806 }
807
808 #[tokio::test]
809 async fn test_read_int96_timestamps_with_field_ids() {
810 let schema = Arc::new(
811 Schema::builder()
812 .with_schema_id(1)
813 .with_fields(vec![
814 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
815 .into(),
816 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
817 ])
818 .build()
819 .unwrap(),
820 );
821
822 let tmp_dir = TempDir::new().unwrap();
823 let table_location = tmp_dir.path().to_str().unwrap().to_string();
824 let (file_path, expected_micros) =
825 write_int96_parquet_file(&table_location, "with_ids.parquet", true);
826
827 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
828 }
829
830 #[tokio::test]
831 async fn test_read_int96_timestamps_without_field_ids() {
832 let schema = Arc::new(
833 Schema::builder()
834 .with_schema_id(1)
835 .with_fields(vec![
836 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
837 .into(),
838 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
839 ])
840 .build()
841 .unwrap(),
842 );
843
844 let tmp_dir = TempDir::new().unwrap();
845 let table_location = tmp_dir.path().to_str().unwrap().to_string();
846 let (file_path, expected_micros) =
847 write_int96_parquet_file(&table_location, "no_ids.parquet", false);
848
849 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
850 }
851
852 #[tokio::test]
853 async fn test_read_int96_timestamps_in_struct() {
854 use arrow_array::{StructArray, TimestampMicrosecondArray};
855 use parquet::basic::{Repetition, Type as PhysicalType};
856 use parquet::data_type::Int96Type;
857 use parquet::file::writer::SerializedFileWriter;
858 use parquet::schema::types::Type as SchemaType;
859
860 let tmp_dir = TempDir::new().unwrap();
861 let table_location = tmp_dir.path().to_str().unwrap().to_string();
862 let file_path = format!("{table_location}/struct_int96.parquet");
863
864 let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
865 .with_repetition(Repetition::OPTIONAL)
866 .with_id(Some(2))
867 .build()
868 .unwrap();
869
870 let struct_type = SchemaType::group_type_builder("data")
871 .with_repetition(Repetition::REQUIRED)
872 .with_id(Some(1))
873 .with_fields(vec![Arc::new(ts_type)])
874 .build()
875 .unwrap();
876
877 let parquet_schema = SchemaType::group_type_builder("schema")
878 .with_fields(vec![Arc::new(struct_type)])
879 .build()
880 .unwrap();
881
882 let (int96_val, expected_micros) = make_int96_test_value();
883
884 let file = File::create(&file_path).unwrap();
885 let mut writer =
886 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
887
888 let mut row_group = writer.next_row_group().unwrap();
891 {
892 let mut col = row_group.next_column().unwrap().unwrap();
893 col.typed::<Int96Type>()
894 .write_batch(&[int96_val], Some(&[1]), None)
895 .unwrap();
896 col.close().unwrap();
897 }
898 row_group.close().unwrap();
899 writer.close().unwrap();
900
901 let iceberg_schema = Arc::new(
902 Schema::builder()
903 .with_schema_id(1)
904 .with_fields(vec![
905 NestedField::required(
906 1,
907 "data",
908 Type::Struct(crate::spec::StructType::new(vec![
909 NestedField::optional(
910 2,
911 "ts",
912 Type::Primitive(PrimitiveType::Timestamp),
913 )
914 .into(),
915 ])),
916 )
917 .into(),
918 ])
919 .build()
920 .unwrap(),
921 );
922
923 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
924
925 assert_eq!(batches.len(), 1);
926 let struct_array = batches[0]
927 .column(0)
928 .as_any()
929 .downcast_ref::<StructArray>()
930 .expect("Expected StructArray");
931 let ts_array = struct_array
932 .column(0)
933 .as_any()
934 .downcast_ref::<TimestampMicrosecondArray>()
935 .expect("Expected TimestampMicrosecondArray inside struct");
936
937 assert_eq!(
938 ts_array.value(0),
939 expected_micros,
940 "INT96 in struct: got {}, expected {expected_micros}",
941 ts_array.value(0)
942 );
943 }
944
945 #[tokio::test]
946 async fn test_read_int96_timestamps_in_list() {
947 use arrow_array::{ListArray, TimestampMicrosecondArray};
948 use parquet::basic::{Repetition, Type as PhysicalType};
949 use parquet::data_type::Int96Type;
950 use parquet::file::writer::SerializedFileWriter;
951 use parquet::schema::types::Type as SchemaType;
952
953 let tmp_dir = TempDir::new().unwrap();
954 let table_location = tmp_dir.path().to_str().unwrap().to_string();
955 let file_path = format!("{table_location}/list_int96.parquet");
956
957 let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
964 .with_repetition(Repetition::OPTIONAL)
965 .with_id(Some(2))
966 .build()
967 .unwrap();
968
969 let list_group = SchemaType::group_type_builder("list")
970 .with_repetition(Repetition::REPEATED)
971 .with_fields(vec![Arc::new(element_type)])
972 .build()
973 .unwrap();
974
975 let list_type = SchemaType::group_type_builder("timestamps")
976 .with_repetition(Repetition::OPTIONAL)
977 .with_id(Some(1))
978 .with_logical_type(Some(parquet::basic::LogicalType::List))
979 .with_fields(vec![Arc::new(list_group)])
980 .build()
981 .unwrap();
982
983 let parquet_schema = SchemaType::group_type_builder("schema")
984 .with_fields(vec![Arc::new(list_type)])
985 .build()
986 .unwrap();
987
988 let (int96_val, expected_micros) = make_int96_test_value();
989
990 let file = File::create(&file_path).unwrap();
991 let mut writer =
992 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
993
994 let mut row_group = writer.next_row_group().unwrap();
998 {
999 let mut col = row_group.next_column().unwrap().unwrap();
1000 col.typed::<Int96Type>()
1001 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1002 .unwrap();
1003 col.close().unwrap();
1004 }
1005 row_group.close().unwrap();
1006 writer.close().unwrap();
1007
1008 let iceberg_schema = Arc::new(
1009 Schema::builder()
1010 .with_schema_id(1)
1011 .with_fields(vec![
1012 NestedField::optional(
1013 1,
1014 "timestamps",
1015 Type::List(crate::spec::ListType {
1016 element_field: NestedField::optional(
1017 2,
1018 "element",
1019 Type::Primitive(PrimitiveType::Timestamp),
1020 )
1021 .into(),
1022 }),
1023 )
1024 .into(),
1025 ])
1026 .build()
1027 .unwrap(),
1028 );
1029
1030 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1031
1032 assert_eq!(batches.len(), 1);
1033 let list_array = batches[0]
1034 .column(0)
1035 .as_any()
1036 .downcast_ref::<ListArray>()
1037 .expect("Expected ListArray");
1038 let ts_array = list_array
1039 .values()
1040 .as_any()
1041 .downcast_ref::<TimestampMicrosecondArray>()
1042 .expect("Expected TimestampMicrosecondArray inside list");
1043
1044 assert_eq!(
1045 ts_array.value(0),
1046 expected_micros,
1047 "INT96 in list: got {}, expected {expected_micros}",
1048 ts_array.value(0)
1049 );
1050 }
1051
1052 #[tokio::test]
1053 async fn test_read_int96_timestamps_in_map() {
1054 use arrow_array::{MapArray, TimestampMicrosecondArray};
1055 use parquet::basic::{Repetition, Type as PhysicalType};
1056 use parquet::data_type::{ByteArrayType, Int96Type};
1057 use parquet::file::writer::SerializedFileWriter;
1058 use parquet::schema::types::Type as SchemaType;
1059
1060 let tmp_dir = TempDir::new().unwrap();
1061 let table_location = tmp_dir.path().to_str().unwrap().to_string();
1062 let file_path = format!("{table_location}/map_int96.parquet");
1063
1064 let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1072 .with_repetition(Repetition::REQUIRED)
1073 .with_logical_type(Some(parquet::basic::LogicalType::String))
1074 .with_id(Some(2))
1075 .build()
1076 .unwrap();
1077
1078 let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1079 .with_repetition(Repetition::OPTIONAL)
1080 .with_id(Some(3))
1081 .build()
1082 .unwrap();
1083
1084 let key_value_group = SchemaType::group_type_builder("key_value")
1085 .with_repetition(Repetition::REPEATED)
1086 .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1087 .build()
1088 .unwrap();
1089
1090 let map_type = SchemaType::group_type_builder("ts_map")
1091 .with_repetition(Repetition::OPTIONAL)
1092 .with_id(Some(1))
1093 .with_logical_type(Some(parquet::basic::LogicalType::Map))
1094 .with_fields(vec![Arc::new(key_value_group)])
1095 .build()
1096 .unwrap();
1097
1098 let parquet_schema = SchemaType::group_type_builder("schema")
1099 .with_fields(vec![Arc::new(map_type)])
1100 .build()
1101 .unwrap();
1102
1103 let (int96_val, expected_micros) = make_int96_test_value();
1104
1105 let file = File::create(&file_path).unwrap();
1106 let mut writer =
1107 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1108
1109 let mut row_group = writer.next_row_group().unwrap();
1114 {
1115 let mut col = row_group.next_column().unwrap().unwrap();
1116 col.typed::<ByteArrayType>()
1117 .write_batch(
1118 &[parquet::data_type::ByteArray::from("event_time")],
1119 Some(&[2]),
1120 Some(&[0]),
1121 )
1122 .unwrap();
1123 col.close().unwrap();
1124 }
1125 {
1126 let mut col = row_group.next_column().unwrap().unwrap();
1127 col.typed::<Int96Type>()
1128 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1129 .unwrap();
1130 col.close().unwrap();
1131 }
1132 row_group.close().unwrap();
1133 writer.close().unwrap();
1134
1135 let iceberg_schema = Arc::new(
1136 Schema::builder()
1137 .with_schema_id(1)
1138 .with_fields(vec![
1139 NestedField::optional(
1140 1,
1141 "ts_map",
1142 Type::Map(crate::spec::MapType {
1143 key_field: NestedField::required(
1144 2,
1145 "key",
1146 Type::Primitive(PrimitiveType::String),
1147 )
1148 .into(),
1149 value_field: NestedField::optional(
1150 3,
1151 "value",
1152 Type::Primitive(PrimitiveType::Timestamp),
1153 )
1154 .into(),
1155 }),
1156 )
1157 .into(),
1158 ])
1159 .build()
1160 .unwrap(),
1161 );
1162
1163 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1164
1165 assert_eq!(batches.len(), 1);
1166 let map_array = batches[0]
1167 .column(0)
1168 .as_any()
1169 .downcast_ref::<MapArray>()
1170 .expect("Expected MapArray");
1171 let ts_array = map_array
1172 .values()
1173 .as_any()
1174 .downcast_ref::<TimestampMicrosecondArray>()
1175 .expect("Expected TimestampMicrosecondArray as map values");
1176
1177 assert_eq!(
1178 ts_array.value(0),
1179 expected_micros,
1180 "INT96 in map: got {}, expected {expected_micros}",
1181 ts_array.value(0)
1182 );
1183 }
1184}