1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::runtime::spawn;
40use crate::spec::{DataContentType, SnapshotRef};
41use crate::table::Table;
42use crate::utils::available_parallelism;
43use crate::{Error, ErrorKind, Result};
44
45pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
47
48pub struct TableScanBuilder<'a> {
50 table: &'a Table,
51 column_names: Option<Vec<String>>,
53 snapshot_id: Option<i64>,
54 batch_size: Option<usize>,
55 case_sensitive: bool,
56 filter: Option<Predicate>,
57 concurrency_limit_data_files: usize,
58 concurrency_limit_manifest_entries: usize,
59 concurrency_limit_manifest_files: usize,
60 row_group_filtering_enabled: bool,
61 row_selection_enabled: bool,
62}
63
64impl<'a> TableScanBuilder<'a> {
65 pub(crate) fn new(table: &'a Table) -> Self {
66 let num_cpus = available_parallelism().get();
67
68 Self {
69 table,
70 column_names: None,
71 snapshot_id: None,
72 batch_size: None,
73 case_sensitive: true,
74 filter: None,
75 concurrency_limit_data_files: num_cpus,
76 concurrency_limit_manifest_entries: num_cpus,
77 concurrency_limit_manifest_files: num_cpus,
78 row_group_filtering_enabled: true,
79 row_selection_enabled: false,
80 }
81 }
82
83 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
86 self.batch_size = batch_size;
87 self
88 }
89
90 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
92 self.case_sensitive = case_sensitive;
93 self
94 }
95
96 pub fn with_filter(mut self, predicate: Predicate) -> Self {
98 self.filter = Some(predicate.rewrite_not());
101 self
102 }
103
104 pub fn select_all(mut self) -> Self {
106 self.column_names = None;
107 self
108 }
109
110 pub fn select_empty(mut self) -> Self {
112 self.column_names = Some(vec![]);
113 self
114 }
115
116 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
118 self.column_names = Some(
119 column_names
120 .into_iter()
121 .map(|item| item.to_string())
122 .collect(),
123 );
124 self
125 }
126
127 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
129 self.snapshot_id = Some(snapshot_id);
130 self
131 }
132
133 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
136 self.concurrency_limit_manifest_files = limit;
137 self.concurrency_limit_manifest_entries = limit;
138 self.concurrency_limit_data_files = limit;
139 self
140 }
141
142 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
144 self.concurrency_limit_data_files = limit;
145 self
146 }
147
148 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
150 self.concurrency_limit_manifest_entries = limit;
151 self
152 }
153
154 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
163 self.row_group_filtering_enabled = row_group_filtering_enabled;
164 self
165 }
166
167 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182 self.row_selection_enabled = row_selection_enabled;
183 self
184 }
185
186 pub fn build(self) -> Result<TableScan> {
188 let snapshot = match self.snapshot_id {
189 Some(snapshot_id) => self
190 .table
191 .metadata()
192 .snapshot_by_id(snapshot_id)
193 .ok_or_else(|| {
194 Error::new(
195 ErrorKind::DataInvalid,
196 format!("Snapshot with id {snapshot_id} not found"),
197 )
198 })?
199 .clone(),
200 None => {
201 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
202 return Ok(TableScan {
203 batch_size: self.batch_size,
204 column_names: self.column_names,
205 file_io: self.table.file_io().clone(),
206 plan_context: None,
207 concurrency_limit_data_files: self.concurrency_limit_data_files,
208 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
209 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
210 row_group_filtering_enabled: self.row_group_filtering_enabled,
211 row_selection_enabled: self.row_selection_enabled,
212 });
213 };
214 current_snapshot_id.clone()
215 }
216 };
217
218 let schema = snapshot.schema(self.table.metadata())?;
219
220 if let Some(column_names) = self.column_names.as_ref() {
222 for column_name in column_names {
223 if schema.field_by_name(column_name).is_none() {
224 return Err(Error::new(
225 ErrorKind::DataInvalid,
226 format!("Column {column_name} not found in table. Schema: {schema}"),
227 ));
228 }
229 }
230 }
231
232 let mut field_ids = vec![];
233 let column_names = self.column_names.clone().unwrap_or_else(|| {
234 schema
235 .as_struct()
236 .fields()
237 .iter()
238 .map(|f| f.name.clone())
239 .collect()
240 });
241
242 for column_name in column_names.iter() {
243 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
244 Error::new(
245 ErrorKind::DataInvalid,
246 format!("Column {column_name} not found in table. Schema: {schema}"),
247 )
248 })?;
249
250 schema
251 .as_struct()
252 .field_by_id(field_id)
253 .ok_or_else(|| {
254 Error::new(
255 ErrorKind::FeatureUnsupported,
256 format!(
257 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
258 ),
259 )
260 })?;
261
262 field_ids.push(field_id);
263 }
264
265 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
266 Some(predicates.bind(schema.clone(), true)?)
267 } else {
268 None
269 };
270
271 let plan_context = PlanContext {
272 snapshot,
273 table_metadata: self.table.metadata_ref(),
274 snapshot_schema: schema,
275 case_sensitive: self.case_sensitive,
276 predicate: self.filter.map(Arc::new),
277 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
278 object_cache: self.table.object_cache(),
279 field_ids: Arc::new(field_ids),
280 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
281 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
282 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
283 };
284
285 Ok(TableScan {
286 batch_size: self.batch_size,
287 column_names: self.column_names,
288 file_io: self.table.file_io().clone(),
289 plan_context: Some(plan_context),
290 concurrency_limit_data_files: self.concurrency_limit_data_files,
291 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
292 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
293 row_group_filtering_enabled: self.row_group_filtering_enabled,
294 row_selection_enabled: self.row_selection_enabled,
295 })
296 }
297}
298
299#[derive(Debug)]
301pub struct TableScan {
302 plan_context: Option<PlanContext>,
306 batch_size: Option<usize>,
307 file_io: FileIO,
308 column_names: Option<Vec<String>>,
309 concurrency_limit_manifest_files: usize,
312
313 concurrency_limit_manifest_entries: usize,
316
317 concurrency_limit_data_files: usize,
320
321 row_group_filtering_enabled: bool,
322 row_selection_enabled: bool,
323}
324
325impl TableScan {
326 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
328 let Some(plan_context) = self.plan_context.as_ref() else {
329 return Ok(Box::pin(futures::stream::empty()));
330 };
331
332 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
333 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
334
335 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
337 channel(concurrency_limit_manifest_files);
338 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
339 channel(concurrency_limit_manifest_files);
340
341 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
343
344 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
345
346 let manifest_list = plan_context.get_manifest_list().await?;
347
348 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
352 manifest_list,
353 manifest_entry_data_ctx_tx,
354 delete_file_idx.clone(),
355 manifest_entry_delete_ctx_tx,
356 )?;
357
358 let mut channel_for_manifest_error = file_scan_task_tx.clone();
359
360 spawn(async move {
362 let result = futures::stream::iter(manifest_file_contexts)
363 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
364 ctx.fetch_manifest_and_stream_manifest_entries().await
365 })
366 .await;
367
368 if let Err(error) = result {
369 let _ = channel_for_manifest_error.send(Err(error)).await;
370 }
371 });
372
373 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
374 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
375
376 spawn(async move {
378 let result = manifest_entry_delete_ctx_rx
379 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
380 .try_for_each_concurrent(
381 concurrency_limit_manifest_entries,
382 |(manifest_entry_context, tx)| async move {
383 spawn(async move {
384 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
385 })
386 .await
387 },
388 )
389 .await;
390
391 if let Err(error) = result {
392 let _ = channel_for_delete_manifest_entry_error
393 .send(Err(error))
394 .await;
395 }
396 })
397 .await;
398
399 spawn(async move {
401 let result = manifest_entry_data_ctx_rx
402 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
403 .try_for_each_concurrent(
404 concurrency_limit_manifest_entries,
405 |(manifest_entry_context, tx)| async move {
406 spawn(async move {
407 Self::process_data_manifest_entry(manifest_entry_context, tx).await
408 })
409 .await
410 },
411 )
412 .await;
413
414 if let Err(error) = result {
415 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
416 }
417 });
418
419 Ok(file_scan_task_rx.boxed())
420 }
421
422 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
424 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
425 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
426 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
427 .with_row_selection_enabled(self.row_selection_enabled);
428
429 if let Some(batch_size) = self.batch_size {
430 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
431 }
432
433 arrow_reader_builder.build().read(self.plan_files().await?)
434 }
435
436 pub fn column_names(&self) -> Option<&[String]> {
438 self.column_names.as_deref()
439 }
440
441 pub fn snapshot(&self) -> Option<&SnapshotRef> {
443 self.plan_context.as_ref().map(|x| &x.snapshot)
444 }
445
446 async fn process_data_manifest_entry(
447 manifest_entry_context: ManifestEntryContext,
448 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
449 ) -> Result<()> {
450 if !manifest_entry_context.manifest_entry.is_alive() {
452 return Ok(());
453 }
454
455 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
457 return Err(Error::new(
458 ErrorKind::FeatureUnsupported,
459 "Encountered an entry for a delete file in a data file manifest",
460 ));
461 }
462
463 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
464 let BoundPredicates {
465 snapshot_bound_predicate,
466 partition_bound_predicate,
467 } = bound_predicates.as_ref();
468
469 let expression_evaluator_cache =
470 manifest_entry_context.expression_evaluator_cache.as_ref();
471
472 let expression_evaluator = expression_evaluator_cache.get(
473 manifest_entry_context.partition_spec_id,
474 partition_bound_predicate,
475 )?;
476
477 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
480 return Ok(());
481 }
482
483 if !InclusiveMetricsEvaluator::eval(
485 snapshot_bound_predicate,
486 manifest_entry_context.manifest_entry.data_file(),
487 false,
488 )? {
489 return Ok(());
490 }
491 }
492
493 file_scan_task_tx
497 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
498 .await?;
499
500 Ok(())
501 }
502
503 async fn process_delete_manifest_entry(
504 manifest_entry_context: ManifestEntryContext,
505 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
506 ) -> Result<()> {
507 if !manifest_entry_context.manifest_entry.is_alive() {
509 return Ok(());
510 }
511
512 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
514 return Err(Error::new(
515 ErrorKind::FeatureUnsupported,
516 "Encountered an entry for a data file in a delete manifest",
517 ));
518 }
519
520 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
521 let expression_evaluator_cache =
522 manifest_entry_context.expression_evaluator_cache.as_ref();
523
524 let expression_evaluator = expression_evaluator_cache.get(
525 manifest_entry_context.partition_spec_id,
526 &bound_predicates.partition_bound_predicate,
527 )?;
528
529 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
532 return Ok(());
533 }
534 }
535
536 delete_file_ctx_tx
537 .send(DeleteFileContext {
538 manifest_entry: manifest_entry_context.manifest_entry.clone(),
539 partition_spec_id: manifest_entry_context.partition_spec_id,
540 })
541 .await?;
542
543 Ok(())
544 }
545}
546
547pub(crate) struct BoundPredicates {
548 partition_bound_predicate: BoundPredicate,
549 snapshot_bound_predicate: BoundPredicate,
550}
551
552#[cfg(test)]
553pub mod tests {
554 #![allow(missing_docs)]
556
557 use std::collections::HashMap;
558 use std::fs;
559 use std::fs::File;
560 use std::sync::Arc;
561
562 use arrow_array::{
563 ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
564 };
565 use futures::{TryStreamExt, stream};
566 use minijinja::value::Value;
567 use minijinja::{AutoEscape, Environment, context};
568 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
569 use parquet::basic::Compression;
570 use parquet::file::properties::WriterProperties;
571 use tempfile::TempDir;
572 use uuid::Uuid;
573
574 use crate::TableIdent;
575 use crate::arrow::ArrowReaderBuilder;
576 use crate::expr::{BoundPredicate, Reference};
577 use crate::io::{FileIO, OutputFile};
578 use crate::scan::FileScanTask;
579 use crate::spec::{
580 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
581 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
582 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
583 };
584 use crate::table::Table;
585
586 fn render_template(template: &str, ctx: Value) -> String {
587 let mut env = Environment::new();
588 env.set_auto_escape_callback(|_| AutoEscape::None);
589 env.render_str(template, ctx).unwrap()
590 }
591
592 pub struct TableTestFixture {
593 pub table_location: String,
594 pub table: Table,
595 }
596
597 impl TableTestFixture {
598 #[allow(clippy::new_without_default)]
599 pub fn new() -> Self {
600 let tmp_dir = TempDir::new().unwrap();
601 let table_location = tmp_dir.path().join("table1");
602 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
603 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
604 let table_metadata1_location = table_location.join("metadata/v1.json");
605
606 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
607 .unwrap()
608 .build()
609 .unwrap();
610
611 let table_metadata = {
612 let template_json_str = fs::read_to_string(format!(
613 "{}/testdata/example_table_metadata_v2.json",
614 env!("CARGO_MANIFEST_DIR")
615 ))
616 .unwrap();
617 let metadata_json = render_template(&template_json_str, context! {
618 table_location => &table_location,
619 manifest_list_1_location => &manifest_list1_location,
620 manifest_list_2_location => &manifest_list2_location,
621 table_metadata_1_location => &table_metadata1_location,
622 });
623 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
624 };
625
626 let table = Table::builder()
627 .metadata(table_metadata)
628 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
629 .file_io(file_io.clone())
630 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
631 .build()
632 .unwrap();
633
634 Self {
635 table_location: table_location.to_str().unwrap().to_string(),
636 table,
637 }
638 }
639
640 #[allow(clippy::new_without_default)]
641 pub fn new_empty() -> Self {
642 let tmp_dir = TempDir::new().unwrap();
643 let table_location = tmp_dir.path().join("table1");
644 let table_metadata1_location = table_location.join("metadata/v1.json");
645
646 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
647 .unwrap()
648 .build()
649 .unwrap();
650
651 let table_metadata = {
652 let template_json_str = fs::read_to_string(format!(
653 "{}/testdata/example_empty_table_metadata_v2.json",
654 env!("CARGO_MANIFEST_DIR")
655 ))
656 .unwrap();
657 let metadata_json = render_template(&template_json_str, context! {
658 table_location => &table_location,
659 table_metadata_1_location => &table_metadata1_location,
660 });
661 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
662 };
663
664 let table = Table::builder()
665 .metadata(table_metadata)
666 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
667 .file_io(file_io.clone())
668 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
669 .build()
670 .unwrap();
671
672 Self {
673 table_location: table_location.to_str().unwrap().to_string(),
674 table,
675 }
676 }
677
678 pub fn new_unpartitioned() -> Self {
679 let tmp_dir = TempDir::new().unwrap();
680 let table_location = tmp_dir.path().join("table1");
681 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
682 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
683 let table_metadata1_location = table_location.join("metadata/v1.json");
684
685 let file_io = FileIO::from_path(table_location.to_str().unwrap())
686 .unwrap()
687 .build()
688 .unwrap();
689
690 let mut table_metadata = {
691 let template_json_str = fs::read_to_string(format!(
692 "{}/testdata/example_table_metadata_v2.json",
693 env!("CARGO_MANIFEST_DIR")
694 ))
695 .unwrap();
696 let metadata_json = render_template(&template_json_str, context! {
697 table_location => &table_location,
698 manifest_list_1_location => &manifest_list1_location,
699 manifest_list_2_location => &manifest_list2_location,
700 table_metadata_1_location => &table_metadata1_location,
701 });
702 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
703 };
704
705 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
706 table_metadata.partition_specs.clear();
707 table_metadata.default_partition_type = StructType::new(vec![]);
708 table_metadata
709 .partition_specs
710 .insert(0, table_metadata.default_spec.clone());
711
712 let table = Table::builder()
713 .metadata(table_metadata)
714 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
715 .file_io(file_io.clone())
716 .metadata_location(table_metadata1_location.to_str().unwrap())
717 .build()
718 .unwrap();
719
720 Self {
721 table_location: table_location.to_str().unwrap().to_string(),
722 table,
723 }
724 }
725
726 fn next_manifest_file(&self) -> OutputFile {
727 self.table
728 .file_io()
729 .new_output(format!(
730 "{}/metadata/manifest_{}.avro",
731 self.table_location,
732 Uuid::new_v4()
733 ))
734 .unwrap()
735 }
736
737 pub async fn setup_manifest_files(&mut self) {
738 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
739 let parent_snapshot = current_snapshot
740 .parent_snapshot(self.table.metadata())
741 .unwrap();
742 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
743 let current_partition_spec = self.table.metadata().default_partition_spec();
744
745 let mut writer = ManifestWriterBuilder::new(
747 self.next_manifest_file(),
748 Some(current_snapshot.snapshot_id()),
749 None,
750 current_schema.clone(),
751 current_partition_spec.as_ref().clone(),
752 )
753 .build_v2_data();
754 writer
755 .add_entry(
756 ManifestEntry::builder()
757 .status(ManifestStatus::Added)
758 .data_file(
759 DataFileBuilder::default()
760 .partition_spec_id(0)
761 .content(DataContentType::Data)
762 .file_path(format!("{}/1.parquet", &self.table_location))
763 .file_format(DataFileFormat::Parquet)
764 .file_size_in_bytes(100)
765 .record_count(1)
766 .partition(Struct::from_iter([Some(Literal::long(100))]))
767 .key_metadata(None)
768 .build()
769 .unwrap(),
770 )
771 .build(),
772 )
773 .unwrap();
774 writer
775 .add_delete_entry(
776 ManifestEntry::builder()
777 .status(ManifestStatus::Deleted)
778 .snapshot_id(parent_snapshot.snapshot_id())
779 .sequence_number(parent_snapshot.sequence_number())
780 .file_sequence_number(parent_snapshot.sequence_number())
781 .data_file(
782 DataFileBuilder::default()
783 .partition_spec_id(0)
784 .content(DataContentType::Data)
785 .file_path(format!("{}/2.parquet", &self.table_location))
786 .file_format(DataFileFormat::Parquet)
787 .file_size_in_bytes(100)
788 .record_count(1)
789 .partition(Struct::from_iter([Some(Literal::long(200))]))
790 .build()
791 .unwrap(),
792 )
793 .build(),
794 )
795 .unwrap();
796 writer
797 .add_existing_entry(
798 ManifestEntry::builder()
799 .status(ManifestStatus::Existing)
800 .snapshot_id(parent_snapshot.snapshot_id())
801 .sequence_number(parent_snapshot.sequence_number())
802 .file_sequence_number(parent_snapshot.sequence_number())
803 .data_file(
804 DataFileBuilder::default()
805 .partition_spec_id(0)
806 .content(DataContentType::Data)
807 .file_path(format!("{}/3.parquet", &self.table_location))
808 .file_format(DataFileFormat::Parquet)
809 .file_size_in_bytes(100)
810 .record_count(1)
811 .partition(Struct::from_iter([Some(Literal::long(300))]))
812 .build()
813 .unwrap(),
814 )
815 .build(),
816 )
817 .unwrap();
818 let data_file_manifest = writer.write_manifest_file().await.unwrap();
819
820 let mut manifest_list_write = ManifestListWriter::v2(
822 self.table
823 .file_io()
824 .new_output(current_snapshot.manifest_list())
825 .unwrap(),
826 current_snapshot.snapshot_id(),
827 current_snapshot.parent_snapshot_id(),
828 current_snapshot.sequence_number(),
829 );
830 manifest_list_write
831 .add_manifests(vec![data_file_manifest].into_iter())
832 .unwrap();
833 manifest_list_write.close().await.unwrap();
834
835 let schema = {
837 let fields = vec![
838 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
839 .with_metadata(HashMap::from([(
840 PARQUET_FIELD_ID_META_KEY.to_string(),
841 "1".to_string(),
842 )])),
843 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
844 .with_metadata(HashMap::from([(
845 PARQUET_FIELD_ID_META_KEY.to_string(),
846 "2".to_string(),
847 )])),
848 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
849 .with_metadata(HashMap::from([(
850 PARQUET_FIELD_ID_META_KEY.to_string(),
851 "3".to_string(),
852 )])),
853 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
854 .with_metadata(HashMap::from([(
855 PARQUET_FIELD_ID_META_KEY.to_string(),
856 "4".to_string(),
857 )])),
858 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
859 .with_metadata(HashMap::from([(
860 PARQUET_FIELD_ID_META_KEY.to_string(),
861 "5".to_string(),
862 )])),
863 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
864 .with_metadata(HashMap::from([(
865 PARQUET_FIELD_ID_META_KEY.to_string(),
866 "6".to_string(),
867 )])),
868 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
869 .with_metadata(HashMap::from([(
870 PARQUET_FIELD_ID_META_KEY.to_string(),
871 "7".to_string(),
872 )])),
873 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
874 .with_metadata(HashMap::from([(
875 PARQUET_FIELD_ID_META_KEY.to_string(),
876 "8".to_string(),
877 )])),
878 ];
879 Arc::new(arrow_schema::Schema::new(fields))
880 };
881 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
883
884 let mut values = vec![2; 512];
885 values.append(vec![3; 200].as_mut());
886 values.append(vec![4; 300].as_mut());
887 values.append(vec![5; 12].as_mut());
888
889 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
891
892 let mut values = vec![3; 512];
893 values.append(vec![4; 512].as_mut());
894
895 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
897
898 let mut values = vec!["Apache"; 512];
900 values.append(vec!["Iceberg"; 512].as_mut());
901 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
902
903 let mut values = vec![100.0f64; 512];
905 values.append(vec![150.0f64; 12].as_mut());
906 values.append(vec![200.0f64; 500].as_mut());
907 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
908
909 let mut values = vec![100i32; 512];
911 values.append(vec![150i32; 12].as_mut());
912 values.append(vec![200i32; 500].as_mut());
913 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
914
915 let mut values = vec![100i64; 512];
917 values.append(vec![150i64; 12].as_mut());
918 values.append(vec![200i64; 500].as_mut());
919 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
920
921 let mut values = vec![false; 512];
923 values.append(vec![true; 512].as_mut());
924 let values: BooleanArray = values.into();
925 let col8 = Arc::new(values) as ArrayRef;
926
927 let to_write = RecordBatch::try_new(schema.clone(), vec![
928 col1, col2, col3, col4, col5, col6, col7, col8,
929 ])
930 .unwrap();
931
932 let props = WriterProperties::builder()
934 .set_compression(Compression::SNAPPY)
935 .build();
936
937 for n in 1..=3 {
938 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
939 let mut writer =
940 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
941
942 writer.write(&to_write).expect("Writing batch");
943
944 writer.close().unwrap();
946 }
947 }
948
949 pub async fn setup_unpartitioned_manifest_files(&mut self) {
950 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
951 let parent_snapshot = current_snapshot
952 .parent_snapshot(self.table.metadata())
953 .unwrap();
954 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
955 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
956
957 let mut writer = ManifestWriterBuilder::new(
959 self.next_manifest_file(),
960 Some(current_snapshot.snapshot_id()),
961 None,
962 current_schema.clone(),
963 current_partition_spec.as_ref().clone(),
964 )
965 .build_v2_data();
966
967 let empty_partition = Struct::empty();
969
970 writer
971 .add_entry(
972 ManifestEntry::builder()
973 .status(ManifestStatus::Added)
974 .data_file(
975 DataFileBuilder::default()
976 .partition_spec_id(0)
977 .content(DataContentType::Data)
978 .file_path(format!("{}/1.parquet", &self.table_location))
979 .file_format(DataFileFormat::Parquet)
980 .file_size_in_bytes(100)
981 .record_count(1)
982 .partition(empty_partition.clone())
983 .key_metadata(None)
984 .build()
985 .unwrap(),
986 )
987 .build(),
988 )
989 .unwrap();
990
991 writer
992 .add_delete_entry(
993 ManifestEntry::builder()
994 .status(ManifestStatus::Deleted)
995 .snapshot_id(parent_snapshot.snapshot_id())
996 .sequence_number(parent_snapshot.sequence_number())
997 .file_sequence_number(parent_snapshot.sequence_number())
998 .data_file(
999 DataFileBuilder::default()
1000 .partition_spec_id(0)
1001 .content(DataContentType::Data)
1002 .file_path(format!("{}/2.parquet", &self.table_location))
1003 .file_format(DataFileFormat::Parquet)
1004 .file_size_in_bytes(100)
1005 .record_count(1)
1006 .partition(empty_partition.clone())
1007 .build()
1008 .unwrap(),
1009 )
1010 .build(),
1011 )
1012 .unwrap();
1013
1014 writer
1015 .add_existing_entry(
1016 ManifestEntry::builder()
1017 .status(ManifestStatus::Existing)
1018 .snapshot_id(parent_snapshot.snapshot_id())
1019 .sequence_number(parent_snapshot.sequence_number())
1020 .file_sequence_number(parent_snapshot.sequence_number())
1021 .data_file(
1022 DataFileBuilder::default()
1023 .partition_spec_id(0)
1024 .content(DataContentType::Data)
1025 .file_path(format!("{}/3.parquet", &self.table_location))
1026 .file_format(DataFileFormat::Parquet)
1027 .file_size_in_bytes(100)
1028 .record_count(1)
1029 .partition(empty_partition.clone())
1030 .build()
1031 .unwrap(),
1032 )
1033 .build(),
1034 )
1035 .unwrap();
1036
1037 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1038
1039 let mut manifest_list_write = ManifestListWriter::v2(
1041 self.table
1042 .file_io()
1043 .new_output(current_snapshot.manifest_list())
1044 .unwrap(),
1045 current_snapshot.snapshot_id(),
1046 current_snapshot.parent_snapshot_id(),
1047 current_snapshot.sequence_number(),
1048 );
1049 manifest_list_write
1050 .add_manifests(vec![data_file_manifest].into_iter())
1051 .unwrap();
1052 manifest_list_write.close().await.unwrap();
1053
1054 let schema = {
1056 let fields = vec![
1057 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1058 .with_metadata(HashMap::from([(
1059 PARQUET_FIELD_ID_META_KEY.to_string(),
1060 "1".to_string(),
1061 )])),
1062 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1063 .with_metadata(HashMap::from([(
1064 PARQUET_FIELD_ID_META_KEY.to_string(),
1065 "2".to_string(),
1066 )])),
1067 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1068 .with_metadata(HashMap::from([(
1069 PARQUET_FIELD_ID_META_KEY.to_string(),
1070 "3".to_string(),
1071 )])),
1072 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1073 .with_metadata(HashMap::from([(
1074 PARQUET_FIELD_ID_META_KEY.to_string(),
1075 "4".to_string(),
1076 )])),
1077 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1078 .with_metadata(HashMap::from([(
1079 PARQUET_FIELD_ID_META_KEY.to_string(),
1080 "5".to_string(),
1081 )])),
1082 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1083 .with_metadata(HashMap::from([(
1084 PARQUET_FIELD_ID_META_KEY.to_string(),
1085 "6".to_string(),
1086 )])),
1087 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1088 .with_metadata(HashMap::from([(
1089 PARQUET_FIELD_ID_META_KEY.to_string(),
1090 "7".to_string(),
1091 )])),
1092 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1093 .with_metadata(HashMap::from([(
1094 PARQUET_FIELD_ID_META_KEY.to_string(),
1095 "8".to_string(),
1096 )])),
1097 ];
1098 Arc::new(arrow_schema::Schema::new(fields))
1099 };
1100
1101 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1103
1104 let mut values = vec![2; 512];
1105 values.append(vec![3; 200].as_mut());
1106 values.append(vec![4; 300].as_mut());
1107 values.append(vec![5; 12].as_mut());
1108 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1109
1110 let mut values = vec![3; 512];
1111 values.append(vec![4; 512].as_mut());
1112 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1113
1114 let mut values = vec!["Apache"; 512];
1115 values.append(vec!["Iceberg"; 512].as_mut());
1116 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1117
1118 let mut values = vec![100.0f64; 512];
1119 values.append(vec![150.0f64; 12].as_mut());
1120 values.append(vec![200.0f64; 500].as_mut());
1121 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1122
1123 let mut values = vec![100i32; 512];
1124 values.append(vec![150i32; 12].as_mut());
1125 values.append(vec![200i32; 500].as_mut());
1126 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1127
1128 let mut values = vec![100i64; 512];
1129 values.append(vec![150i64; 12].as_mut());
1130 values.append(vec![200i64; 500].as_mut());
1131 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1132
1133 let mut values = vec![false; 512];
1134 values.append(vec![true; 512].as_mut());
1135 let values: BooleanArray = values.into();
1136 let col8 = Arc::new(values) as ArrayRef;
1137
1138 let to_write = RecordBatch::try_new(schema.clone(), vec![
1139 col1, col2, col3, col4, col5, col6, col7, col8,
1140 ])
1141 .unwrap();
1142
1143 let props = WriterProperties::builder()
1145 .set_compression(Compression::SNAPPY)
1146 .build();
1147
1148 for n in 1..=3 {
1149 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1150 let mut writer =
1151 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1152
1153 writer.write(&to_write).expect("Writing batch");
1154
1155 writer.close().unwrap();
1157 }
1158 }
1159 }
1160
1161 #[test]
1162 fn test_table_scan_columns() {
1163 let table = TableTestFixture::new().table;
1164
1165 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1166 assert_eq!(
1167 Some(vec!["x".to_string(), "y".to_string()]),
1168 table_scan.column_names
1169 );
1170
1171 let table_scan = table
1172 .scan()
1173 .select(["x", "y"])
1174 .select(["z"])
1175 .build()
1176 .unwrap();
1177 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1178 }
1179
1180 #[test]
1181 fn test_select_all() {
1182 let table = TableTestFixture::new().table;
1183
1184 let table_scan = table.scan().select_all().build().unwrap();
1185 assert!(table_scan.column_names.is_none());
1186 }
1187
1188 #[test]
1189 fn test_select_no_exist_column() {
1190 let table = TableTestFixture::new().table;
1191
1192 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1193 assert!(table_scan.is_err());
1194 }
1195
1196 #[test]
1197 fn test_table_scan_default_snapshot_id() {
1198 let table = TableTestFixture::new().table;
1199
1200 let table_scan = table.scan().build().unwrap();
1201 assert_eq!(
1202 table.metadata().current_snapshot().unwrap().snapshot_id(),
1203 table_scan.snapshot().unwrap().snapshot_id()
1204 );
1205 }
1206
1207 #[test]
1208 fn test_table_scan_non_exist_snapshot_id() {
1209 let table = TableTestFixture::new().table;
1210
1211 let table_scan = table.scan().snapshot_id(1024).build();
1212 assert!(table_scan.is_err());
1213 }
1214
1215 #[test]
1216 fn test_table_scan_with_snapshot_id() {
1217 let table = TableTestFixture::new().table;
1218
1219 let table_scan = table
1220 .scan()
1221 .snapshot_id(3051729675574597004)
1222 .with_row_selection_enabled(true)
1223 .build()
1224 .unwrap();
1225 assert_eq!(
1226 table_scan.snapshot().unwrap().snapshot_id(),
1227 3051729675574597004
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_plan_files_on_table_without_any_snapshots() {
1233 let table = TableTestFixture::new_empty().table;
1234 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1235 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1236 assert!(batches.is_empty());
1237 }
1238
1239 #[tokio::test]
1240 async fn test_plan_files_no_deletions() {
1241 let mut fixture = TableTestFixture::new();
1242 fixture.setup_manifest_files().await;
1243
1244 let table_scan = fixture
1246 .table
1247 .scan()
1248 .with_row_selection_enabled(true)
1249 .build()
1250 .unwrap();
1251
1252 let mut tasks = table_scan
1253 .plan_files()
1254 .await
1255 .unwrap()
1256 .try_fold(vec![], |mut acc, task| async move {
1257 acc.push(task);
1258 Ok(acc)
1259 })
1260 .await
1261 .unwrap();
1262
1263 assert_eq!(tasks.len(), 2);
1264
1265 tasks.sort_by_key(|t| t.data_file_path.to_string());
1266
1267 assert_eq!(
1269 tasks[0].data_file_path,
1270 format!("{}/1.parquet", &fixture.table_location)
1271 );
1272
1273 assert_eq!(
1275 tasks[1].data_file_path,
1276 format!("{}/3.parquet", &fixture.table_location)
1277 );
1278 }
1279
1280 #[tokio::test]
1281 async fn test_open_parquet_no_deletions() {
1282 let mut fixture = TableTestFixture::new();
1283 fixture.setup_manifest_files().await;
1284
1285 let table_scan = fixture
1287 .table
1288 .scan()
1289 .with_row_selection_enabled(true)
1290 .build()
1291 .unwrap();
1292
1293 let batch_stream = table_scan.to_arrow().await.unwrap();
1294
1295 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1296
1297 let col = batches[0].column_by_name("x").unwrap();
1298
1299 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1300 assert_eq!(int64_arr.value(0), 1);
1301 }
1302
1303 #[tokio::test]
1304 async fn test_open_parquet_no_deletions_by_separate_reader() {
1305 let mut fixture = TableTestFixture::new();
1306 fixture.setup_manifest_files().await;
1307
1308 let table_scan = fixture
1310 .table
1311 .scan()
1312 .with_row_selection_enabled(true)
1313 .build()
1314 .unwrap();
1315
1316 let mut plan_task: Vec<_> = table_scan
1317 .plan_files()
1318 .await
1319 .unwrap()
1320 .try_collect()
1321 .await
1322 .unwrap();
1323 assert_eq!(plan_task.len(), 2);
1324
1325 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1326 let batch_stream = reader
1327 .clone()
1328 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1329 .unwrap();
1330 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1331
1332 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1333 let batch_stream = reader
1334 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1335 .unwrap();
1336 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1337
1338 assert_eq!(batch_1, batch_2);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_open_parquet_with_projection() {
1343 let mut fixture = TableTestFixture::new();
1344 fixture.setup_manifest_files().await;
1345
1346 let table_scan = fixture
1348 .table
1349 .scan()
1350 .select(["x", "z"])
1351 .with_row_selection_enabled(true)
1352 .build()
1353 .unwrap();
1354
1355 let batch_stream = table_scan.to_arrow().await.unwrap();
1356
1357 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1358
1359 assert_eq!(batches[0].num_columns(), 2);
1360
1361 let col1 = batches[0].column_by_name("x").unwrap();
1362 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1363 assert_eq!(int64_arr.value(0), 1);
1364
1365 let col2 = batches[0].column_by_name("z").unwrap();
1366 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1367 assert_eq!(int64_arr.value(0), 3);
1368
1369 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1371 let batch_stream = table_scan.to_arrow().await.unwrap();
1372 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1373
1374 assert_eq!(batches[0].num_columns(), 0);
1375 assert_eq!(batches[0].num_rows(), 1024);
1376 }
1377
1378 #[tokio::test]
1379 async fn test_filter_on_arrow_lt() {
1380 let mut fixture = TableTestFixture::new();
1381 fixture.setup_manifest_files().await;
1382
1383 let mut builder = fixture.table.scan();
1385 let predicate = Reference::new("y").less_than(Datum::long(3));
1386 builder = builder
1387 .with_filter(predicate)
1388 .with_row_selection_enabled(true);
1389 let table_scan = builder.build().unwrap();
1390
1391 let batch_stream = table_scan.to_arrow().await.unwrap();
1392
1393 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1394
1395 assert_eq!(batches[0].num_rows(), 512);
1396
1397 let col = batches[0].column_by_name("x").unwrap();
1398 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1399 assert_eq!(int64_arr.value(0), 1);
1400
1401 let col = batches[0].column_by_name("y").unwrap();
1402 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1403 assert_eq!(int64_arr.value(0), 2);
1404 }
1405
1406 #[tokio::test]
1407 async fn test_filter_on_arrow_gt_eq() {
1408 let mut fixture = TableTestFixture::new();
1409 fixture.setup_manifest_files().await;
1410
1411 let mut builder = fixture.table.scan();
1413 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1414 builder = builder
1415 .with_filter(predicate)
1416 .with_row_selection_enabled(true);
1417 let table_scan = builder.build().unwrap();
1418
1419 let batch_stream = table_scan.to_arrow().await.unwrap();
1420
1421 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1422
1423 assert_eq!(batches[0].num_rows(), 12);
1424
1425 let col = batches[0].column_by_name("x").unwrap();
1426 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1427 assert_eq!(int64_arr.value(0), 1);
1428
1429 let col = batches[0].column_by_name("y").unwrap();
1430 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1431 assert_eq!(int64_arr.value(0), 5);
1432 }
1433
1434 #[tokio::test]
1435 async fn test_filter_double_eq() {
1436 let mut fixture = TableTestFixture::new();
1437 fixture.setup_manifest_files().await;
1438
1439 let mut builder = fixture.table.scan();
1441 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1442 builder = builder
1443 .with_filter(predicate)
1444 .with_row_selection_enabled(true);
1445 let table_scan = builder.build().unwrap();
1446
1447 let batch_stream = table_scan.to_arrow().await.unwrap();
1448
1449 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1450
1451 assert_eq!(batches.len(), 2);
1452 assert_eq!(batches[0].num_rows(), 12);
1453
1454 let col = batches[0].column_by_name("dbl").unwrap();
1455 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1456 assert_eq!(f64_arr.value(1), 150.0f64);
1457 }
1458
1459 #[tokio::test]
1460 async fn test_filter_int_eq() {
1461 let mut fixture = TableTestFixture::new();
1462 fixture.setup_manifest_files().await;
1463
1464 let mut builder = fixture.table.scan();
1466 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1467 builder = builder
1468 .with_filter(predicate)
1469 .with_row_selection_enabled(true);
1470 let table_scan = builder.build().unwrap();
1471
1472 let batch_stream = table_scan.to_arrow().await.unwrap();
1473
1474 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1475
1476 assert_eq!(batches.len(), 2);
1477 assert_eq!(batches[0].num_rows(), 12);
1478
1479 let col = batches[0].column_by_name("i32").unwrap();
1480 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1481 assert_eq!(i32_arr.value(1), 150i32);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_filter_long_eq() {
1486 let mut fixture = TableTestFixture::new();
1487 fixture.setup_manifest_files().await;
1488
1489 let mut builder = fixture.table.scan();
1491 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1492 builder = builder
1493 .with_filter(predicate)
1494 .with_row_selection_enabled(true);
1495 let table_scan = builder.build().unwrap();
1496
1497 let batch_stream = table_scan.to_arrow().await.unwrap();
1498
1499 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1500
1501 assert_eq!(batches.len(), 2);
1502 assert_eq!(batches[0].num_rows(), 12);
1503
1504 let col = batches[0].column_by_name("i64").unwrap();
1505 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1506 assert_eq!(i64_arr.value(1), 150i64);
1507 }
1508
1509 #[tokio::test]
1510 async fn test_filter_bool_eq() {
1511 let mut fixture = TableTestFixture::new();
1512 fixture.setup_manifest_files().await;
1513
1514 let mut builder = fixture.table.scan();
1516 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1517 builder = builder
1518 .with_filter(predicate)
1519 .with_row_selection_enabled(true);
1520 let table_scan = builder.build().unwrap();
1521
1522 let batch_stream = table_scan.to_arrow().await.unwrap();
1523
1524 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1525
1526 assert_eq!(batches.len(), 2);
1527 assert_eq!(batches[0].num_rows(), 512);
1528
1529 let col = batches[0].column_by_name("bool").unwrap();
1530 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1531 assert!(bool_arr.value(1));
1532 }
1533
1534 #[tokio::test]
1535 async fn test_filter_on_arrow_is_null() {
1536 let mut fixture = TableTestFixture::new();
1537 fixture.setup_manifest_files().await;
1538
1539 let mut builder = fixture.table.scan();
1541 let predicate = Reference::new("y").is_null();
1542 builder = builder
1543 .with_filter(predicate)
1544 .with_row_selection_enabled(true);
1545 let table_scan = builder.build().unwrap();
1546
1547 let batch_stream = table_scan.to_arrow().await.unwrap();
1548
1549 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1550 assert_eq!(batches.len(), 0);
1551 }
1552
1553 #[tokio::test]
1554 async fn test_filter_on_arrow_is_not_null() {
1555 let mut fixture = TableTestFixture::new();
1556 fixture.setup_manifest_files().await;
1557
1558 let mut builder = fixture.table.scan();
1560 let predicate = Reference::new("y").is_not_null();
1561 builder = builder
1562 .with_filter(predicate)
1563 .with_row_selection_enabled(true);
1564 let table_scan = builder.build().unwrap();
1565
1566 let batch_stream = table_scan.to_arrow().await.unwrap();
1567
1568 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1569 assert_eq!(batches[0].num_rows(), 1024);
1570 }
1571
1572 #[tokio::test]
1573 async fn test_filter_on_arrow_lt_and_gt() {
1574 let mut fixture = TableTestFixture::new();
1575 fixture.setup_manifest_files().await;
1576
1577 let mut builder = fixture.table.scan();
1579 let predicate = Reference::new("y")
1580 .less_than(Datum::long(5))
1581 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1582 builder = builder
1583 .with_filter(predicate)
1584 .with_row_selection_enabled(true);
1585 let table_scan = builder.build().unwrap();
1586
1587 let batch_stream = table_scan.to_arrow().await.unwrap();
1588
1589 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1590 assert_eq!(batches[0].num_rows(), 500);
1591
1592 let col = batches[0].column_by_name("x").unwrap();
1593 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1594 assert_eq!(col, &expected_x);
1595
1596 let col = batches[0].column_by_name("y").unwrap();
1597 let mut values = vec![];
1598 values.append(vec![3; 200].as_mut());
1599 values.append(vec![4; 300].as_mut());
1600 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1601 assert_eq!(col, &expected_y);
1602
1603 let col = batches[0].column_by_name("z").unwrap();
1604 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1605 assert_eq!(col, &expected_z);
1606 }
1607
1608 #[tokio::test]
1609 async fn test_filter_on_arrow_lt_or_gt() {
1610 let mut fixture = TableTestFixture::new();
1611 fixture.setup_manifest_files().await;
1612
1613 let mut builder = fixture.table.scan();
1615 let predicate = Reference::new("y")
1616 .less_than(Datum::long(5))
1617 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1618 builder = builder
1619 .with_filter(predicate)
1620 .with_row_selection_enabled(true);
1621 let table_scan = builder.build().unwrap();
1622
1623 let batch_stream = table_scan.to_arrow().await.unwrap();
1624
1625 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1626 assert_eq!(batches[0].num_rows(), 1024);
1627
1628 let col = batches[0].column_by_name("x").unwrap();
1629 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1630 assert_eq!(col, &expected_x);
1631
1632 let col = batches[0].column_by_name("y").unwrap();
1633 let mut values = vec![2; 512];
1634 values.append(vec![3; 200].as_mut());
1635 values.append(vec![4; 300].as_mut());
1636 values.append(vec![5; 12].as_mut());
1637 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1638 assert_eq!(col, &expected_y);
1639
1640 let col = batches[0].column_by_name("z").unwrap();
1641 let mut values = vec![3; 512];
1642 values.append(vec![4; 512].as_mut());
1643 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1644 assert_eq!(col, &expected_z);
1645 }
1646
1647 #[tokio::test]
1648 async fn test_filter_on_arrow_startswith() {
1649 let mut fixture = TableTestFixture::new();
1650 fixture.setup_manifest_files().await;
1651
1652 let mut builder = fixture.table.scan();
1654 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1655 builder = builder
1656 .with_filter(predicate)
1657 .with_row_selection_enabled(true);
1658 let table_scan = builder.build().unwrap();
1659
1660 let batch_stream = table_scan.to_arrow().await.unwrap();
1661
1662 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1663
1664 assert_eq!(batches[0].num_rows(), 512);
1665
1666 let col = batches[0].column_by_name("a").unwrap();
1667 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1668 assert_eq!(string_arr.value(0), "Iceberg");
1669 }
1670
1671 #[tokio::test]
1672 async fn test_filter_on_arrow_not_startswith() {
1673 let mut fixture = TableTestFixture::new();
1674 fixture.setup_manifest_files().await;
1675
1676 let mut builder = fixture.table.scan();
1678 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1679 builder = builder
1680 .with_filter(predicate)
1681 .with_row_selection_enabled(true);
1682 let table_scan = builder.build().unwrap();
1683
1684 let batch_stream = table_scan.to_arrow().await.unwrap();
1685
1686 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1687
1688 assert_eq!(batches[0].num_rows(), 512);
1689
1690 let col = batches[0].column_by_name("a").unwrap();
1691 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1692 assert_eq!(string_arr.value(0), "Apache");
1693 }
1694
1695 #[tokio::test]
1696 async fn test_filter_on_arrow_in() {
1697 let mut fixture = TableTestFixture::new();
1698 fixture.setup_manifest_files().await;
1699
1700 let mut builder = fixture.table.scan();
1702 let predicate =
1703 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1704 builder = builder
1705 .with_filter(predicate)
1706 .with_row_selection_enabled(true);
1707 let table_scan = builder.build().unwrap();
1708
1709 let batch_stream = table_scan.to_arrow().await.unwrap();
1710
1711 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1712
1713 assert_eq!(batches[0].num_rows(), 512);
1714
1715 let col = batches[0].column_by_name("a").unwrap();
1716 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1717 assert_eq!(string_arr.value(0), "Iceberg");
1718 }
1719
1720 #[tokio::test]
1721 async fn test_filter_on_arrow_not_in() {
1722 let mut fixture = TableTestFixture::new();
1723 fixture.setup_manifest_files().await;
1724
1725 let mut builder = fixture.table.scan();
1727 let predicate =
1728 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1729 builder = builder
1730 .with_filter(predicate)
1731 .with_row_selection_enabled(true);
1732 let table_scan = builder.build().unwrap();
1733
1734 let batch_stream = table_scan.to_arrow().await.unwrap();
1735
1736 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1737
1738 assert_eq!(batches[0].num_rows(), 512);
1739
1740 let col = batches[0].column_by_name("a").unwrap();
1741 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1742 assert_eq!(string_arr.value(0), "Apache");
1743 }
1744
1745 #[test]
1746 fn test_file_scan_task_serialize_deserialize() {
1747 let test_fn = |task: FileScanTask| {
1748 let serialized = serde_json::to_string(&task).unwrap();
1749 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1750
1751 assert_eq!(task.data_file_path, deserialized.data_file_path);
1752 assert_eq!(task.start, deserialized.start);
1753 assert_eq!(task.length, deserialized.length);
1754 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1755 assert_eq!(task.predicate, deserialized.predicate);
1756 assert_eq!(task.schema, deserialized.schema);
1757 };
1758
1759 let schema = Arc::new(
1761 Schema::builder()
1762 .with_fields(vec![Arc::new(NestedField::required(
1763 1,
1764 "x",
1765 Type::Primitive(PrimitiveType::Binary),
1766 ))])
1767 .build()
1768 .unwrap(),
1769 );
1770 let task = FileScanTask {
1771 data_file_path: "data_file_path".to_string(),
1772 start: 0,
1773 length: 100,
1774 project_field_ids: vec![1, 2, 3],
1775 predicate: None,
1776 schema: schema.clone(),
1777 record_count: Some(100),
1778 data_file_format: DataFileFormat::Parquet,
1779 deletes: vec![],
1780 partition: None,
1781 partition_spec: None,
1782 name_mapping: None,
1783 };
1784 test_fn(task);
1785
1786 let task = FileScanTask {
1788 data_file_path: "data_file_path".to_string(),
1789 start: 0,
1790 length: 100,
1791 project_field_ids: vec![1, 2, 3],
1792 predicate: Some(BoundPredicate::AlwaysTrue),
1793 schema,
1794 record_count: None,
1795 data_file_format: DataFileFormat::Avro,
1796 deletes: vec![],
1797 partition: None,
1798 partition_spec: None,
1799 name_mapping: None,
1800 };
1801 test_fn(task);
1802 }
1803}