1use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31 ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32 apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46 pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49 let concurrency_limit_data_files = self.concurrency_limit_data_files;
50 let scan_metrics = ScanMetrics::new();
51
52 let task_reader = FileScanTaskReader {
53 batch_size: self.batch_size,
54 file_io: self.file_io,
55 delete_file_loader: self
56 .delete_file_loader
57 .with_scan_metrics(scan_metrics.clone()),
58 row_group_filtering_enabled: self.row_group_filtering_enabled,
59 row_selection_enabled: self.row_selection_enabled,
60 parquet_read_options: self.parquet_read_options,
61 scan_metrics: scan_metrics.clone(),
62 };
63
64 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66 Box::pin(
67 tasks
68 .and_then(move |task| task_reader.clone().process(task))
69 .map_err(|err| {
70 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71 .with_source(err)
72 })
73 .try_flatten(),
74 )
75 } else {
76 Box::pin(
77 tasks
78 .map_ok(move |task| task_reader.clone().process(task))
79 .map_err(|err| {
80 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81 .with_source(err)
82 })
83 .try_buffer_unordered(concurrency_limit_data_files)
84 .try_flatten_unordered(concurrency_limit_data_files),
85 )
86 };
87
88 Ok(ScanResult::new(stream, scan_metrics))
89 }
90}
91
92#[derive(Clone)]
95struct FileScanTaskReader {
96 batch_size: Option<usize>,
97 file_io: FileIO,
98 delete_file_loader: CachingDeleteFileLoader,
99 row_group_filtering_enabled: bool,
100 row_selection_enabled: bool,
101 parquet_read_options: ParquetReadOptions,
102 scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106 async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107 let should_load_page_index =
108 (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109 let mut parquet_read_options = self.parquet_read_options;
110 parquet_read_options.preload_page_index = should_load_page_index;
111
112 let delete_filter_rx = self
113 .delete_file_loader
114 .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118 &task.data_file_path,
119 &self.file_io,
120 task.file_size_in_bytes,
121 parquet_read_options,
122 self.scan_metrics.bytes_read_counter(),
123 )
124 .await?;
125
126 let missing_field_ids = arrow_metadata
130 .schema()
131 .fields()
132 .iter()
133 .next()
134 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136 let arrow_metadata = if missing_field_ids {
152 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154 apply_name_mapping_to_arrow_schema(
159 Arc::clone(arrow_metadata.schema()),
160 name_mapping,
161 )?
162 } else {
163 add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166 };
167
168 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170 |e| {
171 Error::new(
172 ErrorKind::Unexpected,
173 "Failed to create ArrowReaderMetadata with field ID schema",
174 )
175 .with_source(e)
176 },
177 )?
178 } else {
179 arrow_metadata
181 };
182
183 let arrow_metadata = if let Some(coerced_schema) =
186 coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187 {
188 let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190 |e| {
191 Error::new(
192 ErrorKind::Unexpected,
193 format!(
194 "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195 ),
196 )
197 .with_source(e)
198 },
199 )?
200 } else {
201 arrow_metadata
202 };
203
204 let mut record_batch_stream_builder =
206 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208 let project_field_ids_without_metadata: Vec<i32> = task
210 .project_field_ids
211 .iter()
212 .filter(|&&id| !is_metadata_field(id))
213 .copied()
214 .collect();
215
216 let projection_mask = ArrowReader::get_arrow_projection_mask(
221 &project_field_ids_without_metadata,
222 &task.schema,
223 record_batch_stream_builder.parquet_schema(),
224 record_batch_stream_builder.schema(),
225 missing_field_ids, )?;
227
228 record_batch_stream_builder =
229 record_batch_stream_builder.with_projection(projection_mask.clone());
230
231 let mut record_batch_transformer_builder =
235 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239 let file_datum = Datum::string(task.data_file_path.clone());
240 record_batch_transformer_builder =
241 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242 }
243
244 if let (Some(partition_spec), Some(partition_data)) =
245 (task.partition_spec.clone(), task.partition.clone())
246 {
247 record_batch_transformer_builder =
248 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249 }
250
251 let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253 if let Some(batch_size) = self.batch_size {
254 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255 }
256
257 let delete_filter = delete_filter_rx.await.unwrap()?;
258 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260 let final_predicate = match (&task.predicate, delete_predicate) {
265 (None, None) => None,
266 (Some(predicate), None) => Some(predicate.clone()),
267 (None, Some(ref predicate)) => Some(predicate.clone()),
268 (Some(filter_predicate), Some(delete_predicate)) => {
269 Some(filter_predicate.clone().and(delete_predicate))
270 }
271 };
272
273 let mut selected_row_group_indices = None;
289 let mut row_selection = None;
290
291 if task.start != 0 || task.length != 0 {
294 let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295 record_batch_stream_builder.metadata(),
296 task.start,
297 task.length,
298 )?;
299 selected_row_group_indices = Some(byte_range_filtered_row_groups);
300 }
301
302 if let Some(predicate) = final_predicate {
303 let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304 record_batch_stream_builder.parquet_schema(),
305 &predicate,
306 )?;
307
308 let row_filter = ArrowReader::get_row_filter(
309 &predicate,
310 record_batch_stream_builder.parquet_schema(),
311 &iceberg_field_ids,
312 &field_id_map,
313 )?;
314 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316 if self.row_group_filtering_enabled {
317 let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318 &predicate,
319 record_batch_stream_builder.metadata(),
320 &field_id_map,
321 &task.schema,
322 )?;
323
324 selected_row_group_indices = match selected_row_group_indices {
327 Some(byte_range_filtered) => {
328 let intersection: Vec<usize> = byte_range_filtered
330 .into_iter()
331 .filter(|idx| predicate_filtered_row_groups.contains(idx))
332 .collect();
333 Some(intersection)
334 }
335 None => Some(predicate_filtered_row_groups),
336 };
337 }
338
339 if self.row_selection_enabled {
340 row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341 &predicate,
342 record_batch_stream_builder.metadata(),
343 &selected_row_group_indices,
344 &field_id_map,
345 &task.schema,
346 )?);
347 }
348 }
349
350 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352 if let Some(positional_delete_indexes) = positional_delete_indexes {
353 let delete_row_selection = {
354 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356 ArrowReader::build_deletes_row_selection(
357 record_batch_stream_builder.metadata().row_groups(),
358 &selected_row_group_indices,
359 &positional_delete_indexes,
360 )
361 }?;
362
363 row_selection = match row_selection {
366 None => Some(delete_row_selection),
367 Some(filter_row_selection) => {
368 Some(filter_row_selection.intersection(&delete_row_selection))
369 }
370 };
371 }
372
373 if let Some(row_selection) = row_selection {
374 record_batch_stream_builder =
375 record_batch_stream_builder.with_row_selection(row_selection);
376 }
377
378 if let Some(selected_row_group_indices) = selected_row_group_indices {
379 record_batch_stream_builder =
380 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381 }
382
383 let record_batch_stream =
386 record_batch_stream_builder
387 .build()?
388 .map(move |batch| match batch {
389 Ok(batch) => {
390 record_batch_transformer.process_record_batch(batch)
392 }
393 Err(err) => Err(err.into()),
394 });
395
396 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397 }
398}
399
400impl ArrowReader {
401 pub(crate) async fn open_parquet_file(
404 data_file_path: &str,
405 file_io: &FileIO,
406 file_size_in_bytes: u64,
407 parquet_read_options: ParquetReadOptions,
408 bytes_read: &Arc<AtomicU64>,
409 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410 let parquet_file = file_io.new_input(data_file_path)?;
411 let counting_reader =
412 CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413 Self::build_parquet_reader(
414 Box::new(counting_reader),
415 file_size_in_bytes,
416 parquet_read_options,
417 )
418 .await
419 }
420
421 async fn build_parquet_reader(
422 parquet_reader: Box<dyn FileRead>,
423 file_size_in_bytes: u64,
424 parquet_read_options: ParquetReadOptions,
425 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426 let mut reader = ArrowFileReader::new(
427 FileMetadata {
428 size: file_size_in_bytes,
429 },
430 parquet_reader,
431 )
432 .with_parquet_read_options(parquet_read_options);
433
434 let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435 .await
436 .map_err(|e| {
437 Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438 })?;
439
440 Ok((reader, arrow_metadata))
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::collections::HashMap;
447 use std::fs::File;
448 use std::sync::Arc;
449
450 use arrow_array::cast::AsArray;
451 use arrow_array::{Array, ArrayRef, RecordBatch};
452 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453 use futures::TryStreamExt;
454 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455 use parquet::basic::Compression;
456 use parquet::file::properties::WriterProperties;
457 use tempfile::TempDir;
458
459 use crate::Runtime;
460 use crate::arrow::ArrowReaderBuilder;
461 use crate::io::FileIO;
462 use crate::scan::{FileScanTask, FileScanTaskStream};
463 use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
464
465 const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
468 const MICROS_PER_DAY: i64 = 86_400_000_000;
469 const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
471 const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
472
473 fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
474 let mut val = parquet::data_type::Int96::new();
475 val.set_data(
476 (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
477 (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
478 INT96_TEST_JULIAN_DAY,
479 );
480 let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
481 + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
482 (val, expected_micros)
483 }
484
485 async fn read_int96_batches(
486 file_path: &str,
487 schema: SchemaRef,
488 project_field_ids: Vec<i32>,
489 ) -> Vec<RecordBatch> {
490 let file_io = FileIO::new_with_fs();
491 let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
492
493 let file_size = std::fs::metadata(file_path).unwrap().len();
494 let task = FileScanTask {
495 file_size_in_bytes: file_size,
496 start: 0,
497 length: file_size,
498 record_count: None,
499 data_file_path: file_path.to_string(),
500 data_file_format: DataFileFormat::Parquet,
501 schema,
502 project_field_ids,
503 predicate: None,
504 deletes: vec![],
505 partition: None,
506 partition_spec: None,
507 name_mapping: None,
508 case_sensitive: false,
509 };
510
511 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
512 reader
513 .read(tasks)
514 .unwrap()
515 .stream()
516 .try_collect()
517 .await
518 .unwrap()
519 }
520
521 fn write_int96_parquet_file(
523 table_location: &str,
524 filename: &str,
525 with_field_ids: bool,
526 ) -> (String, Vec<i64>) {
527 use parquet::basic::{Repetition, Type as PhysicalType};
528 use parquet::data_type::{Int32Type, Int96, Int96Type};
529 use parquet::file::writer::SerializedFileWriter;
530 use parquet::schema::types::Type as SchemaType;
531
532 let file_path = format!("{table_location}/{filename}");
533
534 let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
535 .with_repetition(Repetition::OPTIONAL);
536 let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
537 .with_repetition(Repetition::REQUIRED);
538
539 if with_field_ids {
540 ts_builder = ts_builder.with_id(Some(1));
541 id_builder = id_builder.with_id(Some(2));
542 }
543
544 let schema = SchemaType::group_type_builder("schema")
545 .with_fields(vec![
546 Arc::new(ts_builder.build().unwrap()),
547 Arc::new(id_builder.build().unwrap()),
548 ])
549 .build()
550 .unwrap();
551
552 const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
554 const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
555 const JULIAN_2100: u32 = 2_488_070;
556
557 let test_data: Vec<(u32, u32, u32, i64)> = vec![
558 (
560 0,
561 0,
562 JULIAN_3333,
563 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
564 ),
565 (
567 (NOON_NANOS & 0xFFFFFFFF) as u32,
568 (NOON_NANOS >> 32) as u32,
569 JULIAN_3333,
570 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
571 + (NOON_NANOS / 1_000) as i64,
572 ),
573 (
575 0,
576 0,
577 JULIAN_2100,
578 (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
579 ),
580 ];
581
582 let int96_values: Vec<Int96> = test_data
583 .iter()
584 .map(|(lo, hi, day, _)| {
585 let mut v = Int96::new();
586 v.set_data(*lo, *hi, *day);
587 v
588 })
589 .collect();
590
591 let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
592 let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
593
594 let file = File::create(&file_path).unwrap();
595 let mut writer =
596 SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
597
598 let mut row_group = writer.next_row_group().unwrap();
599 {
600 let mut col = row_group.next_column().unwrap().unwrap();
602 col.typed::<Int96Type>()
603 .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
604 .unwrap();
605 col.close().unwrap();
606 }
607 {
608 let mut col = row_group.next_column().unwrap().unwrap();
609 col.typed::<Int32Type>()
610 .write_batch(&id_values, None, None)
611 .unwrap();
612 col.close().unwrap();
613 }
614 row_group.close().unwrap();
615 writer.close().unwrap();
616
617 (file_path, expected_micros)
618 }
619
620 async fn assert_int96_read_matches(
621 file_path: &str,
622 schema: SchemaRef,
623 project_field_ids: Vec<i32>,
624 expected_micros: &[i64],
625 ) {
626 use arrow_array::TimestampMicrosecondArray;
627
628 let batches = read_int96_batches(file_path, schema, project_field_ids).await;
629
630 assert_eq!(batches.len(), 1);
631 let ts_array = batches[0]
632 .column(0)
633 .as_any()
634 .downcast_ref::<TimestampMicrosecondArray>()
635 .expect("Expected TimestampMicrosecondArray");
636
637 for (i, expected) in expected_micros.iter().enumerate() {
638 assert_eq!(
639 ts_array.value(i),
640 *expected,
641 "Row {i}: got {}, expected {expected}",
642 ts_array.value(i)
643 );
644 }
645 }
646
647 #[tokio::test]
650 async fn test_read_with_concurrency_one() {
651 use arrow_array::Int32Array;
652
653 let schema = Arc::new(
654 Schema::builder()
655 .with_schema_id(1)
656 .with_fields(vec![
657 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
658 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
659 .into(),
660 ])
661 .build()
662 .unwrap(),
663 );
664
665 let arrow_schema = Arc::new(ArrowSchema::new(vec![
666 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
667 PARQUET_FIELD_ID_META_KEY.to_string(),
668 "1".to_string(),
669 )])),
670 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
671 PARQUET_FIELD_ID_META_KEY.to_string(),
672 "2".to_string(),
673 )])),
674 ]));
675
676 let tmp_dir = TempDir::new().unwrap();
677 let table_location = tmp_dir.path().to_str().unwrap().to_string();
678 let file_io = FileIO::new_with_fs();
679
680 let props = WriterProperties::builder()
682 .set_compression(Compression::SNAPPY)
683 .build();
684
685 for file_num in 0..3 {
686 let id_data = Arc::new(Int32Array::from_iter_values(
687 file_num * 10..(file_num + 1) * 10,
688 )) as ArrayRef;
689 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
690
691 let to_write =
692 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
693
694 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
695 let mut writer =
696 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
697 writer.write(&to_write).expect("Writing batch");
698 writer.close().unwrap();
699 }
700
701 let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
703 .with_data_file_concurrency_limit(1)
704 .build();
705
706 let tasks = vec![
708 Ok(FileScanTask {
709 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
710 .unwrap()
711 .len(),
712 start: 0,
713 length: 0,
714 record_count: None,
715 data_file_path: format!("{table_location}/file_0.parquet"),
716 data_file_format: DataFileFormat::Parquet,
717 schema: schema.clone(),
718 project_field_ids: vec![1, 2],
719 predicate: None,
720 deletes: vec![],
721 partition: None,
722 partition_spec: None,
723 name_mapping: None,
724 case_sensitive: false,
725 }),
726 Ok(FileScanTask {
727 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
728 .unwrap()
729 .len(),
730 start: 0,
731 length: 0,
732 record_count: None,
733 data_file_path: format!("{table_location}/file_1.parquet"),
734 data_file_format: DataFileFormat::Parquet,
735 schema: schema.clone(),
736 project_field_ids: vec![1, 2],
737 predicate: None,
738 deletes: vec![],
739 partition: None,
740 partition_spec: None,
741 name_mapping: None,
742 case_sensitive: false,
743 }),
744 Ok(FileScanTask {
745 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
746 .unwrap()
747 .len(),
748 start: 0,
749 length: 0,
750 record_count: None,
751 data_file_path: format!("{table_location}/file_2.parquet"),
752 data_file_format: DataFileFormat::Parquet,
753 schema: schema.clone(),
754 project_field_ids: vec![1, 2],
755 predicate: None,
756 deletes: vec![],
757 partition: None,
758 partition_spec: None,
759 name_mapping: None,
760 case_sensitive: false,
761 }),
762 ];
763
764 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
765
766 let result = reader
767 .read(tasks_stream)
768 .unwrap()
769 .stream()
770 .try_collect::<Vec<RecordBatch>>()
771 .await
772 .unwrap();
773
774 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
776 assert_eq!(total_rows, 30, "Should have 30 total rows");
777
778 let mut all_ids = Vec::new();
780 let mut all_file_nums = Vec::new();
781
782 for batch in &result {
783 let id_col = batch
784 .column(0)
785 .as_primitive::<arrow_array::types::Int32Type>();
786 let file_num_col = batch
787 .column(1)
788 .as_primitive::<arrow_array::types::Int32Type>();
789
790 for i in 0..batch.num_rows() {
791 all_ids.push(id_col.value(i));
792 all_file_nums.push(file_num_col.value(i));
793 }
794 }
795
796 assert_eq!(all_ids.len(), 30);
797 assert_eq!(all_file_nums.len(), 30);
798
799 for i in 0..10 {
804 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
805 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
806 }
807 for i in 10..20 {
808 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
809 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
810 }
811 for i in 20..30 {
812 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
813 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
814 }
815 }
816
817 #[tokio::test]
818 async fn test_read_int96_timestamps_with_field_ids() {
819 let schema = Arc::new(
820 Schema::builder()
821 .with_schema_id(1)
822 .with_fields(vec![
823 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
824 .into(),
825 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
826 ])
827 .build()
828 .unwrap(),
829 );
830
831 let tmp_dir = TempDir::new().unwrap();
832 let table_location = tmp_dir.path().to_str().unwrap().to_string();
833 let (file_path, expected_micros) =
834 write_int96_parquet_file(&table_location, "with_ids.parquet", true);
835
836 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
837 }
838
839 #[tokio::test]
840 async fn test_read_int96_timestamps_without_field_ids() {
841 let schema = Arc::new(
842 Schema::builder()
843 .with_schema_id(1)
844 .with_fields(vec![
845 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
846 .into(),
847 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
848 ])
849 .build()
850 .unwrap(),
851 );
852
853 let tmp_dir = TempDir::new().unwrap();
854 let table_location = tmp_dir.path().to_str().unwrap().to_string();
855 let (file_path, expected_micros) =
856 write_int96_parquet_file(&table_location, "no_ids.parquet", false);
857
858 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
859 }
860
861 #[tokio::test]
862 async fn test_read_int96_timestamps_in_struct() {
863 use arrow_array::{StructArray, TimestampMicrosecondArray};
864 use parquet::basic::{Repetition, Type as PhysicalType};
865 use parquet::data_type::Int96Type;
866 use parquet::file::writer::SerializedFileWriter;
867 use parquet::schema::types::Type as SchemaType;
868
869 let tmp_dir = TempDir::new().unwrap();
870 let table_location = tmp_dir.path().to_str().unwrap().to_string();
871 let file_path = format!("{table_location}/struct_int96.parquet");
872
873 let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
874 .with_repetition(Repetition::OPTIONAL)
875 .with_id(Some(2))
876 .build()
877 .unwrap();
878
879 let struct_type = SchemaType::group_type_builder("data")
880 .with_repetition(Repetition::REQUIRED)
881 .with_id(Some(1))
882 .with_fields(vec![Arc::new(ts_type)])
883 .build()
884 .unwrap();
885
886 let parquet_schema = SchemaType::group_type_builder("schema")
887 .with_fields(vec![Arc::new(struct_type)])
888 .build()
889 .unwrap();
890
891 let (int96_val, expected_micros) = make_int96_test_value();
892
893 let file = File::create(&file_path).unwrap();
894 let mut writer =
895 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
896
897 let mut row_group = writer.next_row_group().unwrap();
900 {
901 let mut col = row_group.next_column().unwrap().unwrap();
902 col.typed::<Int96Type>()
903 .write_batch(&[int96_val], Some(&[1]), None)
904 .unwrap();
905 col.close().unwrap();
906 }
907 row_group.close().unwrap();
908 writer.close().unwrap();
909
910 let iceberg_schema = Arc::new(
911 Schema::builder()
912 .with_schema_id(1)
913 .with_fields(vec![
914 NestedField::required(
915 1,
916 "data",
917 Type::Struct(crate::spec::StructType::new(vec![
918 NestedField::optional(
919 2,
920 "ts",
921 Type::Primitive(PrimitiveType::Timestamp),
922 )
923 .into(),
924 ])),
925 )
926 .into(),
927 ])
928 .build()
929 .unwrap(),
930 );
931
932 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
933
934 assert_eq!(batches.len(), 1);
935 let struct_array = batches[0]
936 .column(0)
937 .as_any()
938 .downcast_ref::<StructArray>()
939 .expect("Expected StructArray");
940 let ts_array = struct_array
941 .column(0)
942 .as_any()
943 .downcast_ref::<TimestampMicrosecondArray>()
944 .expect("Expected TimestampMicrosecondArray inside struct");
945
946 assert_eq!(
947 ts_array.value(0),
948 expected_micros,
949 "INT96 in struct: got {}, expected {expected_micros}",
950 ts_array.value(0)
951 );
952 }
953
954 #[tokio::test]
955 async fn test_read_int96_timestamps_in_list() {
956 use arrow_array::{ListArray, TimestampMicrosecondArray};
957 use parquet::basic::{Repetition, Type as PhysicalType};
958 use parquet::data_type::Int96Type;
959 use parquet::file::writer::SerializedFileWriter;
960 use parquet::schema::types::Type as SchemaType;
961
962 let tmp_dir = TempDir::new().unwrap();
963 let table_location = tmp_dir.path().to_str().unwrap().to_string();
964 let file_path = format!("{table_location}/list_int96.parquet");
965
966 let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
973 .with_repetition(Repetition::OPTIONAL)
974 .with_id(Some(2))
975 .build()
976 .unwrap();
977
978 let list_group = SchemaType::group_type_builder("list")
979 .with_repetition(Repetition::REPEATED)
980 .with_fields(vec![Arc::new(element_type)])
981 .build()
982 .unwrap();
983
984 let list_type = SchemaType::group_type_builder("timestamps")
985 .with_repetition(Repetition::OPTIONAL)
986 .with_id(Some(1))
987 .with_logical_type(Some(parquet::basic::LogicalType::List))
988 .with_fields(vec![Arc::new(list_group)])
989 .build()
990 .unwrap();
991
992 let parquet_schema = SchemaType::group_type_builder("schema")
993 .with_fields(vec![Arc::new(list_type)])
994 .build()
995 .unwrap();
996
997 let (int96_val, expected_micros) = make_int96_test_value();
998
999 let file = File::create(&file_path).unwrap();
1000 let mut writer =
1001 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1002
1003 let mut row_group = writer.next_row_group().unwrap();
1007 {
1008 let mut col = row_group.next_column().unwrap().unwrap();
1009 col.typed::<Int96Type>()
1010 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1011 .unwrap();
1012 col.close().unwrap();
1013 }
1014 row_group.close().unwrap();
1015 writer.close().unwrap();
1016
1017 let iceberg_schema = Arc::new(
1018 Schema::builder()
1019 .with_schema_id(1)
1020 .with_fields(vec![
1021 NestedField::optional(
1022 1,
1023 "timestamps",
1024 Type::List(crate::spec::ListType {
1025 element_field: NestedField::optional(
1026 2,
1027 "element",
1028 Type::Primitive(PrimitiveType::Timestamp),
1029 )
1030 .into(),
1031 }),
1032 )
1033 .into(),
1034 ])
1035 .build()
1036 .unwrap(),
1037 );
1038
1039 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1040
1041 assert_eq!(batches.len(), 1);
1042 let list_array = batches[0]
1043 .column(0)
1044 .as_any()
1045 .downcast_ref::<ListArray>()
1046 .expect("Expected ListArray");
1047 let ts_array = list_array
1048 .values()
1049 .as_any()
1050 .downcast_ref::<TimestampMicrosecondArray>()
1051 .expect("Expected TimestampMicrosecondArray inside list");
1052
1053 assert_eq!(
1054 ts_array.value(0),
1055 expected_micros,
1056 "INT96 in list: got {}, expected {expected_micros}",
1057 ts_array.value(0)
1058 );
1059 }
1060
1061 #[tokio::test]
1062 async fn test_read_int96_timestamps_in_map() {
1063 use arrow_array::{MapArray, TimestampMicrosecondArray};
1064 use parquet::basic::{Repetition, Type as PhysicalType};
1065 use parquet::data_type::{ByteArrayType, Int96Type};
1066 use parquet::file::writer::SerializedFileWriter;
1067 use parquet::schema::types::Type as SchemaType;
1068
1069 let tmp_dir = TempDir::new().unwrap();
1070 let table_location = tmp_dir.path().to_str().unwrap().to_string();
1071 let file_path = format!("{table_location}/map_int96.parquet");
1072
1073 let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1081 .with_repetition(Repetition::REQUIRED)
1082 .with_logical_type(Some(parquet::basic::LogicalType::String))
1083 .with_id(Some(2))
1084 .build()
1085 .unwrap();
1086
1087 let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1088 .with_repetition(Repetition::OPTIONAL)
1089 .with_id(Some(3))
1090 .build()
1091 .unwrap();
1092
1093 let key_value_group = SchemaType::group_type_builder("key_value")
1094 .with_repetition(Repetition::REPEATED)
1095 .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1096 .build()
1097 .unwrap();
1098
1099 let map_type = SchemaType::group_type_builder("ts_map")
1100 .with_repetition(Repetition::OPTIONAL)
1101 .with_id(Some(1))
1102 .with_logical_type(Some(parquet::basic::LogicalType::Map))
1103 .with_fields(vec![Arc::new(key_value_group)])
1104 .build()
1105 .unwrap();
1106
1107 let parquet_schema = SchemaType::group_type_builder("schema")
1108 .with_fields(vec![Arc::new(map_type)])
1109 .build()
1110 .unwrap();
1111
1112 let (int96_val, expected_micros) = make_int96_test_value();
1113
1114 let file = File::create(&file_path).unwrap();
1115 let mut writer =
1116 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1117
1118 let mut row_group = writer.next_row_group().unwrap();
1123 {
1124 let mut col = row_group.next_column().unwrap().unwrap();
1125 col.typed::<ByteArrayType>()
1126 .write_batch(
1127 &[parquet::data_type::ByteArray::from("event_time")],
1128 Some(&[2]),
1129 Some(&[0]),
1130 )
1131 .unwrap();
1132 col.close().unwrap();
1133 }
1134 {
1135 let mut col = row_group.next_column().unwrap().unwrap();
1136 col.typed::<Int96Type>()
1137 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1138 .unwrap();
1139 col.close().unwrap();
1140 }
1141 row_group.close().unwrap();
1142 writer.close().unwrap();
1143
1144 let iceberg_schema = Arc::new(
1145 Schema::builder()
1146 .with_schema_id(1)
1147 .with_fields(vec![
1148 NestedField::optional(
1149 1,
1150 "ts_map",
1151 Type::Map(crate::spec::MapType {
1152 key_field: NestedField::required(
1153 2,
1154 "key",
1155 Type::Primitive(PrimitiveType::String),
1156 )
1157 .into(),
1158 value_field: NestedField::optional(
1159 3,
1160 "value",
1161 Type::Primitive(PrimitiveType::Timestamp),
1162 )
1163 .into(),
1164 }),
1165 )
1166 .into(),
1167 ])
1168 .build()
1169 .unwrap(),
1170 );
1171
1172 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1173
1174 assert_eq!(batches.len(), 1);
1175 let map_array = batches[0]
1176 .column(0)
1177 .as_any()
1178 .downcast_ref::<MapArray>()
1179 .expect("Expected MapArray");
1180 let ts_array = map_array
1181 .values()
1182 .as_any()
1183 .downcast_ref::<TimestampMicrosecondArray>()
1184 .expect("Expected TimestampMicrosecondArray as map values");
1185
1186 assert_eq!(
1187 ts_array.value(0),
1188 expected_micros,
1189 "INT96 in map: got {}, expected {expected_micros}",
1190 ts_array.value(0)
1191 );
1192 }
1193}