1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::spawn;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50pub struct TableScanBuilder<'a> {
52 table: &'a Table,
53 column_names: Option<Vec<String>>,
55 snapshot_id: Option<i64>,
56 batch_size: Option<usize>,
57 case_sensitive: bool,
58 filter: Option<Predicate>,
59 concurrency_limit_data_files: usize,
60 concurrency_limit_manifest_entries: usize,
61 concurrency_limit_manifest_files: usize,
62 row_group_filtering_enabled: bool,
63 row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67 pub(crate) fn new(table: &'a Table) -> Self {
68 let num_cpus = available_parallelism().get();
69
70 Self {
71 table,
72 column_names: None,
73 snapshot_id: None,
74 batch_size: None,
75 case_sensitive: true,
76 filter: None,
77 concurrency_limit_data_files: num_cpus,
78 concurrency_limit_manifest_entries: num_cpus,
79 concurrency_limit_manifest_files: num_cpus,
80 row_group_filtering_enabled: true,
81 row_selection_enabled: false,
82 }
83 }
84
85 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88 self.batch_size = batch_size;
89 self
90 }
91
92 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94 self.case_sensitive = case_sensitive;
95 self
96 }
97
98 pub fn with_filter(mut self, predicate: Predicate) -> Self {
100 self.filter = Some(predicate.rewrite_not());
103 self
104 }
105
106 pub fn select_all(mut self) -> Self {
108 self.column_names = None;
109 self
110 }
111
112 pub fn select_empty(mut self) -> Self {
114 self.column_names = Some(vec![]);
115 self
116 }
117
118 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120 self.column_names = Some(
121 column_names
122 .into_iter()
123 .map(|item| item.to_string())
124 .collect(),
125 );
126 self
127 }
128
129 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131 self.snapshot_id = Some(snapshot_id);
132 self
133 }
134
135 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138 self.concurrency_limit_manifest_files = limit;
139 self.concurrency_limit_manifest_entries = limit;
140 self.concurrency_limit_data_files = limit;
141 self
142 }
143
144 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146 self.concurrency_limit_data_files = limit;
147 self
148 }
149
150 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152 self.concurrency_limit_manifest_entries = limit;
153 self
154 }
155
156 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165 self.row_group_filtering_enabled = row_group_filtering_enabled;
166 self
167 }
168
169 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184 self.row_selection_enabled = row_selection_enabled;
185 self
186 }
187
188 pub fn build(self) -> Result<TableScan> {
190 let snapshot = match self.snapshot_id {
191 Some(snapshot_id) => self
192 .table
193 .metadata()
194 .snapshot_by_id(snapshot_id)
195 .ok_or_else(|| {
196 Error::new(
197 ErrorKind::DataInvalid,
198 format!("Snapshot with id {snapshot_id} not found"),
199 )
200 })?
201 .clone(),
202 None => {
203 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204 return Ok(TableScan {
205 batch_size: self.batch_size,
206 column_names: self.column_names,
207 file_io: self.table.file_io().clone(),
208 plan_context: None,
209 concurrency_limit_data_files: self.concurrency_limit_data_files,
210 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212 row_group_filtering_enabled: self.row_group_filtering_enabled,
213 row_selection_enabled: self.row_selection_enabled,
214 });
215 };
216 current_snapshot_id.clone()
217 }
218 };
219
220 let schema = snapshot.schema(self.table.metadata())?;
221
222 if let Some(column_names) = self.column_names.as_ref() {
224 for column_name in column_names {
225 if is_metadata_column_name(column_name) {
227 continue;
228 }
229 if schema.field_by_name(column_name).is_none() {
230 return Err(Error::new(
231 ErrorKind::DataInvalid,
232 format!("Column {column_name} not found in table. Schema: {schema}"),
233 ));
234 }
235 }
236 }
237
238 let mut field_ids = vec![];
239 let column_names = self.column_names.clone().unwrap_or_else(|| {
240 schema
241 .as_struct()
242 .fields()
243 .iter()
244 .map(|f| f.name.clone())
245 .collect()
246 });
247
248 for column_name in column_names.iter() {
249 if is_metadata_column_name(column_name) {
251 field_ids.push(get_metadata_field_id(column_name)?);
252 continue;
253 }
254
255 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
256 Error::new(
257 ErrorKind::DataInvalid,
258 format!("Column {column_name} not found in table. Schema: {schema}"),
259 )
260 })?;
261
262 schema
263 .as_struct()
264 .field_by_id(field_id)
265 .ok_or_else(|| {
266 Error::new(
267 ErrorKind::FeatureUnsupported,
268 format!(
269 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
270 ),
271 )
272 })?;
273
274 field_ids.push(field_id);
275 }
276
277 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
278 Some(predicates.bind(schema.clone(), true)?)
279 } else {
280 None
281 };
282
283 let plan_context = PlanContext {
284 snapshot,
285 table_metadata: self.table.metadata_ref(),
286 snapshot_schema: schema,
287 case_sensitive: self.case_sensitive,
288 predicate: self.filter.map(Arc::new),
289 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
290 object_cache: self.table.object_cache(),
291 field_ids: Arc::new(field_ids),
292 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
293 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
294 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
295 };
296
297 Ok(TableScan {
298 batch_size: self.batch_size,
299 column_names: self.column_names,
300 file_io: self.table.file_io().clone(),
301 plan_context: Some(plan_context),
302 concurrency_limit_data_files: self.concurrency_limit_data_files,
303 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
304 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
305 row_group_filtering_enabled: self.row_group_filtering_enabled,
306 row_selection_enabled: self.row_selection_enabled,
307 })
308 }
309}
310
311#[derive(Debug)]
313pub struct TableScan {
314 plan_context: Option<PlanContext>,
318 batch_size: Option<usize>,
319 file_io: FileIO,
320 column_names: Option<Vec<String>>,
321 concurrency_limit_manifest_files: usize,
324
325 concurrency_limit_manifest_entries: usize,
328
329 concurrency_limit_data_files: usize,
332
333 row_group_filtering_enabled: bool,
334 row_selection_enabled: bool,
335}
336
337impl TableScan {
338 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
340 let Some(plan_context) = self.plan_context.as_ref() else {
341 return Ok(Box::pin(futures::stream::empty()));
342 };
343
344 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
345 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
346
347 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
349 channel(concurrency_limit_manifest_files);
350 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
351 channel(concurrency_limit_manifest_files);
352
353 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
355
356 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
357
358 let manifest_list = plan_context.get_manifest_list().await?;
359
360 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
364 manifest_list,
365 manifest_entry_data_ctx_tx,
366 delete_file_idx.clone(),
367 manifest_entry_delete_ctx_tx,
368 )?;
369
370 let mut channel_for_manifest_error = file_scan_task_tx.clone();
371
372 spawn(async move {
374 let result = futures::stream::iter(manifest_file_contexts)
375 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
376 ctx.fetch_manifest_and_stream_manifest_entries().await
377 })
378 .await;
379
380 if let Err(error) = result {
381 let _ = channel_for_manifest_error.send(Err(error)).await;
382 }
383 });
384
385 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
386 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
387
388 spawn(async move {
390 let result = manifest_entry_delete_ctx_rx
391 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
392 .try_for_each_concurrent(
393 concurrency_limit_manifest_entries,
394 |(manifest_entry_context, tx)| async move {
395 spawn(async move {
396 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
397 })
398 .await
399 },
400 )
401 .await;
402
403 if let Err(error) = result {
404 let _ = channel_for_delete_manifest_entry_error
405 .send(Err(error))
406 .await;
407 }
408 })
409 .await;
410
411 spawn(async move {
413 let result = manifest_entry_data_ctx_rx
414 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
415 .try_for_each_concurrent(
416 concurrency_limit_manifest_entries,
417 |(manifest_entry_context, tx)| async move {
418 spawn(async move {
419 Self::process_data_manifest_entry(manifest_entry_context, tx).await
420 })
421 .await
422 },
423 )
424 .await;
425
426 if let Err(error) = result {
427 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
428 }
429 });
430
431 Ok(file_scan_task_rx.boxed())
432 }
433
434 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
436 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
437 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
438 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
439 .with_row_selection_enabled(self.row_selection_enabled);
440
441 if let Some(batch_size) = self.batch_size {
442 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
443 }
444
445 arrow_reader_builder
446 .build()
447 .read(self.plan_files().await?)
448 .map(|result| result.stream())
449 }
450
451 pub fn column_names(&self) -> Option<&[String]> {
453 self.column_names.as_deref()
454 }
455
456 pub fn snapshot(&self) -> Option<&SnapshotRef> {
458 self.plan_context.as_ref().map(|x| &x.snapshot)
459 }
460
461 async fn process_data_manifest_entry(
462 manifest_entry_context: ManifestEntryContext,
463 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
464 ) -> Result<()> {
465 if !manifest_entry_context.manifest_entry.is_alive() {
467 return Ok(());
468 }
469
470 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
472 return Err(Error::new(
473 ErrorKind::FeatureUnsupported,
474 "Encountered an entry for a delete file in a data file manifest",
475 ));
476 }
477
478 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
479 let BoundPredicates {
480 snapshot_bound_predicate,
481 partition_bound_predicate,
482 } = bound_predicates.as_ref();
483
484 let expression_evaluator_cache =
485 manifest_entry_context.expression_evaluator_cache.as_ref();
486
487 let expression_evaluator = expression_evaluator_cache.get(
488 manifest_entry_context.partition_spec_id,
489 partition_bound_predicate,
490 )?;
491
492 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
495 return Ok(());
496 }
497
498 if !InclusiveMetricsEvaluator::eval(
500 snapshot_bound_predicate,
501 manifest_entry_context.manifest_entry.data_file(),
502 false,
503 )? {
504 return Ok(());
505 }
506 }
507
508 file_scan_task_tx
512 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
513 .await?;
514
515 Ok(())
516 }
517
518 async fn process_delete_manifest_entry(
519 manifest_entry_context: ManifestEntryContext,
520 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
521 ) -> Result<()> {
522 if !manifest_entry_context.manifest_entry.is_alive() {
524 return Ok(());
525 }
526
527 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
529 return Err(Error::new(
530 ErrorKind::FeatureUnsupported,
531 "Encountered an entry for a data file in a delete manifest",
532 ));
533 }
534
535 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
536 let expression_evaluator_cache =
537 manifest_entry_context.expression_evaluator_cache.as_ref();
538
539 let expression_evaluator = expression_evaluator_cache.get(
540 manifest_entry_context.partition_spec_id,
541 &bound_predicates.partition_bound_predicate,
542 )?;
543
544 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
547 return Ok(());
548 }
549 }
550
551 delete_file_ctx_tx
552 .send(DeleteFileContext {
553 manifest_entry: manifest_entry_context.manifest_entry.clone(),
554 partition_spec_id: manifest_entry_context.partition_spec_id,
555 })
556 .await?;
557
558 Ok(())
559 }
560}
561
562pub(crate) struct BoundPredicates {
563 partition_bound_predicate: BoundPredicate,
564 snapshot_bound_predicate: BoundPredicate,
565}
566
567#[cfg(test)]
568pub mod tests {
569 #![allow(missing_docs)]
571
572 use std::collections::HashMap;
573 use std::fs;
574 use std::fs::File;
575 use std::sync::Arc;
576
577 use arrow_array::cast::AsArray;
578 use arrow_array::{
579 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
580 StringArray,
581 };
582 use futures::{TryStreamExt, stream};
583 use minijinja::value::Value;
584 use minijinja::{AutoEscape, Environment, context};
585 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
586 use parquet::basic::Compression;
587 use parquet::file::properties::WriterProperties;
588 use tempfile::TempDir;
589 use uuid::Uuid;
590
591 use crate::TableIdent;
592 use crate::arrow::ArrowReaderBuilder;
593 use crate::expr::{BoundPredicate, Reference};
594 use crate::io::{FileIO, OutputFile};
595 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
596 use crate::scan::FileScanTask;
597 use crate::spec::{
598 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
599 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
600 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
601 };
602 use crate::table::Table;
603
604 fn render_template(template: &str, ctx: Value) -> String {
605 let mut env = Environment::new();
606 env.set_auto_escape_callback(|_| AutoEscape::None);
607 env.render_str(template, ctx).unwrap()
608 }
609
610 pub struct TableTestFixture {
611 pub table_location: String,
612 pub table: Table,
613 }
614
615 impl TableTestFixture {
616 #[allow(clippy::new_without_default)]
617 pub fn new() -> Self {
618 let tmp_dir = TempDir::new().unwrap();
619 let table_location = tmp_dir.path().join("table1");
620 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
621 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
622 let table_metadata1_location = table_location.join("metadata/v1.json");
623
624 let file_io = FileIO::new_with_fs();
625
626 let table_metadata = {
627 let template_json_str = fs::read_to_string(format!(
628 "{}/testdata/example_table_metadata_v2.json",
629 env!("CARGO_MANIFEST_DIR")
630 ))
631 .unwrap();
632 let metadata_json = render_template(&template_json_str, context! {
633 table_location => &table_location,
634 manifest_list_1_location => &manifest_list1_location,
635 manifest_list_2_location => &manifest_list2_location,
636 table_metadata_1_location => &table_metadata1_location,
637 });
638 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
639 };
640
641 let table = Table::builder()
642 .metadata(table_metadata)
643 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
644 .file_io(file_io.clone())
645 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
646 .build()
647 .unwrap();
648
649 Self {
650 table_location: table_location.to_str().unwrap().to_string(),
651 table,
652 }
653 }
654
655 #[allow(clippy::new_without_default)]
656 pub fn new_empty() -> Self {
657 let tmp_dir = TempDir::new().unwrap();
658 let table_location = tmp_dir.path().join("table1");
659 let table_metadata1_location = table_location.join("metadata/v1.json");
660
661 let file_io = FileIO::new_with_fs();
662
663 let table_metadata = {
664 let template_json_str = fs::read_to_string(format!(
665 "{}/testdata/example_empty_table_metadata_v2.json",
666 env!("CARGO_MANIFEST_DIR")
667 ))
668 .unwrap();
669 let metadata_json = render_template(&template_json_str, context! {
670 table_location => &table_location,
671 table_metadata_1_location => &table_metadata1_location,
672 });
673 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
674 };
675
676 let table = Table::builder()
677 .metadata(table_metadata)
678 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
679 .file_io(file_io.clone())
680 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
681 .build()
682 .unwrap();
683
684 Self {
685 table_location: table_location.to_str().unwrap().to_string(),
686 table,
687 }
688 }
689
690 pub fn new_with_deep_history() -> Self {
694 let tmp_dir = TempDir::new().unwrap();
695 let table_location = tmp_dir.path().join("table1");
696 let table_metadata1_location = table_location.join("metadata/v1.json");
697
698 let file_io = FileIO::new_with_fs();
699
700 let table_metadata = {
701 let json_str = fs::read_to_string(format!(
702 "{}/testdata/example_table_metadata_v2_deep_history.json",
703 env!("CARGO_MANIFEST_DIR")
704 ))
705 .unwrap();
706 serde_json::from_str::<TableMetadata>(&json_str).unwrap()
707 };
708
709 let table = Table::builder()
710 .metadata(table_metadata)
711 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
712 .file_io(file_io.clone())
713 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
714 .build()
715 .unwrap();
716
717 Self {
718 table_location: table_location.to_str().unwrap().to_string(),
719 table,
720 }
721 }
722
723 pub fn new_unpartitioned() -> Self {
724 let tmp_dir = TempDir::new().unwrap();
725 let table_location = tmp_dir.path().join("table1");
726 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
727 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
728 let table_metadata1_location = table_location.join("metadata/v1.json");
729
730 let file_io = FileIO::new_with_fs();
731
732 let mut table_metadata = {
733 let template_json_str = fs::read_to_string(format!(
734 "{}/testdata/example_table_metadata_v2.json",
735 env!("CARGO_MANIFEST_DIR")
736 ))
737 .unwrap();
738 let metadata_json = render_template(&template_json_str, context! {
739 table_location => &table_location,
740 manifest_list_1_location => &manifest_list1_location,
741 manifest_list_2_location => &manifest_list2_location,
742 table_metadata_1_location => &table_metadata1_location,
743 });
744 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
745 };
746
747 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
748 table_metadata.partition_specs.clear();
749 table_metadata.default_partition_type = StructType::new(vec![]);
750 table_metadata
751 .partition_specs
752 .insert(0, table_metadata.default_spec.clone());
753
754 let table = Table::builder()
755 .metadata(table_metadata)
756 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
757 .file_io(file_io.clone())
758 .metadata_location(table_metadata1_location.to_str().unwrap())
759 .build()
760 .unwrap();
761
762 Self {
763 table_location: table_location.to_str().unwrap().to_string(),
764 table,
765 }
766 }
767
768 fn next_manifest_file(&self) -> OutputFile {
769 self.table
770 .file_io()
771 .new_output(format!(
772 "{}/metadata/manifest_{}.avro",
773 self.table_location,
774 Uuid::new_v4()
775 ))
776 .unwrap()
777 }
778
779 pub async fn setup_manifest_files(&mut self) {
780 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
781 let parent_snapshot = current_snapshot
782 .parent_snapshot(self.table.metadata())
783 .unwrap();
784 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
785 let current_partition_spec = self.table.metadata().default_partition_spec();
786
787 let parquet_file_size = self.write_parquet_data_files();
789
790 let mut writer = ManifestWriterBuilder::new(
791 self.next_manifest_file(),
792 Some(current_snapshot.snapshot_id()),
793 None,
794 current_schema.clone(),
795 current_partition_spec.as_ref().clone(),
796 )
797 .build_v2_data();
798 writer
799 .add_entry(
800 ManifestEntry::builder()
801 .status(ManifestStatus::Added)
802 .data_file(
803 DataFileBuilder::default()
804 .partition_spec_id(0)
805 .content(DataContentType::Data)
806 .file_path(format!("{}/1.parquet", &self.table_location))
807 .file_format(DataFileFormat::Parquet)
808 .file_size_in_bytes(parquet_file_size)
809 .record_count(1)
810 .partition(Struct::from_iter([Some(Literal::long(100))]))
811 .key_metadata(None)
812 .build()
813 .unwrap(),
814 )
815 .build(),
816 )
817 .unwrap();
818 writer
819 .add_delete_entry(
820 ManifestEntry::builder()
821 .status(ManifestStatus::Deleted)
822 .snapshot_id(parent_snapshot.snapshot_id())
823 .sequence_number(parent_snapshot.sequence_number())
824 .file_sequence_number(parent_snapshot.sequence_number())
825 .data_file(
826 DataFileBuilder::default()
827 .partition_spec_id(0)
828 .content(DataContentType::Data)
829 .file_path(format!("{}/2.parquet", &self.table_location))
830 .file_format(DataFileFormat::Parquet)
831 .file_size_in_bytes(parquet_file_size)
832 .record_count(1)
833 .partition(Struct::from_iter([Some(Literal::long(200))]))
834 .build()
835 .unwrap(),
836 )
837 .build(),
838 )
839 .unwrap();
840 writer
841 .add_existing_entry(
842 ManifestEntry::builder()
843 .status(ManifestStatus::Existing)
844 .snapshot_id(parent_snapshot.snapshot_id())
845 .sequence_number(parent_snapshot.sequence_number())
846 .file_sequence_number(parent_snapshot.sequence_number())
847 .data_file(
848 DataFileBuilder::default()
849 .partition_spec_id(0)
850 .content(DataContentType::Data)
851 .file_path(format!("{}/3.parquet", &self.table_location))
852 .file_format(DataFileFormat::Parquet)
853 .file_size_in_bytes(parquet_file_size)
854 .record_count(1)
855 .partition(Struct::from_iter([Some(Literal::long(300))]))
856 .build()
857 .unwrap(),
858 )
859 .build(),
860 )
861 .unwrap();
862 let data_file_manifest = writer.write_manifest_file().await.unwrap();
863
864 let mut manifest_list_write = ManifestListWriter::v2(
866 self.table
867 .file_io()
868 .new_output(current_snapshot.manifest_list())
869 .unwrap(),
870 current_snapshot.snapshot_id(),
871 current_snapshot.parent_snapshot_id(),
872 current_snapshot.sequence_number(),
873 );
874 manifest_list_write
875 .add_manifests(vec![data_file_manifest].into_iter())
876 .unwrap();
877 manifest_list_write.close().await.unwrap();
878 }
879
880 fn write_parquet_data_files(&self) -> u64 {
883 std::fs::create_dir_all(&self.table_location).unwrap();
884
885 let schema = {
886 let fields = vec![
887 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
888 .with_metadata(HashMap::from([(
889 PARQUET_FIELD_ID_META_KEY.to_string(),
890 "1".to_string(),
891 )])),
892 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
893 .with_metadata(HashMap::from([(
894 PARQUET_FIELD_ID_META_KEY.to_string(),
895 "2".to_string(),
896 )])),
897 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
898 .with_metadata(HashMap::from([(
899 PARQUET_FIELD_ID_META_KEY.to_string(),
900 "3".to_string(),
901 )])),
902 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
903 .with_metadata(HashMap::from([(
904 PARQUET_FIELD_ID_META_KEY.to_string(),
905 "4".to_string(),
906 )])),
907 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
908 .with_metadata(HashMap::from([(
909 PARQUET_FIELD_ID_META_KEY.to_string(),
910 "5".to_string(),
911 )])),
912 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
913 .with_metadata(HashMap::from([(
914 PARQUET_FIELD_ID_META_KEY.to_string(),
915 "6".to_string(),
916 )])),
917 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
918 .with_metadata(HashMap::from([(
919 PARQUET_FIELD_ID_META_KEY.to_string(),
920 "7".to_string(),
921 )])),
922 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
923 .with_metadata(HashMap::from([(
924 PARQUET_FIELD_ID_META_KEY.to_string(),
925 "8".to_string(),
926 )])),
927 ];
928 Arc::new(arrow_schema::Schema::new(fields))
929 };
930 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
932
933 let mut values = vec![2; 512];
934 values.append(vec![3; 200].as_mut());
935 values.append(vec![4; 300].as_mut());
936 values.append(vec![5; 12].as_mut());
937
938 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
940
941 let mut values = vec![3; 512];
942 values.append(vec![4; 512].as_mut());
943
944 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
946
947 let mut values = vec!["Apache"; 512];
949 values.append(vec!["Iceberg"; 512].as_mut());
950 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
951
952 let mut values = vec![100.0f64; 512];
954 values.append(vec![150.0f64; 12].as_mut());
955 values.append(vec![200.0f64; 500].as_mut());
956 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
957
958 let mut values = vec![100i32; 512];
960 values.append(vec![150i32; 12].as_mut());
961 values.append(vec![200i32; 500].as_mut());
962 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
963
964 let mut values = vec![100i64; 512];
966 values.append(vec![150i64; 12].as_mut());
967 values.append(vec![200i64; 500].as_mut());
968 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
969
970 let mut values = vec![false; 512];
972 values.append(vec![true; 512].as_mut());
973 let values: BooleanArray = values.into();
974 let col8 = Arc::new(values) as ArrayRef;
975
976 let to_write = RecordBatch::try_new(schema.clone(), vec![
977 col1, col2, col3, col4, col5, col6, col7, col8,
978 ])
979 .unwrap();
980
981 let props = WriterProperties::builder()
983 .set_compression(Compression::SNAPPY)
984 .build();
985
986 for n in 1..=3 {
987 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
988 let mut writer =
989 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
990
991 writer.write(&to_write).expect("Writing batch");
992
993 writer.close().unwrap();
995 }
996
997 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
998 .unwrap()
999 .len()
1000 }
1001
1002 pub async fn setup_unpartitioned_manifest_files(&mut self) {
1003 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1004 let parent_snapshot = current_snapshot
1005 .parent_snapshot(self.table.metadata())
1006 .unwrap();
1007 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1008 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1009
1010 let parquet_file_size = self.write_parquet_data_files();
1012
1013 let mut writer = ManifestWriterBuilder::new(
1015 self.next_manifest_file(),
1016 Some(current_snapshot.snapshot_id()),
1017 None,
1018 current_schema.clone(),
1019 current_partition_spec.as_ref().clone(),
1020 )
1021 .build_v2_data();
1022
1023 let empty_partition = Struct::empty();
1025
1026 writer
1027 .add_entry(
1028 ManifestEntry::builder()
1029 .status(ManifestStatus::Added)
1030 .data_file(
1031 DataFileBuilder::default()
1032 .partition_spec_id(0)
1033 .content(DataContentType::Data)
1034 .file_path(format!("{}/1.parquet", &self.table_location))
1035 .file_format(DataFileFormat::Parquet)
1036 .file_size_in_bytes(parquet_file_size)
1037 .record_count(1)
1038 .partition(empty_partition.clone())
1039 .key_metadata(None)
1040 .build()
1041 .unwrap(),
1042 )
1043 .build(),
1044 )
1045 .unwrap();
1046
1047 writer
1048 .add_delete_entry(
1049 ManifestEntry::builder()
1050 .status(ManifestStatus::Deleted)
1051 .snapshot_id(parent_snapshot.snapshot_id())
1052 .sequence_number(parent_snapshot.sequence_number())
1053 .file_sequence_number(parent_snapshot.sequence_number())
1054 .data_file(
1055 DataFileBuilder::default()
1056 .partition_spec_id(0)
1057 .content(DataContentType::Data)
1058 .file_path(format!("{}/2.parquet", &self.table_location))
1059 .file_format(DataFileFormat::Parquet)
1060 .file_size_in_bytes(parquet_file_size)
1061 .record_count(1)
1062 .partition(empty_partition.clone())
1063 .build()
1064 .unwrap(),
1065 )
1066 .build(),
1067 )
1068 .unwrap();
1069
1070 writer
1071 .add_existing_entry(
1072 ManifestEntry::builder()
1073 .status(ManifestStatus::Existing)
1074 .snapshot_id(parent_snapshot.snapshot_id())
1075 .sequence_number(parent_snapshot.sequence_number())
1076 .file_sequence_number(parent_snapshot.sequence_number())
1077 .data_file(
1078 DataFileBuilder::default()
1079 .partition_spec_id(0)
1080 .content(DataContentType::Data)
1081 .file_path(format!("{}/3.parquet", &self.table_location))
1082 .file_format(DataFileFormat::Parquet)
1083 .file_size_in_bytes(parquet_file_size)
1084 .record_count(1)
1085 .partition(empty_partition.clone())
1086 .build()
1087 .unwrap(),
1088 )
1089 .build(),
1090 )
1091 .unwrap();
1092
1093 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1094
1095 let mut manifest_list_write = ManifestListWriter::v2(
1097 self.table
1098 .file_io()
1099 .new_output(current_snapshot.manifest_list())
1100 .unwrap(),
1101 current_snapshot.snapshot_id(),
1102 current_snapshot.parent_snapshot_id(),
1103 current_snapshot.sequence_number(),
1104 );
1105 manifest_list_write
1106 .add_manifests(vec![data_file_manifest].into_iter())
1107 .unwrap();
1108 manifest_list_write.close().await.unwrap();
1109 }
1110
1111 pub async fn setup_deadlock_manifests(&mut self) {
1112 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1113 let _parent_snapshot = current_snapshot
1114 .parent_snapshot(self.table.metadata())
1115 .unwrap();
1116 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1117 let current_partition_spec = self.table.metadata().default_partition_spec();
1118
1119 let mut writer = ManifestWriterBuilder::new(
1121 self.next_manifest_file(),
1122 Some(current_snapshot.snapshot_id()),
1123 None,
1124 current_schema.clone(),
1125 current_partition_spec.as_ref().clone(),
1126 )
1127 .build_v2_data();
1128
1129 for i in 0..10 {
1131 writer
1132 .add_entry(
1133 ManifestEntry::builder()
1134 .status(ManifestStatus::Added)
1135 .data_file(
1136 DataFileBuilder::default()
1137 .partition_spec_id(0)
1138 .content(DataContentType::Data)
1139 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1140 .file_format(DataFileFormat::Parquet)
1141 .file_size_in_bytes(100)
1142 .record_count(1)
1143 .partition(Struct::from_iter([Some(Literal::long(100))]))
1144 .key_metadata(None)
1145 .build()
1146 .unwrap(),
1147 )
1148 .build(),
1149 )
1150 .unwrap();
1151 }
1152 let data_manifest = writer.write_manifest_file().await.unwrap();
1153
1154 let mut writer = ManifestWriterBuilder::new(
1156 self.next_manifest_file(),
1157 Some(current_snapshot.snapshot_id()),
1158 None,
1159 current_schema.clone(),
1160 current_partition_spec.as_ref().clone(),
1161 )
1162 .build_v2_deletes();
1163
1164 writer
1165 .add_entry(
1166 ManifestEntry::builder()
1167 .status(ManifestStatus::Added)
1168 .data_file(
1169 DataFileBuilder::default()
1170 .partition_spec_id(0)
1171 .content(DataContentType::PositionDeletes)
1172 .file_path(format!("{}/del.parquet", &self.table_location))
1173 .file_format(DataFileFormat::Parquet)
1174 .file_size_in_bytes(100)
1175 .record_count(1)
1176 .partition(Struct::from_iter([Some(Literal::long(100))]))
1177 .build()
1178 .unwrap(),
1179 )
1180 .build(),
1181 )
1182 .unwrap();
1183 let delete_manifest = writer.write_manifest_file().await.unwrap();
1184
1185 let mut manifest_list_write = ManifestListWriter::v2(
1188 self.table
1189 .file_io()
1190 .new_output(current_snapshot.manifest_list())
1191 .unwrap(),
1192 current_snapshot.snapshot_id(),
1193 current_snapshot.parent_snapshot_id(),
1194 current_snapshot.sequence_number(),
1195 );
1196 manifest_list_write
1197 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1198 .unwrap();
1199 manifest_list_write.close().await.unwrap();
1200 }
1201 }
1202
1203 #[test]
1204 fn test_table_scan_columns() {
1205 let table = TableTestFixture::new().table;
1206
1207 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1208 assert_eq!(
1209 Some(vec!["x".to_string(), "y".to_string()]),
1210 table_scan.column_names
1211 );
1212
1213 let table_scan = table
1214 .scan()
1215 .select(["x", "y"])
1216 .select(["z"])
1217 .build()
1218 .unwrap();
1219 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1220 }
1221
1222 #[test]
1223 fn test_select_all() {
1224 let table = TableTestFixture::new().table;
1225
1226 let table_scan = table.scan().select_all().build().unwrap();
1227 assert!(table_scan.column_names.is_none());
1228 }
1229
1230 #[test]
1231 fn test_select_no_exist_column() {
1232 let table = TableTestFixture::new().table;
1233
1234 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1235 assert!(table_scan.is_err());
1236 }
1237
1238 #[test]
1239 fn test_table_scan_default_snapshot_id() {
1240 let table = TableTestFixture::new().table;
1241
1242 let table_scan = table.scan().build().unwrap();
1243 assert_eq!(
1244 table.metadata().current_snapshot().unwrap().snapshot_id(),
1245 table_scan.snapshot().unwrap().snapshot_id()
1246 );
1247 }
1248
1249 #[test]
1250 fn test_table_scan_non_exist_snapshot_id() {
1251 let table = TableTestFixture::new().table;
1252
1253 let table_scan = table.scan().snapshot_id(1024).build();
1254 assert!(table_scan.is_err());
1255 }
1256
1257 #[test]
1258 fn test_table_scan_with_snapshot_id() {
1259 let table = TableTestFixture::new().table;
1260
1261 let table_scan = table
1262 .scan()
1263 .snapshot_id(3051729675574597004)
1264 .with_row_selection_enabled(true)
1265 .build()
1266 .unwrap();
1267 assert_eq!(
1268 table_scan.snapshot().unwrap().snapshot_id(),
1269 3051729675574597004
1270 );
1271 }
1272
1273 #[tokio::test]
1274 async fn test_plan_files_on_table_without_any_snapshots() {
1275 let table = TableTestFixture::new_empty().table;
1276 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1277 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1278 assert!(batches.is_empty());
1279 }
1280
1281 #[tokio::test]
1282 async fn test_plan_files_no_deletions() {
1283 let mut fixture = TableTestFixture::new();
1284 fixture.setup_manifest_files().await;
1285
1286 let table_scan = fixture
1288 .table
1289 .scan()
1290 .with_row_selection_enabled(true)
1291 .build()
1292 .unwrap();
1293
1294 let mut tasks = table_scan
1295 .plan_files()
1296 .await
1297 .unwrap()
1298 .try_fold(vec![], |mut acc, task| async move {
1299 acc.push(task);
1300 Ok(acc)
1301 })
1302 .await
1303 .unwrap();
1304
1305 assert_eq!(tasks.len(), 2);
1306
1307 tasks.sort_by_key(|t| t.data_file_path.to_string());
1308
1309 assert_eq!(
1311 tasks[0].data_file_path,
1312 format!("{}/1.parquet", &fixture.table_location)
1313 );
1314
1315 assert_eq!(
1317 tasks[1].data_file_path,
1318 format!("{}/3.parquet", &fixture.table_location)
1319 );
1320 }
1321
1322 #[tokio::test]
1323 async fn test_open_parquet_no_deletions() {
1324 let mut fixture = TableTestFixture::new();
1325 fixture.setup_manifest_files().await;
1326
1327 let table_scan = fixture
1329 .table
1330 .scan()
1331 .with_row_selection_enabled(true)
1332 .build()
1333 .unwrap();
1334
1335 let batch_stream = table_scan.to_arrow().await.unwrap();
1336
1337 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1338
1339 let col = batches[0].column_by_name("x").unwrap();
1340
1341 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1342 assert_eq!(int64_arr.value(0), 1);
1343 }
1344
1345 #[tokio::test]
1346 async fn test_open_parquet_no_deletions_by_separate_reader() {
1347 let mut fixture = TableTestFixture::new();
1348 fixture.setup_manifest_files().await;
1349
1350 let table_scan = fixture
1352 .table
1353 .scan()
1354 .with_row_selection_enabled(true)
1355 .build()
1356 .unwrap();
1357
1358 let mut plan_task: Vec<_> = table_scan
1359 .plan_files()
1360 .await
1361 .unwrap()
1362 .try_collect()
1363 .await
1364 .unwrap();
1365 assert_eq!(plan_task.len(), 2);
1366
1367 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1368 let batch_stream = reader
1369 .clone()
1370 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1371 .unwrap()
1372 .stream();
1373 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1374
1375 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1376 let batch_stream = reader
1377 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1378 .unwrap()
1379 .stream();
1380 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1381
1382 assert_eq!(batch_1, batch_2);
1383 }
1384
1385 #[tokio::test]
1386 async fn test_open_parquet_with_projection() {
1387 let mut fixture = TableTestFixture::new();
1388 fixture.setup_manifest_files().await;
1389
1390 let table_scan = fixture
1392 .table
1393 .scan()
1394 .select(["x", "z"])
1395 .with_row_selection_enabled(true)
1396 .build()
1397 .unwrap();
1398
1399 let batch_stream = table_scan.to_arrow().await.unwrap();
1400
1401 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1402
1403 assert_eq!(batches[0].num_columns(), 2);
1404
1405 let col1 = batches[0].column_by_name("x").unwrap();
1406 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1407 assert_eq!(int64_arr.value(0), 1);
1408
1409 let col2 = batches[0].column_by_name("z").unwrap();
1410 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1411 assert_eq!(int64_arr.value(0), 3);
1412
1413 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1415 let batch_stream = table_scan.to_arrow().await.unwrap();
1416 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1417
1418 assert_eq!(batches[0].num_columns(), 0);
1419 assert_eq!(batches[0].num_rows(), 1024);
1420 }
1421
1422 #[tokio::test]
1423 async fn test_filter_on_arrow_lt() {
1424 let mut fixture = TableTestFixture::new();
1425 fixture.setup_manifest_files().await;
1426
1427 let mut builder = fixture.table.scan();
1429 let predicate = Reference::new("y").less_than(Datum::long(3));
1430 builder = builder
1431 .with_filter(predicate)
1432 .with_row_selection_enabled(true);
1433 let table_scan = builder.build().unwrap();
1434
1435 let batch_stream = table_scan.to_arrow().await.unwrap();
1436
1437 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1438
1439 assert_eq!(batches[0].num_rows(), 512);
1440
1441 let col = batches[0].column_by_name("x").unwrap();
1442 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1443 assert_eq!(int64_arr.value(0), 1);
1444
1445 let col = batches[0].column_by_name("y").unwrap();
1446 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1447 assert_eq!(int64_arr.value(0), 2);
1448 }
1449
1450 #[tokio::test]
1451 async fn test_filter_on_arrow_gt_eq() {
1452 let mut fixture = TableTestFixture::new();
1453 fixture.setup_manifest_files().await;
1454
1455 let mut builder = fixture.table.scan();
1457 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1458 builder = builder
1459 .with_filter(predicate)
1460 .with_row_selection_enabled(true);
1461 let table_scan = builder.build().unwrap();
1462
1463 let batch_stream = table_scan.to_arrow().await.unwrap();
1464
1465 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1466
1467 assert_eq!(batches[0].num_rows(), 12);
1468
1469 let col = batches[0].column_by_name("x").unwrap();
1470 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1471 assert_eq!(int64_arr.value(0), 1);
1472
1473 let col = batches[0].column_by_name("y").unwrap();
1474 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1475 assert_eq!(int64_arr.value(0), 5);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_filter_double_eq() {
1480 let mut fixture = TableTestFixture::new();
1481 fixture.setup_manifest_files().await;
1482
1483 let mut builder = fixture.table.scan();
1485 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1486 builder = builder
1487 .with_filter(predicate)
1488 .with_row_selection_enabled(true);
1489 let table_scan = builder.build().unwrap();
1490
1491 let batch_stream = table_scan.to_arrow().await.unwrap();
1492
1493 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1494
1495 assert_eq!(batches.len(), 2);
1496 assert_eq!(batches[0].num_rows(), 12);
1497
1498 let col = batches[0].column_by_name("dbl").unwrap();
1499 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1500 assert_eq!(f64_arr.value(1), 150.0f64);
1501 }
1502
1503 #[tokio::test]
1504 async fn test_filter_int_eq() {
1505 let mut fixture = TableTestFixture::new();
1506 fixture.setup_manifest_files().await;
1507
1508 let mut builder = fixture.table.scan();
1510 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1511 builder = builder
1512 .with_filter(predicate)
1513 .with_row_selection_enabled(true);
1514 let table_scan = builder.build().unwrap();
1515
1516 let batch_stream = table_scan.to_arrow().await.unwrap();
1517
1518 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1519
1520 assert_eq!(batches.len(), 2);
1521 assert_eq!(batches[0].num_rows(), 12);
1522
1523 let col = batches[0].column_by_name("i32").unwrap();
1524 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1525 assert_eq!(i32_arr.value(1), 150i32);
1526 }
1527
1528 #[tokio::test]
1529 async fn test_filter_long_eq() {
1530 let mut fixture = TableTestFixture::new();
1531 fixture.setup_manifest_files().await;
1532
1533 let mut builder = fixture.table.scan();
1535 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1536 builder = builder
1537 .with_filter(predicate)
1538 .with_row_selection_enabled(true);
1539 let table_scan = builder.build().unwrap();
1540
1541 let batch_stream = table_scan.to_arrow().await.unwrap();
1542
1543 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1544
1545 assert_eq!(batches.len(), 2);
1546 assert_eq!(batches[0].num_rows(), 12);
1547
1548 let col = batches[0].column_by_name("i64").unwrap();
1549 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1550 assert_eq!(i64_arr.value(1), 150i64);
1551 }
1552
1553 #[tokio::test]
1554 async fn test_filter_bool_eq() {
1555 let mut fixture = TableTestFixture::new();
1556 fixture.setup_manifest_files().await;
1557
1558 let mut builder = fixture.table.scan();
1560 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1561 builder = builder
1562 .with_filter(predicate)
1563 .with_row_selection_enabled(true);
1564 let table_scan = builder.build().unwrap();
1565
1566 let batch_stream = table_scan.to_arrow().await.unwrap();
1567
1568 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1569
1570 assert_eq!(batches.len(), 2);
1571 assert_eq!(batches[0].num_rows(), 512);
1572
1573 let col = batches[0].column_by_name("bool").unwrap();
1574 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1575 assert!(bool_arr.value(1));
1576 }
1577
1578 #[tokio::test]
1579 async fn test_filter_on_arrow_is_null() {
1580 let mut fixture = TableTestFixture::new();
1581 fixture.setup_manifest_files().await;
1582
1583 let mut builder = fixture.table.scan();
1585 let predicate = Reference::new("y").is_null();
1586 builder = builder
1587 .with_filter(predicate)
1588 .with_row_selection_enabled(true);
1589 let table_scan = builder.build().unwrap();
1590
1591 let batch_stream = table_scan.to_arrow().await.unwrap();
1592
1593 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1594 assert_eq!(batches.len(), 0);
1595 }
1596
1597 #[tokio::test]
1598 async fn test_filter_on_arrow_is_not_null() {
1599 let mut fixture = TableTestFixture::new();
1600 fixture.setup_manifest_files().await;
1601
1602 let mut builder = fixture.table.scan();
1604 let predicate = Reference::new("y").is_not_null();
1605 builder = builder
1606 .with_filter(predicate)
1607 .with_row_selection_enabled(true);
1608 let table_scan = builder.build().unwrap();
1609
1610 let batch_stream = table_scan.to_arrow().await.unwrap();
1611
1612 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1613 assert_eq!(batches[0].num_rows(), 1024);
1614 }
1615
1616 #[tokio::test]
1617 async fn test_filter_on_arrow_lt_and_gt() {
1618 let mut fixture = TableTestFixture::new();
1619 fixture.setup_manifest_files().await;
1620
1621 let mut builder = fixture.table.scan();
1623 let predicate = Reference::new("y")
1624 .less_than(Datum::long(5))
1625 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1626 builder = builder
1627 .with_filter(predicate)
1628 .with_row_selection_enabled(true);
1629 let table_scan = builder.build().unwrap();
1630
1631 let batch_stream = table_scan.to_arrow().await.unwrap();
1632
1633 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1634 assert_eq!(batches[0].num_rows(), 500);
1635
1636 let col = batches[0].column_by_name("x").unwrap();
1637 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1638 assert_eq!(col, &expected_x);
1639
1640 let col = batches[0].column_by_name("y").unwrap();
1641 let mut values = vec![];
1642 values.append(vec![3; 200].as_mut());
1643 values.append(vec![4; 300].as_mut());
1644 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1645 assert_eq!(col, &expected_y);
1646
1647 let col = batches[0].column_by_name("z").unwrap();
1648 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1649 assert_eq!(col, &expected_z);
1650 }
1651
1652 #[tokio::test]
1653 async fn test_filter_on_arrow_lt_or_gt() {
1654 let mut fixture = TableTestFixture::new();
1655 fixture.setup_manifest_files().await;
1656
1657 let mut builder = fixture.table.scan();
1659 let predicate = Reference::new("y")
1660 .less_than(Datum::long(5))
1661 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1662 builder = builder
1663 .with_filter(predicate)
1664 .with_row_selection_enabled(true);
1665 let table_scan = builder.build().unwrap();
1666
1667 let batch_stream = table_scan.to_arrow().await.unwrap();
1668
1669 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1670 assert_eq!(batches[0].num_rows(), 1024);
1671
1672 let col = batches[0].column_by_name("x").unwrap();
1673 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1674 assert_eq!(col, &expected_x);
1675
1676 let col = batches[0].column_by_name("y").unwrap();
1677 let mut values = vec![2; 512];
1678 values.append(vec![3; 200].as_mut());
1679 values.append(vec![4; 300].as_mut());
1680 values.append(vec![5; 12].as_mut());
1681 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1682 assert_eq!(col, &expected_y);
1683
1684 let col = batches[0].column_by_name("z").unwrap();
1685 let mut values = vec![3; 512];
1686 values.append(vec![4; 512].as_mut());
1687 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1688 assert_eq!(col, &expected_z);
1689 }
1690
1691 #[tokio::test]
1692 async fn test_filter_on_arrow_startswith() {
1693 let mut fixture = TableTestFixture::new();
1694 fixture.setup_manifest_files().await;
1695
1696 let mut builder = fixture.table.scan();
1698 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1699 builder = builder
1700 .with_filter(predicate)
1701 .with_row_selection_enabled(true);
1702 let table_scan = builder.build().unwrap();
1703
1704 let batch_stream = table_scan.to_arrow().await.unwrap();
1705
1706 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1707
1708 assert_eq!(batches[0].num_rows(), 512);
1709
1710 let col = batches[0].column_by_name("a").unwrap();
1711 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1712 assert_eq!(string_arr.value(0), "Iceberg");
1713 }
1714
1715 #[tokio::test]
1716 async fn test_filter_on_arrow_not_startswith() {
1717 let mut fixture = TableTestFixture::new();
1718 fixture.setup_manifest_files().await;
1719
1720 let mut builder = fixture.table.scan();
1722 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1723 builder = builder
1724 .with_filter(predicate)
1725 .with_row_selection_enabled(true);
1726 let table_scan = builder.build().unwrap();
1727
1728 let batch_stream = table_scan.to_arrow().await.unwrap();
1729
1730 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1731
1732 assert_eq!(batches[0].num_rows(), 512);
1733
1734 let col = batches[0].column_by_name("a").unwrap();
1735 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1736 assert_eq!(string_arr.value(0), "Apache");
1737 }
1738
1739 #[tokio::test]
1740 async fn test_filter_on_arrow_in() {
1741 let mut fixture = TableTestFixture::new();
1742 fixture.setup_manifest_files().await;
1743
1744 let mut builder = fixture.table.scan();
1746 let predicate =
1747 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1748 builder = builder
1749 .with_filter(predicate)
1750 .with_row_selection_enabled(true);
1751 let table_scan = builder.build().unwrap();
1752
1753 let batch_stream = table_scan.to_arrow().await.unwrap();
1754
1755 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1756
1757 assert_eq!(batches[0].num_rows(), 512);
1758
1759 let col = batches[0].column_by_name("a").unwrap();
1760 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1761 assert_eq!(string_arr.value(0), "Iceberg");
1762 }
1763
1764 #[tokio::test]
1765 async fn test_filter_on_arrow_not_in() {
1766 let mut fixture = TableTestFixture::new();
1767 fixture.setup_manifest_files().await;
1768
1769 let mut builder = fixture.table.scan();
1771 let predicate =
1772 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1773 builder = builder
1774 .with_filter(predicate)
1775 .with_row_selection_enabled(true);
1776 let table_scan = builder.build().unwrap();
1777
1778 let batch_stream = table_scan.to_arrow().await.unwrap();
1779
1780 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1781
1782 assert_eq!(batches[0].num_rows(), 512);
1783
1784 let col = batches[0].column_by_name("a").unwrap();
1785 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1786 assert_eq!(string_arr.value(0), "Apache");
1787 }
1788
1789 #[test]
1790 fn test_file_scan_task_serialize_deserialize() {
1791 let test_fn = |task: FileScanTask| {
1792 let serialized = serde_json::to_string(&task).unwrap();
1793 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1794
1795 assert_eq!(task.data_file_path, deserialized.data_file_path);
1796 assert_eq!(task.start, deserialized.start);
1797 assert_eq!(task.length, deserialized.length);
1798 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1799 assert_eq!(task.predicate, deserialized.predicate);
1800 assert_eq!(task.schema, deserialized.schema);
1801 };
1802
1803 let schema = Arc::new(
1805 Schema::builder()
1806 .with_fields(vec![Arc::new(NestedField::required(
1807 1,
1808 "x",
1809 Type::Primitive(PrimitiveType::Binary),
1810 ))])
1811 .build()
1812 .unwrap(),
1813 );
1814 let task = FileScanTask {
1815 data_file_path: "data_file_path".to_string(),
1816 file_size_in_bytes: 0,
1817 start: 0,
1818 length: 100,
1819 project_field_ids: vec![1, 2, 3],
1820 predicate: None,
1821 schema: schema.clone(),
1822 record_count: Some(100),
1823 data_file_format: DataFileFormat::Parquet,
1824 deletes: vec![],
1825 partition: None,
1826 partition_spec: None,
1827 name_mapping: None,
1828 case_sensitive: false,
1829 };
1830 test_fn(task);
1831
1832 let task = FileScanTask {
1834 data_file_path: "data_file_path".to_string(),
1835 file_size_in_bytes: 0,
1836 start: 0,
1837 length: 100,
1838 project_field_ids: vec![1, 2, 3],
1839 predicate: Some(BoundPredicate::AlwaysTrue),
1840 schema,
1841 record_count: None,
1842 data_file_format: DataFileFormat::Avro,
1843 deletes: vec![],
1844 partition: None,
1845 partition_spec: None,
1846 name_mapping: None,
1847 case_sensitive: false,
1848 };
1849 test_fn(task);
1850 }
1851
1852 #[tokio::test]
1853 async fn test_select_with_file_column() {
1854 use arrow_array::cast::AsArray;
1855
1856 let mut fixture = TableTestFixture::new();
1857 fixture.setup_manifest_files().await;
1858
1859 let table_scan = fixture
1861 .table
1862 .scan()
1863 .select(["x", RESERVED_COL_NAME_FILE])
1864 .with_row_selection_enabled(true)
1865 .build()
1866 .unwrap();
1867
1868 let batch_stream = table_scan.to_arrow().await.unwrap();
1869 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1870
1871 assert_eq!(batches[0].num_columns(), 2);
1873
1874 let x_col = batches[0].column_by_name("x").unwrap();
1876 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1877 assert_eq!(x_arr.value(0), 1);
1878
1879 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1881 assert!(
1882 file_col.is_some(),
1883 "_file column should be present in the batch"
1884 );
1885
1886 let file_col = file_col.unwrap();
1888 assert!(
1889 matches!(
1890 file_col.data_type(),
1891 arrow_schema::DataType::RunEndEncoded(_, _)
1892 ),
1893 "_file column should use RunEndEncoded type"
1894 );
1895
1896 let run_array = file_col
1898 .as_any()
1899 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1900 .expect("_file column should be a RunArray");
1901
1902 let values = run_array.values();
1903 let string_values = values.as_string::<i32>();
1904 assert_eq!(string_values.len(), 1, "Should have a single file path");
1905
1906 let file_path = string_values.value(0);
1907 assert!(
1908 file_path.ends_with(".parquet"),
1909 "File path should end with .parquet, got: {file_path}"
1910 );
1911 }
1912
1913 #[tokio::test]
1914 async fn test_select_file_column_position() {
1915 let mut fixture = TableTestFixture::new();
1916 fixture.setup_manifest_files().await;
1917
1918 let table_scan = fixture
1920 .table
1921 .scan()
1922 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1923 .with_row_selection_enabled(true)
1924 .build()
1925 .unwrap();
1926
1927 let batch_stream = table_scan.to_arrow().await.unwrap();
1928 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1929
1930 assert_eq!(batches[0].num_columns(), 3);
1931
1932 let schema = batches[0].schema();
1934 assert_eq!(schema.field(0).name(), "x");
1935 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1936 assert_eq!(schema.field(2).name(), "z");
1937
1938 assert!(batches[0].column_by_name("x").is_some());
1940 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1941 assert!(batches[0].column_by_name("z").is_some());
1942 }
1943
1944 #[tokio::test]
1945 async fn test_select_file_column_only() {
1946 let mut fixture = TableTestFixture::new();
1947 fixture.setup_manifest_files().await;
1948
1949 let table_scan = fixture
1951 .table
1952 .scan()
1953 .select([RESERVED_COL_NAME_FILE])
1954 .with_row_selection_enabled(true)
1955 .build()
1956 .unwrap();
1957
1958 let batch_stream = table_scan.to_arrow().await.unwrap();
1959 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1960
1961 assert_eq!(batches[0].num_columns(), 1);
1963
1964 let schema = batches[0].schema();
1966 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1967
1968 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1972 assert_eq!(total_rows, 2048);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_file_column_with_multiple_files() {
1977 use std::collections::HashSet;
1978
1979 let mut fixture = TableTestFixture::new();
1980 fixture.setup_manifest_files().await;
1981
1982 let table_scan = fixture
1984 .table
1985 .scan()
1986 .select(["x", RESERVED_COL_NAME_FILE])
1987 .with_row_selection_enabled(true)
1988 .build()
1989 .unwrap();
1990
1991 let batch_stream = table_scan.to_arrow().await.unwrap();
1992 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1993
1994 let mut file_paths = HashSet::new();
1996 for batch in &batches {
1997 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1998 let run_array = file_col
1999 .as_any()
2000 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2001 .expect("_file column should be a RunArray");
2002
2003 let values = run_array.values();
2004 let string_values = values.as_string::<i32>();
2005 for i in 0..string_values.len() {
2006 file_paths.insert(string_values.value(i).to_string());
2007 }
2008 }
2009
2010 assert!(!file_paths.is_empty(), "Should have at least one file path");
2012
2013 for path in &file_paths {
2015 assert!(
2016 path.ends_with(".parquet"),
2017 "All file paths should end with .parquet, got: {path}"
2018 );
2019 }
2020 }
2021
2022 #[tokio::test]
2023 async fn test_file_column_at_start() {
2024 let mut fixture = TableTestFixture::new();
2025 fixture.setup_manifest_files().await;
2026
2027 let table_scan = fixture
2029 .table
2030 .scan()
2031 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2032 .with_row_selection_enabled(true)
2033 .build()
2034 .unwrap();
2035
2036 let batch_stream = table_scan.to_arrow().await.unwrap();
2037 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2038
2039 assert_eq!(batches[0].num_columns(), 3);
2040
2041 let schema = batches[0].schema();
2043 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2044 assert_eq!(schema.field(1).name(), "x");
2045 assert_eq!(schema.field(2).name(), "y");
2046 }
2047
2048 #[tokio::test]
2049 async fn test_file_column_at_end() {
2050 let mut fixture = TableTestFixture::new();
2051 fixture.setup_manifest_files().await;
2052
2053 let table_scan = fixture
2055 .table
2056 .scan()
2057 .select(["x", "y", RESERVED_COL_NAME_FILE])
2058 .with_row_selection_enabled(true)
2059 .build()
2060 .unwrap();
2061
2062 let batch_stream = table_scan.to_arrow().await.unwrap();
2063 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2064
2065 assert_eq!(batches[0].num_columns(), 3);
2066
2067 let schema = batches[0].schema();
2069 assert_eq!(schema.field(0).name(), "x");
2070 assert_eq!(schema.field(1).name(), "y");
2071 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2072 }
2073
2074 #[tokio::test]
2075 async fn test_select_with_repeated_column_names() {
2076 let mut fixture = TableTestFixture::new();
2077 fixture.setup_manifest_files().await;
2078
2079 let table_scan = fixture
2082 .table
2083 .scan()
2084 .select([
2085 "x",
2086 RESERVED_COL_NAME_FILE,
2087 "x", "y",
2089 RESERVED_COL_NAME_FILE, "y", ])
2092 .with_row_selection_enabled(true)
2093 .build()
2094 .unwrap();
2095
2096 let batch_stream = table_scan.to_arrow().await.unwrap();
2097 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2098
2099 assert_eq!(
2101 batches[0].num_columns(),
2102 6,
2103 "Should have exactly 6 columns with duplicates"
2104 );
2105
2106 let schema = batches[0].schema();
2107
2108 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2110 assert_eq!(
2111 schema.field(1).name(),
2112 RESERVED_COL_NAME_FILE,
2113 "Column 1 should be _file"
2114 );
2115 assert_eq!(
2116 schema.field(2).name(),
2117 "x",
2118 "Column 2 should be x (duplicate)"
2119 );
2120 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2121 assert_eq!(
2122 schema.field(4).name(),
2123 RESERVED_COL_NAME_FILE,
2124 "Column 4 should be _file (duplicate)"
2125 );
2126 assert_eq!(
2127 schema.field(5).name(),
2128 "y",
2129 "Column 5 should be y (duplicate)"
2130 );
2131
2132 assert!(
2134 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2135 "Column x should be Int64"
2136 );
2137 assert!(
2138 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2139 "Column x (duplicate) should be Int64"
2140 );
2141 assert!(
2142 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2143 "Column y should be Int64"
2144 );
2145 assert!(
2146 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2147 "Column y (duplicate) should be Int64"
2148 );
2149 assert!(
2150 matches!(
2151 schema.field(1).data_type(),
2152 arrow_schema::DataType::RunEndEncoded(_, _)
2153 ),
2154 "_file column should use RunEndEncoded type"
2155 );
2156 assert!(
2157 matches!(
2158 schema.field(4).data_type(),
2159 arrow_schema::DataType::RunEndEncoded(_, _)
2160 ),
2161 "_file column (duplicate) should use RunEndEncoded type"
2162 );
2163 }
2164
2165 #[tokio::test]
2166 async fn test_scan_deadlock() {
2167 let mut fixture = TableTestFixture::new();
2168 fixture.setup_deadlock_manifests().await;
2169
2170 let table_scan = fixture
2177 .table
2178 .scan()
2179 .with_concurrency_limit(1)
2180 .build()
2181 .unwrap();
2182
2183 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2186 table_scan
2187 .plan_files()
2188 .await
2189 .unwrap()
2190 .try_collect::<Vec<_>>()
2191 .await
2192 })
2193 .await;
2194
2195 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2197 }
2198}