1use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31 ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32 apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46 pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49 let concurrency_limit_data_files = self.concurrency_limit_data_files;
50 let scan_metrics = ScanMetrics::new();
51
52 let task_reader = FileScanTaskReader {
53 batch_size: self.batch_size,
54 file_io: self.file_io,
55 delete_file_loader: self
56 .delete_file_loader
57 .with_scan_metrics(scan_metrics.clone()),
58 row_group_filtering_enabled: self.row_group_filtering_enabled,
59 row_selection_enabled: self.row_selection_enabled,
60 parquet_read_options: self.parquet_read_options,
61 scan_metrics: scan_metrics.clone(),
62 };
63
64 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66 Box::pin(
67 tasks
68 .and_then(move |task| task_reader.clone().process(task))
69 .map_err(|err| {
70 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71 .with_source(err)
72 })
73 .try_flatten(),
74 )
75 } else {
76 Box::pin(
77 tasks
78 .map_ok(move |task| task_reader.clone().process(task))
79 .map_err(|err| {
80 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81 .with_source(err)
82 })
83 .try_buffer_unordered(concurrency_limit_data_files)
84 .try_flatten_unordered(concurrency_limit_data_files),
85 )
86 };
87
88 Ok(ScanResult::new(stream, scan_metrics))
89 }
90}
91
92#[derive(Clone)]
95struct FileScanTaskReader {
96 batch_size: Option<usize>,
97 file_io: FileIO,
98 delete_file_loader: CachingDeleteFileLoader,
99 row_group_filtering_enabled: bool,
100 row_selection_enabled: bool,
101 parquet_read_options: ParquetReadOptions,
102 scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106 async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107 let should_load_page_index =
108 (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109 let mut parquet_read_options = self.parquet_read_options;
110 parquet_read_options.preload_page_index = should_load_page_index;
111
112 let delete_filter_rx = self
113 .delete_file_loader
114 .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118 &task.data_file_path,
119 &self.file_io,
120 task.file_size_in_bytes,
121 parquet_read_options,
122 self.scan_metrics.bytes_read_counter(),
123 )
124 .await?;
125
126 let missing_field_ids = arrow_metadata
130 .schema()
131 .fields()
132 .iter()
133 .next()
134 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136 let arrow_metadata = if missing_field_ids {
152 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154 apply_name_mapping_to_arrow_schema(
159 Arc::clone(arrow_metadata.schema()),
160 name_mapping,
161 )?
162 } else {
163 add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166 };
167
168 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170 |e| {
171 Error::new(
172 ErrorKind::Unexpected,
173 "Failed to create ArrowReaderMetadata with field ID schema",
174 )
175 .with_source(e)
176 },
177 )?
178 } else {
179 arrow_metadata
181 };
182
183 let arrow_metadata = if let Some(coerced_schema) =
186 coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187 {
188 let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190 |e| {
191 Error::new(
192 ErrorKind::Unexpected,
193 format!(
194 "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195 ),
196 )
197 .with_source(e)
198 },
199 )?
200 } else {
201 arrow_metadata
202 };
203
204 let mut record_batch_stream_builder =
206 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208 let project_field_ids_without_metadata: Vec<i32> = task
210 .project_field_ids
211 .iter()
212 .filter(|&&id| !is_metadata_field(id))
213 .copied()
214 .collect();
215
216 let projection_mask = ArrowReader::get_arrow_projection_mask(
221 &project_field_ids_without_metadata,
222 &task.schema,
223 record_batch_stream_builder.parquet_schema(),
224 record_batch_stream_builder.schema(),
225 missing_field_ids, )?;
227
228 record_batch_stream_builder =
229 record_batch_stream_builder.with_projection(projection_mask.clone());
230
231 let mut record_batch_transformer_builder =
235 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239 let file_datum = Datum::string(task.data_file_path.clone());
240 record_batch_transformer_builder =
241 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242 }
243
244 if let (Some(partition_spec), Some(partition_data)) =
245 (task.partition_spec.clone(), task.partition.clone())
246 {
247 record_batch_transformer_builder =
248 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249 }
250
251 let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253 if let Some(batch_size) = self.batch_size {
254 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255 }
256
257 let delete_filter = delete_filter_rx.await.unwrap()?;
258 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260 let final_predicate = match (&task.predicate, delete_predicate) {
265 (None, None) => None,
266 (Some(predicate), None) => Some(predicate.clone()),
267 (None, Some(ref predicate)) => Some(predicate.clone()),
268 (Some(filter_predicate), Some(delete_predicate)) => {
269 Some(filter_predicate.clone().and(delete_predicate))
270 }
271 };
272
273 let mut selected_row_group_indices = None;
289 let mut row_selection = None;
290
291 if task.start != 0 || task.length != 0 {
294 let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295 record_batch_stream_builder.metadata(),
296 task.start,
297 task.length,
298 )?;
299 selected_row_group_indices = Some(byte_range_filtered_row_groups);
300 }
301
302 if let Some(predicate) = final_predicate {
303 let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304 record_batch_stream_builder.parquet_schema(),
305 &predicate,
306 )?;
307
308 let row_filter = ArrowReader::get_row_filter(
309 &predicate,
310 record_batch_stream_builder.parquet_schema(),
311 &iceberg_field_ids,
312 &field_id_map,
313 )?;
314 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316 if self.row_group_filtering_enabled {
317 let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318 &predicate,
319 record_batch_stream_builder.metadata(),
320 &field_id_map,
321 &task.schema,
322 )?;
323
324 selected_row_group_indices = match selected_row_group_indices {
327 Some(byte_range_filtered) => {
328 let intersection: Vec<usize> = byte_range_filtered
330 .into_iter()
331 .filter(|idx| predicate_filtered_row_groups.contains(idx))
332 .collect();
333 Some(intersection)
334 }
335 None => Some(predicate_filtered_row_groups),
336 };
337 }
338
339 if self.row_selection_enabled {
340 row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341 &predicate,
342 record_batch_stream_builder.metadata(),
343 &selected_row_group_indices,
344 &field_id_map,
345 &task.schema,
346 )?);
347 }
348 }
349
350 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352 if let Some(positional_delete_indexes) = positional_delete_indexes {
353 let delete_row_selection = {
354 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356 ArrowReader::build_deletes_row_selection(
357 record_batch_stream_builder.metadata().row_groups(),
358 &selected_row_group_indices,
359 &positional_delete_indexes,
360 )
361 }?;
362
363 row_selection = match row_selection {
366 None => Some(delete_row_selection),
367 Some(filter_row_selection) => {
368 Some(filter_row_selection.intersection(&delete_row_selection))
369 }
370 };
371 }
372
373 if let Some(row_selection) = row_selection {
374 record_batch_stream_builder =
375 record_batch_stream_builder.with_row_selection(row_selection);
376 }
377
378 if let Some(selected_row_group_indices) = selected_row_group_indices {
379 record_batch_stream_builder =
380 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381 }
382
383 let record_batch_stream =
386 record_batch_stream_builder
387 .build()?
388 .map(move |batch| match batch {
389 Ok(batch) => {
390 record_batch_transformer.process_record_batch(batch)
392 }
393 Err(err) => Err(err.into()),
394 });
395
396 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397 }
398}
399
400impl ArrowReader {
401 pub(crate) async fn open_parquet_file(
404 data_file_path: &str,
405 file_io: &FileIO,
406 file_size_in_bytes: u64,
407 parquet_read_options: ParquetReadOptions,
408 bytes_read: &Arc<AtomicU64>,
409 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410 let parquet_file = file_io.new_input(data_file_path)?;
411 let counting_reader =
412 CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413 Self::build_parquet_reader(
414 Box::new(counting_reader),
415 file_size_in_bytes,
416 parquet_read_options,
417 )
418 .await
419 }
420
421 async fn build_parquet_reader(
422 parquet_reader: Box<dyn FileRead>,
423 file_size_in_bytes: u64,
424 parquet_read_options: ParquetReadOptions,
425 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426 let mut reader = ArrowFileReader::new(
427 FileMetadata {
428 size: file_size_in_bytes,
429 },
430 parquet_reader,
431 )
432 .with_parquet_read_options(parquet_read_options);
433
434 let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435 .await
436 .map_err(|e| {
437 Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438 })?;
439
440 Ok((reader, arrow_metadata))
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::collections::HashMap;
447 use std::fs::File;
448 use std::sync::Arc;
449
450 use arrow_array::cast::AsArray;
451 use arrow_array::{Array, ArrayRef, RecordBatch};
452 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453 use futures::TryStreamExt;
454 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455 use parquet::basic::Compression;
456 use parquet::file::properties::WriterProperties;
457 use tempfile::TempDir;
458
459 use crate::Runtime;
460 use crate::arrow::ArrowReaderBuilder;
461 use crate::io::FileIO;
462 use crate::scan::{FileScanTask, FileScanTaskStream};
463 use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
464
465 const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
468 const MICROS_PER_DAY: i64 = 86_400_000_000;
469 const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
471 const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
472
473 fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
474 let mut val = parquet::data_type::Int96::new();
475 val.set_data(
476 (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
477 (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
478 INT96_TEST_JULIAN_DAY,
479 );
480 let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
481 + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
482 (val, expected_micros)
483 }
484
485 async fn read_int96_batches(
486 file_path: &str,
487 schema: SchemaRef,
488 project_field_ids: Vec<i32>,
489 ) -> Vec<RecordBatch> {
490 let file_io = FileIO::new_with_fs();
491 let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
492
493 let file_size = std::fs::metadata(file_path).unwrap().len();
494 let task = FileScanTask::builder()
495 .with_file_size_in_bytes(file_size)
496 .with_start(0)
497 .with_length(file_size)
498 .with_data_file_path(file_path.to_string())
499 .with_data_file_format(DataFileFormat::Parquet)
500 .with_schema(schema)
501 .with_project_field_ids(project_field_ids)
502 .with_case_sensitive(false)
503 .build();
504
505 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
506 reader
507 .read(tasks)
508 .unwrap()
509 .stream()
510 .try_collect()
511 .await
512 .unwrap()
513 }
514
515 fn write_int96_parquet_file(
517 table_location: &str,
518 filename: &str,
519 with_field_ids: bool,
520 ) -> (String, Vec<i64>) {
521 use parquet::basic::{Repetition, Type as PhysicalType};
522 use parquet::data_type::{Int32Type, Int96, Int96Type};
523 use parquet::file::writer::SerializedFileWriter;
524 use parquet::schema::types::Type as SchemaType;
525
526 let file_path = format!("{table_location}/{filename}");
527
528 let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
529 .with_repetition(Repetition::OPTIONAL);
530 let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
531 .with_repetition(Repetition::REQUIRED);
532
533 if with_field_ids {
534 ts_builder = ts_builder.with_id(Some(1));
535 id_builder = id_builder.with_id(Some(2));
536 }
537
538 let schema = SchemaType::group_type_builder("schema")
539 .with_fields(vec![
540 Arc::new(ts_builder.build().unwrap()),
541 Arc::new(id_builder.build().unwrap()),
542 ])
543 .build()
544 .unwrap();
545
546 const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
548 const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
549 const JULIAN_2100: u32 = 2_488_070;
550
551 let test_data: Vec<(u32, u32, u32, i64)> = vec![
552 (
554 0,
555 0,
556 JULIAN_3333,
557 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
558 ),
559 (
561 (NOON_NANOS & 0xFFFFFFFF) as u32,
562 (NOON_NANOS >> 32) as u32,
563 JULIAN_3333,
564 (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
565 + (NOON_NANOS / 1_000) as i64,
566 ),
567 (
569 0,
570 0,
571 JULIAN_2100,
572 (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
573 ),
574 ];
575
576 let int96_values: Vec<Int96> = test_data
577 .iter()
578 .map(|(lo, hi, day, _)| {
579 let mut v = Int96::new();
580 v.set_data(*lo, *hi, *day);
581 v
582 })
583 .collect();
584
585 let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
586 let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
587
588 let file = File::create(&file_path).unwrap();
589 let mut writer =
590 SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
591
592 let mut row_group = writer.next_row_group().unwrap();
593 {
594 let mut col = row_group.next_column().unwrap().unwrap();
596 col.typed::<Int96Type>()
597 .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
598 .unwrap();
599 col.close().unwrap();
600 }
601 {
602 let mut col = row_group.next_column().unwrap().unwrap();
603 col.typed::<Int32Type>()
604 .write_batch(&id_values, None, None)
605 .unwrap();
606 col.close().unwrap();
607 }
608 row_group.close().unwrap();
609 writer.close().unwrap();
610
611 (file_path, expected_micros)
612 }
613
614 async fn assert_int96_read_matches(
615 file_path: &str,
616 schema: SchemaRef,
617 project_field_ids: Vec<i32>,
618 expected_micros: &[i64],
619 ) {
620 use arrow_array::TimestampMicrosecondArray;
621
622 let batches = read_int96_batches(file_path, schema, project_field_ids).await;
623
624 assert_eq!(batches.len(), 1);
625 let ts_array = batches[0]
626 .column(0)
627 .as_any()
628 .downcast_ref::<TimestampMicrosecondArray>()
629 .expect("Expected TimestampMicrosecondArray");
630
631 for (i, expected) in expected_micros.iter().enumerate() {
632 assert_eq!(
633 ts_array.value(i),
634 *expected,
635 "Row {i}: got {}, expected {expected}",
636 ts_array.value(i)
637 );
638 }
639 }
640
641 #[tokio::test]
644 async fn test_read_with_concurrency_one() {
645 use arrow_array::Int32Array;
646
647 let schema = Arc::new(
648 Schema::builder()
649 .with_schema_id(1)
650 .with_fields(vec![
651 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
652 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
653 .into(),
654 ])
655 .build()
656 .unwrap(),
657 );
658
659 let arrow_schema = Arc::new(ArrowSchema::new(vec![
660 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
661 PARQUET_FIELD_ID_META_KEY.to_string(),
662 "1".to_string(),
663 )])),
664 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
665 PARQUET_FIELD_ID_META_KEY.to_string(),
666 "2".to_string(),
667 )])),
668 ]));
669
670 let tmp_dir = TempDir::new().unwrap();
671 let table_location = tmp_dir.path().to_str().unwrap().to_string();
672 let file_io = FileIO::new_with_fs();
673
674 let props = WriterProperties::builder()
676 .set_compression(Compression::SNAPPY)
677 .build();
678
679 for file_num in 0..3 {
680 let id_data = Arc::new(Int32Array::from_iter_values(
681 file_num * 10..(file_num + 1) * 10,
682 )) as ArrayRef;
683 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
684
685 let to_write =
686 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
687
688 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
689 let mut writer =
690 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
691 writer.write(&to_write).expect("Writing batch");
692 writer.close().unwrap();
693 }
694
695 let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
697 .with_data_file_concurrency_limit(1)
698 .build();
699
700 let tasks = vec![
702 Ok(FileScanTask::builder()
703 .with_file_size_in_bytes(
704 std::fs::metadata(format!("{table_location}/file_0.parquet"))
705 .unwrap()
706 .len(),
707 )
708 .with_start(0)
709 .with_length(0)
710 .with_data_file_path(format!("{table_location}/file_0.parquet"))
711 .with_data_file_format(DataFileFormat::Parquet)
712 .with_schema(schema.clone())
713 .with_project_field_ids(vec![1, 2])
714 .with_case_sensitive(false)
715 .build()),
716 Ok(FileScanTask::builder()
717 .with_file_size_in_bytes(
718 std::fs::metadata(format!("{table_location}/file_1.parquet"))
719 .unwrap()
720 .len(),
721 )
722 .with_start(0)
723 .with_length(0)
724 .with_data_file_path(format!("{table_location}/file_1.parquet"))
725 .with_data_file_format(DataFileFormat::Parquet)
726 .with_schema(schema.clone())
727 .with_project_field_ids(vec![1, 2])
728 .with_case_sensitive(false)
729 .build()),
730 Ok(FileScanTask::builder()
731 .with_file_size_in_bytes(
732 std::fs::metadata(format!("{table_location}/file_2.parquet"))
733 .unwrap()
734 .len(),
735 )
736 .with_start(0)
737 .with_length(0)
738 .with_data_file_path(format!("{table_location}/file_2.parquet"))
739 .with_data_file_format(DataFileFormat::Parquet)
740 .with_schema(schema.clone())
741 .with_project_field_ids(vec![1, 2])
742 .with_case_sensitive(false)
743 .build()),
744 ];
745
746 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
747
748 let result = reader
749 .read(tasks_stream)
750 .unwrap()
751 .stream()
752 .try_collect::<Vec<RecordBatch>>()
753 .await
754 .unwrap();
755
756 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
758 assert_eq!(total_rows, 30, "Should have 30 total rows");
759
760 let mut all_ids = Vec::new();
762 let mut all_file_nums = Vec::new();
763
764 for batch in &result {
765 let id_col = batch
766 .column(0)
767 .as_primitive::<arrow_array::types::Int32Type>();
768 let file_num_col = batch
769 .column(1)
770 .as_primitive::<arrow_array::types::Int32Type>();
771
772 for i in 0..batch.num_rows() {
773 all_ids.push(id_col.value(i));
774 all_file_nums.push(file_num_col.value(i));
775 }
776 }
777
778 assert_eq!(all_ids.len(), 30);
779 assert_eq!(all_file_nums.len(), 30);
780
781 for i in 0..10 {
786 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
787 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
788 }
789 for i in 10..20 {
790 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
791 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
792 }
793 for i in 20..30 {
794 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
795 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
796 }
797 }
798
799 #[tokio::test]
800 async fn test_read_int96_timestamps_with_field_ids() {
801 let schema = Arc::new(
802 Schema::builder()
803 .with_schema_id(1)
804 .with_fields(vec![
805 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
806 .into(),
807 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
808 ])
809 .build()
810 .unwrap(),
811 );
812
813 let tmp_dir = TempDir::new().unwrap();
814 let table_location = tmp_dir.path().to_str().unwrap().to_string();
815 let (file_path, expected_micros) =
816 write_int96_parquet_file(&table_location, "with_ids.parquet", true);
817
818 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
819 }
820
821 #[tokio::test]
822 async fn test_read_int96_timestamps_without_field_ids() {
823 let schema = Arc::new(
824 Schema::builder()
825 .with_schema_id(1)
826 .with_fields(vec![
827 NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
828 .into(),
829 NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
830 ])
831 .build()
832 .unwrap(),
833 );
834
835 let tmp_dir = TempDir::new().unwrap();
836 let table_location = tmp_dir.path().to_str().unwrap().to_string();
837 let (file_path, expected_micros) =
838 write_int96_parquet_file(&table_location, "no_ids.parquet", false);
839
840 assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
841 }
842
843 #[tokio::test]
844 async fn test_read_int96_timestamps_in_struct() {
845 use arrow_array::{StructArray, TimestampMicrosecondArray};
846 use parquet::basic::{Repetition, Type as PhysicalType};
847 use parquet::data_type::Int96Type;
848 use parquet::file::writer::SerializedFileWriter;
849 use parquet::schema::types::Type as SchemaType;
850
851 let tmp_dir = TempDir::new().unwrap();
852 let table_location = tmp_dir.path().to_str().unwrap().to_string();
853 let file_path = format!("{table_location}/struct_int96.parquet");
854
855 let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
856 .with_repetition(Repetition::OPTIONAL)
857 .with_id(Some(2))
858 .build()
859 .unwrap();
860
861 let struct_type = SchemaType::group_type_builder("data")
862 .with_repetition(Repetition::REQUIRED)
863 .with_id(Some(1))
864 .with_fields(vec![Arc::new(ts_type)])
865 .build()
866 .unwrap();
867
868 let parquet_schema = SchemaType::group_type_builder("schema")
869 .with_fields(vec![Arc::new(struct_type)])
870 .build()
871 .unwrap();
872
873 let (int96_val, expected_micros) = make_int96_test_value();
874
875 let file = File::create(&file_path).unwrap();
876 let mut writer =
877 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
878
879 let mut row_group = writer.next_row_group().unwrap();
882 {
883 let mut col = row_group.next_column().unwrap().unwrap();
884 col.typed::<Int96Type>()
885 .write_batch(&[int96_val], Some(&[1]), None)
886 .unwrap();
887 col.close().unwrap();
888 }
889 row_group.close().unwrap();
890 writer.close().unwrap();
891
892 let iceberg_schema = Arc::new(
893 Schema::builder()
894 .with_schema_id(1)
895 .with_fields(vec![
896 NestedField::required(
897 1,
898 "data",
899 Type::Struct(crate::spec::StructType::new(vec![
900 NestedField::optional(
901 2,
902 "ts",
903 Type::Primitive(PrimitiveType::Timestamp),
904 )
905 .into(),
906 ])),
907 )
908 .into(),
909 ])
910 .build()
911 .unwrap(),
912 );
913
914 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
915
916 assert_eq!(batches.len(), 1);
917 let struct_array = batches[0]
918 .column(0)
919 .as_any()
920 .downcast_ref::<StructArray>()
921 .expect("Expected StructArray");
922 let ts_array = struct_array
923 .column(0)
924 .as_any()
925 .downcast_ref::<TimestampMicrosecondArray>()
926 .expect("Expected TimestampMicrosecondArray inside struct");
927
928 assert_eq!(
929 ts_array.value(0),
930 expected_micros,
931 "INT96 in struct: got {}, expected {expected_micros}",
932 ts_array.value(0)
933 );
934 }
935
936 #[tokio::test]
937 async fn test_read_int96_timestamps_in_list() {
938 use arrow_array::{ListArray, TimestampMicrosecondArray};
939 use parquet::basic::{Repetition, Type as PhysicalType};
940 use parquet::data_type::Int96Type;
941 use parquet::file::writer::SerializedFileWriter;
942 use parquet::schema::types::Type as SchemaType;
943
944 let tmp_dir = TempDir::new().unwrap();
945 let table_location = tmp_dir.path().to_str().unwrap().to_string();
946 let file_path = format!("{table_location}/list_int96.parquet");
947
948 let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
955 .with_repetition(Repetition::OPTIONAL)
956 .with_id(Some(2))
957 .build()
958 .unwrap();
959
960 let list_group = SchemaType::group_type_builder("list")
961 .with_repetition(Repetition::REPEATED)
962 .with_fields(vec![Arc::new(element_type)])
963 .build()
964 .unwrap();
965
966 let list_type = SchemaType::group_type_builder("timestamps")
967 .with_repetition(Repetition::OPTIONAL)
968 .with_id(Some(1))
969 .with_logical_type(Some(parquet::basic::LogicalType::List))
970 .with_fields(vec![Arc::new(list_group)])
971 .build()
972 .unwrap();
973
974 let parquet_schema = SchemaType::group_type_builder("schema")
975 .with_fields(vec![Arc::new(list_type)])
976 .build()
977 .unwrap();
978
979 let (int96_val, expected_micros) = make_int96_test_value();
980
981 let file = File::create(&file_path).unwrap();
982 let mut writer =
983 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
984
985 let mut row_group = writer.next_row_group().unwrap();
989 {
990 let mut col = row_group.next_column().unwrap().unwrap();
991 col.typed::<Int96Type>()
992 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
993 .unwrap();
994 col.close().unwrap();
995 }
996 row_group.close().unwrap();
997 writer.close().unwrap();
998
999 let iceberg_schema = Arc::new(
1000 Schema::builder()
1001 .with_schema_id(1)
1002 .with_fields(vec![
1003 NestedField::optional(
1004 1,
1005 "timestamps",
1006 Type::List(crate::spec::ListType {
1007 element_field: NestedField::optional(
1008 2,
1009 "element",
1010 Type::Primitive(PrimitiveType::Timestamp),
1011 )
1012 .into(),
1013 }),
1014 )
1015 .into(),
1016 ])
1017 .build()
1018 .unwrap(),
1019 );
1020
1021 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1022
1023 assert_eq!(batches.len(), 1);
1024 let list_array = batches[0]
1025 .column(0)
1026 .as_any()
1027 .downcast_ref::<ListArray>()
1028 .expect("Expected ListArray");
1029 let ts_array = list_array
1030 .values()
1031 .as_any()
1032 .downcast_ref::<TimestampMicrosecondArray>()
1033 .expect("Expected TimestampMicrosecondArray inside list");
1034
1035 assert_eq!(
1036 ts_array.value(0),
1037 expected_micros,
1038 "INT96 in list: got {}, expected {expected_micros}",
1039 ts_array.value(0)
1040 );
1041 }
1042
1043 #[tokio::test]
1044 async fn test_read_int96_timestamps_in_map() {
1045 use arrow_array::{MapArray, TimestampMicrosecondArray};
1046 use parquet::basic::{Repetition, Type as PhysicalType};
1047 use parquet::data_type::{ByteArrayType, Int96Type};
1048 use parquet::file::writer::SerializedFileWriter;
1049 use parquet::schema::types::Type as SchemaType;
1050
1051 let tmp_dir = TempDir::new().unwrap();
1052 let table_location = tmp_dir.path().to_str().unwrap().to_string();
1053 let file_path = format!("{table_location}/map_int96.parquet");
1054
1055 let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1063 .with_repetition(Repetition::REQUIRED)
1064 .with_logical_type(Some(parquet::basic::LogicalType::String))
1065 .with_id(Some(2))
1066 .build()
1067 .unwrap();
1068
1069 let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1070 .with_repetition(Repetition::OPTIONAL)
1071 .with_id(Some(3))
1072 .build()
1073 .unwrap();
1074
1075 let key_value_group = SchemaType::group_type_builder("key_value")
1076 .with_repetition(Repetition::REPEATED)
1077 .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1078 .build()
1079 .unwrap();
1080
1081 let map_type = SchemaType::group_type_builder("ts_map")
1082 .with_repetition(Repetition::OPTIONAL)
1083 .with_id(Some(1))
1084 .with_logical_type(Some(parquet::basic::LogicalType::Map))
1085 .with_fields(vec![Arc::new(key_value_group)])
1086 .build()
1087 .unwrap();
1088
1089 let parquet_schema = SchemaType::group_type_builder("schema")
1090 .with_fields(vec![Arc::new(map_type)])
1091 .build()
1092 .unwrap();
1093
1094 let (int96_val, expected_micros) = make_int96_test_value();
1095
1096 let file = File::create(&file_path).unwrap();
1097 let mut writer =
1098 SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1099
1100 let mut row_group = writer.next_row_group().unwrap();
1105 {
1106 let mut col = row_group.next_column().unwrap().unwrap();
1107 col.typed::<ByteArrayType>()
1108 .write_batch(
1109 &[parquet::data_type::ByteArray::from("event_time")],
1110 Some(&[2]),
1111 Some(&[0]),
1112 )
1113 .unwrap();
1114 col.close().unwrap();
1115 }
1116 {
1117 let mut col = row_group.next_column().unwrap().unwrap();
1118 col.typed::<Int96Type>()
1119 .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1120 .unwrap();
1121 col.close().unwrap();
1122 }
1123 row_group.close().unwrap();
1124 writer.close().unwrap();
1125
1126 let iceberg_schema = Arc::new(
1127 Schema::builder()
1128 .with_schema_id(1)
1129 .with_fields(vec![
1130 NestedField::optional(
1131 1,
1132 "ts_map",
1133 Type::Map(crate::spec::MapType {
1134 key_field: NestedField::required(
1135 2,
1136 "key",
1137 Type::Primitive(PrimitiveType::String),
1138 )
1139 .into(),
1140 value_field: NestedField::optional(
1141 3,
1142 "value",
1143 Type::Primitive(PrimitiveType::Timestamp),
1144 )
1145 .into(),
1146 }),
1147 )
1148 .into(),
1149 ])
1150 .build()
1151 .unwrap(),
1152 );
1153
1154 let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1155
1156 assert_eq!(batches.len(), 1);
1157 let map_array = batches[0]
1158 .column(0)
1159 .as_any()
1160 .downcast_ref::<MapArray>()
1161 .expect("Expected MapArray");
1162 let ts_array = map_array
1163 .values()
1164 .as_any()
1165 .downcast_ref::<TimestampMicrosecondArray>()
1166 .expect("Expected TimestampMicrosecondArray as map values");
1167
1168 assert_eq!(
1169 ts_array.value(0),
1170 expected_micros,
1171 "INT96 in map: got {}, expected {expected_micros}",
1172 ts_array.value(0)
1173 );
1174 }
1175}