1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50pub struct TableScanBuilder<'a> {
52 table: &'a Table,
53 column_names: Option<Vec<String>>,
55 snapshot_id: Option<i64>,
56 batch_size: Option<usize>,
57 case_sensitive: bool,
58 filter: Option<Predicate>,
59 concurrency_limit_data_files: usize,
60 concurrency_limit_manifest_entries: usize,
61 concurrency_limit_manifest_files: usize,
62 row_group_filtering_enabled: bool,
63 row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67 pub(crate) fn new(table: &'a Table) -> Self {
68 let num_cpus = available_parallelism().get();
69
70 Self {
71 table,
72 column_names: None,
73 snapshot_id: None,
74 batch_size: None,
75 case_sensitive: true,
76 filter: None,
77 concurrency_limit_data_files: num_cpus,
78 concurrency_limit_manifest_entries: num_cpus,
79 concurrency_limit_manifest_files: num_cpus,
80 row_group_filtering_enabled: true,
81 row_selection_enabled: false,
82 }
83 }
84
85 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88 self.batch_size = batch_size;
89 self
90 }
91
92 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94 self.case_sensitive = case_sensitive;
95 self
96 }
97
98 pub fn with_filter(mut self, predicate: Predicate) -> Self {
100 self.filter = Some(predicate.rewrite_not());
103 self
104 }
105
106 pub fn select_all(mut self) -> Self {
108 self.column_names = None;
109 self
110 }
111
112 pub fn select_empty(mut self) -> Self {
114 self.column_names = Some(vec![]);
115 self
116 }
117
118 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120 self.column_names = Some(
121 column_names
122 .into_iter()
123 .map(|item| item.to_string())
124 .collect(),
125 );
126 self
127 }
128
129 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131 self.snapshot_id = Some(snapshot_id);
132 self
133 }
134
135 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138 self.concurrency_limit_manifest_files = limit;
139 self.concurrency_limit_manifest_entries = limit;
140 self.concurrency_limit_data_files = limit;
141 self
142 }
143
144 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146 self.concurrency_limit_data_files = limit;
147 self
148 }
149
150 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152 self.concurrency_limit_manifest_entries = limit;
153 self
154 }
155
156 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165 self.row_group_filtering_enabled = row_group_filtering_enabled;
166 self
167 }
168
169 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184 self.row_selection_enabled = row_selection_enabled;
185 self
186 }
187
188 pub fn build(self) -> Result<TableScan> {
190 let snapshot = match self.snapshot_id {
191 Some(snapshot_id) => self
192 .table
193 .metadata()
194 .snapshot_by_id(snapshot_id)
195 .ok_or_else(|| {
196 Error::new(
197 ErrorKind::DataInvalid,
198 format!("Snapshot with id {snapshot_id} not found"),
199 )
200 })?
201 .clone(),
202 None => {
203 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204 return Ok(TableScan {
205 batch_size: self.batch_size,
206 column_names: self.column_names,
207 file_io: self.table.file_io().clone(),
208 plan_context: None,
209 concurrency_limit_data_files: self.concurrency_limit_data_files,
210 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212 row_group_filtering_enabled: self.row_group_filtering_enabled,
213 row_selection_enabled: self.row_selection_enabled,
214 runtime: self.table.runtime().clone(),
215 });
216 };
217 current_snapshot_id.clone()
218 }
219 };
220
221 let schema = snapshot.schema(self.table.metadata())?;
222
223 if let Some(column_names) = self.column_names.as_ref() {
225 for column_name in column_names {
226 if is_metadata_column_name(column_name) {
228 continue;
229 }
230 if schema.field_by_name(column_name).is_none() {
231 return Err(Error::new(
232 ErrorKind::DataInvalid,
233 format!("Column {column_name} not found in table. Schema: {schema}"),
234 ));
235 }
236 }
237 }
238
239 let mut field_ids = vec![];
240 let column_names = self.column_names.clone().unwrap_or_else(|| {
241 schema
242 .as_struct()
243 .fields()
244 .iter()
245 .map(|f| f.name.clone())
246 .collect()
247 });
248
249 for column_name in column_names.iter() {
250 if is_metadata_column_name(column_name) {
252 field_ids.push(get_metadata_field_id(column_name)?);
253 continue;
254 }
255
256 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257 Error::new(
258 ErrorKind::DataInvalid,
259 format!("Column {column_name} not found in table. Schema: {schema}"),
260 )
261 })?;
262
263 schema
264 .as_struct()
265 .field_by_id(field_id)
266 .ok_or_else(|| {
267 Error::new(
268 ErrorKind::FeatureUnsupported,
269 format!(
270 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271 ),
272 )
273 })?;
274
275 field_ids.push(field_id);
276 }
277
278 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279 Some(predicates.bind(schema.clone(), true)?)
280 } else {
281 None
282 };
283
284 let plan_context = PlanContext {
285 snapshot,
286 table_metadata: self.table.metadata_ref(),
287 snapshot_schema: schema,
288 case_sensitive: self.case_sensitive,
289 predicate: self.filter.map(Arc::new),
290 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
291 object_cache: self.table.object_cache(),
292 field_ids: Arc::new(field_ids),
293 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
294 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
295 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
296 };
297
298 Ok(TableScan {
299 batch_size: self.batch_size,
300 column_names: self.column_names,
301 file_io: self.table.file_io().clone(),
302 plan_context: Some(plan_context),
303 concurrency_limit_data_files: self.concurrency_limit_data_files,
304 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
305 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
306 row_group_filtering_enabled: self.row_group_filtering_enabled,
307 row_selection_enabled: self.row_selection_enabled,
308 runtime: self.table.runtime().clone(),
309 })
310 }
311}
312
313#[derive(Debug)]
315pub struct TableScan {
316 plan_context: Option<PlanContext>,
320 batch_size: Option<usize>,
321 file_io: FileIO,
322 column_names: Option<Vec<String>>,
323 concurrency_limit_manifest_files: usize,
326
327 concurrency_limit_manifest_entries: usize,
330
331 concurrency_limit_data_files: usize,
334
335 row_group_filtering_enabled: bool,
336 row_selection_enabled: bool,
337
338 runtime: Runtime,
339}
340
341impl TableScan {
342 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
344 let Some(plan_context) = self.plan_context.as_ref() else {
345 return Ok(Box::pin(futures::stream::empty()));
346 };
347
348 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
349 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
350
351 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
353 channel(concurrency_limit_manifest_files);
354 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
355 channel(concurrency_limit_manifest_files);
356
357 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
359
360 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
361
362 let manifest_list = plan_context.get_manifest_list().await?;
363
364 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
368 manifest_list,
369 manifest_entry_data_ctx_tx,
370 delete_file_idx.clone(),
371 manifest_entry_delete_ctx_tx,
372 )?;
373
374 let mut channel_for_manifest_error = file_scan_task_tx.clone();
375 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
376 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
377
378 let rt = self.runtime.clone();
379
380 rt.io().spawn(async move {
382 let result = futures::stream::iter(manifest_file_contexts)
383 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
384 ctx.fetch_manifest_and_stream_manifest_entries().await
385 })
386 .await;
387
388 if let Err(error) = result {
389 let _ = channel_for_manifest_error.send(Err(error)).await;
390 }
391 });
392
393 {
395 let rt = rt.clone();
396 let rt_inner = rt.clone();
397 rt.cpu().spawn(async move {
398 let result = manifest_entry_delete_ctx_rx
399 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
400 .try_for_each_concurrent(
401 concurrency_limit_manifest_entries,
402 |(manifest_entry_context, tx)| {
403 let rt_inner = rt_inner.clone();
404 async move {
405 rt_inner
406 .cpu()
407 .spawn(async move {
408 Self::process_delete_manifest_entry(
409 manifest_entry_context,
410 tx,
411 )
412 .await
413 })
414 .await?
415 }
416 },
417 )
418 .await;
419
420 if let Err(error) = result {
421 let _ = channel_for_delete_manifest_entry_error
422 .send(Err(error))
423 .await;
424 }
425 });
426 }
427
428 {
430 let rt_inner = rt.clone();
431 rt.cpu().spawn(async move {
432 let result = manifest_entry_data_ctx_rx
433 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
434 .try_for_each_concurrent(
435 concurrency_limit_manifest_entries,
436 |(manifest_entry_context, tx)| {
437 let rt_inner = rt_inner.clone();
438 async move {
439 rt_inner
440 .cpu()
441 .spawn(async move {
442 Self::process_data_manifest_entry(
443 manifest_entry_context,
444 tx,
445 )
446 .await
447 })
448 .await?
449 }
450 },
451 )
452 .await;
453
454 if let Err(error) = result {
455 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
456 }
457 });
458 }
459
460 Ok(file_scan_task_rx.boxed())
461 }
462
463 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
465 let mut arrow_reader_builder =
466 ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
467 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
468 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
469 .with_row_selection_enabled(self.row_selection_enabled);
470
471 if let Some(batch_size) = self.batch_size {
472 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
473 }
474
475 arrow_reader_builder
476 .build()
477 .read(self.plan_files().await?)
478 .map(|result| result.stream())
479 }
480
481 pub fn column_names(&self) -> Option<&[String]> {
483 self.column_names.as_deref()
484 }
485
486 pub fn snapshot(&self) -> Option<&SnapshotRef> {
488 self.plan_context.as_ref().map(|x| &x.snapshot)
489 }
490
491 async fn process_data_manifest_entry(
492 manifest_entry_context: ManifestEntryContext,
493 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
494 ) -> Result<()> {
495 if !manifest_entry_context.manifest_entry.is_alive() {
497 return Ok(());
498 }
499
500 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
502 return Err(Error::new(
503 ErrorKind::FeatureUnsupported,
504 "Encountered an entry for a delete file in a data file manifest",
505 ));
506 }
507
508 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
509 let BoundPredicates {
510 snapshot_bound_predicate,
511 partition_bound_predicate,
512 } = bound_predicates.as_ref();
513
514 let expression_evaluator_cache =
515 manifest_entry_context.expression_evaluator_cache.as_ref();
516
517 let expression_evaluator = expression_evaluator_cache.get(
518 manifest_entry_context.partition_spec_id,
519 partition_bound_predicate,
520 )?;
521
522 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
525 return Ok(());
526 }
527
528 if !InclusiveMetricsEvaluator::eval(
530 snapshot_bound_predicate,
531 manifest_entry_context.manifest_entry.data_file(),
532 false,
533 )? {
534 return Ok(());
535 }
536 }
537
538 file_scan_task_tx
542 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
543 .await?;
544
545 Ok(())
546 }
547
548 async fn process_delete_manifest_entry(
549 manifest_entry_context: ManifestEntryContext,
550 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
551 ) -> Result<()> {
552 if !manifest_entry_context.manifest_entry.is_alive() {
554 return Ok(());
555 }
556
557 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
559 return Err(Error::new(
560 ErrorKind::FeatureUnsupported,
561 "Encountered an entry for a data file in a delete manifest",
562 ));
563 }
564
565 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
566 let expression_evaluator_cache =
567 manifest_entry_context.expression_evaluator_cache.as_ref();
568
569 let expression_evaluator = expression_evaluator_cache.get(
570 manifest_entry_context.partition_spec_id,
571 &bound_predicates.partition_bound_predicate,
572 )?;
573
574 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
577 return Ok(());
578 }
579 }
580
581 delete_file_ctx_tx
582 .send(DeleteFileContext {
583 manifest_entry: manifest_entry_context.manifest_entry.clone(),
584 partition_spec_id: manifest_entry_context.partition_spec_id,
585 })
586 .await?;
587
588 Ok(())
589 }
590}
591
592pub(crate) struct BoundPredicates {
593 partition_bound_predicate: BoundPredicate,
594 snapshot_bound_predicate: BoundPredicate,
595}
596
597#[cfg(test)]
598pub mod tests {
599 #![allow(missing_docs)]
601
602 use std::collections::HashMap;
603 use std::fs;
604 use std::fs::File;
605 use std::sync::Arc;
606
607 use arrow_array::cast::AsArray;
608 use arrow_array::{
609 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
610 StringArray,
611 };
612 use futures::{TryStreamExt, stream};
613 use minijinja::value::Value;
614 use minijinja::{AutoEscape, Environment, context};
615 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
616 use parquet::basic::Compression;
617 use parquet::file::properties::WriterProperties;
618 use tempfile::TempDir;
619 use uuid::Uuid;
620
621 use crate::TableIdent;
622 use crate::arrow::ArrowReaderBuilder;
623 use crate::expr::{BoundPredicate, Reference};
624 use crate::io::{FileIO, OutputFile};
625 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
626 use crate::scan::FileScanTask;
627 use crate::spec::{
628 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
629 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
630 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
631 };
632 use crate::table::Table;
633 use crate::test_utils::test_runtime;
634
635 fn render_template(template: &str, ctx: Value) -> String {
636 let mut env = Environment::new();
637 env.set_auto_escape_callback(|_| AutoEscape::None);
638 env.render_str(template, ctx).unwrap()
639 }
640
641 pub struct TableTestFixture {
642 pub table_location: String,
643 pub table: Table,
644 }
645
646 impl TableTestFixture {
647 #[allow(clippy::new_without_default)]
648 pub fn new() -> Self {
649 let tmp_dir = TempDir::new().unwrap();
650 let table_location = tmp_dir.path().join("table1");
651 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
652 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
653 let table_metadata1_location = table_location.join("metadata/v1.json");
654
655 let file_io = FileIO::new_with_fs();
656
657 let table_metadata = {
658 let template_json_str = fs::read_to_string(format!(
659 "{}/testdata/example_table_metadata_v2.json",
660 env!("CARGO_MANIFEST_DIR")
661 ))
662 .unwrap();
663 let metadata_json = render_template(&template_json_str, context! {
664 table_location => &table_location,
665 manifest_list_1_location => &manifest_list1_location,
666 manifest_list_2_location => &manifest_list2_location,
667 table_metadata_1_location => &table_metadata1_location,
668 });
669 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670 };
671
672 let table = Table::builder()
673 .metadata(table_metadata)
674 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675 .file_io(file_io.clone())
676 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677 .runtime(test_runtime())
678 .build()
679 .unwrap();
680
681 Self {
682 table_location: table_location.to_str().unwrap().to_string(),
683 table,
684 }
685 }
686
687 #[allow(clippy::new_without_default)]
688 pub fn new_empty() -> Self {
689 let tmp_dir = TempDir::new().unwrap();
690 let table_location = tmp_dir.path().join("table1");
691 let table_metadata1_location = table_location.join("metadata/v1.json");
692
693 let file_io = FileIO::new_with_fs();
694
695 let table_metadata = {
696 let template_json_str = fs::read_to_string(format!(
697 "{}/testdata/example_empty_table_metadata_v2.json",
698 env!("CARGO_MANIFEST_DIR")
699 ))
700 .unwrap();
701 let metadata_json = render_template(&template_json_str, context! {
702 table_location => &table_location,
703 table_metadata_1_location => &table_metadata1_location,
704 });
705 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706 };
707
708 let table = Table::builder()
709 .metadata(table_metadata)
710 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
711 .file_io(file_io.clone())
712 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
713 .runtime(test_runtime())
714 .build()
715 .unwrap();
716
717 Self {
718 table_location: table_location.to_str().unwrap().to_string(),
719 table,
720 }
721 }
722
723 pub fn new_with_deep_history() -> Self {
727 let tmp_dir = TempDir::new().unwrap();
728 let table_location = tmp_dir.path().join("table1");
729 let table_metadata1_location = table_location.join("metadata/v1.json");
730
731 let file_io = FileIO::new_with_fs();
732
733 let table_metadata = {
734 let json_str = fs::read_to_string(format!(
735 "{}/testdata/example_table_metadata_v2_deep_history.json",
736 env!("CARGO_MANIFEST_DIR")
737 ))
738 .unwrap();
739 serde_json::from_str::<TableMetadata>(&json_str).unwrap()
740 };
741
742 let table = Table::builder()
743 .metadata(table_metadata)
744 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
745 .file_io(file_io.clone())
746 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
747 .runtime(test_runtime())
748 .build()
749 .unwrap();
750
751 Self {
752 table_location: table_location.to_str().unwrap().to_string(),
753 table,
754 }
755 }
756
757 pub fn new_unpartitioned() -> Self {
758 let tmp_dir = TempDir::new().unwrap();
759 let table_location = tmp_dir.path().join("table1");
760 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
761 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
762 let table_metadata1_location = table_location.join("metadata/v1.json");
763
764 let file_io = FileIO::new_with_fs();
765
766 let mut table_metadata = {
767 let template_json_str = fs::read_to_string(format!(
768 "{}/testdata/example_table_metadata_v2.json",
769 env!("CARGO_MANIFEST_DIR")
770 ))
771 .unwrap();
772 let metadata_json = render_template(&template_json_str, context! {
773 table_location => &table_location,
774 manifest_list_1_location => &manifest_list1_location,
775 manifest_list_2_location => &manifest_list2_location,
776 table_metadata_1_location => &table_metadata1_location,
777 });
778 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
779 };
780
781 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
782 table_metadata.partition_specs.clear();
783 table_metadata.default_partition_type = StructType::new(vec![]);
784 table_metadata
785 .partition_specs
786 .insert(0, table_metadata.default_spec.clone());
787
788 let table = Table::builder()
789 .metadata(table_metadata)
790 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
791 .file_io(file_io.clone())
792 .metadata_location(table_metadata1_location.to_str().unwrap())
793 .runtime(test_runtime())
794 .build()
795 .unwrap();
796
797 Self {
798 table_location: table_location.to_str().unwrap().to_string(),
799 table,
800 }
801 }
802
803 fn next_manifest_file(&self) -> OutputFile {
804 self.table
805 .file_io()
806 .new_output(format!(
807 "{}/metadata/manifest_{}.avro",
808 self.table_location,
809 Uuid::new_v4()
810 ))
811 .unwrap()
812 }
813
814 pub async fn setup_manifest_files(&mut self) {
815 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
816 let parent_snapshot = current_snapshot
817 .parent_snapshot(self.table.metadata())
818 .unwrap();
819 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
820 let current_partition_spec = self.table.metadata().default_partition_spec();
821
822 let parquet_file_size = self.write_parquet_data_files();
824
825 let mut writer = ManifestWriterBuilder::new(
826 self.next_manifest_file(),
827 Some(current_snapshot.snapshot_id()),
828 None,
829 current_schema.clone(),
830 current_partition_spec.as_ref().clone(),
831 )
832 .build_v2_data();
833 writer
834 .add_entry(
835 ManifestEntry::builder()
836 .status(ManifestStatus::Added)
837 .data_file(
838 DataFileBuilder::default()
839 .partition_spec_id(0)
840 .content(DataContentType::Data)
841 .file_path(format!("{}/1.parquet", &self.table_location))
842 .file_format(DataFileFormat::Parquet)
843 .file_size_in_bytes(parquet_file_size)
844 .record_count(1)
845 .partition(Struct::from_iter([Some(Literal::long(100))]))
846 .key_metadata(None)
847 .build()
848 .unwrap(),
849 )
850 .build(),
851 )
852 .unwrap();
853 writer
854 .add_delete_entry(
855 ManifestEntry::builder()
856 .status(ManifestStatus::Deleted)
857 .snapshot_id(parent_snapshot.snapshot_id())
858 .sequence_number(parent_snapshot.sequence_number())
859 .file_sequence_number(parent_snapshot.sequence_number())
860 .data_file(
861 DataFileBuilder::default()
862 .partition_spec_id(0)
863 .content(DataContentType::Data)
864 .file_path(format!("{}/2.parquet", &self.table_location))
865 .file_format(DataFileFormat::Parquet)
866 .file_size_in_bytes(parquet_file_size)
867 .record_count(1)
868 .partition(Struct::from_iter([Some(Literal::long(200))]))
869 .build()
870 .unwrap(),
871 )
872 .build(),
873 )
874 .unwrap();
875 writer
876 .add_existing_entry(
877 ManifestEntry::builder()
878 .status(ManifestStatus::Existing)
879 .snapshot_id(parent_snapshot.snapshot_id())
880 .sequence_number(parent_snapshot.sequence_number())
881 .file_sequence_number(parent_snapshot.sequence_number())
882 .data_file(
883 DataFileBuilder::default()
884 .partition_spec_id(0)
885 .content(DataContentType::Data)
886 .file_path(format!("{}/3.parquet", &self.table_location))
887 .file_format(DataFileFormat::Parquet)
888 .file_size_in_bytes(parquet_file_size)
889 .record_count(1)
890 .partition(Struct::from_iter([Some(Literal::long(300))]))
891 .build()
892 .unwrap(),
893 )
894 .build(),
895 )
896 .unwrap();
897 let data_file_manifest = writer.write_manifest_file().await.unwrap();
898
899 let mut manifest_list_write = ManifestListWriter::v2(
901 self.table
902 .file_io()
903 .new_output(current_snapshot.manifest_list())
904 .unwrap(),
905 current_snapshot.snapshot_id(),
906 current_snapshot.parent_snapshot_id(),
907 current_snapshot.sequence_number(),
908 );
909 manifest_list_write
910 .add_manifests(vec![data_file_manifest].into_iter())
911 .unwrap();
912 manifest_list_write.close().await.unwrap();
913 }
914
915 fn write_parquet_data_files(&self) -> u64 {
918 std::fs::create_dir_all(&self.table_location).unwrap();
919
920 let schema = {
921 let fields = vec![
922 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
923 .with_metadata(HashMap::from([(
924 PARQUET_FIELD_ID_META_KEY.to_string(),
925 "1".to_string(),
926 )])),
927 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
928 .with_metadata(HashMap::from([(
929 PARQUET_FIELD_ID_META_KEY.to_string(),
930 "2".to_string(),
931 )])),
932 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
933 .with_metadata(HashMap::from([(
934 PARQUET_FIELD_ID_META_KEY.to_string(),
935 "3".to_string(),
936 )])),
937 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
938 .with_metadata(HashMap::from([(
939 PARQUET_FIELD_ID_META_KEY.to_string(),
940 "4".to_string(),
941 )])),
942 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
943 .with_metadata(HashMap::from([(
944 PARQUET_FIELD_ID_META_KEY.to_string(),
945 "5".to_string(),
946 )])),
947 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
948 .with_metadata(HashMap::from([(
949 PARQUET_FIELD_ID_META_KEY.to_string(),
950 "6".to_string(),
951 )])),
952 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
953 .with_metadata(HashMap::from([(
954 PARQUET_FIELD_ID_META_KEY.to_string(),
955 "7".to_string(),
956 )])),
957 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
958 .with_metadata(HashMap::from([(
959 PARQUET_FIELD_ID_META_KEY.to_string(),
960 "8".to_string(),
961 )])),
962 ];
963 Arc::new(arrow_schema::Schema::new(fields))
964 };
965 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
967
968 let mut values = vec![2; 512];
969 values.append(vec![3; 200].as_mut());
970 values.append(vec![4; 300].as_mut());
971 values.append(vec![5; 12].as_mut());
972
973 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
975
976 let mut values = vec![3; 512];
977 values.append(vec![4; 512].as_mut());
978
979 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
981
982 let mut values = vec!["Apache"; 512];
984 values.append(vec!["Iceberg"; 512].as_mut());
985 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
986
987 let mut values = vec![100.0f64; 512];
989 values.append(vec![150.0f64; 12].as_mut());
990 values.append(vec![200.0f64; 500].as_mut());
991 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
992
993 let mut values = vec![100i32; 512];
995 values.append(vec![150i32; 12].as_mut());
996 values.append(vec![200i32; 500].as_mut());
997 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
998
999 let mut values = vec![100i64; 512];
1001 values.append(vec![150i64; 12].as_mut());
1002 values.append(vec![200i64; 500].as_mut());
1003 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1004
1005 let mut values = vec![false; 512];
1007 values.append(vec![true; 512].as_mut());
1008 let values: BooleanArray = values.into();
1009 let col8 = Arc::new(values) as ArrayRef;
1010
1011 let to_write = RecordBatch::try_new(schema.clone(), vec![
1012 col1, col2, col3, col4, col5, col6, col7, col8,
1013 ])
1014 .unwrap();
1015
1016 let props = WriterProperties::builder()
1018 .set_compression(Compression::SNAPPY)
1019 .build();
1020
1021 for n in 1..=3 {
1022 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1023 let mut writer =
1024 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1025
1026 writer.write(&to_write).expect("Writing batch");
1027
1028 writer.close().unwrap();
1030 }
1031
1032 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1033 .unwrap()
1034 .len()
1035 }
1036
1037 pub async fn setup_unpartitioned_manifest_files(&mut self) {
1038 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1039 let parent_snapshot = current_snapshot
1040 .parent_snapshot(self.table.metadata())
1041 .unwrap();
1042 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1043 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1044
1045 let parquet_file_size = self.write_parquet_data_files();
1047
1048 let mut writer = ManifestWriterBuilder::new(
1050 self.next_manifest_file(),
1051 Some(current_snapshot.snapshot_id()),
1052 None,
1053 current_schema.clone(),
1054 current_partition_spec.as_ref().clone(),
1055 )
1056 .build_v2_data();
1057
1058 let empty_partition = Struct::empty();
1060
1061 writer
1062 .add_entry(
1063 ManifestEntry::builder()
1064 .status(ManifestStatus::Added)
1065 .data_file(
1066 DataFileBuilder::default()
1067 .partition_spec_id(0)
1068 .content(DataContentType::Data)
1069 .file_path(format!("{}/1.parquet", &self.table_location))
1070 .file_format(DataFileFormat::Parquet)
1071 .file_size_in_bytes(parquet_file_size)
1072 .record_count(1)
1073 .partition(empty_partition.clone())
1074 .key_metadata(None)
1075 .build()
1076 .unwrap(),
1077 )
1078 .build(),
1079 )
1080 .unwrap();
1081
1082 writer
1083 .add_delete_entry(
1084 ManifestEntry::builder()
1085 .status(ManifestStatus::Deleted)
1086 .snapshot_id(parent_snapshot.snapshot_id())
1087 .sequence_number(parent_snapshot.sequence_number())
1088 .file_sequence_number(parent_snapshot.sequence_number())
1089 .data_file(
1090 DataFileBuilder::default()
1091 .partition_spec_id(0)
1092 .content(DataContentType::Data)
1093 .file_path(format!("{}/2.parquet", &self.table_location))
1094 .file_format(DataFileFormat::Parquet)
1095 .file_size_in_bytes(parquet_file_size)
1096 .record_count(1)
1097 .partition(empty_partition.clone())
1098 .build()
1099 .unwrap(),
1100 )
1101 .build(),
1102 )
1103 .unwrap();
1104
1105 writer
1106 .add_existing_entry(
1107 ManifestEntry::builder()
1108 .status(ManifestStatus::Existing)
1109 .snapshot_id(parent_snapshot.snapshot_id())
1110 .sequence_number(parent_snapshot.sequence_number())
1111 .file_sequence_number(parent_snapshot.sequence_number())
1112 .data_file(
1113 DataFileBuilder::default()
1114 .partition_spec_id(0)
1115 .content(DataContentType::Data)
1116 .file_path(format!("{}/3.parquet", &self.table_location))
1117 .file_format(DataFileFormat::Parquet)
1118 .file_size_in_bytes(parquet_file_size)
1119 .record_count(1)
1120 .partition(empty_partition.clone())
1121 .build()
1122 .unwrap(),
1123 )
1124 .build(),
1125 )
1126 .unwrap();
1127
1128 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1129
1130 let mut manifest_list_write = ManifestListWriter::v2(
1132 self.table
1133 .file_io()
1134 .new_output(current_snapshot.manifest_list())
1135 .unwrap(),
1136 current_snapshot.snapshot_id(),
1137 current_snapshot.parent_snapshot_id(),
1138 current_snapshot.sequence_number(),
1139 );
1140 manifest_list_write
1141 .add_manifests(vec![data_file_manifest].into_iter())
1142 .unwrap();
1143 manifest_list_write.close().await.unwrap();
1144 }
1145
1146 pub async fn setup_deadlock_manifests(&mut self) {
1147 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1148 let _parent_snapshot = current_snapshot
1149 .parent_snapshot(self.table.metadata())
1150 .unwrap();
1151 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1152 let current_partition_spec = self.table.metadata().default_partition_spec();
1153
1154 let mut writer = ManifestWriterBuilder::new(
1156 self.next_manifest_file(),
1157 Some(current_snapshot.snapshot_id()),
1158 None,
1159 current_schema.clone(),
1160 current_partition_spec.as_ref().clone(),
1161 )
1162 .build_v2_data();
1163
1164 for i in 0..10 {
1166 writer
1167 .add_entry(
1168 ManifestEntry::builder()
1169 .status(ManifestStatus::Added)
1170 .data_file(
1171 DataFileBuilder::default()
1172 .partition_spec_id(0)
1173 .content(DataContentType::Data)
1174 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1175 .file_format(DataFileFormat::Parquet)
1176 .file_size_in_bytes(100)
1177 .record_count(1)
1178 .partition(Struct::from_iter([Some(Literal::long(100))]))
1179 .key_metadata(None)
1180 .build()
1181 .unwrap(),
1182 )
1183 .build(),
1184 )
1185 .unwrap();
1186 }
1187 let data_manifest = writer.write_manifest_file().await.unwrap();
1188
1189 let mut writer = ManifestWriterBuilder::new(
1191 self.next_manifest_file(),
1192 Some(current_snapshot.snapshot_id()),
1193 None,
1194 current_schema.clone(),
1195 current_partition_spec.as_ref().clone(),
1196 )
1197 .build_v2_deletes();
1198
1199 writer
1200 .add_entry(
1201 ManifestEntry::builder()
1202 .status(ManifestStatus::Added)
1203 .data_file(
1204 DataFileBuilder::default()
1205 .partition_spec_id(0)
1206 .content(DataContentType::PositionDeletes)
1207 .file_path(format!("{}/del.parquet", &self.table_location))
1208 .file_format(DataFileFormat::Parquet)
1209 .file_size_in_bytes(100)
1210 .record_count(1)
1211 .partition(Struct::from_iter([Some(Literal::long(100))]))
1212 .build()
1213 .unwrap(),
1214 )
1215 .build(),
1216 )
1217 .unwrap();
1218 let delete_manifest = writer.write_manifest_file().await.unwrap();
1219
1220 let mut manifest_list_write = ManifestListWriter::v2(
1223 self.table
1224 .file_io()
1225 .new_output(current_snapshot.manifest_list())
1226 .unwrap(),
1227 current_snapshot.snapshot_id(),
1228 current_snapshot.parent_snapshot_id(),
1229 current_snapshot.sequence_number(),
1230 );
1231 manifest_list_write
1232 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1233 .unwrap();
1234 manifest_list_write.close().await.unwrap();
1235 }
1236 }
1237
1238 #[tokio::test]
1239 async fn test_table_scan_columns() {
1240 let table = TableTestFixture::new().table;
1241
1242 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1243 assert_eq!(
1244 Some(vec!["x".to_string(), "y".to_string()]),
1245 table_scan.column_names
1246 );
1247
1248 let table_scan = table
1249 .scan()
1250 .select(["x", "y"])
1251 .select(["z"])
1252 .build()
1253 .unwrap();
1254 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_select_all() {
1259 let table = TableTestFixture::new().table;
1260
1261 let table_scan = table.scan().select_all().build().unwrap();
1262 assert!(table_scan.column_names.is_none());
1263 }
1264
1265 #[test]
1266 fn test_select_no_exist_column() {
1267 let table = TableTestFixture::new().table;
1268
1269 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1270 assert!(table_scan.is_err());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_table_scan_default_snapshot_id() {
1275 let table = TableTestFixture::new().table;
1276
1277 let table_scan = table.scan().build().unwrap();
1278 assert_eq!(
1279 table.metadata().current_snapshot().unwrap().snapshot_id(),
1280 table_scan.snapshot().unwrap().snapshot_id()
1281 );
1282 }
1283
1284 #[test]
1285 fn test_table_scan_non_exist_snapshot_id() {
1286 let table = TableTestFixture::new().table;
1287
1288 let table_scan = table.scan().snapshot_id(1024).build();
1289 assert!(table_scan.is_err());
1290 }
1291
1292 #[tokio::test]
1293 async fn test_table_scan_with_snapshot_id() {
1294 let table = TableTestFixture::new().table;
1295
1296 let table_scan = table
1297 .scan()
1298 .snapshot_id(3051729675574597004)
1299 .with_row_selection_enabled(true)
1300 .build()
1301 .unwrap();
1302 assert_eq!(
1303 table_scan.snapshot().unwrap().snapshot_id(),
1304 3051729675574597004
1305 );
1306 }
1307
1308 #[tokio::test]
1309 async fn test_plan_files_on_table_without_any_snapshots() {
1310 let table = TableTestFixture::new_empty().table;
1311 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1312 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1313 assert!(batches.is_empty());
1314 }
1315
1316 #[tokio::test]
1317 async fn test_plan_files_no_deletions() {
1318 let mut fixture = TableTestFixture::new();
1319 fixture.setup_manifest_files().await;
1320
1321 let table_scan = fixture
1323 .table
1324 .scan()
1325 .with_row_selection_enabled(true)
1326 .build()
1327 .unwrap();
1328
1329 let mut tasks = table_scan
1330 .plan_files()
1331 .await
1332 .unwrap()
1333 .try_fold(vec![], |mut acc, task| async move {
1334 acc.push(task);
1335 Ok(acc)
1336 })
1337 .await
1338 .unwrap();
1339
1340 assert_eq!(tasks.len(), 2);
1341
1342 tasks.sort_by_key(|t| t.data_file_path.to_string());
1343
1344 assert_eq!(
1346 tasks[0].data_file_path,
1347 format!("{}/1.parquet", &fixture.table_location)
1348 );
1349
1350 assert_eq!(
1352 tasks[1].data_file_path,
1353 format!("{}/3.parquet", &fixture.table_location)
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn test_open_parquet_no_deletions() {
1359 let mut fixture = TableTestFixture::new();
1360 fixture.setup_manifest_files().await;
1361
1362 let table_scan = fixture
1364 .table
1365 .scan()
1366 .with_row_selection_enabled(true)
1367 .build()
1368 .unwrap();
1369
1370 let batch_stream = table_scan.to_arrow().await.unwrap();
1371
1372 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1373
1374 let col = batches[0].column_by_name("x").unwrap();
1375
1376 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1377 assert_eq!(int64_arr.value(0), 1);
1378 }
1379
1380 #[tokio::test]
1381 async fn test_open_parquet_no_deletions_by_separate_reader() {
1382 let mut fixture = TableTestFixture::new();
1383 fixture.setup_manifest_files().await;
1384
1385 let table_scan = fixture
1387 .table
1388 .scan()
1389 .with_row_selection_enabled(true)
1390 .build()
1391 .unwrap();
1392
1393 let mut plan_task: Vec<_> = table_scan
1394 .plan_files()
1395 .await
1396 .unwrap()
1397 .try_collect()
1398 .await
1399 .unwrap();
1400 assert_eq!(plan_task.len(), 2);
1401
1402 let reader = ArrowReaderBuilder::new(
1403 fixture.table.file_io().clone(),
1404 fixture.table.runtime().clone(),
1405 )
1406 .build();
1407 let batch_stream = reader
1408 .clone()
1409 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1410 .unwrap()
1411 .stream();
1412 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1413
1414 let reader = ArrowReaderBuilder::new(
1415 fixture.table.file_io().clone(),
1416 fixture.table.runtime().clone(),
1417 )
1418 .build();
1419 let batch_stream = reader
1420 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1421 .unwrap()
1422 .stream();
1423 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1424
1425 assert_eq!(batch_1, batch_2);
1426 }
1427
1428 #[tokio::test]
1429 async fn test_open_parquet_with_projection() {
1430 let mut fixture = TableTestFixture::new();
1431 fixture.setup_manifest_files().await;
1432
1433 let table_scan = fixture
1435 .table
1436 .scan()
1437 .select(["x", "z"])
1438 .with_row_selection_enabled(true)
1439 .build()
1440 .unwrap();
1441
1442 let batch_stream = table_scan.to_arrow().await.unwrap();
1443
1444 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1445
1446 assert_eq!(batches[0].num_columns(), 2);
1447
1448 let col1 = batches[0].column_by_name("x").unwrap();
1449 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1450 assert_eq!(int64_arr.value(0), 1);
1451
1452 let col2 = batches[0].column_by_name("z").unwrap();
1453 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1454 assert_eq!(int64_arr.value(0), 3);
1455
1456 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1458 let batch_stream = table_scan.to_arrow().await.unwrap();
1459 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461 assert_eq!(batches[0].num_columns(), 0);
1462 assert_eq!(batches[0].num_rows(), 1024);
1463 }
1464
1465 #[tokio::test]
1466 async fn test_filter_on_arrow_lt() {
1467 let mut fixture = TableTestFixture::new();
1468 fixture.setup_manifest_files().await;
1469
1470 let mut builder = fixture.table.scan();
1472 let predicate = Reference::new("y").less_than(Datum::long(3));
1473 builder = builder
1474 .with_filter(predicate)
1475 .with_row_selection_enabled(true);
1476 let table_scan = builder.build().unwrap();
1477
1478 let batch_stream = table_scan.to_arrow().await.unwrap();
1479
1480 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1481
1482 assert_eq!(batches[0].num_rows(), 512);
1483
1484 let col = batches[0].column_by_name("x").unwrap();
1485 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1486 assert_eq!(int64_arr.value(0), 1);
1487
1488 let col = batches[0].column_by_name("y").unwrap();
1489 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1490 assert_eq!(int64_arr.value(0), 2);
1491 }
1492
1493 #[tokio::test]
1494 async fn test_filter_on_arrow_gt_eq() {
1495 let mut fixture = TableTestFixture::new();
1496 fixture.setup_manifest_files().await;
1497
1498 let mut builder = fixture.table.scan();
1500 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1501 builder = builder
1502 .with_filter(predicate)
1503 .with_row_selection_enabled(true);
1504 let table_scan = builder.build().unwrap();
1505
1506 let batch_stream = table_scan.to_arrow().await.unwrap();
1507
1508 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1509
1510 assert_eq!(batches[0].num_rows(), 12);
1511
1512 let col = batches[0].column_by_name("x").unwrap();
1513 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1514 assert_eq!(int64_arr.value(0), 1);
1515
1516 let col = batches[0].column_by_name("y").unwrap();
1517 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1518 assert_eq!(int64_arr.value(0), 5);
1519 }
1520
1521 #[tokio::test]
1522 async fn test_filter_double_eq() {
1523 let mut fixture = TableTestFixture::new();
1524 fixture.setup_manifest_files().await;
1525
1526 let mut builder = fixture.table.scan();
1528 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1529 builder = builder
1530 .with_filter(predicate)
1531 .with_row_selection_enabled(true);
1532 let table_scan = builder.build().unwrap();
1533
1534 let batch_stream = table_scan.to_arrow().await.unwrap();
1535
1536 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1537
1538 assert_eq!(batches.len(), 2);
1539 assert_eq!(batches[0].num_rows(), 12);
1540
1541 let col = batches[0].column_by_name("dbl").unwrap();
1542 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1543 assert_eq!(f64_arr.value(1), 150.0f64);
1544 }
1545
1546 #[tokio::test]
1547 async fn test_filter_int_eq() {
1548 let mut fixture = TableTestFixture::new();
1549 fixture.setup_manifest_files().await;
1550
1551 let mut builder = fixture.table.scan();
1553 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1554 builder = builder
1555 .with_filter(predicate)
1556 .with_row_selection_enabled(true);
1557 let table_scan = builder.build().unwrap();
1558
1559 let batch_stream = table_scan.to_arrow().await.unwrap();
1560
1561 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1562
1563 assert_eq!(batches.len(), 2);
1564 assert_eq!(batches[0].num_rows(), 12);
1565
1566 let col = batches[0].column_by_name("i32").unwrap();
1567 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1568 assert_eq!(i32_arr.value(1), 150i32);
1569 }
1570
1571 #[tokio::test]
1572 async fn test_filter_long_eq() {
1573 let mut fixture = TableTestFixture::new();
1574 fixture.setup_manifest_files().await;
1575
1576 let mut builder = fixture.table.scan();
1578 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1579 builder = builder
1580 .with_filter(predicate)
1581 .with_row_selection_enabled(true);
1582 let table_scan = builder.build().unwrap();
1583
1584 let batch_stream = table_scan.to_arrow().await.unwrap();
1585
1586 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1587
1588 assert_eq!(batches.len(), 2);
1589 assert_eq!(batches[0].num_rows(), 12);
1590
1591 let col = batches[0].column_by_name("i64").unwrap();
1592 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1593 assert_eq!(i64_arr.value(1), 150i64);
1594 }
1595
1596 #[tokio::test]
1597 async fn test_filter_bool_eq() {
1598 let mut fixture = TableTestFixture::new();
1599 fixture.setup_manifest_files().await;
1600
1601 let mut builder = fixture.table.scan();
1603 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1604 builder = builder
1605 .with_filter(predicate)
1606 .with_row_selection_enabled(true);
1607 let table_scan = builder.build().unwrap();
1608
1609 let batch_stream = table_scan.to_arrow().await.unwrap();
1610
1611 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1612
1613 assert_eq!(batches.len(), 2);
1614 assert_eq!(batches[0].num_rows(), 512);
1615
1616 let col = batches[0].column_by_name("bool").unwrap();
1617 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1618 assert!(bool_arr.value(1));
1619 }
1620
1621 #[tokio::test]
1622 async fn test_filter_on_arrow_is_null() {
1623 let mut fixture = TableTestFixture::new();
1624 fixture.setup_manifest_files().await;
1625
1626 let mut builder = fixture.table.scan();
1628 let predicate = Reference::new("y").is_null();
1629 builder = builder
1630 .with_filter(predicate)
1631 .with_row_selection_enabled(true);
1632 let table_scan = builder.build().unwrap();
1633
1634 let batch_stream = table_scan.to_arrow().await.unwrap();
1635
1636 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1637 assert_eq!(batches.len(), 0);
1638 }
1639
1640 #[tokio::test]
1641 async fn test_filter_on_arrow_is_not_null() {
1642 let mut fixture = TableTestFixture::new();
1643 fixture.setup_manifest_files().await;
1644
1645 let mut builder = fixture.table.scan();
1647 let predicate = Reference::new("y").is_not_null();
1648 builder = builder
1649 .with_filter(predicate)
1650 .with_row_selection_enabled(true);
1651 let table_scan = builder.build().unwrap();
1652
1653 let batch_stream = table_scan.to_arrow().await.unwrap();
1654
1655 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1656 assert_eq!(batches[0].num_rows(), 1024);
1657 }
1658
1659 #[tokio::test]
1660 async fn test_filter_on_arrow_lt_and_gt() {
1661 let mut fixture = TableTestFixture::new();
1662 fixture.setup_manifest_files().await;
1663
1664 let mut builder = fixture.table.scan();
1666 let predicate = Reference::new("y")
1667 .less_than(Datum::long(5))
1668 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1669 builder = builder
1670 .with_filter(predicate)
1671 .with_row_selection_enabled(true);
1672 let table_scan = builder.build().unwrap();
1673
1674 let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677 assert_eq!(batches[0].num_rows(), 500);
1678
1679 let col = batches[0].column_by_name("x").unwrap();
1680 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1681 assert_eq!(col, &expected_x);
1682
1683 let col = batches[0].column_by_name("y").unwrap();
1684 let mut values = vec![];
1685 values.append(vec![3; 200].as_mut());
1686 values.append(vec![4; 300].as_mut());
1687 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1688 assert_eq!(col, &expected_y);
1689
1690 let col = batches[0].column_by_name("z").unwrap();
1691 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1692 assert_eq!(col, &expected_z);
1693 }
1694
1695 #[tokio::test]
1696 async fn test_filter_on_arrow_lt_or_gt() {
1697 let mut fixture = TableTestFixture::new();
1698 fixture.setup_manifest_files().await;
1699
1700 let mut builder = fixture.table.scan();
1702 let predicate = Reference::new("y")
1703 .less_than(Datum::long(5))
1704 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1705 builder = builder
1706 .with_filter(predicate)
1707 .with_row_selection_enabled(true);
1708 let table_scan = builder.build().unwrap();
1709
1710 let batch_stream = table_scan.to_arrow().await.unwrap();
1711
1712 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1713 assert_eq!(batches[0].num_rows(), 1024);
1714
1715 let col = batches[0].column_by_name("x").unwrap();
1716 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1717 assert_eq!(col, &expected_x);
1718
1719 let col = batches[0].column_by_name("y").unwrap();
1720 let mut values = vec![2; 512];
1721 values.append(vec![3; 200].as_mut());
1722 values.append(vec![4; 300].as_mut());
1723 values.append(vec![5; 12].as_mut());
1724 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1725 assert_eq!(col, &expected_y);
1726
1727 let col = batches[0].column_by_name("z").unwrap();
1728 let mut values = vec![3; 512];
1729 values.append(vec![4; 512].as_mut());
1730 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1731 assert_eq!(col, &expected_z);
1732 }
1733
1734 #[tokio::test]
1735 async fn test_filter_on_arrow_startswith() {
1736 let mut fixture = TableTestFixture::new();
1737 fixture.setup_manifest_files().await;
1738
1739 let mut builder = fixture.table.scan();
1741 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1742 builder = builder
1743 .with_filter(predicate)
1744 .with_row_selection_enabled(true);
1745 let table_scan = builder.build().unwrap();
1746
1747 let batch_stream = table_scan.to_arrow().await.unwrap();
1748
1749 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1750
1751 assert_eq!(batches[0].num_rows(), 512);
1752
1753 let col = batches[0].column_by_name("a").unwrap();
1754 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1755 assert_eq!(string_arr.value(0), "Iceberg");
1756 }
1757
1758 #[tokio::test]
1759 async fn test_filter_on_arrow_not_startswith() {
1760 let mut fixture = TableTestFixture::new();
1761 fixture.setup_manifest_files().await;
1762
1763 let mut builder = fixture.table.scan();
1765 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1766 builder = builder
1767 .with_filter(predicate)
1768 .with_row_selection_enabled(true);
1769 let table_scan = builder.build().unwrap();
1770
1771 let batch_stream = table_scan.to_arrow().await.unwrap();
1772
1773 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1774
1775 assert_eq!(batches[0].num_rows(), 512);
1776
1777 let col = batches[0].column_by_name("a").unwrap();
1778 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1779 assert_eq!(string_arr.value(0), "Apache");
1780 }
1781
1782 #[tokio::test]
1783 async fn test_filter_on_arrow_in() {
1784 let mut fixture = TableTestFixture::new();
1785 fixture.setup_manifest_files().await;
1786
1787 let mut builder = fixture.table.scan();
1789 let predicate =
1790 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1791 builder = builder
1792 .with_filter(predicate)
1793 .with_row_selection_enabled(true);
1794 let table_scan = builder.build().unwrap();
1795
1796 let batch_stream = table_scan.to_arrow().await.unwrap();
1797
1798 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1799
1800 assert_eq!(batches[0].num_rows(), 512);
1801
1802 let col = batches[0].column_by_name("a").unwrap();
1803 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1804 assert_eq!(string_arr.value(0), "Iceberg");
1805 }
1806
1807 #[tokio::test]
1808 async fn test_filter_on_arrow_not_in() {
1809 let mut fixture = TableTestFixture::new();
1810 fixture.setup_manifest_files().await;
1811
1812 let mut builder = fixture.table.scan();
1814 let predicate =
1815 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1816 builder = builder
1817 .with_filter(predicate)
1818 .with_row_selection_enabled(true);
1819 let table_scan = builder.build().unwrap();
1820
1821 let batch_stream = table_scan.to_arrow().await.unwrap();
1822
1823 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1824
1825 assert_eq!(batches[0].num_rows(), 512);
1826
1827 let col = batches[0].column_by_name("a").unwrap();
1828 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1829 assert_eq!(string_arr.value(0), "Apache");
1830 }
1831
1832 #[test]
1833 fn test_file_scan_task_serialize_deserialize() {
1834 let test_fn = |task: FileScanTask| {
1835 let serialized = serde_json::to_string(&task).unwrap();
1836 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1837
1838 assert_eq!(task.data_file_path, deserialized.data_file_path);
1839 assert_eq!(task.start, deserialized.start);
1840 assert_eq!(task.length, deserialized.length);
1841 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1842 assert_eq!(task.predicate, deserialized.predicate);
1843 assert_eq!(task.schema, deserialized.schema);
1844 };
1845
1846 let schema = Arc::new(
1848 Schema::builder()
1849 .with_fields(vec![Arc::new(NestedField::required(
1850 1,
1851 "x",
1852 Type::Primitive(PrimitiveType::Binary),
1853 ))])
1854 .build()
1855 .unwrap(),
1856 );
1857 let task = FileScanTask {
1858 data_file_path: "data_file_path".to_string(),
1859 file_size_in_bytes: 0,
1860 start: 0,
1861 length: 100,
1862 project_field_ids: vec![1, 2, 3],
1863 predicate: None,
1864 schema: schema.clone(),
1865 record_count: Some(100),
1866 data_file_format: DataFileFormat::Parquet,
1867 deletes: vec![],
1868 partition: None,
1869 partition_spec: None,
1870 name_mapping: None,
1871 case_sensitive: false,
1872 };
1873 test_fn(task);
1874
1875 let task = FileScanTask {
1877 data_file_path: "data_file_path".to_string(),
1878 file_size_in_bytes: 0,
1879 start: 0,
1880 length: 100,
1881 project_field_ids: vec![1, 2, 3],
1882 predicate: Some(BoundPredicate::AlwaysTrue),
1883 schema,
1884 record_count: None,
1885 data_file_format: DataFileFormat::Avro,
1886 deletes: vec![],
1887 partition: None,
1888 partition_spec: None,
1889 name_mapping: None,
1890 case_sensitive: false,
1891 };
1892 test_fn(task);
1893 }
1894
1895 #[tokio::test]
1896 async fn test_select_with_file_column() {
1897 use arrow_array::cast::AsArray;
1898
1899 let mut fixture = TableTestFixture::new();
1900 fixture.setup_manifest_files().await;
1901
1902 let table_scan = fixture
1904 .table
1905 .scan()
1906 .select(["x", RESERVED_COL_NAME_FILE])
1907 .with_row_selection_enabled(true)
1908 .build()
1909 .unwrap();
1910
1911 let batch_stream = table_scan.to_arrow().await.unwrap();
1912 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1913
1914 assert_eq!(batches[0].num_columns(), 2);
1916
1917 let x_col = batches[0].column_by_name("x").unwrap();
1919 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1920 assert_eq!(x_arr.value(0), 1);
1921
1922 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1924 assert!(
1925 file_col.is_some(),
1926 "_file column should be present in the batch"
1927 );
1928
1929 let file_col = file_col.unwrap();
1931 assert!(
1932 matches!(
1933 file_col.data_type(),
1934 arrow_schema::DataType::RunEndEncoded(_, _)
1935 ),
1936 "_file column should use RunEndEncoded type"
1937 );
1938
1939 let run_array = file_col
1941 .as_any()
1942 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1943 .expect("_file column should be a RunArray");
1944
1945 let values = run_array.values();
1946 let string_values = values.as_string::<i32>();
1947 assert_eq!(string_values.len(), 1, "Should have a single file path");
1948
1949 let file_path = string_values.value(0);
1950 assert!(
1951 file_path.ends_with(".parquet"),
1952 "File path should end with .parquet, got: {file_path}"
1953 );
1954 }
1955
1956 #[tokio::test]
1957 async fn test_select_file_column_position() {
1958 let mut fixture = TableTestFixture::new();
1959 fixture.setup_manifest_files().await;
1960
1961 let table_scan = fixture
1963 .table
1964 .scan()
1965 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1966 .with_row_selection_enabled(true)
1967 .build()
1968 .unwrap();
1969
1970 let batch_stream = table_scan.to_arrow().await.unwrap();
1971 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1972
1973 assert_eq!(batches[0].num_columns(), 3);
1974
1975 let schema = batches[0].schema();
1977 assert_eq!(schema.field(0).name(), "x");
1978 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1979 assert_eq!(schema.field(2).name(), "z");
1980
1981 assert!(batches[0].column_by_name("x").is_some());
1983 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1984 assert!(batches[0].column_by_name("z").is_some());
1985 }
1986
1987 #[tokio::test]
1988 async fn test_select_file_column_only() {
1989 let mut fixture = TableTestFixture::new();
1990 fixture.setup_manifest_files().await;
1991
1992 let table_scan = fixture
1994 .table
1995 .scan()
1996 .select([RESERVED_COL_NAME_FILE])
1997 .with_row_selection_enabled(true)
1998 .build()
1999 .unwrap();
2000
2001 let batch_stream = table_scan.to_arrow().await.unwrap();
2002 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2003
2004 assert_eq!(batches[0].num_columns(), 1);
2006
2007 let schema = batches[0].schema();
2009 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2010
2011 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2015 assert_eq!(total_rows, 2048);
2016 }
2017
2018 #[tokio::test]
2019 async fn test_file_column_with_multiple_files() {
2020 use std::collections::HashSet;
2021
2022 let mut fixture = TableTestFixture::new();
2023 fixture.setup_manifest_files().await;
2024
2025 let table_scan = fixture
2027 .table
2028 .scan()
2029 .select(["x", RESERVED_COL_NAME_FILE])
2030 .with_row_selection_enabled(true)
2031 .build()
2032 .unwrap();
2033
2034 let batch_stream = table_scan.to_arrow().await.unwrap();
2035 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2036
2037 let mut file_paths = HashSet::new();
2039 for batch in &batches {
2040 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2041 let run_array = file_col
2042 .as_any()
2043 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2044 .expect("_file column should be a RunArray");
2045
2046 let values = run_array.values();
2047 let string_values = values.as_string::<i32>();
2048 for i in 0..string_values.len() {
2049 file_paths.insert(string_values.value(i).to_string());
2050 }
2051 }
2052
2053 assert!(!file_paths.is_empty(), "Should have at least one file path");
2055
2056 for path in &file_paths {
2058 assert!(
2059 path.ends_with(".parquet"),
2060 "All file paths should end with .parquet, got: {path}"
2061 );
2062 }
2063 }
2064
2065 #[tokio::test]
2066 async fn test_file_column_at_start() {
2067 let mut fixture = TableTestFixture::new();
2068 fixture.setup_manifest_files().await;
2069
2070 let table_scan = fixture
2072 .table
2073 .scan()
2074 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2075 .with_row_selection_enabled(true)
2076 .build()
2077 .unwrap();
2078
2079 let batch_stream = table_scan.to_arrow().await.unwrap();
2080 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2081
2082 assert_eq!(batches[0].num_columns(), 3);
2083
2084 let schema = batches[0].schema();
2086 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2087 assert_eq!(schema.field(1).name(), "x");
2088 assert_eq!(schema.field(2).name(), "y");
2089 }
2090
2091 #[tokio::test]
2092 async fn test_file_column_at_end() {
2093 let mut fixture = TableTestFixture::new();
2094 fixture.setup_manifest_files().await;
2095
2096 let table_scan = fixture
2098 .table
2099 .scan()
2100 .select(["x", "y", RESERVED_COL_NAME_FILE])
2101 .with_row_selection_enabled(true)
2102 .build()
2103 .unwrap();
2104
2105 let batch_stream = table_scan.to_arrow().await.unwrap();
2106 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2107
2108 assert_eq!(batches[0].num_columns(), 3);
2109
2110 let schema = batches[0].schema();
2112 assert_eq!(schema.field(0).name(), "x");
2113 assert_eq!(schema.field(1).name(), "y");
2114 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2115 }
2116
2117 #[tokio::test]
2118 async fn test_select_with_repeated_column_names() {
2119 let mut fixture = TableTestFixture::new();
2120 fixture.setup_manifest_files().await;
2121
2122 let table_scan = fixture
2125 .table
2126 .scan()
2127 .select([
2128 "x",
2129 RESERVED_COL_NAME_FILE,
2130 "x", "y",
2132 RESERVED_COL_NAME_FILE, "y", ])
2135 .with_row_selection_enabled(true)
2136 .build()
2137 .unwrap();
2138
2139 let batch_stream = table_scan.to_arrow().await.unwrap();
2140 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2141
2142 assert_eq!(
2144 batches[0].num_columns(),
2145 6,
2146 "Should have exactly 6 columns with duplicates"
2147 );
2148
2149 let schema = batches[0].schema();
2150
2151 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2153 assert_eq!(
2154 schema.field(1).name(),
2155 RESERVED_COL_NAME_FILE,
2156 "Column 1 should be _file"
2157 );
2158 assert_eq!(
2159 schema.field(2).name(),
2160 "x",
2161 "Column 2 should be x (duplicate)"
2162 );
2163 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2164 assert_eq!(
2165 schema.field(4).name(),
2166 RESERVED_COL_NAME_FILE,
2167 "Column 4 should be _file (duplicate)"
2168 );
2169 assert_eq!(
2170 schema.field(5).name(),
2171 "y",
2172 "Column 5 should be y (duplicate)"
2173 );
2174
2175 assert!(
2177 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2178 "Column x should be Int64"
2179 );
2180 assert!(
2181 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2182 "Column x (duplicate) should be Int64"
2183 );
2184 assert!(
2185 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2186 "Column y should be Int64"
2187 );
2188 assert!(
2189 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2190 "Column y (duplicate) should be Int64"
2191 );
2192 assert!(
2193 matches!(
2194 schema.field(1).data_type(),
2195 arrow_schema::DataType::RunEndEncoded(_, _)
2196 ),
2197 "_file column should use RunEndEncoded type"
2198 );
2199 assert!(
2200 matches!(
2201 schema.field(4).data_type(),
2202 arrow_schema::DataType::RunEndEncoded(_, _)
2203 ),
2204 "_file column (duplicate) should use RunEndEncoded type"
2205 );
2206 }
2207
2208 #[tokio::test]
2209 async fn test_scan_deadlock() {
2210 let mut fixture = TableTestFixture::new();
2211 fixture.setup_deadlock_manifests().await;
2212
2213 let table_scan = fixture
2220 .table
2221 .scan()
2222 .with_concurrency_limit(1)
2223 .build()
2224 .unwrap();
2225
2226 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2229 table_scan
2230 .plan_files()
2231 .await
2232 .unwrap()
2233 .try_collect::<Vec<_>>()
2234 .await
2235 })
2236 .await;
2237
2238 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2240 }
2241}