1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49pub struct TableScanBuilder<'a> {
51 table: &'a Table,
52 column_names: Option<Vec<String>>,
54 snapshot_id: Option<i64>,
55 batch_size: Option<usize>,
56 case_sensitive: bool,
57 filter: Option<Predicate>,
58 concurrency_limit_data_files: usize,
59 concurrency_limit_manifest_entries: usize,
60 concurrency_limit_manifest_files: usize,
61 row_group_filtering_enabled: bool,
62 row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66 pub(crate) fn new(table: &'a Table) -> Self {
67 let num_cpus = available_parallelism().get();
68
69 Self {
70 table,
71 column_names: None,
72 snapshot_id: None,
73 batch_size: None,
74 case_sensitive: true,
75 filter: None,
76 concurrency_limit_data_files: num_cpus,
77 concurrency_limit_manifest_entries: num_cpus,
78 concurrency_limit_manifest_files: num_cpus,
79 row_group_filtering_enabled: true,
80 row_selection_enabled: false,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87 self.batch_size = batch_size;
88 self
89 }
90
91 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93 self.case_sensitive = case_sensitive;
94 self
95 }
96
97 pub fn with_filter(mut self, predicate: Predicate) -> Self {
99 self.filter = Some(predicate.rewrite_not());
102 self
103 }
104
105 pub fn select_all(mut self) -> Self {
107 self.column_names = None;
108 self
109 }
110
111 pub fn select_empty(mut self) -> Self {
113 self.column_names = Some(vec![]);
114 self
115 }
116
117 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119 self.column_names = Some(
120 column_names
121 .into_iter()
122 .map(|item| item.to_string())
123 .collect(),
124 );
125 self
126 }
127
128 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130 self.snapshot_id = Some(snapshot_id);
131 self
132 }
133
134 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137 self.concurrency_limit_manifest_files = limit;
138 self.concurrency_limit_manifest_entries = limit;
139 self.concurrency_limit_data_files = limit;
140 self
141 }
142
143 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145 self.concurrency_limit_data_files = limit;
146 self
147 }
148
149 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151 self.concurrency_limit_manifest_entries = limit;
152 self
153 }
154
155 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164 self.row_group_filtering_enabled = row_group_filtering_enabled;
165 self
166 }
167
168 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183 self.row_selection_enabled = row_selection_enabled;
184 self
185 }
186
187 pub fn build(self) -> Result<TableScan> {
189 let snapshot = match self.snapshot_id {
190 Some(snapshot_id) => self
191 .table
192 .metadata()
193 .snapshot_by_id(snapshot_id)
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Snapshot with id {snapshot_id} not found"),
198 )
199 })?
200 .clone(),
201 None => {
202 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203 return Ok(TableScan {
204 batch_size: self.batch_size,
205 column_names: self.column_names,
206 file_io: self.table.file_io().clone(),
207 plan_context: None,
208 concurrency_limit_data_files: self.concurrency_limit_data_files,
209 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211 row_group_filtering_enabled: self.row_group_filtering_enabled,
212 row_selection_enabled: self.row_selection_enabled,
213 });
214 };
215 current_snapshot_id.clone()
216 }
217 };
218
219 let schema = snapshot.schema(self.table.metadata())?;
220
221 if let Some(column_names) = self.column_names.as_ref() {
223 for column_name in column_names {
224 if is_metadata_column_name(column_name) {
226 continue;
227 }
228 if schema.field_by_name(column_name).is_none() {
229 return Err(Error::new(
230 ErrorKind::DataInvalid,
231 format!("Column {column_name} not found in table. Schema: {schema}"),
232 ));
233 }
234 }
235 }
236
237 let mut field_ids = vec![];
238 let column_names = self.column_names.clone().unwrap_or_else(|| {
239 schema
240 .as_struct()
241 .fields()
242 .iter()
243 .map(|f| f.name.clone())
244 .collect()
245 });
246
247 for column_name in column_names.iter() {
248 if is_metadata_column_name(column_name) {
250 field_ids.push(get_metadata_field_id(column_name)?);
251 continue;
252 }
253
254 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255 Error::new(
256 ErrorKind::DataInvalid,
257 format!("Column {column_name} not found in table. Schema: {schema}"),
258 )
259 })?;
260
261 schema
262 .as_struct()
263 .field_by_id(field_id)
264 .ok_or_else(|| {
265 Error::new(
266 ErrorKind::FeatureUnsupported,
267 format!(
268 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269 ),
270 )
271 })?;
272
273 field_ids.push(field_id);
274 }
275
276 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277 Some(predicates.bind(schema.clone(), true)?)
278 } else {
279 None
280 };
281
282 let plan_context = PlanContext {
283 snapshot,
284 table_metadata: self.table.metadata_ref(),
285 snapshot_schema: schema,
286 case_sensitive: self.case_sensitive,
287 predicate: self.filter.map(Arc::new),
288 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289 object_cache: self.table.object_cache(),
290 field_ids: Arc::new(field_ids),
291 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294 };
295
296 Ok(TableScan {
297 batch_size: self.batch_size,
298 column_names: self.column_names,
299 file_io: self.table.file_io().clone(),
300 plan_context: Some(plan_context),
301 concurrency_limit_data_files: self.concurrency_limit_data_files,
302 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304 row_group_filtering_enabled: self.row_group_filtering_enabled,
305 row_selection_enabled: self.row_selection_enabled,
306 })
307 }
308}
309
310#[derive(Debug)]
312pub struct TableScan {
313 plan_context: Option<PlanContext>,
317 batch_size: Option<usize>,
318 file_io: FileIO,
319 column_names: Option<Vec<String>>,
320 concurrency_limit_manifest_files: usize,
323
324 concurrency_limit_manifest_entries: usize,
327
328 concurrency_limit_data_files: usize,
331
332 row_group_filtering_enabled: bool,
333 row_selection_enabled: bool,
334}
335
336impl TableScan {
337 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339 let Some(plan_context) = self.plan_context.as_ref() else {
340 return Ok(Box::pin(futures::stream::empty()));
341 };
342
343 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348 channel(concurrency_limit_manifest_files);
349 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350 channel(concurrency_limit_manifest_files);
351
352 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357 let manifest_list = plan_context.get_manifest_list().await?;
358
359 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363 manifest_list,
364 manifest_entry_data_ctx_tx,
365 delete_file_idx.clone(),
366 manifest_entry_delete_ctx_tx,
367 )?;
368
369 let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371 spawn(async move {
373 let result = futures::stream::iter(manifest_file_contexts)
374 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375 ctx.fetch_manifest_and_stream_manifest_entries().await
376 })
377 .await;
378
379 if let Err(error) = result {
380 let _ = channel_for_manifest_error.send(Err(error)).await;
381 }
382 });
383
384 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387 spawn(async move {
389 let result = manifest_entry_delete_ctx_rx
390 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391 .try_for_each_concurrent(
392 concurrency_limit_manifest_entries,
393 |(manifest_entry_context, tx)| async move {
394 spawn(async move {
395 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396 })
397 .await
398 },
399 )
400 .await;
401
402 if let Err(error) = result {
403 let _ = channel_for_delete_manifest_entry_error
404 .send(Err(error))
405 .await;
406 }
407 })
408 .await;
409
410 spawn(async move {
412 let result = manifest_entry_data_ctx_rx
413 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414 .try_for_each_concurrent(
415 concurrency_limit_manifest_entries,
416 |(manifest_entry_context, tx)| async move {
417 spawn(async move {
418 Self::process_data_manifest_entry(manifest_entry_context, tx).await
419 })
420 .await
421 },
422 )
423 .await;
424
425 if let Err(error) = result {
426 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427 }
428 });
429
430 Ok(file_scan_task_rx.boxed())
431 }
432
433 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438 .with_row_selection_enabled(self.row_selection_enabled);
439
440 if let Some(batch_size) = self.batch_size {
441 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442 }
443
444 arrow_reader_builder.build().read(self.plan_files().await?)
445 }
446
447 pub fn column_names(&self) -> Option<&[String]> {
449 self.column_names.as_deref()
450 }
451
452 pub fn snapshot(&self) -> Option<&SnapshotRef> {
454 self.plan_context.as_ref().map(|x| &x.snapshot)
455 }
456
457 async fn process_data_manifest_entry(
458 manifest_entry_context: ManifestEntryContext,
459 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460 ) -> Result<()> {
461 if !manifest_entry_context.manifest_entry.is_alive() {
463 return Ok(());
464 }
465
466 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468 return Err(Error::new(
469 ErrorKind::FeatureUnsupported,
470 "Encountered an entry for a delete file in a data file manifest",
471 ));
472 }
473
474 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475 let BoundPredicates {
476 snapshot_bound_predicate,
477 partition_bound_predicate,
478 } = bound_predicates.as_ref();
479
480 let expression_evaluator_cache =
481 manifest_entry_context.expression_evaluator_cache.as_ref();
482
483 let expression_evaluator = expression_evaluator_cache.get(
484 manifest_entry_context.partition_spec_id,
485 partition_bound_predicate,
486 )?;
487
488 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491 return Ok(());
492 }
493
494 if !InclusiveMetricsEvaluator::eval(
496 snapshot_bound_predicate,
497 manifest_entry_context.manifest_entry.data_file(),
498 false,
499 )? {
500 return Ok(());
501 }
502 }
503
504 file_scan_task_tx
508 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509 .await?;
510
511 Ok(())
512 }
513
514 async fn process_delete_manifest_entry(
515 manifest_entry_context: ManifestEntryContext,
516 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517 ) -> Result<()> {
518 if !manifest_entry_context.manifest_entry.is_alive() {
520 return Ok(());
521 }
522
523 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525 return Err(Error::new(
526 ErrorKind::FeatureUnsupported,
527 "Encountered an entry for a data file in a delete manifest",
528 ));
529 }
530
531 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532 let expression_evaluator_cache =
533 manifest_entry_context.expression_evaluator_cache.as_ref();
534
535 let expression_evaluator = expression_evaluator_cache.get(
536 manifest_entry_context.partition_spec_id,
537 &bound_predicates.partition_bound_predicate,
538 )?;
539
540 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543 return Ok(());
544 }
545 }
546
547 delete_file_ctx_tx
548 .send(DeleteFileContext {
549 manifest_entry: manifest_entry_context.manifest_entry.clone(),
550 partition_spec_id: manifest_entry_context.partition_spec_id,
551 })
552 .await?;
553
554 Ok(())
555 }
556}
557
558pub(crate) struct BoundPredicates {
559 partition_bound_predicate: BoundPredicate,
560 snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565 #![allow(missing_docs)]
567
568 use std::collections::HashMap;
569 use std::fs;
570 use std::fs::File;
571 use std::sync::Arc;
572
573 use arrow_array::cast::AsArray;
574 use arrow_array::{
575 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576 StringArray,
577 };
578 use futures::{TryStreamExt, stream};
579 use minijinja::value::Value;
580 use minijinja::{AutoEscape, Environment, context};
581 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582 use parquet::basic::Compression;
583 use parquet::file::properties::WriterProperties;
584 use tempfile::TempDir;
585 use uuid::Uuid;
586
587 use crate::TableIdent;
588 use crate::arrow::ArrowReaderBuilder;
589 use crate::expr::{BoundPredicate, Reference};
590 use crate::io::{FileIO, OutputFile};
591 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592 use crate::scan::FileScanTask;
593 use crate::spec::{
594 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597 };
598 use crate::table::Table;
599
600 fn render_template(template: &str, ctx: Value) -> String {
601 let mut env = Environment::new();
602 env.set_auto_escape_callback(|_| AutoEscape::None);
603 env.render_str(template, ctx).unwrap()
604 }
605
606 pub struct TableTestFixture {
607 pub table_location: String,
608 pub table: Table,
609 }
610
611 impl TableTestFixture {
612 #[allow(clippy::new_without_default)]
613 pub fn new() -> Self {
614 let tmp_dir = TempDir::new().unwrap();
615 let table_location = tmp_dir.path().join("table1");
616 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618 let table_metadata1_location = table_location.join("metadata/v1.json");
619
620 let file_io = FileIO::new_with_fs();
621
622 let table_metadata = {
623 let template_json_str = fs::read_to_string(format!(
624 "{}/testdata/example_table_metadata_v2.json",
625 env!("CARGO_MANIFEST_DIR")
626 ))
627 .unwrap();
628 let metadata_json = render_template(&template_json_str, context! {
629 table_location => &table_location,
630 manifest_list_1_location => &manifest_list1_location,
631 manifest_list_2_location => &manifest_list2_location,
632 table_metadata_1_location => &table_metadata1_location,
633 });
634 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
635 };
636
637 let table = Table::builder()
638 .metadata(table_metadata)
639 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
640 .file_io(file_io.clone())
641 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
642 .build()
643 .unwrap();
644
645 Self {
646 table_location: table_location.to_str().unwrap().to_string(),
647 table,
648 }
649 }
650
651 #[allow(clippy::new_without_default)]
652 pub fn new_empty() -> Self {
653 let tmp_dir = TempDir::new().unwrap();
654 let table_location = tmp_dir.path().join("table1");
655 let table_metadata1_location = table_location.join("metadata/v1.json");
656
657 let file_io = FileIO::new_with_fs();
658
659 let table_metadata = {
660 let template_json_str = fs::read_to_string(format!(
661 "{}/testdata/example_empty_table_metadata_v2.json",
662 env!("CARGO_MANIFEST_DIR")
663 ))
664 .unwrap();
665 let metadata_json = render_template(&template_json_str, context! {
666 table_location => &table_location,
667 table_metadata_1_location => &table_metadata1_location,
668 });
669 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670 };
671
672 let table = Table::builder()
673 .metadata(table_metadata)
674 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675 .file_io(file_io.clone())
676 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677 .build()
678 .unwrap();
679
680 Self {
681 table_location: table_location.to_str().unwrap().to_string(),
682 table,
683 }
684 }
685
686 pub fn new_unpartitioned() -> Self {
687 let tmp_dir = TempDir::new().unwrap();
688 let table_location = tmp_dir.path().join("table1");
689 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
690 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
691 let table_metadata1_location = table_location.join("metadata/v1.json");
692
693 let file_io = FileIO::new_with_fs();
694
695 let mut table_metadata = {
696 let template_json_str = fs::read_to_string(format!(
697 "{}/testdata/example_table_metadata_v2.json",
698 env!("CARGO_MANIFEST_DIR")
699 ))
700 .unwrap();
701 let metadata_json = render_template(&template_json_str, context! {
702 table_location => &table_location,
703 manifest_list_1_location => &manifest_list1_location,
704 manifest_list_2_location => &manifest_list2_location,
705 table_metadata_1_location => &table_metadata1_location,
706 });
707 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
708 };
709
710 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
711 table_metadata.partition_specs.clear();
712 table_metadata.default_partition_type = StructType::new(vec![]);
713 table_metadata
714 .partition_specs
715 .insert(0, table_metadata.default_spec.clone());
716
717 let table = Table::builder()
718 .metadata(table_metadata)
719 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
720 .file_io(file_io.clone())
721 .metadata_location(table_metadata1_location.to_str().unwrap())
722 .build()
723 .unwrap();
724
725 Self {
726 table_location: table_location.to_str().unwrap().to_string(),
727 table,
728 }
729 }
730
731 fn next_manifest_file(&self) -> OutputFile {
732 self.table
733 .file_io()
734 .new_output(format!(
735 "{}/metadata/manifest_{}.avro",
736 self.table_location,
737 Uuid::new_v4()
738 ))
739 .unwrap()
740 }
741
742 pub async fn setup_manifest_files(&mut self) {
743 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
744 let parent_snapshot = current_snapshot
745 .parent_snapshot(self.table.metadata())
746 .unwrap();
747 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
748 let current_partition_spec = self.table.metadata().default_partition_spec();
749
750 let parquet_file_size = self.write_parquet_data_files();
752
753 let mut writer = ManifestWriterBuilder::new(
754 self.next_manifest_file(),
755 Some(current_snapshot.snapshot_id()),
756 None,
757 current_schema.clone(),
758 current_partition_spec.as_ref().clone(),
759 )
760 .build_v2_data();
761 writer
762 .add_entry(
763 ManifestEntry::builder()
764 .status(ManifestStatus::Added)
765 .data_file(
766 DataFileBuilder::default()
767 .partition_spec_id(0)
768 .content(DataContentType::Data)
769 .file_path(format!("{}/1.parquet", &self.table_location))
770 .file_format(DataFileFormat::Parquet)
771 .file_size_in_bytes(parquet_file_size)
772 .record_count(1)
773 .partition(Struct::from_iter([Some(Literal::long(100))]))
774 .key_metadata(None)
775 .build()
776 .unwrap(),
777 )
778 .build(),
779 )
780 .unwrap();
781 writer
782 .add_delete_entry(
783 ManifestEntry::builder()
784 .status(ManifestStatus::Deleted)
785 .snapshot_id(parent_snapshot.snapshot_id())
786 .sequence_number(parent_snapshot.sequence_number())
787 .file_sequence_number(parent_snapshot.sequence_number())
788 .data_file(
789 DataFileBuilder::default()
790 .partition_spec_id(0)
791 .content(DataContentType::Data)
792 .file_path(format!("{}/2.parquet", &self.table_location))
793 .file_format(DataFileFormat::Parquet)
794 .file_size_in_bytes(parquet_file_size)
795 .record_count(1)
796 .partition(Struct::from_iter([Some(Literal::long(200))]))
797 .build()
798 .unwrap(),
799 )
800 .build(),
801 )
802 .unwrap();
803 writer
804 .add_existing_entry(
805 ManifestEntry::builder()
806 .status(ManifestStatus::Existing)
807 .snapshot_id(parent_snapshot.snapshot_id())
808 .sequence_number(parent_snapshot.sequence_number())
809 .file_sequence_number(parent_snapshot.sequence_number())
810 .data_file(
811 DataFileBuilder::default()
812 .partition_spec_id(0)
813 .content(DataContentType::Data)
814 .file_path(format!("{}/3.parquet", &self.table_location))
815 .file_format(DataFileFormat::Parquet)
816 .file_size_in_bytes(parquet_file_size)
817 .record_count(1)
818 .partition(Struct::from_iter([Some(Literal::long(300))]))
819 .build()
820 .unwrap(),
821 )
822 .build(),
823 )
824 .unwrap();
825 let data_file_manifest = writer.write_manifest_file().await.unwrap();
826
827 let mut manifest_list_write = ManifestListWriter::v2(
829 self.table
830 .file_io()
831 .new_output(current_snapshot.manifest_list())
832 .unwrap(),
833 current_snapshot.snapshot_id(),
834 current_snapshot.parent_snapshot_id(),
835 current_snapshot.sequence_number(),
836 );
837 manifest_list_write
838 .add_manifests(vec![data_file_manifest].into_iter())
839 .unwrap();
840 manifest_list_write.close().await.unwrap();
841 }
842
843 fn write_parquet_data_files(&self) -> u64 {
846 std::fs::create_dir_all(&self.table_location).unwrap();
847
848 let schema = {
849 let fields = vec![
850 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
851 .with_metadata(HashMap::from([(
852 PARQUET_FIELD_ID_META_KEY.to_string(),
853 "1".to_string(),
854 )])),
855 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
856 .with_metadata(HashMap::from([(
857 PARQUET_FIELD_ID_META_KEY.to_string(),
858 "2".to_string(),
859 )])),
860 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
861 .with_metadata(HashMap::from([(
862 PARQUET_FIELD_ID_META_KEY.to_string(),
863 "3".to_string(),
864 )])),
865 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
866 .with_metadata(HashMap::from([(
867 PARQUET_FIELD_ID_META_KEY.to_string(),
868 "4".to_string(),
869 )])),
870 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
871 .with_metadata(HashMap::from([(
872 PARQUET_FIELD_ID_META_KEY.to_string(),
873 "5".to_string(),
874 )])),
875 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
876 .with_metadata(HashMap::from([(
877 PARQUET_FIELD_ID_META_KEY.to_string(),
878 "6".to_string(),
879 )])),
880 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
881 .with_metadata(HashMap::from([(
882 PARQUET_FIELD_ID_META_KEY.to_string(),
883 "7".to_string(),
884 )])),
885 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
886 .with_metadata(HashMap::from([(
887 PARQUET_FIELD_ID_META_KEY.to_string(),
888 "8".to_string(),
889 )])),
890 ];
891 Arc::new(arrow_schema::Schema::new(fields))
892 };
893 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
895
896 let mut values = vec![2; 512];
897 values.append(vec![3; 200].as_mut());
898 values.append(vec![4; 300].as_mut());
899 values.append(vec![5; 12].as_mut());
900
901 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
903
904 let mut values = vec![3; 512];
905 values.append(vec![4; 512].as_mut());
906
907 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
909
910 let mut values = vec!["Apache"; 512];
912 values.append(vec!["Iceberg"; 512].as_mut());
913 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
914
915 let mut values = vec![100.0f64; 512];
917 values.append(vec![150.0f64; 12].as_mut());
918 values.append(vec![200.0f64; 500].as_mut());
919 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
920
921 let mut values = vec![100i32; 512];
923 values.append(vec![150i32; 12].as_mut());
924 values.append(vec![200i32; 500].as_mut());
925 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
926
927 let mut values = vec![100i64; 512];
929 values.append(vec![150i64; 12].as_mut());
930 values.append(vec![200i64; 500].as_mut());
931 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
932
933 let mut values = vec![false; 512];
935 values.append(vec![true; 512].as_mut());
936 let values: BooleanArray = values.into();
937 let col8 = Arc::new(values) as ArrayRef;
938
939 let to_write = RecordBatch::try_new(schema.clone(), vec![
940 col1, col2, col3, col4, col5, col6, col7, col8,
941 ])
942 .unwrap();
943
944 let props = WriterProperties::builder()
946 .set_compression(Compression::SNAPPY)
947 .build();
948
949 for n in 1..=3 {
950 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
951 let mut writer =
952 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
953
954 writer.write(&to_write).expect("Writing batch");
955
956 writer.close().unwrap();
958 }
959
960 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
961 .unwrap()
962 .len()
963 }
964
965 pub async fn setup_unpartitioned_manifest_files(&mut self) {
966 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
967 let parent_snapshot = current_snapshot
968 .parent_snapshot(self.table.metadata())
969 .unwrap();
970 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
971 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
972
973 let parquet_file_size = self.write_parquet_data_files();
975
976 let mut writer = ManifestWriterBuilder::new(
978 self.next_manifest_file(),
979 Some(current_snapshot.snapshot_id()),
980 None,
981 current_schema.clone(),
982 current_partition_spec.as_ref().clone(),
983 )
984 .build_v2_data();
985
986 let empty_partition = Struct::empty();
988
989 writer
990 .add_entry(
991 ManifestEntry::builder()
992 .status(ManifestStatus::Added)
993 .data_file(
994 DataFileBuilder::default()
995 .partition_spec_id(0)
996 .content(DataContentType::Data)
997 .file_path(format!("{}/1.parquet", &self.table_location))
998 .file_format(DataFileFormat::Parquet)
999 .file_size_in_bytes(parquet_file_size)
1000 .record_count(1)
1001 .partition(empty_partition.clone())
1002 .key_metadata(None)
1003 .build()
1004 .unwrap(),
1005 )
1006 .build(),
1007 )
1008 .unwrap();
1009
1010 writer
1011 .add_delete_entry(
1012 ManifestEntry::builder()
1013 .status(ManifestStatus::Deleted)
1014 .snapshot_id(parent_snapshot.snapshot_id())
1015 .sequence_number(parent_snapshot.sequence_number())
1016 .file_sequence_number(parent_snapshot.sequence_number())
1017 .data_file(
1018 DataFileBuilder::default()
1019 .partition_spec_id(0)
1020 .content(DataContentType::Data)
1021 .file_path(format!("{}/2.parquet", &self.table_location))
1022 .file_format(DataFileFormat::Parquet)
1023 .file_size_in_bytes(parquet_file_size)
1024 .record_count(1)
1025 .partition(empty_partition.clone())
1026 .build()
1027 .unwrap(),
1028 )
1029 .build(),
1030 )
1031 .unwrap();
1032
1033 writer
1034 .add_existing_entry(
1035 ManifestEntry::builder()
1036 .status(ManifestStatus::Existing)
1037 .snapshot_id(parent_snapshot.snapshot_id())
1038 .sequence_number(parent_snapshot.sequence_number())
1039 .file_sequence_number(parent_snapshot.sequence_number())
1040 .data_file(
1041 DataFileBuilder::default()
1042 .partition_spec_id(0)
1043 .content(DataContentType::Data)
1044 .file_path(format!("{}/3.parquet", &self.table_location))
1045 .file_format(DataFileFormat::Parquet)
1046 .file_size_in_bytes(parquet_file_size)
1047 .record_count(1)
1048 .partition(empty_partition.clone())
1049 .build()
1050 .unwrap(),
1051 )
1052 .build(),
1053 )
1054 .unwrap();
1055
1056 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1057
1058 let mut manifest_list_write = ManifestListWriter::v2(
1060 self.table
1061 .file_io()
1062 .new_output(current_snapshot.manifest_list())
1063 .unwrap(),
1064 current_snapshot.snapshot_id(),
1065 current_snapshot.parent_snapshot_id(),
1066 current_snapshot.sequence_number(),
1067 );
1068 manifest_list_write
1069 .add_manifests(vec![data_file_manifest].into_iter())
1070 .unwrap();
1071 manifest_list_write.close().await.unwrap();
1072 }
1073
1074 pub async fn setup_deadlock_manifests(&mut self) {
1075 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1076 let _parent_snapshot = current_snapshot
1077 .parent_snapshot(self.table.metadata())
1078 .unwrap();
1079 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1080 let current_partition_spec = self.table.metadata().default_partition_spec();
1081
1082 let mut writer = ManifestWriterBuilder::new(
1084 self.next_manifest_file(),
1085 Some(current_snapshot.snapshot_id()),
1086 None,
1087 current_schema.clone(),
1088 current_partition_spec.as_ref().clone(),
1089 )
1090 .build_v2_data();
1091
1092 for i in 0..10 {
1094 writer
1095 .add_entry(
1096 ManifestEntry::builder()
1097 .status(ManifestStatus::Added)
1098 .data_file(
1099 DataFileBuilder::default()
1100 .partition_spec_id(0)
1101 .content(DataContentType::Data)
1102 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1103 .file_format(DataFileFormat::Parquet)
1104 .file_size_in_bytes(100)
1105 .record_count(1)
1106 .partition(Struct::from_iter([Some(Literal::long(100))]))
1107 .key_metadata(None)
1108 .build()
1109 .unwrap(),
1110 )
1111 .build(),
1112 )
1113 .unwrap();
1114 }
1115 let data_manifest = writer.write_manifest_file().await.unwrap();
1116
1117 let mut writer = ManifestWriterBuilder::new(
1119 self.next_manifest_file(),
1120 Some(current_snapshot.snapshot_id()),
1121 None,
1122 current_schema.clone(),
1123 current_partition_spec.as_ref().clone(),
1124 )
1125 .build_v2_deletes();
1126
1127 writer
1128 .add_entry(
1129 ManifestEntry::builder()
1130 .status(ManifestStatus::Added)
1131 .data_file(
1132 DataFileBuilder::default()
1133 .partition_spec_id(0)
1134 .content(DataContentType::PositionDeletes)
1135 .file_path(format!("{}/del.parquet", &self.table_location))
1136 .file_format(DataFileFormat::Parquet)
1137 .file_size_in_bytes(100)
1138 .record_count(1)
1139 .partition(Struct::from_iter([Some(Literal::long(100))]))
1140 .build()
1141 .unwrap(),
1142 )
1143 .build(),
1144 )
1145 .unwrap();
1146 let delete_manifest = writer.write_manifest_file().await.unwrap();
1147
1148 let mut manifest_list_write = ManifestListWriter::v2(
1151 self.table
1152 .file_io()
1153 .new_output(current_snapshot.manifest_list())
1154 .unwrap(),
1155 current_snapshot.snapshot_id(),
1156 current_snapshot.parent_snapshot_id(),
1157 current_snapshot.sequence_number(),
1158 );
1159 manifest_list_write
1160 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1161 .unwrap();
1162 manifest_list_write.close().await.unwrap();
1163 }
1164 }
1165
1166 #[test]
1167 fn test_table_scan_columns() {
1168 let table = TableTestFixture::new().table;
1169
1170 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1171 assert_eq!(
1172 Some(vec!["x".to_string(), "y".to_string()]),
1173 table_scan.column_names
1174 );
1175
1176 let table_scan = table
1177 .scan()
1178 .select(["x", "y"])
1179 .select(["z"])
1180 .build()
1181 .unwrap();
1182 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1183 }
1184
1185 #[test]
1186 fn test_select_all() {
1187 let table = TableTestFixture::new().table;
1188
1189 let table_scan = table.scan().select_all().build().unwrap();
1190 assert!(table_scan.column_names.is_none());
1191 }
1192
1193 #[test]
1194 fn test_select_no_exist_column() {
1195 let table = TableTestFixture::new().table;
1196
1197 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1198 assert!(table_scan.is_err());
1199 }
1200
1201 #[test]
1202 fn test_table_scan_default_snapshot_id() {
1203 let table = TableTestFixture::new().table;
1204
1205 let table_scan = table.scan().build().unwrap();
1206 assert_eq!(
1207 table.metadata().current_snapshot().unwrap().snapshot_id(),
1208 table_scan.snapshot().unwrap().snapshot_id()
1209 );
1210 }
1211
1212 #[test]
1213 fn test_table_scan_non_exist_snapshot_id() {
1214 let table = TableTestFixture::new().table;
1215
1216 let table_scan = table.scan().snapshot_id(1024).build();
1217 assert!(table_scan.is_err());
1218 }
1219
1220 #[test]
1221 fn test_table_scan_with_snapshot_id() {
1222 let table = TableTestFixture::new().table;
1223
1224 let table_scan = table
1225 .scan()
1226 .snapshot_id(3051729675574597004)
1227 .with_row_selection_enabled(true)
1228 .build()
1229 .unwrap();
1230 assert_eq!(
1231 table_scan.snapshot().unwrap().snapshot_id(),
1232 3051729675574597004
1233 );
1234 }
1235
1236 #[tokio::test]
1237 async fn test_plan_files_on_table_without_any_snapshots() {
1238 let table = TableTestFixture::new_empty().table;
1239 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1240 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1241 assert!(batches.is_empty());
1242 }
1243
1244 #[tokio::test]
1245 async fn test_plan_files_no_deletions() {
1246 let mut fixture = TableTestFixture::new();
1247 fixture.setup_manifest_files().await;
1248
1249 let table_scan = fixture
1251 .table
1252 .scan()
1253 .with_row_selection_enabled(true)
1254 .build()
1255 .unwrap();
1256
1257 let mut tasks = table_scan
1258 .plan_files()
1259 .await
1260 .unwrap()
1261 .try_fold(vec![], |mut acc, task| async move {
1262 acc.push(task);
1263 Ok(acc)
1264 })
1265 .await
1266 .unwrap();
1267
1268 assert_eq!(tasks.len(), 2);
1269
1270 tasks.sort_by_key(|t| t.data_file_path.to_string());
1271
1272 assert_eq!(
1274 tasks[0].data_file_path,
1275 format!("{}/1.parquet", &fixture.table_location)
1276 );
1277
1278 assert_eq!(
1280 tasks[1].data_file_path,
1281 format!("{}/3.parquet", &fixture.table_location)
1282 );
1283 }
1284
1285 #[tokio::test]
1286 async fn test_open_parquet_no_deletions() {
1287 let mut fixture = TableTestFixture::new();
1288 fixture.setup_manifest_files().await;
1289
1290 let table_scan = fixture
1292 .table
1293 .scan()
1294 .with_row_selection_enabled(true)
1295 .build()
1296 .unwrap();
1297
1298 let batch_stream = table_scan.to_arrow().await.unwrap();
1299
1300 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1301
1302 let col = batches[0].column_by_name("x").unwrap();
1303
1304 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1305 assert_eq!(int64_arr.value(0), 1);
1306 }
1307
1308 #[tokio::test]
1309 async fn test_open_parquet_no_deletions_by_separate_reader() {
1310 let mut fixture = TableTestFixture::new();
1311 fixture.setup_manifest_files().await;
1312
1313 let table_scan = fixture
1315 .table
1316 .scan()
1317 .with_row_selection_enabled(true)
1318 .build()
1319 .unwrap();
1320
1321 let mut plan_task: Vec<_> = table_scan
1322 .plan_files()
1323 .await
1324 .unwrap()
1325 .try_collect()
1326 .await
1327 .unwrap();
1328 assert_eq!(plan_task.len(), 2);
1329
1330 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1331 let batch_stream = reader
1332 .clone()
1333 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1334 .unwrap();
1335 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1336
1337 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1338 let batch_stream = reader
1339 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1340 .unwrap();
1341 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1342
1343 assert_eq!(batch_1, batch_2);
1344 }
1345
1346 #[tokio::test]
1347 async fn test_open_parquet_with_projection() {
1348 let mut fixture = TableTestFixture::new();
1349 fixture.setup_manifest_files().await;
1350
1351 let table_scan = fixture
1353 .table
1354 .scan()
1355 .select(["x", "z"])
1356 .with_row_selection_enabled(true)
1357 .build()
1358 .unwrap();
1359
1360 let batch_stream = table_scan.to_arrow().await.unwrap();
1361
1362 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1363
1364 assert_eq!(batches[0].num_columns(), 2);
1365
1366 let col1 = batches[0].column_by_name("x").unwrap();
1367 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1368 assert_eq!(int64_arr.value(0), 1);
1369
1370 let col2 = batches[0].column_by_name("z").unwrap();
1371 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1372 assert_eq!(int64_arr.value(0), 3);
1373
1374 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1376 let batch_stream = table_scan.to_arrow().await.unwrap();
1377 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1378
1379 assert_eq!(batches[0].num_columns(), 0);
1380 assert_eq!(batches[0].num_rows(), 1024);
1381 }
1382
1383 #[tokio::test]
1384 async fn test_filter_on_arrow_lt() {
1385 let mut fixture = TableTestFixture::new();
1386 fixture.setup_manifest_files().await;
1387
1388 let mut builder = fixture.table.scan();
1390 let predicate = Reference::new("y").less_than(Datum::long(3));
1391 builder = builder
1392 .with_filter(predicate)
1393 .with_row_selection_enabled(true);
1394 let table_scan = builder.build().unwrap();
1395
1396 let batch_stream = table_scan.to_arrow().await.unwrap();
1397
1398 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1399
1400 assert_eq!(batches[0].num_rows(), 512);
1401
1402 let col = batches[0].column_by_name("x").unwrap();
1403 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1404 assert_eq!(int64_arr.value(0), 1);
1405
1406 let col = batches[0].column_by_name("y").unwrap();
1407 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1408 assert_eq!(int64_arr.value(0), 2);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_filter_on_arrow_gt_eq() {
1413 let mut fixture = TableTestFixture::new();
1414 fixture.setup_manifest_files().await;
1415
1416 let mut builder = fixture.table.scan();
1418 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1419 builder = builder
1420 .with_filter(predicate)
1421 .with_row_selection_enabled(true);
1422 let table_scan = builder.build().unwrap();
1423
1424 let batch_stream = table_scan.to_arrow().await.unwrap();
1425
1426 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1427
1428 assert_eq!(batches[0].num_rows(), 12);
1429
1430 let col = batches[0].column_by_name("x").unwrap();
1431 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1432 assert_eq!(int64_arr.value(0), 1);
1433
1434 let col = batches[0].column_by_name("y").unwrap();
1435 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1436 assert_eq!(int64_arr.value(0), 5);
1437 }
1438
1439 #[tokio::test]
1440 async fn test_filter_double_eq() {
1441 let mut fixture = TableTestFixture::new();
1442 fixture.setup_manifest_files().await;
1443
1444 let mut builder = fixture.table.scan();
1446 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1447 builder = builder
1448 .with_filter(predicate)
1449 .with_row_selection_enabled(true);
1450 let table_scan = builder.build().unwrap();
1451
1452 let batch_stream = table_scan.to_arrow().await.unwrap();
1453
1454 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1455
1456 assert_eq!(batches.len(), 2);
1457 assert_eq!(batches[0].num_rows(), 12);
1458
1459 let col = batches[0].column_by_name("dbl").unwrap();
1460 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1461 assert_eq!(f64_arr.value(1), 150.0f64);
1462 }
1463
1464 #[tokio::test]
1465 async fn test_filter_int_eq() {
1466 let mut fixture = TableTestFixture::new();
1467 fixture.setup_manifest_files().await;
1468
1469 let mut builder = fixture.table.scan();
1471 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1472 builder = builder
1473 .with_filter(predicate)
1474 .with_row_selection_enabled(true);
1475 let table_scan = builder.build().unwrap();
1476
1477 let batch_stream = table_scan.to_arrow().await.unwrap();
1478
1479 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1480
1481 assert_eq!(batches.len(), 2);
1482 assert_eq!(batches[0].num_rows(), 12);
1483
1484 let col = batches[0].column_by_name("i32").unwrap();
1485 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1486 assert_eq!(i32_arr.value(1), 150i32);
1487 }
1488
1489 #[tokio::test]
1490 async fn test_filter_long_eq() {
1491 let mut fixture = TableTestFixture::new();
1492 fixture.setup_manifest_files().await;
1493
1494 let mut builder = fixture.table.scan();
1496 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1497 builder = builder
1498 .with_filter(predicate)
1499 .with_row_selection_enabled(true);
1500 let table_scan = builder.build().unwrap();
1501
1502 let batch_stream = table_scan.to_arrow().await.unwrap();
1503
1504 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1505
1506 assert_eq!(batches.len(), 2);
1507 assert_eq!(batches[0].num_rows(), 12);
1508
1509 let col = batches[0].column_by_name("i64").unwrap();
1510 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1511 assert_eq!(i64_arr.value(1), 150i64);
1512 }
1513
1514 #[tokio::test]
1515 async fn test_filter_bool_eq() {
1516 let mut fixture = TableTestFixture::new();
1517 fixture.setup_manifest_files().await;
1518
1519 let mut builder = fixture.table.scan();
1521 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1522 builder = builder
1523 .with_filter(predicate)
1524 .with_row_selection_enabled(true);
1525 let table_scan = builder.build().unwrap();
1526
1527 let batch_stream = table_scan.to_arrow().await.unwrap();
1528
1529 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1530
1531 assert_eq!(batches.len(), 2);
1532 assert_eq!(batches[0].num_rows(), 512);
1533
1534 let col = batches[0].column_by_name("bool").unwrap();
1535 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1536 assert!(bool_arr.value(1));
1537 }
1538
1539 #[tokio::test]
1540 async fn test_filter_on_arrow_is_null() {
1541 let mut fixture = TableTestFixture::new();
1542 fixture.setup_manifest_files().await;
1543
1544 let mut builder = fixture.table.scan();
1546 let predicate = Reference::new("y").is_null();
1547 builder = builder
1548 .with_filter(predicate)
1549 .with_row_selection_enabled(true);
1550 let table_scan = builder.build().unwrap();
1551
1552 let batch_stream = table_scan.to_arrow().await.unwrap();
1553
1554 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1555 assert_eq!(batches.len(), 0);
1556 }
1557
1558 #[tokio::test]
1559 async fn test_filter_on_arrow_is_not_null() {
1560 let mut fixture = TableTestFixture::new();
1561 fixture.setup_manifest_files().await;
1562
1563 let mut builder = fixture.table.scan();
1565 let predicate = Reference::new("y").is_not_null();
1566 builder = builder
1567 .with_filter(predicate)
1568 .with_row_selection_enabled(true);
1569 let table_scan = builder.build().unwrap();
1570
1571 let batch_stream = table_scan.to_arrow().await.unwrap();
1572
1573 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1574 assert_eq!(batches[0].num_rows(), 1024);
1575 }
1576
1577 #[tokio::test]
1578 async fn test_filter_on_arrow_lt_and_gt() {
1579 let mut fixture = TableTestFixture::new();
1580 fixture.setup_manifest_files().await;
1581
1582 let mut builder = fixture.table.scan();
1584 let predicate = Reference::new("y")
1585 .less_than(Datum::long(5))
1586 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1587 builder = builder
1588 .with_filter(predicate)
1589 .with_row_selection_enabled(true);
1590 let table_scan = builder.build().unwrap();
1591
1592 let batch_stream = table_scan.to_arrow().await.unwrap();
1593
1594 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1595 assert_eq!(batches[0].num_rows(), 500);
1596
1597 let col = batches[0].column_by_name("x").unwrap();
1598 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1599 assert_eq!(col, &expected_x);
1600
1601 let col = batches[0].column_by_name("y").unwrap();
1602 let mut values = vec![];
1603 values.append(vec![3; 200].as_mut());
1604 values.append(vec![4; 300].as_mut());
1605 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1606 assert_eq!(col, &expected_y);
1607
1608 let col = batches[0].column_by_name("z").unwrap();
1609 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1610 assert_eq!(col, &expected_z);
1611 }
1612
1613 #[tokio::test]
1614 async fn test_filter_on_arrow_lt_or_gt() {
1615 let mut fixture = TableTestFixture::new();
1616 fixture.setup_manifest_files().await;
1617
1618 let mut builder = fixture.table.scan();
1620 let predicate = Reference::new("y")
1621 .less_than(Datum::long(5))
1622 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1623 builder = builder
1624 .with_filter(predicate)
1625 .with_row_selection_enabled(true);
1626 let table_scan = builder.build().unwrap();
1627
1628 let batch_stream = table_scan.to_arrow().await.unwrap();
1629
1630 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1631 assert_eq!(batches[0].num_rows(), 1024);
1632
1633 let col = batches[0].column_by_name("x").unwrap();
1634 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1635 assert_eq!(col, &expected_x);
1636
1637 let col = batches[0].column_by_name("y").unwrap();
1638 let mut values = vec![2; 512];
1639 values.append(vec![3; 200].as_mut());
1640 values.append(vec![4; 300].as_mut());
1641 values.append(vec![5; 12].as_mut());
1642 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1643 assert_eq!(col, &expected_y);
1644
1645 let col = batches[0].column_by_name("z").unwrap();
1646 let mut values = vec![3; 512];
1647 values.append(vec![4; 512].as_mut());
1648 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1649 assert_eq!(col, &expected_z);
1650 }
1651
1652 #[tokio::test]
1653 async fn test_filter_on_arrow_startswith() {
1654 let mut fixture = TableTestFixture::new();
1655 fixture.setup_manifest_files().await;
1656
1657 let mut builder = fixture.table.scan();
1659 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1660 builder = builder
1661 .with_filter(predicate)
1662 .with_row_selection_enabled(true);
1663 let table_scan = builder.build().unwrap();
1664
1665 let batch_stream = table_scan.to_arrow().await.unwrap();
1666
1667 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1668
1669 assert_eq!(batches[0].num_rows(), 512);
1670
1671 let col = batches[0].column_by_name("a").unwrap();
1672 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1673 assert_eq!(string_arr.value(0), "Iceberg");
1674 }
1675
1676 #[tokio::test]
1677 async fn test_filter_on_arrow_not_startswith() {
1678 let mut fixture = TableTestFixture::new();
1679 fixture.setup_manifest_files().await;
1680
1681 let mut builder = fixture.table.scan();
1683 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1684 builder = builder
1685 .with_filter(predicate)
1686 .with_row_selection_enabled(true);
1687 let table_scan = builder.build().unwrap();
1688
1689 let batch_stream = table_scan.to_arrow().await.unwrap();
1690
1691 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1692
1693 assert_eq!(batches[0].num_rows(), 512);
1694
1695 let col = batches[0].column_by_name("a").unwrap();
1696 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1697 assert_eq!(string_arr.value(0), "Apache");
1698 }
1699
1700 #[tokio::test]
1701 async fn test_filter_on_arrow_in() {
1702 let mut fixture = TableTestFixture::new();
1703 fixture.setup_manifest_files().await;
1704
1705 let mut builder = fixture.table.scan();
1707 let predicate =
1708 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1709 builder = builder
1710 .with_filter(predicate)
1711 .with_row_selection_enabled(true);
1712 let table_scan = builder.build().unwrap();
1713
1714 let batch_stream = table_scan.to_arrow().await.unwrap();
1715
1716 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1717
1718 assert_eq!(batches[0].num_rows(), 512);
1719
1720 let col = batches[0].column_by_name("a").unwrap();
1721 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1722 assert_eq!(string_arr.value(0), "Iceberg");
1723 }
1724
1725 #[tokio::test]
1726 async fn test_filter_on_arrow_not_in() {
1727 let mut fixture = TableTestFixture::new();
1728 fixture.setup_manifest_files().await;
1729
1730 let mut builder = fixture.table.scan();
1732 let predicate =
1733 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1734 builder = builder
1735 .with_filter(predicate)
1736 .with_row_selection_enabled(true);
1737 let table_scan = builder.build().unwrap();
1738
1739 let batch_stream = table_scan.to_arrow().await.unwrap();
1740
1741 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1742
1743 assert_eq!(batches[0].num_rows(), 512);
1744
1745 let col = batches[0].column_by_name("a").unwrap();
1746 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1747 assert_eq!(string_arr.value(0), "Apache");
1748 }
1749
1750 #[test]
1751 fn test_file_scan_task_serialize_deserialize() {
1752 let test_fn = |task: FileScanTask| {
1753 let serialized = serde_json::to_string(&task).unwrap();
1754 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1755
1756 assert_eq!(task.data_file_path, deserialized.data_file_path);
1757 assert_eq!(task.start, deserialized.start);
1758 assert_eq!(task.length, deserialized.length);
1759 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1760 assert_eq!(task.predicate, deserialized.predicate);
1761 assert_eq!(task.schema, deserialized.schema);
1762 };
1763
1764 let schema = Arc::new(
1766 Schema::builder()
1767 .with_fields(vec![Arc::new(NestedField::required(
1768 1,
1769 "x",
1770 Type::Primitive(PrimitiveType::Binary),
1771 ))])
1772 .build()
1773 .unwrap(),
1774 );
1775 let task = FileScanTask {
1776 data_file_path: "data_file_path".to_string(),
1777 file_size_in_bytes: 0,
1778 start: 0,
1779 length: 100,
1780 project_field_ids: vec![1, 2, 3],
1781 predicate: None,
1782 schema: schema.clone(),
1783 record_count: Some(100),
1784 data_file_format: DataFileFormat::Parquet,
1785 deletes: vec![],
1786 partition: None,
1787 partition_spec: None,
1788 name_mapping: None,
1789 case_sensitive: false,
1790 };
1791 test_fn(task);
1792
1793 let task = FileScanTask {
1795 data_file_path: "data_file_path".to_string(),
1796 file_size_in_bytes: 0,
1797 start: 0,
1798 length: 100,
1799 project_field_ids: vec![1, 2, 3],
1800 predicate: Some(BoundPredicate::AlwaysTrue),
1801 schema,
1802 record_count: None,
1803 data_file_format: DataFileFormat::Avro,
1804 deletes: vec![],
1805 partition: None,
1806 partition_spec: None,
1807 name_mapping: None,
1808 case_sensitive: false,
1809 };
1810 test_fn(task);
1811 }
1812
1813 #[tokio::test]
1814 async fn test_select_with_file_column() {
1815 use arrow_array::cast::AsArray;
1816
1817 let mut fixture = TableTestFixture::new();
1818 fixture.setup_manifest_files().await;
1819
1820 let table_scan = fixture
1822 .table
1823 .scan()
1824 .select(["x", RESERVED_COL_NAME_FILE])
1825 .with_row_selection_enabled(true)
1826 .build()
1827 .unwrap();
1828
1829 let batch_stream = table_scan.to_arrow().await.unwrap();
1830 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1831
1832 assert_eq!(batches[0].num_columns(), 2);
1834
1835 let x_col = batches[0].column_by_name("x").unwrap();
1837 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1838 assert_eq!(x_arr.value(0), 1);
1839
1840 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1842 assert!(
1843 file_col.is_some(),
1844 "_file column should be present in the batch"
1845 );
1846
1847 let file_col = file_col.unwrap();
1849 assert!(
1850 matches!(
1851 file_col.data_type(),
1852 arrow_schema::DataType::RunEndEncoded(_, _)
1853 ),
1854 "_file column should use RunEndEncoded type"
1855 );
1856
1857 let run_array = file_col
1859 .as_any()
1860 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1861 .expect("_file column should be a RunArray");
1862
1863 let values = run_array.values();
1864 let string_values = values.as_string::<i32>();
1865 assert_eq!(string_values.len(), 1, "Should have a single file path");
1866
1867 let file_path = string_values.value(0);
1868 assert!(
1869 file_path.ends_with(".parquet"),
1870 "File path should end with .parquet, got: {file_path}"
1871 );
1872 }
1873
1874 #[tokio::test]
1875 async fn test_select_file_column_position() {
1876 let mut fixture = TableTestFixture::new();
1877 fixture.setup_manifest_files().await;
1878
1879 let table_scan = fixture
1881 .table
1882 .scan()
1883 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1884 .with_row_selection_enabled(true)
1885 .build()
1886 .unwrap();
1887
1888 let batch_stream = table_scan.to_arrow().await.unwrap();
1889 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1890
1891 assert_eq!(batches[0].num_columns(), 3);
1892
1893 let schema = batches[0].schema();
1895 assert_eq!(schema.field(0).name(), "x");
1896 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1897 assert_eq!(schema.field(2).name(), "z");
1898
1899 assert!(batches[0].column_by_name("x").is_some());
1901 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1902 assert!(batches[0].column_by_name("z").is_some());
1903 }
1904
1905 #[tokio::test]
1906 async fn test_select_file_column_only() {
1907 let mut fixture = TableTestFixture::new();
1908 fixture.setup_manifest_files().await;
1909
1910 let table_scan = fixture
1912 .table
1913 .scan()
1914 .select([RESERVED_COL_NAME_FILE])
1915 .with_row_selection_enabled(true)
1916 .build()
1917 .unwrap();
1918
1919 let batch_stream = table_scan.to_arrow().await.unwrap();
1920 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1921
1922 assert_eq!(batches[0].num_columns(), 1);
1924
1925 let schema = batches[0].schema();
1927 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1928
1929 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1933 assert_eq!(total_rows, 2048);
1934 }
1935
1936 #[tokio::test]
1937 async fn test_file_column_with_multiple_files() {
1938 use std::collections::HashSet;
1939
1940 let mut fixture = TableTestFixture::new();
1941 fixture.setup_manifest_files().await;
1942
1943 let table_scan = fixture
1945 .table
1946 .scan()
1947 .select(["x", RESERVED_COL_NAME_FILE])
1948 .with_row_selection_enabled(true)
1949 .build()
1950 .unwrap();
1951
1952 let batch_stream = table_scan.to_arrow().await.unwrap();
1953 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1954
1955 let mut file_paths = HashSet::new();
1957 for batch in &batches {
1958 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1959 let run_array = file_col
1960 .as_any()
1961 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1962 .expect("_file column should be a RunArray");
1963
1964 let values = run_array.values();
1965 let string_values = values.as_string::<i32>();
1966 for i in 0..string_values.len() {
1967 file_paths.insert(string_values.value(i).to_string());
1968 }
1969 }
1970
1971 assert!(!file_paths.is_empty(), "Should have at least one file path");
1973
1974 for path in &file_paths {
1976 assert!(
1977 path.ends_with(".parquet"),
1978 "All file paths should end with .parquet, got: {path}"
1979 );
1980 }
1981 }
1982
1983 #[tokio::test]
1984 async fn test_file_column_at_start() {
1985 let mut fixture = TableTestFixture::new();
1986 fixture.setup_manifest_files().await;
1987
1988 let table_scan = fixture
1990 .table
1991 .scan()
1992 .select([RESERVED_COL_NAME_FILE, "x", "y"])
1993 .with_row_selection_enabled(true)
1994 .build()
1995 .unwrap();
1996
1997 let batch_stream = table_scan.to_arrow().await.unwrap();
1998 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1999
2000 assert_eq!(batches[0].num_columns(), 3);
2001
2002 let schema = batches[0].schema();
2004 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2005 assert_eq!(schema.field(1).name(), "x");
2006 assert_eq!(schema.field(2).name(), "y");
2007 }
2008
2009 #[tokio::test]
2010 async fn test_file_column_at_end() {
2011 let mut fixture = TableTestFixture::new();
2012 fixture.setup_manifest_files().await;
2013
2014 let table_scan = fixture
2016 .table
2017 .scan()
2018 .select(["x", "y", RESERVED_COL_NAME_FILE])
2019 .with_row_selection_enabled(true)
2020 .build()
2021 .unwrap();
2022
2023 let batch_stream = table_scan.to_arrow().await.unwrap();
2024 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2025
2026 assert_eq!(batches[0].num_columns(), 3);
2027
2028 let schema = batches[0].schema();
2030 assert_eq!(schema.field(0).name(), "x");
2031 assert_eq!(schema.field(1).name(), "y");
2032 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2033 }
2034
2035 #[tokio::test]
2036 async fn test_select_with_repeated_column_names() {
2037 let mut fixture = TableTestFixture::new();
2038 fixture.setup_manifest_files().await;
2039
2040 let table_scan = fixture
2043 .table
2044 .scan()
2045 .select([
2046 "x",
2047 RESERVED_COL_NAME_FILE,
2048 "x", "y",
2050 RESERVED_COL_NAME_FILE, "y", ])
2053 .with_row_selection_enabled(true)
2054 .build()
2055 .unwrap();
2056
2057 let batch_stream = table_scan.to_arrow().await.unwrap();
2058 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2059
2060 assert_eq!(
2062 batches[0].num_columns(),
2063 6,
2064 "Should have exactly 6 columns with duplicates"
2065 );
2066
2067 let schema = batches[0].schema();
2068
2069 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2071 assert_eq!(
2072 schema.field(1).name(),
2073 RESERVED_COL_NAME_FILE,
2074 "Column 1 should be _file"
2075 );
2076 assert_eq!(
2077 schema.field(2).name(),
2078 "x",
2079 "Column 2 should be x (duplicate)"
2080 );
2081 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2082 assert_eq!(
2083 schema.field(4).name(),
2084 RESERVED_COL_NAME_FILE,
2085 "Column 4 should be _file (duplicate)"
2086 );
2087 assert_eq!(
2088 schema.field(5).name(),
2089 "y",
2090 "Column 5 should be y (duplicate)"
2091 );
2092
2093 assert!(
2095 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2096 "Column x should be Int64"
2097 );
2098 assert!(
2099 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2100 "Column x (duplicate) should be Int64"
2101 );
2102 assert!(
2103 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2104 "Column y should be Int64"
2105 );
2106 assert!(
2107 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2108 "Column y (duplicate) should be Int64"
2109 );
2110 assert!(
2111 matches!(
2112 schema.field(1).data_type(),
2113 arrow_schema::DataType::RunEndEncoded(_, _)
2114 ),
2115 "_file column should use RunEndEncoded type"
2116 );
2117 assert!(
2118 matches!(
2119 schema.field(4).data_type(),
2120 arrow_schema::DataType::RunEndEncoded(_, _)
2121 ),
2122 "_file column (duplicate) should use RunEndEncoded type"
2123 );
2124 }
2125
2126 #[tokio::test]
2127 async fn test_scan_deadlock() {
2128 let mut fixture = TableTestFixture::new();
2129 fixture.setup_deadlock_manifests().await;
2130
2131 let table_scan = fixture
2138 .table
2139 .scan()
2140 .with_concurrency_limit(1)
2141 .build()
2142 .unwrap();
2143
2144 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2147 table_scan
2148 .plan_files()
2149 .await
2150 .unwrap()
2151 .try_collect::<Vec<_>>()
2152 .await
2153 })
2154 .await;
2155
2156 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2158 }
2159}