1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::util::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49pub struct TableScanBuilder<'a> {
51 table: &'a Table,
52 column_names: Option<Vec<String>>,
54 snapshot_id: Option<i64>,
55 batch_size: Option<usize>,
56 case_sensitive: bool,
57 filter: Option<Predicate>,
58 concurrency_limit_data_files: usize,
59 concurrency_limit_manifest_entries: usize,
60 concurrency_limit_manifest_files: usize,
61 row_group_filtering_enabled: bool,
62 row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66 pub(crate) fn new(table: &'a Table) -> Self {
67 let num_cpus = available_parallelism().get();
68
69 Self {
70 table,
71 column_names: None,
72 snapshot_id: None,
73 batch_size: None,
74 case_sensitive: true,
75 filter: None,
76 concurrency_limit_data_files: num_cpus,
77 concurrency_limit_manifest_entries: num_cpus,
78 concurrency_limit_manifest_files: num_cpus,
79 row_group_filtering_enabled: true,
80 row_selection_enabled: false,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87 self.batch_size = batch_size;
88 self
89 }
90
91 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93 self.case_sensitive = case_sensitive;
94 self
95 }
96
97 pub fn with_filter(mut self, predicate: Predicate) -> Self {
99 self.filter = Some(predicate.rewrite_not());
102 self
103 }
104
105 pub fn select_all(mut self) -> Self {
107 self.column_names = None;
108 self
109 }
110
111 pub fn select_empty(mut self) -> Self {
113 self.column_names = Some(vec![]);
114 self
115 }
116
117 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119 self.column_names = Some(
120 column_names
121 .into_iter()
122 .map(|item| item.to_string())
123 .collect(),
124 );
125 self
126 }
127
128 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130 self.snapshot_id = Some(snapshot_id);
131 self
132 }
133
134 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137 self.concurrency_limit_manifest_files = limit;
138 self.concurrency_limit_manifest_entries = limit;
139 self.concurrency_limit_data_files = limit;
140 self
141 }
142
143 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145 self.concurrency_limit_data_files = limit;
146 self
147 }
148
149 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151 self.concurrency_limit_manifest_entries = limit;
152 self
153 }
154
155 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164 self.row_group_filtering_enabled = row_group_filtering_enabled;
165 self
166 }
167
168 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183 self.row_selection_enabled = row_selection_enabled;
184 self
185 }
186
187 pub fn build(self) -> Result<TableScan> {
189 let snapshot = match self.snapshot_id {
190 Some(snapshot_id) => self
191 .table
192 .metadata()
193 .snapshot_by_id(snapshot_id)
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Snapshot with id {snapshot_id} not found"),
198 )
199 })?
200 .clone(),
201 None => {
202 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203 return Ok(TableScan {
204 batch_size: self.batch_size,
205 column_names: self.column_names,
206 file_io: self.table.file_io().clone(),
207 plan_context: None,
208 concurrency_limit_data_files: self.concurrency_limit_data_files,
209 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211 row_group_filtering_enabled: self.row_group_filtering_enabled,
212 row_selection_enabled: self.row_selection_enabled,
213 });
214 };
215 current_snapshot_id.clone()
216 }
217 };
218
219 let schema = snapshot.schema(self.table.metadata())?;
220
221 if let Some(column_names) = self.column_names.as_ref() {
223 for column_name in column_names {
224 if is_metadata_column_name(column_name) {
226 continue;
227 }
228 if schema.field_by_name(column_name).is_none() {
229 return Err(Error::new(
230 ErrorKind::DataInvalid,
231 format!("Column {column_name} not found in table. Schema: {schema}"),
232 ));
233 }
234 }
235 }
236
237 let mut field_ids = vec![];
238 let column_names = self.column_names.clone().unwrap_or_else(|| {
239 schema
240 .as_struct()
241 .fields()
242 .iter()
243 .map(|f| f.name.clone())
244 .collect()
245 });
246
247 for column_name in column_names.iter() {
248 if is_metadata_column_name(column_name) {
250 field_ids.push(get_metadata_field_id(column_name)?);
251 continue;
252 }
253
254 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255 Error::new(
256 ErrorKind::DataInvalid,
257 format!("Column {column_name} not found in table. Schema: {schema}"),
258 )
259 })?;
260
261 schema
262 .as_struct()
263 .field_by_id(field_id)
264 .ok_or_else(|| {
265 Error::new(
266 ErrorKind::FeatureUnsupported,
267 format!(
268 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269 ),
270 )
271 })?;
272
273 field_ids.push(field_id);
274 }
275
276 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277 Some(predicates.bind(schema.clone(), true)?)
278 } else {
279 None
280 };
281
282 let plan_context = PlanContext {
283 snapshot,
284 table_metadata: self.table.metadata_ref(),
285 snapshot_schema: schema,
286 case_sensitive: self.case_sensitive,
287 predicate: self.filter.map(Arc::new),
288 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289 object_cache: self.table.object_cache(),
290 field_ids: Arc::new(field_ids),
291 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294 };
295
296 Ok(TableScan {
297 batch_size: self.batch_size,
298 column_names: self.column_names,
299 file_io: self.table.file_io().clone(),
300 plan_context: Some(plan_context),
301 concurrency_limit_data_files: self.concurrency_limit_data_files,
302 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304 row_group_filtering_enabled: self.row_group_filtering_enabled,
305 row_selection_enabled: self.row_selection_enabled,
306 })
307 }
308}
309
310#[derive(Debug)]
312pub struct TableScan {
313 plan_context: Option<PlanContext>,
317 batch_size: Option<usize>,
318 file_io: FileIO,
319 column_names: Option<Vec<String>>,
320 concurrency_limit_manifest_files: usize,
323
324 concurrency_limit_manifest_entries: usize,
327
328 concurrency_limit_data_files: usize,
331
332 row_group_filtering_enabled: bool,
333 row_selection_enabled: bool,
334}
335
336impl TableScan {
337 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339 let Some(plan_context) = self.plan_context.as_ref() else {
340 return Ok(Box::pin(futures::stream::empty()));
341 };
342
343 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348 channel(concurrency_limit_manifest_files);
349 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350 channel(concurrency_limit_manifest_files);
351
352 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357 let manifest_list = plan_context.get_manifest_list().await?;
358
359 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363 manifest_list,
364 manifest_entry_data_ctx_tx,
365 delete_file_idx.clone(),
366 manifest_entry_delete_ctx_tx,
367 )?;
368
369 let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371 spawn(async move {
373 let result = futures::stream::iter(manifest_file_contexts)
374 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375 ctx.fetch_manifest_and_stream_manifest_entries().await
376 })
377 .await;
378
379 if let Err(error) = result {
380 let _ = channel_for_manifest_error.send(Err(error)).await;
381 }
382 });
383
384 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387 spawn(async move {
389 let result = manifest_entry_delete_ctx_rx
390 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391 .try_for_each_concurrent(
392 concurrency_limit_manifest_entries,
393 |(manifest_entry_context, tx)| async move {
394 spawn(async move {
395 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396 })
397 .await
398 },
399 )
400 .await;
401
402 if let Err(error) = result {
403 let _ = channel_for_delete_manifest_entry_error
404 .send(Err(error))
405 .await;
406 }
407 })
408 .await;
409
410 spawn(async move {
412 let result = manifest_entry_data_ctx_rx
413 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414 .try_for_each_concurrent(
415 concurrency_limit_manifest_entries,
416 |(manifest_entry_context, tx)| async move {
417 spawn(async move {
418 Self::process_data_manifest_entry(manifest_entry_context, tx).await
419 })
420 .await
421 },
422 )
423 .await;
424
425 if let Err(error) = result {
426 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427 }
428 });
429
430 Ok(file_scan_task_rx.boxed())
431 }
432
433 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438 .with_row_selection_enabled(self.row_selection_enabled);
439
440 if let Some(batch_size) = self.batch_size {
441 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442 }
443
444 arrow_reader_builder.build().read(self.plan_files().await?)
445 }
446
447 pub fn column_names(&self) -> Option<&[String]> {
449 self.column_names.as_deref()
450 }
451
452 pub fn snapshot(&self) -> Option<&SnapshotRef> {
454 self.plan_context.as_ref().map(|x| &x.snapshot)
455 }
456
457 async fn process_data_manifest_entry(
458 manifest_entry_context: ManifestEntryContext,
459 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460 ) -> Result<()> {
461 if !manifest_entry_context.manifest_entry.is_alive() {
463 return Ok(());
464 }
465
466 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468 return Err(Error::new(
469 ErrorKind::FeatureUnsupported,
470 "Encountered an entry for a delete file in a data file manifest",
471 ));
472 }
473
474 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475 let BoundPredicates {
476 snapshot_bound_predicate,
477 partition_bound_predicate,
478 } = bound_predicates.as_ref();
479
480 let expression_evaluator_cache =
481 manifest_entry_context.expression_evaluator_cache.as_ref();
482
483 let expression_evaluator = expression_evaluator_cache.get(
484 manifest_entry_context.partition_spec_id,
485 partition_bound_predicate,
486 )?;
487
488 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491 return Ok(());
492 }
493
494 if !InclusiveMetricsEvaluator::eval(
496 snapshot_bound_predicate,
497 manifest_entry_context.manifest_entry.data_file(),
498 false,
499 )? {
500 return Ok(());
501 }
502 }
503
504 file_scan_task_tx
508 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509 .await?;
510
511 Ok(())
512 }
513
514 async fn process_delete_manifest_entry(
515 manifest_entry_context: ManifestEntryContext,
516 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517 ) -> Result<()> {
518 if !manifest_entry_context.manifest_entry.is_alive() {
520 return Ok(());
521 }
522
523 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525 return Err(Error::new(
526 ErrorKind::FeatureUnsupported,
527 "Encountered an entry for a data file in a delete manifest",
528 ));
529 }
530
531 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532 let expression_evaluator_cache =
533 manifest_entry_context.expression_evaluator_cache.as_ref();
534
535 let expression_evaluator = expression_evaluator_cache.get(
536 manifest_entry_context.partition_spec_id,
537 &bound_predicates.partition_bound_predicate,
538 )?;
539
540 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543 return Ok(());
544 }
545 }
546
547 delete_file_ctx_tx
548 .send(DeleteFileContext {
549 manifest_entry: manifest_entry_context.manifest_entry.clone(),
550 partition_spec_id: manifest_entry_context.partition_spec_id,
551 })
552 .await?;
553
554 Ok(())
555 }
556}
557
558pub(crate) struct BoundPredicates {
559 partition_bound_predicate: BoundPredicate,
560 snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565 #![allow(missing_docs)]
567
568 use std::collections::HashMap;
569 use std::fs;
570 use std::fs::File;
571 use std::sync::Arc;
572
573 use arrow_array::cast::AsArray;
574 use arrow_array::{
575 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576 StringArray,
577 };
578 use futures::{TryStreamExt, stream};
579 use minijinja::value::Value;
580 use minijinja::{AutoEscape, Environment, context};
581 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582 use parquet::basic::Compression;
583 use parquet::file::properties::WriterProperties;
584 use tempfile::TempDir;
585 use uuid::Uuid;
586
587 use crate::TableIdent;
588 use crate::arrow::ArrowReaderBuilder;
589 use crate::expr::{BoundPredicate, Reference};
590 use crate::io::{FileIO, OutputFile};
591 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592 use crate::scan::FileScanTask;
593 use crate::spec::{
594 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597 };
598 use crate::table::Table;
599
600 fn render_template(template: &str, ctx: Value) -> String {
601 let mut env = Environment::new();
602 env.set_auto_escape_callback(|_| AutoEscape::None);
603 env.render_str(template, ctx).unwrap()
604 }
605
606 pub struct TableTestFixture {
607 pub table_location: String,
608 pub table: Table,
609 }
610
611 impl TableTestFixture {
612 #[allow(clippy::new_without_default)]
613 pub fn new() -> Self {
614 let tmp_dir = TempDir::new().unwrap();
615 let table_location = tmp_dir.path().join("table1");
616 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618 let table_metadata1_location = table_location.join("metadata/v1.json");
619
620 let file_io = FileIO::new_with_fs();
621
622 let table_metadata = {
623 let template_json_str = fs::read_to_string(format!(
624 "{}/testdata/example_table_metadata_v2.json",
625 env!("CARGO_MANIFEST_DIR")
626 ))
627 .unwrap();
628 let metadata_json = render_template(&template_json_str, context! {
629 table_location => &table_location,
630 manifest_list_1_location => &manifest_list1_location,
631 manifest_list_2_location => &manifest_list2_location,
632 table_metadata_1_location => &table_metadata1_location,
633 });
634 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
635 };
636
637 let table = Table::builder()
638 .metadata(table_metadata)
639 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
640 .file_io(file_io.clone())
641 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
642 .build()
643 .unwrap();
644
645 Self {
646 table_location: table_location.to_str().unwrap().to_string(),
647 table,
648 }
649 }
650
651 #[allow(clippy::new_without_default)]
652 pub fn new_empty() -> Self {
653 let tmp_dir = TempDir::new().unwrap();
654 let table_location = tmp_dir.path().join("table1");
655 let table_metadata1_location = table_location.join("metadata/v1.json");
656
657 let file_io = FileIO::new_with_fs();
658
659 let table_metadata = {
660 let template_json_str = fs::read_to_string(format!(
661 "{}/testdata/example_empty_table_metadata_v2.json",
662 env!("CARGO_MANIFEST_DIR")
663 ))
664 .unwrap();
665 let metadata_json = render_template(&template_json_str, context! {
666 table_location => &table_location,
667 table_metadata_1_location => &table_metadata1_location,
668 });
669 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670 };
671
672 let table = Table::builder()
673 .metadata(table_metadata)
674 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675 .file_io(file_io.clone())
676 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677 .build()
678 .unwrap();
679
680 Self {
681 table_location: table_location.to_str().unwrap().to_string(),
682 table,
683 }
684 }
685
686 pub fn new_with_deep_history() -> Self {
690 let tmp_dir = TempDir::new().unwrap();
691 let table_location = tmp_dir.path().join("table1");
692 let table_metadata1_location = table_location.join("metadata/v1.json");
693
694 let file_io = FileIO::new_with_fs();
695
696 let table_metadata = {
697 let json_str = fs::read_to_string(format!(
698 "{}/testdata/example_table_metadata_v2_deep_history.json",
699 env!("CARGO_MANIFEST_DIR")
700 ))
701 .unwrap();
702 serde_json::from_str::<TableMetadata>(&json_str).unwrap()
703 };
704
705 let table = Table::builder()
706 .metadata(table_metadata)
707 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
708 .file_io(file_io.clone())
709 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
710 .build()
711 .unwrap();
712
713 Self {
714 table_location: table_location.to_str().unwrap().to_string(),
715 table,
716 }
717 }
718
719 pub fn new_unpartitioned() -> Self {
720 let tmp_dir = TempDir::new().unwrap();
721 let table_location = tmp_dir.path().join("table1");
722 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
723 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
724 let table_metadata1_location = table_location.join("metadata/v1.json");
725
726 let file_io = FileIO::new_with_fs();
727
728 let mut table_metadata = {
729 let template_json_str = fs::read_to_string(format!(
730 "{}/testdata/example_table_metadata_v2.json",
731 env!("CARGO_MANIFEST_DIR")
732 ))
733 .unwrap();
734 let metadata_json = render_template(&template_json_str, context! {
735 table_location => &table_location,
736 manifest_list_1_location => &manifest_list1_location,
737 manifest_list_2_location => &manifest_list2_location,
738 table_metadata_1_location => &table_metadata1_location,
739 });
740 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
741 };
742
743 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
744 table_metadata.partition_specs.clear();
745 table_metadata.default_partition_type = StructType::new(vec![]);
746 table_metadata
747 .partition_specs
748 .insert(0, table_metadata.default_spec.clone());
749
750 let table = Table::builder()
751 .metadata(table_metadata)
752 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
753 .file_io(file_io.clone())
754 .metadata_location(table_metadata1_location.to_str().unwrap())
755 .build()
756 .unwrap();
757
758 Self {
759 table_location: table_location.to_str().unwrap().to_string(),
760 table,
761 }
762 }
763
764 fn next_manifest_file(&self) -> OutputFile {
765 self.table
766 .file_io()
767 .new_output(format!(
768 "{}/metadata/manifest_{}.avro",
769 self.table_location,
770 Uuid::new_v4()
771 ))
772 .unwrap()
773 }
774
775 pub async fn setup_manifest_files(&mut self) {
776 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
777 let parent_snapshot = current_snapshot
778 .parent_snapshot(self.table.metadata())
779 .unwrap();
780 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
781 let current_partition_spec = self.table.metadata().default_partition_spec();
782
783 let parquet_file_size = self.write_parquet_data_files();
785
786 let mut writer = ManifestWriterBuilder::new(
787 self.next_manifest_file(),
788 Some(current_snapshot.snapshot_id()),
789 None,
790 current_schema.clone(),
791 current_partition_spec.as_ref().clone(),
792 )
793 .build_v2_data();
794 writer
795 .add_entry(
796 ManifestEntry::builder()
797 .status(ManifestStatus::Added)
798 .data_file(
799 DataFileBuilder::default()
800 .partition_spec_id(0)
801 .content(DataContentType::Data)
802 .file_path(format!("{}/1.parquet", &self.table_location))
803 .file_format(DataFileFormat::Parquet)
804 .file_size_in_bytes(parquet_file_size)
805 .record_count(1)
806 .partition(Struct::from_iter([Some(Literal::long(100))]))
807 .key_metadata(None)
808 .build()
809 .unwrap(),
810 )
811 .build(),
812 )
813 .unwrap();
814 writer
815 .add_delete_entry(
816 ManifestEntry::builder()
817 .status(ManifestStatus::Deleted)
818 .snapshot_id(parent_snapshot.snapshot_id())
819 .sequence_number(parent_snapshot.sequence_number())
820 .file_sequence_number(parent_snapshot.sequence_number())
821 .data_file(
822 DataFileBuilder::default()
823 .partition_spec_id(0)
824 .content(DataContentType::Data)
825 .file_path(format!("{}/2.parquet", &self.table_location))
826 .file_format(DataFileFormat::Parquet)
827 .file_size_in_bytes(parquet_file_size)
828 .record_count(1)
829 .partition(Struct::from_iter([Some(Literal::long(200))]))
830 .build()
831 .unwrap(),
832 )
833 .build(),
834 )
835 .unwrap();
836 writer
837 .add_existing_entry(
838 ManifestEntry::builder()
839 .status(ManifestStatus::Existing)
840 .snapshot_id(parent_snapshot.snapshot_id())
841 .sequence_number(parent_snapshot.sequence_number())
842 .file_sequence_number(parent_snapshot.sequence_number())
843 .data_file(
844 DataFileBuilder::default()
845 .partition_spec_id(0)
846 .content(DataContentType::Data)
847 .file_path(format!("{}/3.parquet", &self.table_location))
848 .file_format(DataFileFormat::Parquet)
849 .file_size_in_bytes(parquet_file_size)
850 .record_count(1)
851 .partition(Struct::from_iter([Some(Literal::long(300))]))
852 .build()
853 .unwrap(),
854 )
855 .build(),
856 )
857 .unwrap();
858 let data_file_manifest = writer.write_manifest_file().await.unwrap();
859
860 let mut manifest_list_write = ManifestListWriter::v2(
862 self.table
863 .file_io()
864 .new_output(current_snapshot.manifest_list())
865 .unwrap(),
866 current_snapshot.snapshot_id(),
867 current_snapshot.parent_snapshot_id(),
868 current_snapshot.sequence_number(),
869 );
870 manifest_list_write
871 .add_manifests(vec![data_file_manifest].into_iter())
872 .unwrap();
873 manifest_list_write.close().await.unwrap();
874 }
875
876 fn write_parquet_data_files(&self) -> u64 {
879 std::fs::create_dir_all(&self.table_location).unwrap();
880
881 let schema = {
882 let fields = vec![
883 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
884 .with_metadata(HashMap::from([(
885 PARQUET_FIELD_ID_META_KEY.to_string(),
886 "1".to_string(),
887 )])),
888 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
889 .with_metadata(HashMap::from([(
890 PARQUET_FIELD_ID_META_KEY.to_string(),
891 "2".to_string(),
892 )])),
893 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
894 .with_metadata(HashMap::from([(
895 PARQUET_FIELD_ID_META_KEY.to_string(),
896 "3".to_string(),
897 )])),
898 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
899 .with_metadata(HashMap::from([(
900 PARQUET_FIELD_ID_META_KEY.to_string(),
901 "4".to_string(),
902 )])),
903 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
904 .with_metadata(HashMap::from([(
905 PARQUET_FIELD_ID_META_KEY.to_string(),
906 "5".to_string(),
907 )])),
908 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
909 .with_metadata(HashMap::from([(
910 PARQUET_FIELD_ID_META_KEY.to_string(),
911 "6".to_string(),
912 )])),
913 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
914 .with_metadata(HashMap::from([(
915 PARQUET_FIELD_ID_META_KEY.to_string(),
916 "7".to_string(),
917 )])),
918 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
919 .with_metadata(HashMap::from([(
920 PARQUET_FIELD_ID_META_KEY.to_string(),
921 "8".to_string(),
922 )])),
923 ];
924 Arc::new(arrow_schema::Schema::new(fields))
925 };
926 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
928
929 let mut values = vec![2; 512];
930 values.append(vec![3; 200].as_mut());
931 values.append(vec![4; 300].as_mut());
932 values.append(vec![5; 12].as_mut());
933
934 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
936
937 let mut values = vec![3; 512];
938 values.append(vec![4; 512].as_mut());
939
940 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
942
943 let mut values = vec!["Apache"; 512];
945 values.append(vec!["Iceberg"; 512].as_mut());
946 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
947
948 let mut values = vec![100.0f64; 512];
950 values.append(vec![150.0f64; 12].as_mut());
951 values.append(vec![200.0f64; 500].as_mut());
952 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
953
954 let mut values = vec![100i32; 512];
956 values.append(vec![150i32; 12].as_mut());
957 values.append(vec![200i32; 500].as_mut());
958 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
959
960 let mut values = vec![100i64; 512];
962 values.append(vec![150i64; 12].as_mut());
963 values.append(vec![200i64; 500].as_mut());
964 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
965
966 let mut values = vec![false; 512];
968 values.append(vec![true; 512].as_mut());
969 let values: BooleanArray = values.into();
970 let col8 = Arc::new(values) as ArrayRef;
971
972 let to_write = RecordBatch::try_new(schema.clone(), vec![
973 col1, col2, col3, col4, col5, col6, col7, col8,
974 ])
975 .unwrap();
976
977 let props = WriterProperties::builder()
979 .set_compression(Compression::SNAPPY)
980 .build();
981
982 for n in 1..=3 {
983 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
984 let mut writer =
985 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
986
987 writer.write(&to_write).expect("Writing batch");
988
989 writer.close().unwrap();
991 }
992
993 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
994 .unwrap()
995 .len()
996 }
997
998 pub async fn setup_unpartitioned_manifest_files(&mut self) {
999 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1000 let parent_snapshot = current_snapshot
1001 .parent_snapshot(self.table.metadata())
1002 .unwrap();
1003 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1004 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1005
1006 let parquet_file_size = self.write_parquet_data_files();
1008
1009 let mut writer = ManifestWriterBuilder::new(
1011 self.next_manifest_file(),
1012 Some(current_snapshot.snapshot_id()),
1013 None,
1014 current_schema.clone(),
1015 current_partition_spec.as_ref().clone(),
1016 )
1017 .build_v2_data();
1018
1019 let empty_partition = Struct::empty();
1021
1022 writer
1023 .add_entry(
1024 ManifestEntry::builder()
1025 .status(ManifestStatus::Added)
1026 .data_file(
1027 DataFileBuilder::default()
1028 .partition_spec_id(0)
1029 .content(DataContentType::Data)
1030 .file_path(format!("{}/1.parquet", &self.table_location))
1031 .file_format(DataFileFormat::Parquet)
1032 .file_size_in_bytes(parquet_file_size)
1033 .record_count(1)
1034 .partition(empty_partition.clone())
1035 .key_metadata(None)
1036 .build()
1037 .unwrap(),
1038 )
1039 .build(),
1040 )
1041 .unwrap();
1042
1043 writer
1044 .add_delete_entry(
1045 ManifestEntry::builder()
1046 .status(ManifestStatus::Deleted)
1047 .snapshot_id(parent_snapshot.snapshot_id())
1048 .sequence_number(parent_snapshot.sequence_number())
1049 .file_sequence_number(parent_snapshot.sequence_number())
1050 .data_file(
1051 DataFileBuilder::default()
1052 .partition_spec_id(0)
1053 .content(DataContentType::Data)
1054 .file_path(format!("{}/2.parquet", &self.table_location))
1055 .file_format(DataFileFormat::Parquet)
1056 .file_size_in_bytes(parquet_file_size)
1057 .record_count(1)
1058 .partition(empty_partition.clone())
1059 .build()
1060 .unwrap(),
1061 )
1062 .build(),
1063 )
1064 .unwrap();
1065
1066 writer
1067 .add_existing_entry(
1068 ManifestEntry::builder()
1069 .status(ManifestStatus::Existing)
1070 .snapshot_id(parent_snapshot.snapshot_id())
1071 .sequence_number(parent_snapshot.sequence_number())
1072 .file_sequence_number(parent_snapshot.sequence_number())
1073 .data_file(
1074 DataFileBuilder::default()
1075 .partition_spec_id(0)
1076 .content(DataContentType::Data)
1077 .file_path(format!("{}/3.parquet", &self.table_location))
1078 .file_format(DataFileFormat::Parquet)
1079 .file_size_in_bytes(parquet_file_size)
1080 .record_count(1)
1081 .partition(empty_partition.clone())
1082 .build()
1083 .unwrap(),
1084 )
1085 .build(),
1086 )
1087 .unwrap();
1088
1089 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1090
1091 let mut manifest_list_write = ManifestListWriter::v2(
1093 self.table
1094 .file_io()
1095 .new_output(current_snapshot.manifest_list())
1096 .unwrap(),
1097 current_snapshot.snapshot_id(),
1098 current_snapshot.parent_snapshot_id(),
1099 current_snapshot.sequence_number(),
1100 );
1101 manifest_list_write
1102 .add_manifests(vec![data_file_manifest].into_iter())
1103 .unwrap();
1104 manifest_list_write.close().await.unwrap();
1105 }
1106
1107 pub async fn setup_deadlock_manifests(&mut self) {
1108 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1109 let _parent_snapshot = current_snapshot
1110 .parent_snapshot(self.table.metadata())
1111 .unwrap();
1112 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1113 let current_partition_spec = self.table.metadata().default_partition_spec();
1114
1115 let mut writer = ManifestWriterBuilder::new(
1117 self.next_manifest_file(),
1118 Some(current_snapshot.snapshot_id()),
1119 None,
1120 current_schema.clone(),
1121 current_partition_spec.as_ref().clone(),
1122 )
1123 .build_v2_data();
1124
1125 for i in 0..10 {
1127 writer
1128 .add_entry(
1129 ManifestEntry::builder()
1130 .status(ManifestStatus::Added)
1131 .data_file(
1132 DataFileBuilder::default()
1133 .partition_spec_id(0)
1134 .content(DataContentType::Data)
1135 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1136 .file_format(DataFileFormat::Parquet)
1137 .file_size_in_bytes(100)
1138 .record_count(1)
1139 .partition(Struct::from_iter([Some(Literal::long(100))]))
1140 .key_metadata(None)
1141 .build()
1142 .unwrap(),
1143 )
1144 .build(),
1145 )
1146 .unwrap();
1147 }
1148 let data_manifest = writer.write_manifest_file().await.unwrap();
1149
1150 let mut writer = ManifestWriterBuilder::new(
1152 self.next_manifest_file(),
1153 Some(current_snapshot.snapshot_id()),
1154 None,
1155 current_schema.clone(),
1156 current_partition_spec.as_ref().clone(),
1157 )
1158 .build_v2_deletes();
1159
1160 writer
1161 .add_entry(
1162 ManifestEntry::builder()
1163 .status(ManifestStatus::Added)
1164 .data_file(
1165 DataFileBuilder::default()
1166 .partition_spec_id(0)
1167 .content(DataContentType::PositionDeletes)
1168 .file_path(format!("{}/del.parquet", &self.table_location))
1169 .file_format(DataFileFormat::Parquet)
1170 .file_size_in_bytes(100)
1171 .record_count(1)
1172 .partition(Struct::from_iter([Some(Literal::long(100))]))
1173 .build()
1174 .unwrap(),
1175 )
1176 .build(),
1177 )
1178 .unwrap();
1179 let delete_manifest = writer.write_manifest_file().await.unwrap();
1180
1181 let mut manifest_list_write = ManifestListWriter::v2(
1184 self.table
1185 .file_io()
1186 .new_output(current_snapshot.manifest_list())
1187 .unwrap(),
1188 current_snapshot.snapshot_id(),
1189 current_snapshot.parent_snapshot_id(),
1190 current_snapshot.sequence_number(),
1191 );
1192 manifest_list_write
1193 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1194 .unwrap();
1195 manifest_list_write.close().await.unwrap();
1196 }
1197 }
1198
1199 #[test]
1200 fn test_table_scan_columns() {
1201 let table = TableTestFixture::new().table;
1202
1203 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1204 assert_eq!(
1205 Some(vec!["x".to_string(), "y".to_string()]),
1206 table_scan.column_names
1207 );
1208
1209 let table_scan = table
1210 .scan()
1211 .select(["x", "y"])
1212 .select(["z"])
1213 .build()
1214 .unwrap();
1215 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1216 }
1217
1218 #[test]
1219 fn test_select_all() {
1220 let table = TableTestFixture::new().table;
1221
1222 let table_scan = table.scan().select_all().build().unwrap();
1223 assert!(table_scan.column_names.is_none());
1224 }
1225
1226 #[test]
1227 fn test_select_no_exist_column() {
1228 let table = TableTestFixture::new().table;
1229
1230 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1231 assert!(table_scan.is_err());
1232 }
1233
1234 #[test]
1235 fn test_table_scan_default_snapshot_id() {
1236 let table = TableTestFixture::new().table;
1237
1238 let table_scan = table.scan().build().unwrap();
1239 assert_eq!(
1240 table.metadata().current_snapshot().unwrap().snapshot_id(),
1241 table_scan.snapshot().unwrap().snapshot_id()
1242 );
1243 }
1244
1245 #[test]
1246 fn test_table_scan_non_exist_snapshot_id() {
1247 let table = TableTestFixture::new().table;
1248
1249 let table_scan = table.scan().snapshot_id(1024).build();
1250 assert!(table_scan.is_err());
1251 }
1252
1253 #[test]
1254 fn test_table_scan_with_snapshot_id() {
1255 let table = TableTestFixture::new().table;
1256
1257 let table_scan = table
1258 .scan()
1259 .snapshot_id(3051729675574597004)
1260 .with_row_selection_enabled(true)
1261 .build()
1262 .unwrap();
1263 assert_eq!(
1264 table_scan.snapshot().unwrap().snapshot_id(),
1265 3051729675574597004
1266 );
1267 }
1268
1269 #[tokio::test]
1270 async fn test_plan_files_on_table_without_any_snapshots() {
1271 let table = TableTestFixture::new_empty().table;
1272 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1273 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1274 assert!(batches.is_empty());
1275 }
1276
1277 #[tokio::test]
1278 async fn test_plan_files_no_deletions() {
1279 let mut fixture = TableTestFixture::new();
1280 fixture.setup_manifest_files().await;
1281
1282 let table_scan = fixture
1284 .table
1285 .scan()
1286 .with_row_selection_enabled(true)
1287 .build()
1288 .unwrap();
1289
1290 let mut tasks = table_scan
1291 .plan_files()
1292 .await
1293 .unwrap()
1294 .try_fold(vec![], |mut acc, task| async move {
1295 acc.push(task);
1296 Ok(acc)
1297 })
1298 .await
1299 .unwrap();
1300
1301 assert_eq!(tasks.len(), 2);
1302
1303 tasks.sort_by_key(|t| t.data_file_path.to_string());
1304
1305 assert_eq!(
1307 tasks[0].data_file_path,
1308 format!("{}/1.parquet", &fixture.table_location)
1309 );
1310
1311 assert_eq!(
1313 tasks[1].data_file_path,
1314 format!("{}/3.parquet", &fixture.table_location)
1315 );
1316 }
1317
1318 #[tokio::test]
1319 async fn test_open_parquet_no_deletions() {
1320 let mut fixture = TableTestFixture::new();
1321 fixture.setup_manifest_files().await;
1322
1323 let table_scan = fixture
1325 .table
1326 .scan()
1327 .with_row_selection_enabled(true)
1328 .build()
1329 .unwrap();
1330
1331 let batch_stream = table_scan.to_arrow().await.unwrap();
1332
1333 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1334
1335 let col = batches[0].column_by_name("x").unwrap();
1336
1337 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1338 assert_eq!(int64_arr.value(0), 1);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_open_parquet_no_deletions_by_separate_reader() {
1343 let mut fixture = TableTestFixture::new();
1344 fixture.setup_manifest_files().await;
1345
1346 let table_scan = fixture
1348 .table
1349 .scan()
1350 .with_row_selection_enabled(true)
1351 .build()
1352 .unwrap();
1353
1354 let mut plan_task: Vec<_> = table_scan
1355 .plan_files()
1356 .await
1357 .unwrap()
1358 .try_collect()
1359 .await
1360 .unwrap();
1361 assert_eq!(plan_task.len(), 2);
1362
1363 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1364 let batch_stream = reader
1365 .clone()
1366 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1367 .unwrap();
1368 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1369
1370 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1371 let batch_stream = reader
1372 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1373 .unwrap();
1374 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1375
1376 assert_eq!(batch_1, batch_2);
1377 }
1378
1379 #[tokio::test]
1380 async fn test_open_parquet_with_projection() {
1381 let mut fixture = TableTestFixture::new();
1382 fixture.setup_manifest_files().await;
1383
1384 let table_scan = fixture
1386 .table
1387 .scan()
1388 .select(["x", "z"])
1389 .with_row_selection_enabled(true)
1390 .build()
1391 .unwrap();
1392
1393 let batch_stream = table_scan.to_arrow().await.unwrap();
1394
1395 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1396
1397 assert_eq!(batches[0].num_columns(), 2);
1398
1399 let col1 = batches[0].column_by_name("x").unwrap();
1400 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1401 assert_eq!(int64_arr.value(0), 1);
1402
1403 let col2 = batches[0].column_by_name("z").unwrap();
1404 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1405 assert_eq!(int64_arr.value(0), 3);
1406
1407 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1409 let batch_stream = table_scan.to_arrow().await.unwrap();
1410 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1411
1412 assert_eq!(batches[0].num_columns(), 0);
1413 assert_eq!(batches[0].num_rows(), 1024);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_filter_on_arrow_lt() {
1418 let mut fixture = TableTestFixture::new();
1419 fixture.setup_manifest_files().await;
1420
1421 let mut builder = fixture.table.scan();
1423 let predicate = Reference::new("y").less_than(Datum::long(3));
1424 builder = builder
1425 .with_filter(predicate)
1426 .with_row_selection_enabled(true);
1427 let table_scan = builder.build().unwrap();
1428
1429 let batch_stream = table_scan.to_arrow().await.unwrap();
1430
1431 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1432
1433 assert_eq!(batches[0].num_rows(), 512);
1434
1435 let col = batches[0].column_by_name("x").unwrap();
1436 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1437 assert_eq!(int64_arr.value(0), 1);
1438
1439 let col = batches[0].column_by_name("y").unwrap();
1440 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441 assert_eq!(int64_arr.value(0), 2);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_filter_on_arrow_gt_eq() {
1446 let mut fixture = TableTestFixture::new();
1447 fixture.setup_manifest_files().await;
1448
1449 let mut builder = fixture.table.scan();
1451 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1452 builder = builder
1453 .with_filter(predicate)
1454 .with_row_selection_enabled(true);
1455 let table_scan = builder.build().unwrap();
1456
1457 let batch_stream = table_scan.to_arrow().await.unwrap();
1458
1459 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461 assert_eq!(batches[0].num_rows(), 12);
1462
1463 let col = batches[0].column_by_name("x").unwrap();
1464 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1465 assert_eq!(int64_arr.value(0), 1);
1466
1467 let col = batches[0].column_by_name("y").unwrap();
1468 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1469 assert_eq!(int64_arr.value(0), 5);
1470 }
1471
1472 #[tokio::test]
1473 async fn test_filter_double_eq() {
1474 let mut fixture = TableTestFixture::new();
1475 fixture.setup_manifest_files().await;
1476
1477 let mut builder = fixture.table.scan();
1479 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1480 builder = builder
1481 .with_filter(predicate)
1482 .with_row_selection_enabled(true);
1483 let table_scan = builder.build().unwrap();
1484
1485 let batch_stream = table_scan.to_arrow().await.unwrap();
1486
1487 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1488
1489 assert_eq!(batches.len(), 2);
1490 assert_eq!(batches[0].num_rows(), 12);
1491
1492 let col = batches[0].column_by_name("dbl").unwrap();
1493 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1494 assert_eq!(f64_arr.value(1), 150.0f64);
1495 }
1496
1497 #[tokio::test]
1498 async fn test_filter_int_eq() {
1499 let mut fixture = TableTestFixture::new();
1500 fixture.setup_manifest_files().await;
1501
1502 let mut builder = fixture.table.scan();
1504 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1505 builder = builder
1506 .with_filter(predicate)
1507 .with_row_selection_enabled(true);
1508 let table_scan = builder.build().unwrap();
1509
1510 let batch_stream = table_scan.to_arrow().await.unwrap();
1511
1512 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1513
1514 assert_eq!(batches.len(), 2);
1515 assert_eq!(batches[0].num_rows(), 12);
1516
1517 let col = batches[0].column_by_name("i32").unwrap();
1518 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1519 assert_eq!(i32_arr.value(1), 150i32);
1520 }
1521
1522 #[tokio::test]
1523 async fn test_filter_long_eq() {
1524 let mut fixture = TableTestFixture::new();
1525 fixture.setup_manifest_files().await;
1526
1527 let mut builder = fixture.table.scan();
1529 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1530 builder = builder
1531 .with_filter(predicate)
1532 .with_row_selection_enabled(true);
1533 let table_scan = builder.build().unwrap();
1534
1535 let batch_stream = table_scan.to_arrow().await.unwrap();
1536
1537 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1538
1539 assert_eq!(batches.len(), 2);
1540 assert_eq!(batches[0].num_rows(), 12);
1541
1542 let col = batches[0].column_by_name("i64").unwrap();
1543 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1544 assert_eq!(i64_arr.value(1), 150i64);
1545 }
1546
1547 #[tokio::test]
1548 async fn test_filter_bool_eq() {
1549 let mut fixture = TableTestFixture::new();
1550 fixture.setup_manifest_files().await;
1551
1552 let mut builder = fixture.table.scan();
1554 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1555 builder = builder
1556 .with_filter(predicate)
1557 .with_row_selection_enabled(true);
1558 let table_scan = builder.build().unwrap();
1559
1560 let batch_stream = table_scan.to_arrow().await.unwrap();
1561
1562 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1563
1564 assert_eq!(batches.len(), 2);
1565 assert_eq!(batches[0].num_rows(), 512);
1566
1567 let col = batches[0].column_by_name("bool").unwrap();
1568 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1569 assert!(bool_arr.value(1));
1570 }
1571
1572 #[tokio::test]
1573 async fn test_filter_on_arrow_is_null() {
1574 let mut fixture = TableTestFixture::new();
1575 fixture.setup_manifest_files().await;
1576
1577 let mut builder = fixture.table.scan();
1579 let predicate = Reference::new("y").is_null();
1580 builder = builder
1581 .with_filter(predicate)
1582 .with_row_selection_enabled(true);
1583 let table_scan = builder.build().unwrap();
1584
1585 let batch_stream = table_scan.to_arrow().await.unwrap();
1586
1587 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1588 assert_eq!(batches.len(), 0);
1589 }
1590
1591 #[tokio::test]
1592 async fn test_filter_on_arrow_is_not_null() {
1593 let mut fixture = TableTestFixture::new();
1594 fixture.setup_manifest_files().await;
1595
1596 let mut builder = fixture.table.scan();
1598 let predicate = Reference::new("y").is_not_null();
1599 builder = builder
1600 .with_filter(predicate)
1601 .with_row_selection_enabled(true);
1602 let table_scan = builder.build().unwrap();
1603
1604 let batch_stream = table_scan.to_arrow().await.unwrap();
1605
1606 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1607 assert_eq!(batches[0].num_rows(), 1024);
1608 }
1609
1610 #[tokio::test]
1611 async fn test_filter_on_arrow_lt_and_gt() {
1612 let mut fixture = TableTestFixture::new();
1613 fixture.setup_manifest_files().await;
1614
1615 let mut builder = fixture.table.scan();
1617 let predicate = Reference::new("y")
1618 .less_than(Datum::long(5))
1619 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1620 builder = builder
1621 .with_filter(predicate)
1622 .with_row_selection_enabled(true);
1623 let table_scan = builder.build().unwrap();
1624
1625 let batch_stream = table_scan.to_arrow().await.unwrap();
1626
1627 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1628 assert_eq!(batches[0].num_rows(), 500);
1629
1630 let col = batches[0].column_by_name("x").unwrap();
1631 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1632 assert_eq!(col, &expected_x);
1633
1634 let col = batches[0].column_by_name("y").unwrap();
1635 let mut values = vec![];
1636 values.append(vec![3; 200].as_mut());
1637 values.append(vec![4; 300].as_mut());
1638 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1639 assert_eq!(col, &expected_y);
1640
1641 let col = batches[0].column_by_name("z").unwrap();
1642 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1643 assert_eq!(col, &expected_z);
1644 }
1645
1646 #[tokio::test]
1647 async fn test_filter_on_arrow_lt_or_gt() {
1648 let mut fixture = TableTestFixture::new();
1649 fixture.setup_manifest_files().await;
1650
1651 let mut builder = fixture.table.scan();
1653 let predicate = Reference::new("y")
1654 .less_than(Datum::long(5))
1655 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1656 builder = builder
1657 .with_filter(predicate)
1658 .with_row_selection_enabled(true);
1659 let table_scan = builder.build().unwrap();
1660
1661 let batch_stream = table_scan.to_arrow().await.unwrap();
1662
1663 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1664 assert_eq!(batches[0].num_rows(), 1024);
1665
1666 let col = batches[0].column_by_name("x").unwrap();
1667 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1668 assert_eq!(col, &expected_x);
1669
1670 let col = batches[0].column_by_name("y").unwrap();
1671 let mut values = vec![2; 512];
1672 values.append(vec![3; 200].as_mut());
1673 values.append(vec![4; 300].as_mut());
1674 values.append(vec![5; 12].as_mut());
1675 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1676 assert_eq!(col, &expected_y);
1677
1678 let col = batches[0].column_by_name("z").unwrap();
1679 let mut values = vec![3; 512];
1680 values.append(vec![4; 512].as_mut());
1681 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1682 assert_eq!(col, &expected_z);
1683 }
1684
1685 #[tokio::test]
1686 async fn test_filter_on_arrow_startswith() {
1687 let mut fixture = TableTestFixture::new();
1688 fixture.setup_manifest_files().await;
1689
1690 let mut builder = fixture.table.scan();
1692 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1693 builder = builder
1694 .with_filter(predicate)
1695 .with_row_selection_enabled(true);
1696 let table_scan = builder.build().unwrap();
1697
1698 let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702 assert_eq!(batches[0].num_rows(), 512);
1703
1704 let col = batches[0].column_by_name("a").unwrap();
1705 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706 assert_eq!(string_arr.value(0), "Iceberg");
1707 }
1708
1709 #[tokio::test]
1710 async fn test_filter_on_arrow_not_startswith() {
1711 let mut fixture = TableTestFixture::new();
1712 fixture.setup_manifest_files().await;
1713
1714 let mut builder = fixture.table.scan();
1716 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1717 builder = builder
1718 .with_filter(predicate)
1719 .with_row_selection_enabled(true);
1720 let table_scan = builder.build().unwrap();
1721
1722 let batch_stream = table_scan.to_arrow().await.unwrap();
1723
1724 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1725
1726 assert_eq!(batches[0].num_rows(), 512);
1727
1728 let col = batches[0].column_by_name("a").unwrap();
1729 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1730 assert_eq!(string_arr.value(0), "Apache");
1731 }
1732
1733 #[tokio::test]
1734 async fn test_filter_on_arrow_in() {
1735 let mut fixture = TableTestFixture::new();
1736 fixture.setup_manifest_files().await;
1737
1738 let mut builder = fixture.table.scan();
1740 let predicate =
1741 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1742 builder = builder
1743 .with_filter(predicate)
1744 .with_row_selection_enabled(true);
1745 let table_scan = builder.build().unwrap();
1746
1747 let batch_stream = table_scan.to_arrow().await.unwrap();
1748
1749 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1750
1751 assert_eq!(batches[0].num_rows(), 512);
1752
1753 let col = batches[0].column_by_name("a").unwrap();
1754 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1755 assert_eq!(string_arr.value(0), "Iceberg");
1756 }
1757
1758 #[tokio::test]
1759 async fn test_filter_on_arrow_not_in() {
1760 let mut fixture = TableTestFixture::new();
1761 fixture.setup_manifest_files().await;
1762
1763 let mut builder = fixture.table.scan();
1765 let predicate =
1766 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1767 builder = builder
1768 .with_filter(predicate)
1769 .with_row_selection_enabled(true);
1770 let table_scan = builder.build().unwrap();
1771
1772 let batch_stream = table_scan.to_arrow().await.unwrap();
1773
1774 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1775
1776 assert_eq!(batches[0].num_rows(), 512);
1777
1778 let col = batches[0].column_by_name("a").unwrap();
1779 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1780 assert_eq!(string_arr.value(0), "Apache");
1781 }
1782
1783 #[test]
1784 fn test_file_scan_task_serialize_deserialize() {
1785 let test_fn = |task: FileScanTask| {
1786 let serialized = serde_json::to_string(&task).unwrap();
1787 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1788
1789 assert_eq!(task.data_file_path, deserialized.data_file_path);
1790 assert_eq!(task.start, deserialized.start);
1791 assert_eq!(task.length, deserialized.length);
1792 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1793 assert_eq!(task.predicate, deserialized.predicate);
1794 assert_eq!(task.schema, deserialized.schema);
1795 };
1796
1797 let schema = Arc::new(
1799 Schema::builder()
1800 .with_fields(vec![Arc::new(NestedField::required(
1801 1,
1802 "x",
1803 Type::Primitive(PrimitiveType::Binary),
1804 ))])
1805 .build()
1806 .unwrap(),
1807 );
1808 let task = FileScanTask {
1809 data_file_path: "data_file_path".to_string(),
1810 file_size_in_bytes: 0,
1811 start: 0,
1812 length: 100,
1813 project_field_ids: vec![1, 2, 3],
1814 predicate: None,
1815 schema: schema.clone(),
1816 record_count: Some(100),
1817 data_file_format: DataFileFormat::Parquet,
1818 deletes: vec![],
1819 partition: None,
1820 partition_spec: None,
1821 name_mapping: None,
1822 case_sensitive: false,
1823 };
1824 test_fn(task);
1825
1826 let task = FileScanTask {
1828 data_file_path: "data_file_path".to_string(),
1829 file_size_in_bytes: 0,
1830 start: 0,
1831 length: 100,
1832 project_field_ids: vec![1, 2, 3],
1833 predicate: Some(BoundPredicate::AlwaysTrue),
1834 schema,
1835 record_count: None,
1836 data_file_format: DataFileFormat::Avro,
1837 deletes: vec![],
1838 partition: None,
1839 partition_spec: None,
1840 name_mapping: None,
1841 case_sensitive: false,
1842 };
1843 test_fn(task);
1844 }
1845
1846 #[tokio::test]
1847 async fn test_select_with_file_column() {
1848 use arrow_array::cast::AsArray;
1849
1850 let mut fixture = TableTestFixture::new();
1851 fixture.setup_manifest_files().await;
1852
1853 let table_scan = fixture
1855 .table
1856 .scan()
1857 .select(["x", RESERVED_COL_NAME_FILE])
1858 .with_row_selection_enabled(true)
1859 .build()
1860 .unwrap();
1861
1862 let batch_stream = table_scan.to_arrow().await.unwrap();
1863 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1864
1865 assert_eq!(batches[0].num_columns(), 2);
1867
1868 let x_col = batches[0].column_by_name("x").unwrap();
1870 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1871 assert_eq!(x_arr.value(0), 1);
1872
1873 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1875 assert!(
1876 file_col.is_some(),
1877 "_file column should be present in the batch"
1878 );
1879
1880 let file_col = file_col.unwrap();
1882 assert!(
1883 matches!(
1884 file_col.data_type(),
1885 arrow_schema::DataType::RunEndEncoded(_, _)
1886 ),
1887 "_file column should use RunEndEncoded type"
1888 );
1889
1890 let run_array = file_col
1892 .as_any()
1893 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1894 .expect("_file column should be a RunArray");
1895
1896 let values = run_array.values();
1897 let string_values = values.as_string::<i32>();
1898 assert_eq!(string_values.len(), 1, "Should have a single file path");
1899
1900 let file_path = string_values.value(0);
1901 assert!(
1902 file_path.ends_with(".parquet"),
1903 "File path should end with .parquet, got: {file_path}"
1904 );
1905 }
1906
1907 #[tokio::test]
1908 async fn test_select_file_column_position() {
1909 let mut fixture = TableTestFixture::new();
1910 fixture.setup_manifest_files().await;
1911
1912 let table_scan = fixture
1914 .table
1915 .scan()
1916 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1917 .with_row_selection_enabled(true)
1918 .build()
1919 .unwrap();
1920
1921 let batch_stream = table_scan.to_arrow().await.unwrap();
1922 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1923
1924 assert_eq!(batches[0].num_columns(), 3);
1925
1926 let schema = batches[0].schema();
1928 assert_eq!(schema.field(0).name(), "x");
1929 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1930 assert_eq!(schema.field(2).name(), "z");
1931
1932 assert!(batches[0].column_by_name("x").is_some());
1934 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1935 assert!(batches[0].column_by_name("z").is_some());
1936 }
1937
1938 #[tokio::test]
1939 async fn test_select_file_column_only() {
1940 let mut fixture = TableTestFixture::new();
1941 fixture.setup_manifest_files().await;
1942
1943 let table_scan = fixture
1945 .table
1946 .scan()
1947 .select([RESERVED_COL_NAME_FILE])
1948 .with_row_selection_enabled(true)
1949 .build()
1950 .unwrap();
1951
1952 let batch_stream = table_scan.to_arrow().await.unwrap();
1953 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1954
1955 assert_eq!(batches[0].num_columns(), 1);
1957
1958 let schema = batches[0].schema();
1960 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1961
1962 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1966 assert_eq!(total_rows, 2048);
1967 }
1968
1969 #[tokio::test]
1970 async fn test_file_column_with_multiple_files() {
1971 use std::collections::HashSet;
1972
1973 let mut fixture = TableTestFixture::new();
1974 fixture.setup_manifest_files().await;
1975
1976 let table_scan = fixture
1978 .table
1979 .scan()
1980 .select(["x", RESERVED_COL_NAME_FILE])
1981 .with_row_selection_enabled(true)
1982 .build()
1983 .unwrap();
1984
1985 let batch_stream = table_scan.to_arrow().await.unwrap();
1986 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1987
1988 let mut file_paths = HashSet::new();
1990 for batch in &batches {
1991 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1992 let run_array = file_col
1993 .as_any()
1994 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1995 .expect("_file column should be a RunArray");
1996
1997 let values = run_array.values();
1998 let string_values = values.as_string::<i32>();
1999 for i in 0..string_values.len() {
2000 file_paths.insert(string_values.value(i).to_string());
2001 }
2002 }
2003
2004 assert!(!file_paths.is_empty(), "Should have at least one file path");
2006
2007 for path in &file_paths {
2009 assert!(
2010 path.ends_with(".parquet"),
2011 "All file paths should end with .parquet, got: {path}"
2012 );
2013 }
2014 }
2015
2016 #[tokio::test]
2017 async fn test_file_column_at_start() {
2018 let mut fixture = TableTestFixture::new();
2019 fixture.setup_manifest_files().await;
2020
2021 let table_scan = fixture
2023 .table
2024 .scan()
2025 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2026 .with_row_selection_enabled(true)
2027 .build()
2028 .unwrap();
2029
2030 let batch_stream = table_scan.to_arrow().await.unwrap();
2031 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2032
2033 assert_eq!(batches[0].num_columns(), 3);
2034
2035 let schema = batches[0].schema();
2037 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2038 assert_eq!(schema.field(1).name(), "x");
2039 assert_eq!(schema.field(2).name(), "y");
2040 }
2041
2042 #[tokio::test]
2043 async fn test_file_column_at_end() {
2044 let mut fixture = TableTestFixture::new();
2045 fixture.setup_manifest_files().await;
2046
2047 let table_scan = fixture
2049 .table
2050 .scan()
2051 .select(["x", "y", RESERVED_COL_NAME_FILE])
2052 .with_row_selection_enabled(true)
2053 .build()
2054 .unwrap();
2055
2056 let batch_stream = table_scan.to_arrow().await.unwrap();
2057 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2058
2059 assert_eq!(batches[0].num_columns(), 3);
2060
2061 let schema = batches[0].schema();
2063 assert_eq!(schema.field(0).name(), "x");
2064 assert_eq!(schema.field(1).name(), "y");
2065 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2066 }
2067
2068 #[tokio::test]
2069 async fn test_select_with_repeated_column_names() {
2070 let mut fixture = TableTestFixture::new();
2071 fixture.setup_manifest_files().await;
2072
2073 let table_scan = fixture
2076 .table
2077 .scan()
2078 .select([
2079 "x",
2080 RESERVED_COL_NAME_FILE,
2081 "x", "y",
2083 RESERVED_COL_NAME_FILE, "y", ])
2086 .with_row_selection_enabled(true)
2087 .build()
2088 .unwrap();
2089
2090 let batch_stream = table_scan.to_arrow().await.unwrap();
2091 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2092
2093 assert_eq!(
2095 batches[0].num_columns(),
2096 6,
2097 "Should have exactly 6 columns with duplicates"
2098 );
2099
2100 let schema = batches[0].schema();
2101
2102 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2104 assert_eq!(
2105 schema.field(1).name(),
2106 RESERVED_COL_NAME_FILE,
2107 "Column 1 should be _file"
2108 );
2109 assert_eq!(
2110 schema.field(2).name(),
2111 "x",
2112 "Column 2 should be x (duplicate)"
2113 );
2114 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2115 assert_eq!(
2116 schema.field(4).name(),
2117 RESERVED_COL_NAME_FILE,
2118 "Column 4 should be _file (duplicate)"
2119 );
2120 assert_eq!(
2121 schema.field(5).name(),
2122 "y",
2123 "Column 5 should be y (duplicate)"
2124 );
2125
2126 assert!(
2128 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2129 "Column x should be Int64"
2130 );
2131 assert!(
2132 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2133 "Column x (duplicate) should be Int64"
2134 );
2135 assert!(
2136 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2137 "Column y should be Int64"
2138 );
2139 assert!(
2140 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2141 "Column y (duplicate) should be Int64"
2142 );
2143 assert!(
2144 matches!(
2145 schema.field(1).data_type(),
2146 arrow_schema::DataType::RunEndEncoded(_, _)
2147 ),
2148 "_file column should use RunEndEncoded type"
2149 );
2150 assert!(
2151 matches!(
2152 schema.field(4).data_type(),
2153 arrow_schema::DataType::RunEndEncoded(_, _)
2154 ),
2155 "_file column (duplicate) should use RunEndEncoded type"
2156 );
2157 }
2158
2159 #[tokio::test]
2160 async fn test_scan_deadlock() {
2161 let mut fixture = TableTestFixture::new();
2162 fixture.setup_deadlock_manifests().await;
2163
2164 let table_scan = fixture
2171 .table
2172 .scan()
2173 .with_concurrency_limit(1)
2174 .build()
2175 .unwrap();
2176
2177 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2180 table_scan
2181 .plan_files()
2182 .await
2183 .unwrap()
2184 .try_collect::<Vec<_>>()
2185 .await
2186 })
2187 .await;
2188
2189 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2191 }
2192}