1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49pub struct TableScanBuilder<'a> {
51 table: &'a Table,
52 column_names: Option<Vec<String>>,
54 snapshot_id: Option<i64>,
55 batch_size: Option<usize>,
56 case_sensitive: bool,
57 filter: Option<Predicate>,
58 concurrency_limit_data_files: usize,
59 concurrency_limit_manifest_entries: usize,
60 concurrency_limit_manifest_files: usize,
61 row_group_filtering_enabled: bool,
62 row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66 pub(crate) fn new(table: &'a Table) -> Self {
67 let num_cpus = available_parallelism().get();
68
69 Self {
70 table,
71 column_names: None,
72 snapshot_id: None,
73 batch_size: None,
74 case_sensitive: true,
75 filter: None,
76 concurrency_limit_data_files: num_cpus,
77 concurrency_limit_manifest_entries: num_cpus,
78 concurrency_limit_manifest_files: num_cpus,
79 row_group_filtering_enabled: true,
80 row_selection_enabled: false,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87 self.batch_size = batch_size;
88 self
89 }
90
91 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93 self.case_sensitive = case_sensitive;
94 self
95 }
96
97 pub fn with_filter(mut self, predicate: Predicate) -> Self {
99 self.filter = Some(predicate.rewrite_not());
102 self
103 }
104
105 pub fn select_all(mut self) -> Self {
107 self.column_names = None;
108 self
109 }
110
111 pub fn select_empty(mut self) -> Self {
113 self.column_names = Some(vec![]);
114 self
115 }
116
117 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119 self.column_names = Some(
120 column_names
121 .into_iter()
122 .map(|item| item.to_string())
123 .collect(),
124 );
125 self
126 }
127
128 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130 self.snapshot_id = Some(snapshot_id);
131 self
132 }
133
134 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137 self.concurrency_limit_manifest_files = limit;
138 self.concurrency_limit_manifest_entries = limit;
139 self.concurrency_limit_data_files = limit;
140 self
141 }
142
143 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145 self.concurrency_limit_data_files = limit;
146 self
147 }
148
149 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151 self.concurrency_limit_manifest_entries = limit;
152 self
153 }
154
155 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164 self.row_group_filtering_enabled = row_group_filtering_enabled;
165 self
166 }
167
168 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183 self.row_selection_enabled = row_selection_enabled;
184 self
185 }
186
187 pub fn build(self) -> Result<TableScan> {
189 let snapshot = match self.snapshot_id {
190 Some(snapshot_id) => self
191 .table
192 .metadata()
193 .snapshot_by_id(snapshot_id)
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Snapshot with id {snapshot_id} not found"),
198 )
199 })?
200 .clone(),
201 None => {
202 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203 return Ok(TableScan {
204 batch_size: self.batch_size,
205 column_names: self.column_names,
206 file_io: self.table.file_io().clone(),
207 plan_context: None,
208 concurrency_limit_data_files: self.concurrency_limit_data_files,
209 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211 row_group_filtering_enabled: self.row_group_filtering_enabled,
212 row_selection_enabled: self.row_selection_enabled,
213 });
214 };
215 current_snapshot_id.clone()
216 }
217 };
218
219 let schema = snapshot.schema(self.table.metadata())?;
220
221 if let Some(column_names) = self.column_names.as_ref() {
223 for column_name in column_names {
224 if is_metadata_column_name(column_name) {
226 continue;
227 }
228 if schema.field_by_name(column_name).is_none() {
229 return Err(Error::new(
230 ErrorKind::DataInvalid,
231 format!("Column {column_name} not found in table. Schema: {schema}"),
232 ));
233 }
234 }
235 }
236
237 let mut field_ids = vec![];
238 let column_names = self.column_names.clone().unwrap_or_else(|| {
239 schema
240 .as_struct()
241 .fields()
242 .iter()
243 .map(|f| f.name.clone())
244 .collect()
245 });
246
247 for column_name in column_names.iter() {
248 if is_metadata_column_name(column_name) {
250 field_ids.push(get_metadata_field_id(column_name)?);
251 continue;
252 }
253
254 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255 Error::new(
256 ErrorKind::DataInvalid,
257 format!("Column {column_name} not found in table. Schema: {schema}"),
258 )
259 })?;
260
261 schema
262 .as_struct()
263 .field_by_id(field_id)
264 .ok_or_else(|| {
265 Error::new(
266 ErrorKind::FeatureUnsupported,
267 format!(
268 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269 ),
270 )
271 })?;
272
273 field_ids.push(field_id);
274 }
275
276 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277 Some(predicates.bind(schema.clone(), true)?)
278 } else {
279 None
280 };
281
282 let plan_context = PlanContext {
283 snapshot,
284 table_metadata: self.table.metadata_ref(),
285 snapshot_schema: schema,
286 case_sensitive: self.case_sensitive,
287 predicate: self.filter.map(Arc::new),
288 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289 object_cache: self.table.object_cache(),
290 field_ids: Arc::new(field_ids),
291 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294 };
295
296 Ok(TableScan {
297 batch_size: self.batch_size,
298 column_names: self.column_names,
299 file_io: self.table.file_io().clone(),
300 plan_context: Some(plan_context),
301 concurrency_limit_data_files: self.concurrency_limit_data_files,
302 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304 row_group_filtering_enabled: self.row_group_filtering_enabled,
305 row_selection_enabled: self.row_selection_enabled,
306 })
307 }
308}
309
310#[derive(Debug)]
312pub struct TableScan {
313 plan_context: Option<PlanContext>,
317 batch_size: Option<usize>,
318 file_io: FileIO,
319 column_names: Option<Vec<String>>,
320 concurrency_limit_manifest_files: usize,
323
324 concurrency_limit_manifest_entries: usize,
327
328 concurrency_limit_data_files: usize,
331
332 row_group_filtering_enabled: bool,
333 row_selection_enabled: bool,
334}
335
336impl TableScan {
337 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339 let Some(plan_context) = self.plan_context.as_ref() else {
340 return Ok(Box::pin(futures::stream::empty()));
341 };
342
343 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348 channel(concurrency_limit_manifest_files);
349 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350 channel(concurrency_limit_manifest_files);
351
352 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357 let manifest_list = plan_context.get_manifest_list().await?;
358
359 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363 manifest_list,
364 manifest_entry_data_ctx_tx,
365 delete_file_idx.clone(),
366 manifest_entry_delete_ctx_tx,
367 )?;
368
369 let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371 spawn(async move {
373 let result = futures::stream::iter(manifest_file_contexts)
374 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375 ctx.fetch_manifest_and_stream_manifest_entries().await
376 })
377 .await;
378
379 if let Err(error) = result {
380 let _ = channel_for_manifest_error.send(Err(error)).await;
381 }
382 });
383
384 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387 spawn(async move {
389 let result = manifest_entry_delete_ctx_rx
390 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391 .try_for_each_concurrent(
392 concurrency_limit_manifest_entries,
393 |(manifest_entry_context, tx)| async move {
394 spawn(async move {
395 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396 })
397 .await
398 },
399 )
400 .await;
401
402 if let Err(error) = result {
403 let _ = channel_for_delete_manifest_entry_error
404 .send(Err(error))
405 .await;
406 }
407 })
408 .await;
409
410 spawn(async move {
412 let result = manifest_entry_data_ctx_rx
413 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414 .try_for_each_concurrent(
415 concurrency_limit_manifest_entries,
416 |(manifest_entry_context, tx)| async move {
417 spawn(async move {
418 Self::process_data_manifest_entry(manifest_entry_context, tx).await
419 })
420 .await
421 },
422 )
423 .await;
424
425 if let Err(error) = result {
426 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427 }
428 });
429
430 Ok(file_scan_task_rx.boxed())
431 }
432
433 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438 .with_row_selection_enabled(self.row_selection_enabled);
439
440 if let Some(batch_size) = self.batch_size {
441 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442 }
443
444 arrow_reader_builder.build().read(self.plan_files().await?)
445 }
446
447 pub fn column_names(&self) -> Option<&[String]> {
449 self.column_names.as_deref()
450 }
451
452 pub fn snapshot(&self) -> Option<&SnapshotRef> {
454 self.plan_context.as_ref().map(|x| &x.snapshot)
455 }
456
457 async fn process_data_manifest_entry(
458 manifest_entry_context: ManifestEntryContext,
459 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460 ) -> Result<()> {
461 if !manifest_entry_context.manifest_entry.is_alive() {
463 return Ok(());
464 }
465
466 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468 return Err(Error::new(
469 ErrorKind::FeatureUnsupported,
470 "Encountered an entry for a delete file in a data file manifest",
471 ));
472 }
473
474 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475 let BoundPredicates {
476 snapshot_bound_predicate,
477 partition_bound_predicate,
478 } = bound_predicates.as_ref();
479
480 let expression_evaluator_cache =
481 manifest_entry_context.expression_evaluator_cache.as_ref();
482
483 let expression_evaluator = expression_evaluator_cache.get(
484 manifest_entry_context.partition_spec_id,
485 partition_bound_predicate,
486 )?;
487
488 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491 return Ok(());
492 }
493
494 if !InclusiveMetricsEvaluator::eval(
496 snapshot_bound_predicate,
497 manifest_entry_context.manifest_entry.data_file(),
498 false,
499 )? {
500 return Ok(());
501 }
502 }
503
504 file_scan_task_tx
508 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509 .await?;
510
511 Ok(())
512 }
513
514 async fn process_delete_manifest_entry(
515 manifest_entry_context: ManifestEntryContext,
516 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517 ) -> Result<()> {
518 if !manifest_entry_context.manifest_entry.is_alive() {
520 return Ok(());
521 }
522
523 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525 return Err(Error::new(
526 ErrorKind::FeatureUnsupported,
527 "Encountered an entry for a data file in a delete manifest",
528 ));
529 }
530
531 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532 let expression_evaluator_cache =
533 manifest_entry_context.expression_evaluator_cache.as_ref();
534
535 let expression_evaluator = expression_evaluator_cache.get(
536 manifest_entry_context.partition_spec_id,
537 &bound_predicates.partition_bound_predicate,
538 )?;
539
540 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543 return Ok(());
544 }
545 }
546
547 delete_file_ctx_tx
548 .send(DeleteFileContext {
549 manifest_entry: manifest_entry_context.manifest_entry.clone(),
550 partition_spec_id: manifest_entry_context.partition_spec_id,
551 })
552 .await?;
553
554 Ok(())
555 }
556}
557
558pub(crate) struct BoundPredicates {
559 partition_bound_predicate: BoundPredicate,
560 snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565 #![allow(missing_docs)]
567
568 use std::collections::HashMap;
569 use std::fs;
570 use std::fs::File;
571 use std::sync::Arc;
572
573 use arrow_array::cast::AsArray;
574 use arrow_array::{
575 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576 StringArray,
577 };
578 use futures::{TryStreamExt, stream};
579 use minijinja::value::Value;
580 use minijinja::{AutoEscape, Environment, context};
581 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582 use parquet::basic::Compression;
583 use parquet::file::properties::WriterProperties;
584 use tempfile::TempDir;
585 use uuid::Uuid;
586
587 use crate::TableIdent;
588 use crate::arrow::ArrowReaderBuilder;
589 use crate::expr::{BoundPredicate, Reference};
590 use crate::io::{FileIO, OutputFile};
591 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592 use crate::scan::FileScanTask;
593 use crate::spec::{
594 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597 };
598 use crate::table::Table;
599
600 fn render_template(template: &str, ctx: Value) -> String {
601 let mut env = Environment::new();
602 env.set_auto_escape_callback(|_| AutoEscape::None);
603 env.render_str(template, ctx).unwrap()
604 }
605
606 pub struct TableTestFixture {
607 pub table_location: String,
608 pub table: Table,
609 }
610
611 impl TableTestFixture {
612 #[allow(clippy::new_without_default)]
613 pub fn new() -> Self {
614 let tmp_dir = TempDir::new().unwrap();
615 let table_location = tmp_dir.path().join("table1");
616 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618 let table_metadata1_location = table_location.join("metadata/v1.json");
619
620 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621 .unwrap()
622 .build()
623 .unwrap();
624
625 let table_metadata = {
626 let template_json_str = fs::read_to_string(format!(
627 "{}/testdata/example_table_metadata_v2.json",
628 env!("CARGO_MANIFEST_DIR")
629 ))
630 .unwrap();
631 let metadata_json = render_template(&template_json_str, context! {
632 table_location => &table_location,
633 manifest_list_1_location => &manifest_list1_location,
634 manifest_list_2_location => &manifest_list2_location,
635 table_metadata_1_location => &table_metadata1_location,
636 });
637 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638 };
639
640 let table = Table::builder()
641 .metadata(table_metadata)
642 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643 .file_io(file_io.clone())
644 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645 .build()
646 .unwrap();
647
648 Self {
649 table_location: table_location.to_str().unwrap().to_string(),
650 table,
651 }
652 }
653
654 #[allow(clippy::new_without_default)]
655 pub fn new_empty() -> Self {
656 let tmp_dir = TempDir::new().unwrap();
657 let table_location = tmp_dir.path().join("table1");
658 let table_metadata1_location = table_location.join("metadata/v1.json");
659
660 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661 .unwrap()
662 .build()
663 .unwrap();
664
665 let table_metadata = {
666 let template_json_str = fs::read_to_string(format!(
667 "{}/testdata/example_empty_table_metadata_v2.json",
668 env!("CARGO_MANIFEST_DIR")
669 ))
670 .unwrap();
671 let metadata_json = render_template(&template_json_str, context! {
672 table_location => &table_location,
673 table_metadata_1_location => &table_metadata1_location,
674 });
675 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676 };
677
678 let table = Table::builder()
679 .metadata(table_metadata)
680 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681 .file_io(file_io.clone())
682 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683 .build()
684 .unwrap();
685
686 Self {
687 table_location: table_location.to_str().unwrap().to_string(),
688 table,
689 }
690 }
691
692 pub fn new_unpartitioned() -> Self {
693 let tmp_dir = TempDir::new().unwrap();
694 let table_location = tmp_dir.path().join("table1");
695 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697 let table_metadata1_location = table_location.join("metadata/v1.json");
698
699 let file_io = FileIO::from_path(table_location.to_str().unwrap())
700 .unwrap()
701 .build()
702 .unwrap();
703
704 let mut table_metadata = {
705 let template_json_str = fs::read_to_string(format!(
706 "{}/testdata/example_table_metadata_v2.json",
707 env!("CARGO_MANIFEST_DIR")
708 ))
709 .unwrap();
710 let metadata_json = render_template(&template_json_str, context! {
711 table_location => &table_location,
712 manifest_list_1_location => &manifest_list1_location,
713 manifest_list_2_location => &manifest_list2_location,
714 table_metadata_1_location => &table_metadata1_location,
715 });
716 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717 };
718
719 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720 table_metadata.partition_specs.clear();
721 table_metadata.default_partition_type = StructType::new(vec![]);
722 table_metadata
723 .partition_specs
724 .insert(0, table_metadata.default_spec.clone());
725
726 let table = Table::builder()
727 .metadata(table_metadata)
728 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729 .file_io(file_io.clone())
730 .metadata_location(table_metadata1_location.to_str().unwrap())
731 .build()
732 .unwrap();
733
734 Self {
735 table_location: table_location.to_str().unwrap().to_string(),
736 table,
737 }
738 }
739
740 fn next_manifest_file(&self) -> OutputFile {
741 self.table
742 .file_io()
743 .new_output(format!(
744 "{}/metadata/manifest_{}.avro",
745 self.table_location,
746 Uuid::new_v4()
747 ))
748 .unwrap()
749 }
750
751 pub async fn setup_manifest_files(&mut self) {
752 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753 let parent_snapshot = current_snapshot
754 .parent_snapshot(self.table.metadata())
755 .unwrap();
756 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757 let current_partition_spec = self.table.metadata().default_partition_spec();
758
759 let mut writer = ManifestWriterBuilder::new(
761 self.next_manifest_file(),
762 Some(current_snapshot.snapshot_id()),
763 None,
764 current_schema.clone(),
765 current_partition_spec.as_ref().clone(),
766 )
767 .build_v2_data();
768 writer
769 .add_entry(
770 ManifestEntry::builder()
771 .status(ManifestStatus::Added)
772 .data_file(
773 DataFileBuilder::default()
774 .partition_spec_id(0)
775 .content(DataContentType::Data)
776 .file_path(format!("{}/1.parquet", &self.table_location))
777 .file_format(DataFileFormat::Parquet)
778 .file_size_in_bytes(100)
779 .record_count(1)
780 .partition(Struct::from_iter([Some(Literal::long(100))]))
781 .key_metadata(None)
782 .build()
783 .unwrap(),
784 )
785 .build(),
786 )
787 .unwrap();
788 writer
789 .add_delete_entry(
790 ManifestEntry::builder()
791 .status(ManifestStatus::Deleted)
792 .snapshot_id(parent_snapshot.snapshot_id())
793 .sequence_number(parent_snapshot.sequence_number())
794 .file_sequence_number(parent_snapshot.sequence_number())
795 .data_file(
796 DataFileBuilder::default()
797 .partition_spec_id(0)
798 .content(DataContentType::Data)
799 .file_path(format!("{}/2.parquet", &self.table_location))
800 .file_format(DataFileFormat::Parquet)
801 .file_size_in_bytes(100)
802 .record_count(1)
803 .partition(Struct::from_iter([Some(Literal::long(200))]))
804 .build()
805 .unwrap(),
806 )
807 .build(),
808 )
809 .unwrap();
810 writer
811 .add_existing_entry(
812 ManifestEntry::builder()
813 .status(ManifestStatus::Existing)
814 .snapshot_id(parent_snapshot.snapshot_id())
815 .sequence_number(parent_snapshot.sequence_number())
816 .file_sequence_number(parent_snapshot.sequence_number())
817 .data_file(
818 DataFileBuilder::default()
819 .partition_spec_id(0)
820 .content(DataContentType::Data)
821 .file_path(format!("{}/3.parquet", &self.table_location))
822 .file_format(DataFileFormat::Parquet)
823 .file_size_in_bytes(100)
824 .record_count(1)
825 .partition(Struct::from_iter([Some(Literal::long(300))]))
826 .build()
827 .unwrap(),
828 )
829 .build(),
830 )
831 .unwrap();
832 let data_file_manifest = writer.write_manifest_file().await.unwrap();
833
834 let mut manifest_list_write = ManifestListWriter::v2(
836 self.table
837 .file_io()
838 .new_output(current_snapshot.manifest_list())
839 .unwrap(),
840 current_snapshot.snapshot_id(),
841 current_snapshot.parent_snapshot_id(),
842 current_snapshot.sequence_number(),
843 );
844 manifest_list_write
845 .add_manifests(vec![data_file_manifest].into_iter())
846 .unwrap();
847 manifest_list_write.close().await.unwrap();
848
849 let schema = {
851 let fields = vec![
852 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
853 .with_metadata(HashMap::from([(
854 PARQUET_FIELD_ID_META_KEY.to_string(),
855 "1".to_string(),
856 )])),
857 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
858 .with_metadata(HashMap::from([(
859 PARQUET_FIELD_ID_META_KEY.to_string(),
860 "2".to_string(),
861 )])),
862 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
863 .with_metadata(HashMap::from([(
864 PARQUET_FIELD_ID_META_KEY.to_string(),
865 "3".to_string(),
866 )])),
867 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
868 .with_metadata(HashMap::from([(
869 PARQUET_FIELD_ID_META_KEY.to_string(),
870 "4".to_string(),
871 )])),
872 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
873 .with_metadata(HashMap::from([(
874 PARQUET_FIELD_ID_META_KEY.to_string(),
875 "5".to_string(),
876 )])),
877 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
878 .with_metadata(HashMap::from([(
879 PARQUET_FIELD_ID_META_KEY.to_string(),
880 "6".to_string(),
881 )])),
882 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
883 .with_metadata(HashMap::from([(
884 PARQUET_FIELD_ID_META_KEY.to_string(),
885 "7".to_string(),
886 )])),
887 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
888 .with_metadata(HashMap::from([(
889 PARQUET_FIELD_ID_META_KEY.to_string(),
890 "8".to_string(),
891 )])),
892 ];
893 Arc::new(arrow_schema::Schema::new(fields))
894 };
895 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
897
898 let mut values = vec![2; 512];
899 values.append(vec![3; 200].as_mut());
900 values.append(vec![4; 300].as_mut());
901 values.append(vec![5; 12].as_mut());
902
903 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
905
906 let mut values = vec![3; 512];
907 values.append(vec![4; 512].as_mut());
908
909 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
911
912 let mut values = vec!["Apache"; 512];
914 values.append(vec!["Iceberg"; 512].as_mut());
915 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
916
917 let mut values = vec![100.0f64; 512];
919 values.append(vec![150.0f64; 12].as_mut());
920 values.append(vec![200.0f64; 500].as_mut());
921 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
922
923 let mut values = vec![100i32; 512];
925 values.append(vec![150i32; 12].as_mut());
926 values.append(vec![200i32; 500].as_mut());
927 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
928
929 let mut values = vec![100i64; 512];
931 values.append(vec![150i64; 12].as_mut());
932 values.append(vec![200i64; 500].as_mut());
933 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
934
935 let mut values = vec![false; 512];
937 values.append(vec![true; 512].as_mut());
938 let values: BooleanArray = values.into();
939 let col8 = Arc::new(values) as ArrayRef;
940
941 let to_write = RecordBatch::try_new(schema.clone(), vec![
942 col1, col2, col3, col4, col5, col6, col7, col8,
943 ])
944 .unwrap();
945
946 let props = WriterProperties::builder()
948 .set_compression(Compression::SNAPPY)
949 .build();
950
951 for n in 1..=3 {
952 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
953 let mut writer =
954 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
955
956 writer.write(&to_write).expect("Writing batch");
957
958 writer.close().unwrap();
960 }
961 }
962
963 pub async fn setup_unpartitioned_manifest_files(&mut self) {
964 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
965 let parent_snapshot = current_snapshot
966 .parent_snapshot(self.table.metadata())
967 .unwrap();
968 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
969 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
970
971 let mut writer = ManifestWriterBuilder::new(
973 self.next_manifest_file(),
974 Some(current_snapshot.snapshot_id()),
975 None,
976 current_schema.clone(),
977 current_partition_spec.as_ref().clone(),
978 )
979 .build_v2_data();
980
981 let empty_partition = Struct::empty();
983
984 writer
985 .add_entry(
986 ManifestEntry::builder()
987 .status(ManifestStatus::Added)
988 .data_file(
989 DataFileBuilder::default()
990 .partition_spec_id(0)
991 .content(DataContentType::Data)
992 .file_path(format!("{}/1.parquet", &self.table_location))
993 .file_format(DataFileFormat::Parquet)
994 .file_size_in_bytes(100)
995 .record_count(1)
996 .partition(empty_partition.clone())
997 .key_metadata(None)
998 .build()
999 .unwrap(),
1000 )
1001 .build(),
1002 )
1003 .unwrap();
1004
1005 writer
1006 .add_delete_entry(
1007 ManifestEntry::builder()
1008 .status(ManifestStatus::Deleted)
1009 .snapshot_id(parent_snapshot.snapshot_id())
1010 .sequence_number(parent_snapshot.sequence_number())
1011 .file_sequence_number(parent_snapshot.sequence_number())
1012 .data_file(
1013 DataFileBuilder::default()
1014 .partition_spec_id(0)
1015 .content(DataContentType::Data)
1016 .file_path(format!("{}/2.parquet", &self.table_location))
1017 .file_format(DataFileFormat::Parquet)
1018 .file_size_in_bytes(100)
1019 .record_count(1)
1020 .partition(empty_partition.clone())
1021 .build()
1022 .unwrap(),
1023 )
1024 .build(),
1025 )
1026 .unwrap();
1027
1028 writer
1029 .add_existing_entry(
1030 ManifestEntry::builder()
1031 .status(ManifestStatus::Existing)
1032 .snapshot_id(parent_snapshot.snapshot_id())
1033 .sequence_number(parent_snapshot.sequence_number())
1034 .file_sequence_number(parent_snapshot.sequence_number())
1035 .data_file(
1036 DataFileBuilder::default()
1037 .partition_spec_id(0)
1038 .content(DataContentType::Data)
1039 .file_path(format!("{}/3.parquet", &self.table_location))
1040 .file_format(DataFileFormat::Parquet)
1041 .file_size_in_bytes(100)
1042 .record_count(1)
1043 .partition(empty_partition.clone())
1044 .build()
1045 .unwrap(),
1046 )
1047 .build(),
1048 )
1049 .unwrap();
1050
1051 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1052
1053 let mut manifest_list_write = ManifestListWriter::v2(
1055 self.table
1056 .file_io()
1057 .new_output(current_snapshot.manifest_list())
1058 .unwrap(),
1059 current_snapshot.snapshot_id(),
1060 current_snapshot.parent_snapshot_id(),
1061 current_snapshot.sequence_number(),
1062 );
1063 manifest_list_write
1064 .add_manifests(vec![data_file_manifest].into_iter())
1065 .unwrap();
1066 manifest_list_write.close().await.unwrap();
1067
1068 let schema = {
1070 let fields = vec![
1071 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1072 .with_metadata(HashMap::from([(
1073 PARQUET_FIELD_ID_META_KEY.to_string(),
1074 "1".to_string(),
1075 )])),
1076 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1077 .with_metadata(HashMap::from([(
1078 PARQUET_FIELD_ID_META_KEY.to_string(),
1079 "2".to_string(),
1080 )])),
1081 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1082 .with_metadata(HashMap::from([(
1083 PARQUET_FIELD_ID_META_KEY.to_string(),
1084 "3".to_string(),
1085 )])),
1086 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1087 .with_metadata(HashMap::from([(
1088 PARQUET_FIELD_ID_META_KEY.to_string(),
1089 "4".to_string(),
1090 )])),
1091 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1092 .with_metadata(HashMap::from([(
1093 PARQUET_FIELD_ID_META_KEY.to_string(),
1094 "5".to_string(),
1095 )])),
1096 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1097 .with_metadata(HashMap::from([(
1098 PARQUET_FIELD_ID_META_KEY.to_string(),
1099 "6".to_string(),
1100 )])),
1101 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1102 .with_metadata(HashMap::from([(
1103 PARQUET_FIELD_ID_META_KEY.to_string(),
1104 "7".to_string(),
1105 )])),
1106 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1107 .with_metadata(HashMap::from([(
1108 PARQUET_FIELD_ID_META_KEY.to_string(),
1109 "8".to_string(),
1110 )])),
1111 ];
1112 Arc::new(arrow_schema::Schema::new(fields))
1113 };
1114
1115 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1117
1118 let mut values = vec![2; 512];
1119 values.append(vec![3; 200].as_mut());
1120 values.append(vec![4; 300].as_mut());
1121 values.append(vec![5; 12].as_mut());
1122 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1123
1124 let mut values = vec![3; 512];
1125 values.append(vec![4; 512].as_mut());
1126 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1127
1128 let mut values = vec!["Apache"; 512];
1129 values.append(vec!["Iceberg"; 512].as_mut());
1130 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1131
1132 let mut values = vec![100.0f64; 512];
1133 values.append(vec![150.0f64; 12].as_mut());
1134 values.append(vec![200.0f64; 500].as_mut());
1135 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1136
1137 let mut values = vec![100i32; 512];
1138 values.append(vec![150i32; 12].as_mut());
1139 values.append(vec![200i32; 500].as_mut());
1140 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1141
1142 let mut values = vec![100i64; 512];
1143 values.append(vec![150i64; 12].as_mut());
1144 values.append(vec![200i64; 500].as_mut());
1145 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1146
1147 let mut values = vec![false; 512];
1148 values.append(vec![true; 512].as_mut());
1149 let values: BooleanArray = values.into();
1150 let col8 = Arc::new(values) as ArrayRef;
1151
1152 let to_write = RecordBatch::try_new(schema.clone(), vec![
1153 col1, col2, col3, col4, col5, col6, col7, col8,
1154 ])
1155 .unwrap();
1156
1157 let props = WriterProperties::builder()
1159 .set_compression(Compression::SNAPPY)
1160 .build();
1161
1162 for n in 1..=3 {
1163 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1164 let mut writer =
1165 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1166
1167 writer.write(&to_write).expect("Writing batch");
1168
1169 writer.close().unwrap();
1171 }
1172 }
1173 }
1174
1175 #[test]
1176 fn test_table_scan_columns() {
1177 let table = TableTestFixture::new().table;
1178
1179 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1180 assert_eq!(
1181 Some(vec!["x".to_string(), "y".to_string()]),
1182 table_scan.column_names
1183 );
1184
1185 let table_scan = table
1186 .scan()
1187 .select(["x", "y"])
1188 .select(["z"])
1189 .build()
1190 .unwrap();
1191 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1192 }
1193
1194 #[test]
1195 fn test_select_all() {
1196 let table = TableTestFixture::new().table;
1197
1198 let table_scan = table.scan().select_all().build().unwrap();
1199 assert!(table_scan.column_names.is_none());
1200 }
1201
1202 #[test]
1203 fn test_select_no_exist_column() {
1204 let table = TableTestFixture::new().table;
1205
1206 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1207 assert!(table_scan.is_err());
1208 }
1209
1210 #[test]
1211 fn test_table_scan_default_snapshot_id() {
1212 let table = TableTestFixture::new().table;
1213
1214 let table_scan = table.scan().build().unwrap();
1215 assert_eq!(
1216 table.metadata().current_snapshot().unwrap().snapshot_id(),
1217 table_scan.snapshot().unwrap().snapshot_id()
1218 );
1219 }
1220
1221 #[test]
1222 fn test_table_scan_non_exist_snapshot_id() {
1223 let table = TableTestFixture::new().table;
1224
1225 let table_scan = table.scan().snapshot_id(1024).build();
1226 assert!(table_scan.is_err());
1227 }
1228
1229 #[test]
1230 fn test_table_scan_with_snapshot_id() {
1231 let table = TableTestFixture::new().table;
1232
1233 let table_scan = table
1234 .scan()
1235 .snapshot_id(3051729675574597004)
1236 .with_row_selection_enabled(true)
1237 .build()
1238 .unwrap();
1239 assert_eq!(
1240 table_scan.snapshot().unwrap().snapshot_id(),
1241 3051729675574597004
1242 );
1243 }
1244
1245 #[tokio::test]
1246 async fn test_plan_files_on_table_without_any_snapshots() {
1247 let table = TableTestFixture::new_empty().table;
1248 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1249 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1250 assert!(batches.is_empty());
1251 }
1252
1253 #[tokio::test]
1254 async fn test_plan_files_no_deletions() {
1255 let mut fixture = TableTestFixture::new();
1256 fixture.setup_manifest_files().await;
1257
1258 let table_scan = fixture
1260 .table
1261 .scan()
1262 .with_row_selection_enabled(true)
1263 .build()
1264 .unwrap();
1265
1266 let mut tasks = table_scan
1267 .plan_files()
1268 .await
1269 .unwrap()
1270 .try_fold(vec![], |mut acc, task| async move {
1271 acc.push(task);
1272 Ok(acc)
1273 })
1274 .await
1275 .unwrap();
1276
1277 assert_eq!(tasks.len(), 2);
1278
1279 tasks.sort_by_key(|t| t.data_file_path.to_string());
1280
1281 assert_eq!(
1283 tasks[0].data_file_path,
1284 format!("{}/1.parquet", &fixture.table_location)
1285 );
1286
1287 assert_eq!(
1289 tasks[1].data_file_path,
1290 format!("{}/3.parquet", &fixture.table_location)
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn test_open_parquet_no_deletions() {
1296 let mut fixture = TableTestFixture::new();
1297 fixture.setup_manifest_files().await;
1298
1299 let table_scan = fixture
1301 .table
1302 .scan()
1303 .with_row_selection_enabled(true)
1304 .build()
1305 .unwrap();
1306
1307 let batch_stream = table_scan.to_arrow().await.unwrap();
1308
1309 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1310
1311 let col = batches[0].column_by_name("x").unwrap();
1312
1313 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1314 assert_eq!(int64_arr.value(0), 1);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_open_parquet_no_deletions_by_separate_reader() {
1319 let mut fixture = TableTestFixture::new();
1320 fixture.setup_manifest_files().await;
1321
1322 let table_scan = fixture
1324 .table
1325 .scan()
1326 .with_row_selection_enabled(true)
1327 .build()
1328 .unwrap();
1329
1330 let mut plan_task: Vec<_> = table_scan
1331 .plan_files()
1332 .await
1333 .unwrap()
1334 .try_collect()
1335 .await
1336 .unwrap();
1337 assert_eq!(plan_task.len(), 2);
1338
1339 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1340 let batch_stream = reader
1341 .clone()
1342 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1343 .unwrap();
1344 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1345
1346 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1347 let batch_stream = reader
1348 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1349 .unwrap();
1350 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1351
1352 assert_eq!(batch_1, batch_2);
1353 }
1354
1355 #[tokio::test]
1356 async fn test_open_parquet_with_projection() {
1357 let mut fixture = TableTestFixture::new();
1358 fixture.setup_manifest_files().await;
1359
1360 let table_scan = fixture
1362 .table
1363 .scan()
1364 .select(["x", "z"])
1365 .with_row_selection_enabled(true)
1366 .build()
1367 .unwrap();
1368
1369 let batch_stream = table_scan.to_arrow().await.unwrap();
1370
1371 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1372
1373 assert_eq!(batches[0].num_columns(), 2);
1374
1375 let col1 = batches[0].column_by_name("x").unwrap();
1376 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1377 assert_eq!(int64_arr.value(0), 1);
1378
1379 let col2 = batches[0].column_by_name("z").unwrap();
1380 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1381 assert_eq!(int64_arr.value(0), 3);
1382
1383 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1385 let batch_stream = table_scan.to_arrow().await.unwrap();
1386 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1387
1388 assert_eq!(batches[0].num_columns(), 0);
1389 assert_eq!(batches[0].num_rows(), 1024);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_filter_on_arrow_lt() {
1394 let mut fixture = TableTestFixture::new();
1395 fixture.setup_manifest_files().await;
1396
1397 let mut builder = fixture.table.scan();
1399 let predicate = Reference::new("y").less_than(Datum::long(3));
1400 builder = builder
1401 .with_filter(predicate)
1402 .with_row_selection_enabled(true);
1403 let table_scan = builder.build().unwrap();
1404
1405 let batch_stream = table_scan.to_arrow().await.unwrap();
1406
1407 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1408
1409 assert_eq!(batches[0].num_rows(), 512);
1410
1411 let col = batches[0].column_by_name("x").unwrap();
1412 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1413 assert_eq!(int64_arr.value(0), 1);
1414
1415 let col = batches[0].column_by_name("y").unwrap();
1416 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1417 assert_eq!(int64_arr.value(0), 2);
1418 }
1419
1420 #[tokio::test]
1421 async fn test_filter_on_arrow_gt_eq() {
1422 let mut fixture = TableTestFixture::new();
1423 fixture.setup_manifest_files().await;
1424
1425 let mut builder = fixture.table.scan();
1427 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1428 builder = builder
1429 .with_filter(predicate)
1430 .with_row_selection_enabled(true);
1431 let table_scan = builder.build().unwrap();
1432
1433 let batch_stream = table_scan.to_arrow().await.unwrap();
1434
1435 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437 assert_eq!(batches[0].num_rows(), 12);
1438
1439 let col = batches[0].column_by_name("x").unwrap();
1440 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441 assert_eq!(int64_arr.value(0), 1);
1442
1443 let col = batches[0].column_by_name("y").unwrap();
1444 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1445 assert_eq!(int64_arr.value(0), 5);
1446 }
1447
1448 #[tokio::test]
1449 async fn test_filter_double_eq() {
1450 let mut fixture = TableTestFixture::new();
1451 fixture.setup_manifest_files().await;
1452
1453 let mut builder = fixture.table.scan();
1455 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1456 builder = builder
1457 .with_filter(predicate)
1458 .with_row_selection_enabled(true);
1459 let table_scan = builder.build().unwrap();
1460
1461 let batch_stream = table_scan.to_arrow().await.unwrap();
1462
1463 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1464
1465 assert_eq!(batches.len(), 2);
1466 assert_eq!(batches[0].num_rows(), 12);
1467
1468 let col = batches[0].column_by_name("dbl").unwrap();
1469 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1470 assert_eq!(f64_arr.value(1), 150.0f64);
1471 }
1472
1473 #[tokio::test]
1474 async fn test_filter_int_eq() {
1475 let mut fixture = TableTestFixture::new();
1476 fixture.setup_manifest_files().await;
1477
1478 let mut builder = fixture.table.scan();
1480 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1481 builder = builder
1482 .with_filter(predicate)
1483 .with_row_selection_enabled(true);
1484 let table_scan = builder.build().unwrap();
1485
1486 let batch_stream = table_scan.to_arrow().await.unwrap();
1487
1488 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1489
1490 assert_eq!(batches.len(), 2);
1491 assert_eq!(batches[0].num_rows(), 12);
1492
1493 let col = batches[0].column_by_name("i32").unwrap();
1494 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1495 assert_eq!(i32_arr.value(1), 150i32);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_filter_long_eq() {
1500 let mut fixture = TableTestFixture::new();
1501 fixture.setup_manifest_files().await;
1502
1503 let mut builder = fixture.table.scan();
1505 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1506 builder = builder
1507 .with_filter(predicate)
1508 .with_row_selection_enabled(true);
1509 let table_scan = builder.build().unwrap();
1510
1511 let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515 assert_eq!(batches.len(), 2);
1516 assert_eq!(batches[0].num_rows(), 12);
1517
1518 let col = batches[0].column_by_name("i64").unwrap();
1519 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1520 assert_eq!(i64_arr.value(1), 150i64);
1521 }
1522
1523 #[tokio::test]
1524 async fn test_filter_bool_eq() {
1525 let mut fixture = TableTestFixture::new();
1526 fixture.setup_manifest_files().await;
1527
1528 let mut builder = fixture.table.scan();
1530 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1531 builder = builder
1532 .with_filter(predicate)
1533 .with_row_selection_enabled(true);
1534 let table_scan = builder.build().unwrap();
1535
1536 let batch_stream = table_scan.to_arrow().await.unwrap();
1537
1538 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1539
1540 assert_eq!(batches.len(), 2);
1541 assert_eq!(batches[0].num_rows(), 512);
1542
1543 let col = batches[0].column_by_name("bool").unwrap();
1544 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1545 assert!(bool_arr.value(1));
1546 }
1547
1548 #[tokio::test]
1549 async fn test_filter_on_arrow_is_null() {
1550 let mut fixture = TableTestFixture::new();
1551 fixture.setup_manifest_files().await;
1552
1553 let mut builder = fixture.table.scan();
1555 let predicate = Reference::new("y").is_null();
1556 builder = builder
1557 .with_filter(predicate)
1558 .with_row_selection_enabled(true);
1559 let table_scan = builder.build().unwrap();
1560
1561 let batch_stream = table_scan.to_arrow().await.unwrap();
1562
1563 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1564 assert_eq!(batches.len(), 0);
1565 }
1566
1567 #[tokio::test]
1568 async fn test_filter_on_arrow_is_not_null() {
1569 let mut fixture = TableTestFixture::new();
1570 fixture.setup_manifest_files().await;
1571
1572 let mut builder = fixture.table.scan();
1574 let predicate = Reference::new("y").is_not_null();
1575 builder = builder
1576 .with_filter(predicate)
1577 .with_row_selection_enabled(true);
1578 let table_scan = builder.build().unwrap();
1579
1580 let batch_stream = table_scan.to_arrow().await.unwrap();
1581
1582 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1583 assert_eq!(batches[0].num_rows(), 1024);
1584 }
1585
1586 #[tokio::test]
1587 async fn test_filter_on_arrow_lt_and_gt() {
1588 let mut fixture = TableTestFixture::new();
1589 fixture.setup_manifest_files().await;
1590
1591 let mut builder = fixture.table.scan();
1593 let predicate = Reference::new("y")
1594 .less_than(Datum::long(5))
1595 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1596 builder = builder
1597 .with_filter(predicate)
1598 .with_row_selection_enabled(true);
1599 let table_scan = builder.build().unwrap();
1600
1601 let batch_stream = table_scan.to_arrow().await.unwrap();
1602
1603 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1604 assert_eq!(batches[0].num_rows(), 500);
1605
1606 let col = batches[0].column_by_name("x").unwrap();
1607 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1608 assert_eq!(col, &expected_x);
1609
1610 let col = batches[0].column_by_name("y").unwrap();
1611 let mut values = vec![];
1612 values.append(vec![3; 200].as_mut());
1613 values.append(vec![4; 300].as_mut());
1614 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1615 assert_eq!(col, &expected_y);
1616
1617 let col = batches[0].column_by_name("z").unwrap();
1618 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1619 assert_eq!(col, &expected_z);
1620 }
1621
1622 #[tokio::test]
1623 async fn test_filter_on_arrow_lt_or_gt() {
1624 let mut fixture = TableTestFixture::new();
1625 fixture.setup_manifest_files().await;
1626
1627 let mut builder = fixture.table.scan();
1629 let predicate = Reference::new("y")
1630 .less_than(Datum::long(5))
1631 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1632 builder = builder
1633 .with_filter(predicate)
1634 .with_row_selection_enabled(true);
1635 let table_scan = builder.build().unwrap();
1636
1637 let batch_stream = table_scan.to_arrow().await.unwrap();
1638
1639 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1640 assert_eq!(batches[0].num_rows(), 1024);
1641
1642 let col = batches[0].column_by_name("x").unwrap();
1643 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1644 assert_eq!(col, &expected_x);
1645
1646 let col = batches[0].column_by_name("y").unwrap();
1647 let mut values = vec![2; 512];
1648 values.append(vec![3; 200].as_mut());
1649 values.append(vec![4; 300].as_mut());
1650 values.append(vec![5; 12].as_mut());
1651 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1652 assert_eq!(col, &expected_y);
1653
1654 let col = batches[0].column_by_name("z").unwrap();
1655 let mut values = vec![3; 512];
1656 values.append(vec![4; 512].as_mut());
1657 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1658 assert_eq!(col, &expected_z);
1659 }
1660
1661 #[tokio::test]
1662 async fn test_filter_on_arrow_startswith() {
1663 let mut fixture = TableTestFixture::new();
1664 fixture.setup_manifest_files().await;
1665
1666 let mut builder = fixture.table.scan();
1668 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1669 builder = builder
1670 .with_filter(predicate)
1671 .with_row_selection_enabled(true);
1672 let table_scan = builder.build().unwrap();
1673
1674 let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677
1678 assert_eq!(batches[0].num_rows(), 512);
1679
1680 let col = batches[0].column_by_name("a").unwrap();
1681 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1682 assert_eq!(string_arr.value(0), "Iceberg");
1683 }
1684
1685 #[tokio::test]
1686 async fn test_filter_on_arrow_not_startswith() {
1687 let mut fixture = TableTestFixture::new();
1688 fixture.setup_manifest_files().await;
1689
1690 let mut builder = fixture.table.scan();
1692 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1693 builder = builder
1694 .with_filter(predicate)
1695 .with_row_selection_enabled(true);
1696 let table_scan = builder.build().unwrap();
1697
1698 let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702 assert_eq!(batches[0].num_rows(), 512);
1703
1704 let col = batches[0].column_by_name("a").unwrap();
1705 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706 assert_eq!(string_arr.value(0), "Apache");
1707 }
1708
1709 #[tokio::test]
1710 async fn test_filter_on_arrow_in() {
1711 let mut fixture = TableTestFixture::new();
1712 fixture.setup_manifest_files().await;
1713
1714 let mut builder = fixture.table.scan();
1716 let predicate =
1717 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1718 builder = builder
1719 .with_filter(predicate)
1720 .with_row_selection_enabled(true);
1721 let table_scan = builder.build().unwrap();
1722
1723 let batch_stream = table_scan.to_arrow().await.unwrap();
1724
1725 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1726
1727 assert_eq!(batches[0].num_rows(), 512);
1728
1729 let col = batches[0].column_by_name("a").unwrap();
1730 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1731 assert_eq!(string_arr.value(0), "Iceberg");
1732 }
1733
1734 #[tokio::test]
1735 async fn test_filter_on_arrow_not_in() {
1736 let mut fixture = TableTestFixture::new();
1737 fixture.setup_manifest_files().await;
1738
1739 let mut builder = fixture.table.scan();
1741 let predicate =
1742 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1743 builder = builder
1744 .with_filter(predicate)
1745 .with_row_selection_enabled(true);
1746 let table_scan = builder.build().unwrap();
1747
1748 let batch_stream = table_scan.to_arrow().await.unwrap();
1749
1750 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1751
1752 assert_eq!(batches[0].num_rows(), 512);
1753
1754 let col = batches[0].column_by_name("a").unwrap();
1755 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1756 assert_eq!(string_arr.value(0), "Apache");
1757 }
1758
1759 #[test]
1760 fn test_file_scan_task_serialize_deserialize() {
1761 let test_fn = |task: FileScanTask| {
1762 let serialized = serde_json::to_string(&task).unwrap();
1763 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1764
1765 assert_eq!(task.data_file_path, deserialized.data_file_path);
1766 assert_eq!(task.start, deserialized.start);
1767 assert_eq!(task.length, deserialized.length);
1768 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1769 assert_eq!(task.predicate, deserialized.predicate);
1770 assert_eq!(task.schema, deserialized.schema);
1771 };
1772
1773 let schema = Arc::new(
1775 Schema::builder()
1776 .with_fields(vec![Arc::new(NestedField::required(
1777 1,
1778 "x",
1779 Type::Primitive(PrimitiveType::Binary),
1780 ))])
1781 .build()
1782 .unwrap(),
1783 );
1784 let task = FileScanTask {
1785 data_file_path: "data_file_path".to_string(),
1786 start: 0,
1787 length: 100,
1788 project_field_ids: vec![1, 2, 3],
1789 predicate: None,
1790 schema: schema.clone(),
1791 record_count: Some(100),
1792 data_file_format: DataFileFormat::Parquet,
1793 deletes: vec![],
1794 partition: None,
1795 partition_spec: None,
1796 name_mapping: None,
1797 };
1798 test_fn(task);
1799
1800 let task = FileScanTask {
1802 data_file_path: "data_file_path".to_string(),
1803 start: 0,
1804 length: 100,
1805 project_field_ids: vec![1, 2, 3],
1806 predicate: Some(BoundPredicate::AlwaysTrue),
1807 schema,
1808 record_count: None,
1809 data_file_format: DataFileFormat::Avro,
1810 deletes: vec![],
1811 partition: None,
1812 partition_spec: None,
1813 name_mapping: None,
1814 };
1815 test_fn(task);
1816 }
1817
1818 #[tokio::test]
1819 async fn test_select_with_file_column() {
1820 use arrow_array::cast::AsArray;
1821
1822 let mut fixture = TableTestFixture::new();
1823 fixture.setup_manifest_files().await;
1824
1825 let table_scan = fixture
1827 .table
1828 .scan()
1829 .select(["x", RESERVED_COL_NAME_FILE])
1830 .with_row_selection_enabled(true)
1831 .build()
1832 .unwrap();
1833
1834 let batch_stream = table_scan.to_arrow().await.unwrap();
1835 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1836
1837 assert_eq!(batches[0].num_columns(), 2);
1839
1840 let x_col = batches[0].column_by_name("x").unwrap();
1842 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1843 assert_eq!(x_arr.value(0), 1);
1844
1845 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1847 assert!(
1848 file_col.is_some(),
1849 "_file column should be present in the batch"
1850 );
1851
1852 let file_col = file_col.unwrap();
1854 assert!(
1855 matches!(
1856 file_col.data_type(),
1857 arrow_schema::DataType::RunEndEncoded(_, _)
1858 ),
1859 "_file column should use RunEndEncoded type"
1860 );
1861
1862 let run_array = file_col
1864 .as_any()
1865 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1866 .expect("_file column should be a RunArray");
1867
1868 let values = run_array.values();
1869 let string_values = values.as_string::<i32>();
1870 assert_eq!(string_values.len(), 1, "Should have a single file path");
1871
1872 let file_path = string_values.value(0);
1873 assert!(
1874 file_path.ends_with(".parquet"),
1875 "File path should end with .parquet, got: {file_path}"
1876 );
1877 }
1878
1879 #[tokio::test]
1880 async fn test_select_file_column_position() {
1881 let mut fixture = TableTestFixture::new();
1882 fixture.setup_manifest_files().await;
1883
1884 let table_scan = fixture
1886 .table
1887 .scan()
1888 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1889 .with_row_selection_enabled(true)
1890 .build()
1891 .unwrap();
1892
1893 let batch_stream = table_scan.to_arrow().await.unwrap();
1894 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1895
1896 assert_eq!(batches[0].num_columns(), 3);
1897
1898 let schema = batches[0].schema();
1900 assert_eq!(schema.field(0).name(), "x");
1901 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1902 assert_eq!(schema.field(2).name(), "z");
1903
1904 assert!(batches[0].column_by_name("x").is_some());
1906 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1907 assert!(batches[0].column_by_name("z").is_some());
1908 }
1909
1910 #[tokio::test]
1911 async fn test_select_file_column_only() {
1912 let mut fixture = TableTestFixture::new();
1913 fixture.setup_manifest_files().await;
1914
1915 let table_scan = fixture
1917 .table
1918 .scan()
1919 .select([RESERVED_COL_NAME_FILE])
1920 .with_row_selection_enabled(true)
1921 .build()
1922 .unwrap();
1923
1924 let batch_stream = table_scan.to_arrow().await.unwrap();
1925 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1926
1927 assert_eq!(batches[0].num_columns(), 1);
1929
1930 let schema = batches[0].schema();
1932 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1933
1934 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1938 assert_eq!(total_rows, 2048);
1939 }
1940
1941 #[tokio::test]
1942 async fn test_file_column_with_multiple_files() {
1943 use std::collections::HashSet;
1944
1945 let mut fixture = TableTestFixture::new();
1946 fixture.setup_manifest_files().await;
1947
1948 let table_scan = fixture
1950 .table
1951 .scan()
1952 .select(["x", RESERVED_COL_NAME_FILE])
1953 .with_row_selection_enabled(true)
1954 .build()
1955 .unwrap();
1956
1957 let batch_stream = table_scan.to_arrow().await.unwrap();
1958 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1959
1960 let mut file_paths = HashSet::new();
1962 for batch in &batches {
1963 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1964 let run_array = file_col
1965 .as_any()
1966 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1967 .expect("_file column should be a RunArray");
1968
1969 let values = run_array.values();
1970 let string_values = values.as_string::<i32>();
1971 for i in 0..string_values.len() {
1972 file_paths.insert(string_values.value(i).to_string());
1973 }
1974 }
1975
1976 assert!(!file_paths.is_empty(), "Should have at least one file path");
1978
1979 for path in &file_paths {
1981 assert!(
1982 path.ends_with(".parquet"),
1983 "All file paths should end with .parquet, got: {path}"
1984 );
1985 }
1986 }
1987
1988 #[tokio::test]
1989 async fn test_file_column_at_start() {
1990 let mut fixture = TableTestFixture::new();
1991 fixture.setup_manifest_files().await;
1992
1993 let table_scan = fixture
1995 .table
1996 .scan()
1997 .select([RESERVED_COL_NAME_FILE, "x", "y"])
1998 .with_row_selection_enabled(true)
1999 .build()
2000 .unwrap();
2001
2002 let batch_stream = table_scan.to_arrow().await.unwrap();
2003 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2004
2005 assert_eq!(batches[0].num_columns(), 3);
2006
2007 let schema = batches[0].schema();
2009 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2010 assert_eq!(schema.field(1).name(), "x");
2011 assert_eq!(schema.field(2).name(), "y");
2012 }
2013
2014 #[tokio::test]
2015 async fn test_file_column_at_end() {
2016 let mut fixture = TableTestFixture::new();
2017 fixture.setup_manifest_files().await;
2018
2019 let table_scan = fixture
2021 .table
2022 .scan()
2023 .select(["x", "y", RESERVED_COL_NAME_FILE])
2024 .with_row_selection_enabled(true)
2025 .build()
2026 .unwrap();
2027
2028 let batch_stream = table_scan.to_arrow().await.unwrap();
2029 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2030
2031 assert_eq!(batches[0].num_columns(), 3);
2032
2033 let schema = batches[0].schema();
2035 assert_eq!(schema.field(0).name(), "x");
2036 assert_eq!(schema.field(1).name(), "y");
2037 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2038 }
2039
2040 #[tokio::test]
2041 async fn test_select_with_repeated_column_names() {
2042 let mut fixture = TableTestFixture::new();
2043 fixture.setup_manifest_files().await;
2044
2045 let table_scan = fixture
2048 .table
2049 .scan()
2050 .select([
2051 "x",
2052 RESERVED_COL_NAME_FILE,
2053 "x", "y",
2055 RESERVED_COL_NAME_FILE, "y", ])
2058 .with_row_selection_enabled(true)
2059 .build()
2060 .unwrap();
2061
2062 let batch_stream = table_scan.to_arrow().await.unwrap();
2063 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2064
2065 assert_eq!(
2067 batches[0].num_columns(),
2068 6,
2069 "Should have exactly 6 columns with duplicates"
2070 );
2071
2072 let schema = batches[0].schema();
2073
2074 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2076 assert_eq!(
2077 schema.field(1).name(),
2078 RESERVED_COL_NAME_FILE,
2079 "Column 1 should be _file"
2080 );
2081 assert_eq!(
2082 schema.field(2).name(),
2083 "x",
2084 "Column 2 should be x (duplicate)"
2085 );
2086 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2087 assert_eq!(
2088 schema.field(4).name(),
2089 RESERVED_COL_NAME_FILE,
2090 "Column 4 should be _file (duplicate)"
2091 );
2092 assert_eq!(
2093 schema.field(5).name(),
2094 "y",
2095 "Column 5 should be y (duplicate)"
2096 );
2097
2098 assert!(
2100 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2101 "Column x should be Int64"
2102 );
2103 assert!(
2104 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2105 "Column x (duplicate) should be Int64"
2106 );
2107 assert!(
2108 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2109 "Column y should be Int64"
2110 );
2111 assert!(
2112 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2113 "Column y (duplicate) should be Int64"
2114 );
2115 assert!(
2116 matches!(
2117 schema.field(1).data_type(),
2118 arrow_schema::DataType::RunEndEncoded(_, _)
2119 ),
2120 "_file column should use RunEndEncoded type"
2121 );
2122 assert!(
2123 matches!(
2124 schema.field(4).data_type(),
2125 arrow_schema::DataType::RunEndEncoded(_, _)
2126 ),
2127 "_file column (duplicate) should use RunEndEncoded type"
2128 );
2129 }
2130}