1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49pub struct TableScanBuilder<'a> {
51 table: &'a Table,
52 column_names: Option<Vec<String>>,
54 snapshot_id: Option<i64>,
55 batch_size: Option<usize>,
56 case_sensitive: bool,
57 filter: Option<Predicate>,
58 concurrency_limit_data_files: usize,
59 concurrency_limit_manifest_entries: usize,
60 concurrency_limit_manifest_files: usize,
61 row_group_filtering_enabled: bool,
62 row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66 pub(crate) fn new(table: &'a Table) -> Self {
67 let num_cpus = available_parallelism().get();
68
69 Self {
70 table,
71 column_names: None,
72 snapshot_id: None,
73 batch_size: None,
74 case_sensitive: true,
75 filter: None,
76 concurrency_limit_data_files: num_cpus,
77 concurrency_limit_manifest_entries: num_cpus,
78 concurrency_limit_manifest_files: num_cpus,
79 row_group_filtering_enabled: true,
80 row_selection_enabled: false,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87 self.batch_size = batch_size;
88 self
89 }
90
91 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93 self.case_sensitive = case_sensitive;
94 self
95 }
96
97 pub fn with_filter(mut self, predicate: Predicate) -> Self {
99 self.filter = Some(predicate.rewrite_not());
102 self
103 }
104
105 pub fn select_all(mut self) -> Self {
107 self.column_names = None;
108 self
109 }
110
111 pub fn select_empty(mut self) -> Self {
113 self.column_names = Some(vec![]);
114 self
115 }
116
117 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119 self.column_names = Some(
120 column_names
121 .into_iter()
122 .map(|item| item.to_string())
123 .collect(),
124 );
125 self
126 }
127
128 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130 self.snapshot_id = Some(snapshot_id);
131 self
132 }
133
134 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137 self.concurrency_limit_manifest_files = limit;
138 self.concurrency_limit_manifest_entries = limit;
139 self.concurrency_limit_data_files = limit;
140 self
141 }
142
143 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145 self.concurrency_limit_data_files = limit;
146 self
147 }
148
149 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151 self.concurrency_limit_manifest_entries = limit;
152 self
153 }
154
155 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164 self.row_group_filtering_enabled = row_group_filtering_enabled;
165 self
166 }
167
168 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183 self.row_selection_enabled = row_selection_enabled;
184 self
185 }
186
187 pub fn build(self) -> Result<TableScan> {
189 let snapshot = match self.snapshot_id {
190 Some(snapshot_id) => self
191 .table
192 .metadata()
193 .snapshot_by_id(snapshot_id)
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Snapshot with id {snapshot_id} not found"),
198 )
199 })?
200 .clone(),
201 None => {
202 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203 return Ok(TableScan {
204 batch_size: self.batch_size,
205 column_names: self.column_names,
206 file_io: self.table.file_io().clone(),
207 plan_context: None,
208 concurrency_limit_data_files: self.concurrency_limit_data_files,
209 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211 row_group_filtering_enabled: self.row_group_filtering_enabled,
212 row_selection_enabled: self.row_selection_enabled,
213 });
214 };
215 current_snapshot_id.clone()
216 }
217 };
218
219 let schema = snapshot.schema(self.table.metadata())?;
220
221 if let Some(column_names) = self.column_names.as_ref() {
223 for column_name in column_names {
224 if is_metadata_column_name(column_name) {
226 continue;
227 }
228 if schema.field_by_name(column_name).is_none() {
229 return Err(Error::new(
230 ErrorKind::DataInvalid,
231 format!("Column {column_name} not found in table. Schema: {schema}"),
232 ));
233 }
234 }
235 }
236
237 let mut field_ids = vec![];
238 let column_names = self.column_names.clone().unwrap_or_else(|| {
239 schema
240 .as_struct()
241 .fields()
242 .iter()
243 .map(|f| f.name.clone())
244 .collect()
245 });
246
247 for column_name in column_names.iter() {
248 if is_metadata_column_name(column_name) {
250 field_ids.push(get_metadata_field_id(column_name)?);
251 continue;
252 }
253
254 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255 Error::new(
256 ErrorKind::DataInvalid,
257 format!("Column {column_name} not found in table. Schema: {schema}"),
258 )
259 })?;
260
261 schema
262 .as_struct()
263 .field_by_id(field_id)
264 .ok_or_else(|| {
265 Error::new(
266 ErrorKind::FeatureUnsupported,
267 format!(
268 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269 ),
270 )
271 })?;
272
273 field_ids.push(field_id);
274 }
275
276 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277 Some(predicates.bind(schema.clone(), true)?)
278 } else {
279 None
280 };
281
282 let plan_context = PlanContext {
283 snapshot,
284 table_metadata: self.table.metadata_ref(),
285 snapshot_schema: schema,
286 case_sensitive: self.case_sensitive,
287 predicate: self.filter.map(Arc::new),
288 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289 object_cache: self.table.object_cache(),
290 field_ids: Arc::new(field_ids),
291 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294 };
295
296 Ok(TableScan {
297 batch_size: self.batch_size,
298 column_names: self.column_names,
299 file_io: self.table.file_io().clone(),
300 plan_context: Some(plan_context),
301 concurrency_limit_data_files: self.concurrency_limit_data_files,
302 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304 row_group_filtering_enabled: self.row_group_filtering_enabled,
305 row_selection_enabled: self.row_selection_enabled,
306 })
307 }
308}
309
310#[derive(Debug)]
312pub struct TableScan {
313 plan_context: Option<PlanContext>,
317 batch_size: Option<usize>,
318 file_io: FileIO,
319 column_names: Option<Vec<String>>,
320 concurrency_limit_manifest_files: usize,
323
324 concurrency_limit_manifest_entries: usize,
327
328 concurrency_limit_data_files: usize,
331
332 row_group_filtering_enabled: bool,
333 row_selection_enabled: bool,
334}
335
336impl TableScan {
337 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339 let Some(plan_context) = self.plan_context.as_ref() else {
340 return Ok(Box::pin(futures::stream::empty()));
341 };
342
343 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348 channel(concurrency_limit_manifest_files);
349 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350 channel(concurrency_limit_manifest_files);
351
352 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357 let manifest_list = plan_context.get_manifest_list().await?;
358
359 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363 manifest_list,
364 manifest_entry_data_ctx_tx,
365 delete_file_idx.clone(),
366 manifest_entry_delete_ctx_tx,
367 )?;
368
369 let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371 spawn(async move {
373 let result = futures::stream::iter(manifest_file_contexts)
374 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375 ctx.fetch_manifest_and_stream_manifest_entries().await
376 })
377 .await;
378
379 if let Err(error) = result {
380 let _ = channel_for_manifest_error.send(Err(error)).await;
381 }
382 });
383
384 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387 spawn(async move {
389 let result = manifest_entry_delete_ctx_rx
390 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391 .try_for_each_concurrent(
392 concurrency_limit_manifest_entries,
393 |(manifest_entry_context, tx)| async move {
394 spawn(async move {
395 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396 })
397 .await
398 },
399 )
400 .await;
401
402 if let Err(error) = result {
403 let _ = channel_for_delete_manifest_entry_error
404 .send(Err(error))
405 .await;
406 }
407 })
408 .await;
409
410 spawn(async move {
412 let result = manifest_entry_data_ctx_rx
413 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414 .try_for_each_concurrent(
415 concurrency_limit_manifest_entries,
416 |(manifest_entry_context, tx)| async move {
417 spawn(async move {
418 Self::process_data_manifest_entry(manifest_entry_context, tx).await
419 })
420 .await
421 },
422 )
423 .await;
424
425 if let Err(error) = result {
426 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427 }
428 });
429
430 Ok(file_scan_task_rx.boxed())
431 }
432
433 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438 .with_row_selection_enabled(self.row_selection_enabled);
439
440 if let Some(batch_size) = self.batch_size {
441 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442 }
443
444 arrow_reader_builder.build().read(self.plan_files().await?)
445 }
446
447 pub fn column_names(&self) -> Option<&[String]> {
449 self.column_names.as_deref()
450 }
451
452 pub fn snapshot(&self) -> Option<&SnapshotRef> {
454 self.plan_context.as_ref().map(|x| &x.snapshot)
455 }
456
457 async fn process_data_manifest_entry(
458 manifest_entry_context: ManifestEntryContext,
459 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460 ) -> Result<()> {
461 if !manifest_entry_context.manifest_entry.is_alive() {
463 return Ok(());
464 }
465
466 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468 return Err(Error::new(
469 ErrorKind::FeatureUnsupported,
470 "Encountered an entry for a delete file in a data file manifest",
471 ));
472 }
473
474 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475 let BoundPredicates {
476 snapshot_bound_predicate,
477 partition_bound_predicate,
478 } = bound_predicates.as_ref();
479
480 let expression_evaluator_cache =
481 manifest_entry_context.expression_evaluator_cache.as_ref();
482
483 let expression_evaluator = expression_evaluator_cache.get(
484 manifest_entry_context.partition_spec_id,
485 partition_bound_predicate,
486 )?;
487
488 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491 return Ok(());
492 }
493
494 if !InclusiveMetricsEvaluator::eval(
496 snapshot_bound_predicate,
497 manifest_entry_context.manifest_entry.data_file(),
498 false,
499 )? {
500 return Ok(());
501 }
502 }
503
504 file_scan_task_tx
508 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509 .await?;
510
511 Ok(())
512 }
513
514 async fn process_delete_manifest_entry(
515 manifest_entry_context: ManifestEntryContext,
516 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517 ) -> Result<()> {
518 if !manifest_entry_context.manifest_entry.is_alive() {
520 return Ok(());
521 }
522
523 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525 return Err(Error::new(
526 ErrorKind::FeatureUnsupported,
527 "Encountered an entry for a data file in a delete manifest",
528 ));
529 }
530
531 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532 let expression_evaluator_cache =
533 manifest_entry_context.expression_evaluator_cache.as_ref();
534
535 let expression_evaluator = expression_evaluator_cache.get(
536 manifest_entry_context.partition_spec_id,
537 &bound_predicates.partition_bound_predicate,
538 )?;
539
540 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543 return Ok(());
544 }
545 }
546
547 delete_file_ctx_tx
548 .send(DeleteFileContext {
549 manifest_entry: manifest_entry_context.manifest_entry.clone(),
550 partition_spec_id: manifest_entry_context.partition_spec_id,
551 })
552 .await?;
553
554 Ok(())
555 }
556}
557
558pub(crate) struct BoundPredicates {
559 partition_bound_predicate: BoundPredicate,
560 snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565 #![allow(missing_docs)]
567
568 use std::collections::HashMap;
569 use std::fs;
570 use std::fs::File;
571 use std::sync::Arc;
572
573 use arrow_array::cast::AsArray;
574 use arrow_array::{
575 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576 StringArray,
577 };
578 use futures::{TryStreamExt, stream};
579 use minijinja::value::Value;
580 use minijinja::{AutoEscape, Environment, context};
581 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582 use parquet::basic::Compression;
583 use parquet::file::properties::WriterProperties;
584 use tempfile::TempDir;
585 use uuid::Uuid;
586
587 use crate::TableIdent;
588 use crate::arrow::ArrowReaderBuilder;
589 use crate::expr::{BoundPredicate, Reference};
590 use crate::io::{FileIO, OutputFile};
591 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592 use crate::scan::FileScanTask;
593 use crate::spec::{
594 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597 };
598 use crate::table::Table;
599
600 fn render_template(template: &str, ctx: Value) -> String {
601 let mut env = Environment::new();
602 env.set_auto_escape_callback(|_| AutoEscape::None);
603 env.render_str(template, ctx).unwrap()
604 }
605
606 pub struct TableTestFixture {
607 pub table_location: String,
608 pub table: Table,
609 }
610
611 impl TableTestFixture {
612 #[allow(clippy::new_without_default)]
613 pub fn new() -> Self {
614 let tmp_dir = TempDir::new().unwrap();
615 let table_location = tmp_dir.path().join("table1");
616 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618 let table_metadata1_location = table_location.join("metadata/v1.json");
619
620 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621 .unwrap()
622 .build()
623 .unwrap();
624
625 let table_metadata = {
626 let template_json_str = fs::read_to_string(format!(
627 "{}/testdata/example_table_metadata_v2.json",
628 env!("CARGO_MANIFEST_DIR")
629 ))
630 .unwrap();
631 let metadata_json = render_template(&template_json_str, context! {
632 table_location => &table_location,
633 manifest_list_1_location => &manifest_list1_location,
634 manifest_list_2_location => &manifest_list2_location,
635 table_metadata_1_location => &table_metadata1_location,
636 });
637 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638 };
639
640 let table = Table::builder()
641 .metadata(table_metadata)
642 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643 .file_io(file_io.clone())
644 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645 .build()
646 .unwrap();
647
648 Self {
649 table_location: table_location.to_str().unwrap().to_string(),
650 table,
651 }
652 }
653
654 #[allow(clippy::new_without_default)]
655 pub fn new_empty() -> Self {
656 let tmp_dir = TempDir::new().unwrap();
657 let table_location = tmp_dir.path().join("table1");
658 let table_metadata1_location = table_location.join("metadata/v1.json");
659
660 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661 .unwrap()
662 .build()
663 .unwrap();
664
665 let table_metadata = {
666 let template_json_str = fs::read_to_string(format!(
667 "{}/testdata/example_empty_table_metadata_v2.json",
668 env!("CARGO_MANIFEST_DIR")
669 ))
670 .unwrap();
671 let metadata_json = render_template(&template_json_str, context! {
672 table_location => &table_location,
673 table_metadata_1_location => &table_metadata1_location,
674 });
675 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676 };
677
678 let table = Table::builder()
679 .metadata(table_metadata)
680 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681 .file_io(file_io.clone())
682 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683 .build()
684 .unwrap();
685
686 Self {
687 table_location: table_location.to_str().unwrap().to_string(),
688 table,
689 }
690 }
691
692 pub fn new_unpartitioned() -> Self {
693 let tmp_dir = TempDir::new().unwrap();
694 let table_location = tmp_dir.path().join("table1");
695 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697 let table_metadata1_location = table_location.join("metadata/v1.json");
698
699 let file_io = FileIO::from_path(table_location.to_str().unwrap())
700 .unwrap()
701 .build()
702 .unwrap();
703
704 let mut table_metadata = {
705 let template_json_str = fs::read_to_string(format!(
706 "{}/testdata/example_table_metadata_v2.json",
707 env!("CARGO_MANIFEST_DIR")
708 ))
709 .unwrap();
710 let metadata_json = render_template(&template_json_str, context! {
711 table_location => &table_location,
712 manifest_list_1_location => &manifest_list1_location,
713 manifest_list_2_location => &manifest_list2_location,
714 table_metadata_1_location => &table_metadata1_location,
715 });
716 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717 };
718
719 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720 table_metadata.partition_specs.clear();
721 table_metadata.default_partition_type = StructType::new(vec![]);
722 table_metadata
723 .partition_specs
724 .insert(0, table_metadata.default_spec.clone());
725
726 let table = Table::builder()
727 .metadata(table_metadata)
728 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729 .file_io(file_io.clone())
730 .metadata_location(table_metadata1_location.to_str().unwrap())
731 .build()
732 .unwrap();
733
734 Self {
735 table_location: table_location.to_str().unwrap().to_string(),
736 table,
737 }
738 }
739
740 fn next_manifest_file(&self) -> OutputFile {
741 self.table
742 .file_io()
743 .new_output(format!(
744 "{}/metadata/manifest_{}.avro",
745 self.table_location,
746 Uuid::new_v4()
747 ))
748 .unwrap()
749 }
750
751 pub async fn setup_manifest_files(&mut self) {
752 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753 let parent_snapshot = current_snapshot
754 .parent_snapshot(self.table.metadata())
755 .unwrap();
756 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757 let current_partition_spec = self.table.metadata().default_partition_spec();
758
759 let parquet_file_size = self.write_parquet_data_files();
761
762 let mut writer = ManifestWriterBuilder::new(
763 self.next_manifest_file(),
764 Some(current_snapshot.snapshot_id()),
765 None,
766 current_schema.clone(),
767 current_partition_spec.as_ref().clone(),
768 )
769 .build_v2_data();
770 writer
771 .add_entry(
772 ManifestEntry::builder()
773 .status(ManifestStatus::Added)
774 .data_file(
775 DataFileBuilder::default()
776 .partition_spec_id(0)
777 .content(DataContentType::Data)
778 .file_path(format!("{}/1.parquet", &self.table_location))
779 .file_format(DataFileFormat::Parquet)
780 .file_size_in_bytes(parquet_file_size)
781 .record_count(1)
782 .partition(Struct::from_iter([Some(Literal::long(100))]))
783 .key_metadata(None)
784 .build()
785 .unwrap(),
786 )
787 .build(),
788 )
789 .unwrap();
790 writer
791 .add_delete_entry(
792 ManifestEntry::builder()
793 .status(ManifestStatus::Deleted)
794 .snapshot_id(parent_snapshot.snapshot_id())
795 .sequence_number(parent_snapshot.sequence_number())
796 .file_sequence_number(parent_snapshot.sequence_number())
797 .data_file(
798 DataFileBuilder::default()
799 .partition_spec_id(0)
800 .content(DataContentType::Data)
801 .file_path(format!("{}/2.parquet", &self.table_location))
802 .file_format(DataFileFormat::Parquet)
803 .file_size_in_bytes(parquet_file_size)
804 .record_count(1)
805 .partition(Struct::from_iter([Some(Literal::long(200))]))
806 .build()
807 .unwrap(),
808 )
809 .build(),
810 )
811 .unwrap();
812 writer
813 .add_existing_entry(
814 ManifestEntry::builder()
815 .status(ManifestStatus::Existing)
816 .snapshot_id(parent_snapshot.snapshot_id())
817 .sequence_number(parent_snapshot.sequence_number())
818 .file_sequence_number(parent_snapshot.sequence_number())
819 .data_file(
820 DataFileBuilder::default()
821 .partition_spec_id(0)
822 .content(DataContentType::Data)
823 .file_path(format!("{}/3.parquet", &self.table_location))
824 .file_format(DataFileFormat::Parquet)
825 .file_size_in_bytes(parquet_file_size)
826 .record_count(1)
827 .partition(Struct::from_iter([Some(Literal::long(300))]))
828 .build()
829 .unwrap(),
830 )
831 .build(),
832 )
833 .unwrap();
834 let data_file_manifest = writer.write_manifest_file().await.unwrap();
835
836 let mut manifest_list_write = ManifestListWriter::v2(
838 self.table
839 .file_io()
840 .new_output(current_snapshot.manifest_list())
841 .unwrap(),
842 current_snapshot.snapshot_id(),
843 current_snapshot.parent_snapshot_id(),
844 current_snapshot.sequence_number(),
845 );
846 manifest_list_write
847 .add_manifests(vec![data_file_manifest].into_iter())
848 .unwrap();
849 manifest_list_write.close().await.unwrap();
850 }
851
852 fn write_parquet_data_files(&self) -> u64 {
855 std::fs::create_dir_all(&self.table_location).unwrap();
856
857 let schema = {
858 let fields = vec![
859 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
860 .with_metadata(HashMap::from([(
861 PARQUET_FIELD_ID_META_KEY.to_string(),
862 "1".to_string(),
863 )])),
864 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
865 .with_metadata(HashMap::from([(
866 PARQUET_FIELD_ID_META_KEY.to_string(),
867 "2".to_string(),
868 )])),
869 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
870 .with_metadata(HashMap::from([(
871 PARQUET_FIELD_ID_META_KEY.to_string(),
872 "3".to_string(),
873 )])),
874 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
875 .with_metadata(HashMap::from([(
876 PARQUET_FIELD_ID_META_KEY.to_string(),
877 "4".to_string(),
878 )])),
879 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
880 .with_metadata(HashMap::from([(
881 PARQUET_FIELD_ID_META_KEY.to_string(),
882 "5".to_string(),
883 )])),
884 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
885 .with_metadata(HashMap::from([(
886 PARQUET_FIELD_ID_META_KEY.to_string(),
887 "6".to_string(),
888 )])),
889 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
890 .with_metadata(HashMap::from([(
891 PARQUET_FIELD_ID_META_KEY.to_string(),
892 "7".to_string(),
893 )])),
894 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
895 .with_metadata(HashMap::from([(
896 PARQUET_FIELD_ID_META_KEY.to_string(),
897 "8".to_string(),
898 )])),
899 ];
900 Arc::new(arrow_schema::Schema::new(fields))
901 };
902 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
904
905 let mut values = vec![2; 512];
906 values.append(vec![3; 200].as_mut());
907 values.append(vec![4; 300].as_mut());
908 values.append(vec![5; 12].as_mut());
909
910 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
912
913 let mut values = vec![3; 512];
914 values.append(vec![4; 512].as_mut());
915
916 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
918
919 let mut values = vec!["Apache"; 512];
921 values.append(vec!["Iceberg"; 512].as_mut());
922 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
923
924 let mut values = vec![100.0f64; 512];
926 values.append(vec![150.0f64; 12].as_mut());
927 values.append(vec![200.0f64; 500].as_mut());
928 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
929
930 let mut values = vec![100i32; 512];
932 values.append(vec![150i32; 12].as_mut());
933 values.append(vec![200i32; 500].as_mut());
934 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
935
936 let mut values = vec![100i64; 512];
938 values.append(vec![150i64; 12].as_mut());
939 values.append(vec![200i64; 500].as_mut());
940 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
941
942 let mut values = vec![false; 512];
944 values.append(vec![true; 512].as_mut());
945 let values: BooleanArray = values.into();
946 let col8 = Arc::new(values) as ArrayRef;
947
948 let to_write = RecordBatch::try_new(schema.clone(), vec![
949 col1, col2, col3, col4, col5, col6, col7, col8,
950 ])
951 .unwrap();
952
953 let props = WriterProperties::builder()
955 .set_compression(Compression::SNAPPY)
956 .build();
957
958 for n in 1..=3 {
959 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
960 let mut writer =
961 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
962
963 writer.write(&to_write).expect("Writing batch");
964
965 writer.close().unwrap();
967 }
968
969 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
970 .unwrap()
971 .len()
972 }
973
974 pub async fn setup_unpartitioned_manifest_files(&mut self) {
975 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
976 let parent_snapshot = current_snapshot
977 .parent_snapshot(self.table.metadata())
978 .unwrap();
979 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
980 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
981
982 let parquet_file_size = self.write_parquet_data_files();
984
985 let mut writer = ManifestWriterBuilder::new(
987 self.next_manifest_file(),
988 Some(current_snapshot.snapshot_id()),
989 None,
990 current_schema.clone(),
991 current_partition_spec.as_ref().clone(),
992 )
993 .build_v2_data();
994
995 let empty_partition = Struct::empty();
997
998 writer
999 .add_entry(
1000 ManifestEntry::builder()
1001 .status(ManifestStatus::Added)
1002 .data_file(
1003 DataFileBuilder::default()
1004 .partition_spec_id(0)
1005 .content(DataContentType::Data)
1006 .file_path(format!("{}/1.parquet", &self.table_location))
1007 .file_format(DataFileFormat::Parquet)
1008 .file_size_in_bytes(parquet_file_size)
1009 .record_count(1)
1010 .partition(empty_partition.clone())
1011 .key_metadata(None)
1012 .build()
1013 .unwrap(),
1014 )
1015 .build(),
1016 )
1017 .unwrap();
1018
1019 writer
1020 .add_delete_entry(
1021 ManifestEntry::builder()
1022 .status(ManifestStatus::Deleted)
1023 .snapshot_id(parent_snapshot.snapshot_id())
1024 .sequence_number(parent_snapshot.sequence_number())
1025 .file_sequence_number(parent_snapshot.sequence_number())
1026 .data_file(
1027 DataFileBuilder::default()
1028 .partition_spec_id(0)
1029 .content(DataContentType::Data)
1030 .file_path(format!("{}/2.parquet", &self.table_location))
1031 .file_format(DataFileFormat::Parquet)
1032 .file_size_in_bytes(parquet_file_size)
1033 .record_count(1)
1034 .partition(empty_partition.clone())
1035 .build()
1036 .unwrap(),
1037 )
1038 .build(),
1039 )
1040 .unwrap();
1041
1042 writer
1043 .add_existing_entry(
1044 ManifestEntry::builder()
1045 .status(ManifestStatus::Existing)
1046 .snapshot_id(parent_snapshot.snapshot_id())
1047 .sequence_number(parent_snapshot.sequence_number())
1048 .file_sequence_number(parent_snapshot.sequence_number())
1049 .data_file(
1050 DataFileBuilder::default()
1051 .partition_spec_id(0)
1052 .content(DataContentType::Data)
1053 .file_path(format!("{}/3.parquet", &self.table_location))
1054 .file_format(DataFileFormat::Parquet)
1055 .file_size_in_bytes(parquet_file_size)
1056 .record_count(1)
1057 .partition(empty_partition.clone())
1058 .build()
1059 .unwrap(),
1060 )
1061 .build(),
1062 )
1063 .unwrap();
1064
1065 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1066
1067 let mut manifest_list_write = ManifestListWriter::v2(
1069 self.table
1070 .file_io()
1071 .new_output(current_snapshot.manifest_list())
1072 .unwrap(),
1073 current_snapshot.snapshot_id(),
1074 current_snapshot.parent_snapshot_id(),
1075 current_snapshot.sequence_number(),
1076 );
1077 manifest_list_write
1078 .add_manifests(vec![data_file_manifest].into_iter())
1079 .unwrap();
1080 manifest_list_write.close().await.unwrap();
1081 }
1082
1083 pub async fn setup_deadlock_manifests(&mut self) {
1084 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1085 let _parent_snapshot = current_snapshot
1086 .parent_snapshot(self.table.metadata())
1087 .unwrap();
1088 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1089 let current_partition_spec = self.table.metadata().default_partition_spec();
1090
1091 let mut writer = ManifestWriterBuilder::new(
1093 self.next_manifest_file(),
1094 Some(current_snapshot.snapshot_id()),
1095 None,
1096 current_schema.clone(),
1097 current_partition_spec.as_ref().clone(),
1098 )
1099 .build_v2_data();
1100
1101 for i in 0..10 {
1103 writer
1104 .add_entry(
1105 ManifestEntry::builder()
1106 .status(ManifestStatus::Added)
1107 .data_file(
1108 DataFileBuilder::default()
1109 .partition_spec_id(0)
1110 .content(DataContentType::Data)
1111 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1112 .file_format(DataFileFormat::Parquet)
1113 .file_size_in_bytes(100)
1114 .record_count(1)
1115 .partition(Struct::from_iter([Some(Literal::long(100))]))
1116 .key_metadata(None)
1117 .build()
1118 .unwrap(),
1119 )
1120 .build(),
1121 )
1122 .unwrap();
1123 }
1124 let data_manifest = writer.write_manifest_file().await.unwrap();
1125
1126 let mut writer = ManifestWriterBuilder::new(
1128 self.next_manifest_file(),
1129 Some(current_snapshot.snapshot_id()),
1130 None,
1131 current_schema.clone(),
1132 current_partition_spec.as_ref().clone(),
1133 )
1134 .build_v2_deletes();
1135
1136 writer
1137 .add_entry(
1138 ManifestEntry::builder()
1139 .status(ManifestStatus::Added)
1140 .data_file(
1141 DataFileBuilder::default()
1142 .partition_spec_id(0)
1143 .content(DataContentType::PositionDeletes)
1144 .file_path(format!("{}/del.parquet", &self.table_location))
1145 .file_format(DataFileFormat::Parquet)
1146 .file_size_in_bytes(100)
1147 .record_count(1)
1148 .partition(Struct::from_iter([Some(Literal::long(100))]))
1149 .build()
1150 .unwrap(),
1151 )
1152 .build(),
1153 )
1154 .unwrap();
1155 let delete_manifest = writer.write_manifest_file().await.unwrap();
1156
1157 let mut manifest_list_write = ManifestListWriter::v2(
1160 self.table
1161 .file_io()
1162 .new_output(current_snapshot.manifest_list())
1163 .unwrap(),
1164 current_snapshot.snapshot_id(),
1165 current_snapshot.parent_snapshot_id(),
1166 current_snapshot.sequence_number(),
1167 );
1168 manifest_list_write
1169 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1170 .unwrap();
1171 manifest_list_write.close().await.unwrap();
1172 }
1173 }
1174
1175 #[test]
1176 fn test_table_scan_columns() {
1177 let table = TableTestFixture::new().table;
1178
1179 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1180 assert_eq!(
1181 Some(vec!["x".to_string(), "y".to_string()]),
1182 table_scan.column_names
1183 );
1184
1185 let table_scan = table
1186 .scan()
1187 .select(["x", "y"])
1188 .select(["z"])
1189 .build()
1190 .unwrap();
1191 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1192 }
1193
1194 #[test]
1195 fn test_select_all() {
1196 let table = TableTestFixture::new().table;
1197
1198 let table_scan = table.scan().select_all().build().unwrap();
1199 assert!(table_scan.column_names.is_none());
1200 }
1201
1202 #[test]
1203 fn test_select_no_exist_column() {
1204 let table = TableTestFixture::new().table;
1205
1206 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1207 assert!(table_scan.is_err());
1208 }
1209
1210 #[test]
1211 fn test_table_scan_default_snapshot_id() {
1212 let table = TableTestFixture::new().table;
1213
1214 let table_scan = table.scan().build().unwrap();
1215 assert_eq!(
1216 table.metadata().current_snapshot().unwrap().snapshot_id(),
1217 table_scan.snapshot().unwrap().snapshot_id()
1218 );
1219 }
1220
1221 #[test]
1222 fn test_table_scan_non_exist_snapshot_id() {
1223 let table = TableTestFixture::new().table;
1224
1225 let table_scan = table.scan().snapshot_id(1024).build();
1226 assert!(table_scan.is_err());
1227 }
1228
1229 #[test]
1230 fn test_table_scan_with_snapshot_id() {
1231 let table = TableTestFixture::new().table;
1232
1233 let table_scan = table
1234 .scan()
1235 .snapshot_id(3051729675574597004)
1236 .with_row_selection_enabled(true)
1237 .build()
1238 .unwrap();
1239 assert_eq!(
1240 table_scan.snapshot().unwrap().snapshot_id(),
1241 3051729675574597004
1242 );
1243 }
1244
1245 #[tokio::test]
1246 async fn test_plan_files_on_table_without_any_snapshots() {
1247 let table = TableTestFixture::new_empty().table;
1248 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1249 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1250 assert!(batches.is_empty());
1251 }
1252
1253 #[tokio::test]
1254 async fn test_plan_files_no_deletions() {
1255 let mut fixture = TableTestFixture::new();
1256 fixture.setup_manifest_files().await;
1257
1258 let table_scan = fixture
1260 .table
1261 .scan()
1262 .with_row_selection_enabled(true)
1263 .build()
1264 .unwrap();
1265
1266 let mut tasks = table_scan
1267 .plan_files()
1268 .await
1269 .unwrap()
1270 .try_fold(vec![], |mut acc, task| async move {
1271 acc.push(task);
1272 Ok(acc)
1273 })
1274 .await
1275 .unwrap();
1276
1277 assert_eq!(tasks.len(), 2);
1278
1279 tasks.sort_by_key(|t| t.data_file_path.to_string());
1280
1281 assert_eq!(
1283 tasks[0].data_file_path,
1284 format!("{}/1.parquet", &fixture.table_location)
1285 );
1286
1287 assert_eq!(
1289 tasks[1].data_file_path,
1290 format!("{}/3.parquet", &fixture.table_location)
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn test_open_parquet_no_deletions() {
1296 let mut fixture = TableTestFixture::new();
1297 fixture.setup_manifest_files().await;
1298
1299 let table_scan = fixture
1301 .table
1302 .scan()
1303 .with_row_selection_enabled(true)
1304 .build()
1305 .unwrap();
1306
1307 let batch_stream = table_scan.to_arrow().await.unwrap();
1308
1309 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1310
1311 let col = batches[0].column_by_name("x").unwrap();
1312
1313 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1314 assert_eq!(int64_arr.value(0), 1);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_open_parquet_no_deletions_by_separate_reader() {
1319 let mut fixture = TableTestFixture::new();
1320 fixture.setup_manifest_files().await;
1321
1322 let table_scan = fixture
1324 .table
1325 .scan()
1326 .with_row_selection_enabled(true)
1327 .build()
1328 .unwrap();
1329
1330 let mut plan_task: Vec<_> = table_scan
1331 .plan_files()
1332 .await
1333 .unwrap()
1334 .try_collect()
1335 .await
1336 .unwrap();
1337 assert_eq!(plan_task.len(), 2);
1338
1339 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1340 let batch_stream = reader
1341 .clone()
1342 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1343 .unwrap();
1344 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1345
1346 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1347 let batch_stream = reader
1348 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1349 .unwrap();
1350 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1351
1352 assert_eq!(batch_1, batch_2);
1353 }
1354
1355 #[tokio::test]
1356 async fn test_open_parquet_with_projection() {
1357 let mut fixture = TableTestFixture::new();
1358 fixture.setup_manifest_files().await;
1359
1360 let table_scan = fixture
1362 .table
1363 .scan()
1364 .select(["x", "z"])
1365 .with_row_selection_enabled(true)
1366 .build()
1367 .unwrap();
1368
1369 let batch_stream = table_scan.to_arrow().await.unwrap();
1370
1371 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1372
1373 assert_eq!(batches[0].num_columns(), 2);
1374
1375 let col1 = batches[0].column_by_name("x").unwrap();
1376 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1377 assert_eq!(int64_arr.value(0), 1);
1378
1379 let col2 = batches[0].column_by_name("z").unwrap();
1380 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1381 assert_eq!(int64_arr.value(0), 3);
1382
1383 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1385 let batch_stream = table_scan.to_arrow().await.unwrap();
1386 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1387
1388 assert_eq!(batches[0].num_columns(), 0);
1389 assert_eq!(batches[0].num_rows(), 1024);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_filter_on_arrow_lt() {
1394 let mut fixture = TableTestFixture::new();
1395 fixture.setup_manifest_files().await;
1396
1397 let mut builder = fixture.table.scan();
1399 let predicate = Reference::new("y").less_than(Datum::long(3));
1400 builder = builder
1401 .with_filter(predicate)
1402 .with_row_selection_enabled(true);
1403 let table_scan = builder.build().unwrap();
1404
1405 let batch_stream = table_scan.to_arrow().await.unwrap();
1406
1407 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1408
1409 assert_eq!(batches[0].num_rows(), 512);
1410
1411 let col = batches[0].column_by_name("x").unwrap();
1412 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1413 assert_eq!(int64_arr.value(0), 1);
1414
1415 let col = batches[0].column_by_name("y").unwrap();
1416 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1417 assert_eq!(int64_arr.value(0), 2);
1418 }
1419
1420 #[tokio::test]
1421 async fn test_filter_on_arrow_gt_eq() {
1422 let mut fixture = TableTestFixture::new();
1423 fixture.setup_manifest_files().await;
1424
1425 let mut builder = fixture.table.scan();
1427 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1428 builder = builder
1429 .with_filter(predicate)
1430 .with_row_selection_enabled(true);
1431 let table_scan = builder.build().unwrap();
1432
1433 let batch_stream = table_scan.to_arrow().await.unwrap();
1434
1435 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437 assert_eq!(batches[0].num_rows(), 12);
1438
1439 let col = batches[0].column_by_name("x").unwrap();
1440 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441 assert_eq!(int64_arr.value(0), 1);
1442
1443 let col = batches[0].column_by_name("y").unwrap();
1444 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1445 assert_eq!(int64_arr.value(0), 5);
1446 }
1447
1448 #[tokio::test]
1449 async fn test_filter_double_eq() {
1450 let mut fixture = TableTestFixture::new();
1451 fixture.setup_manifest_files().await;
1452
1453 let mut builder = fixture.table.scan();
1455 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1456 builder = builder
1457 .with_filter(predicate)
1458 .with_row_selection_enabled(true);
1459 let table_scan = builder.build().unwrap();
1460
1461 let batch_stream = table_scan.to_arrow().await.unwrap();
1462
1463 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1464
1465 assert_eq!(batches.len(), 2);
1466 assert_eq!(batches[0].num_rows(), 12);
1467
1468 let col = batches[0].column_by_name("dbl").unwrap();
1469 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1470 assert_eq!(f64_arr.value(1), 150.0f64);
1471 }
1472
1473 #[tokio::test]
1474 async fn test_filter_int_eq() {
1475 let mut fixture = TableTestFixture::new();
1476 fixture.setup_manifest_files().await;
1477
1478 let mut builder = fixture.table.scan();
1480 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1481 builder = builder
1482 .with_filter(predicate)
1483 .with_row_selection_enabled(true);
1484 let table_scan = builder.build().unwrap();
1485
1486 let batch_stream = table_scan.to_arrow().await.unwrap();
1487
1488 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1489
1490 assert_eq!(batches.len(), 2);
1491 assert_eq!(batches[0].num_rows(), 12);
1492
1493 let col = batches[0].column_by_name("i32").unwrap();
1494 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1495 assert_eq!(i32_arr.value(1), 150i32);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_filter_long_eq() {
1500 let mut fixture = TableTestFixture::new();
1501 fixture.setup_manifest_files().await;
1502
1503 let mut builder = fixture.table.scan();
1505 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1506 builder = builder
1507 .with_filter(predicate)
1508 .with_row_selection_enabled(true);
1509 let table_scan = builder.build().unwrap();
1510
1511 let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515 assert_eq!(batches.len(), 2);
1516 assert_eq!(batches[0].num_rows(), 12);
1517
1518 let col = batches[0].column_by_name("i64").unwrap();
1519 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1520 assert_eq!(i64_arr.value(1), 150i64);
1521 }
1522
1523 #[tokio::test]
1524 async fn test_filter_bool_eq() {
1525 let mut fixture = TableTestFixture::new();
1526 fixture.setup_manifest_files().await;
1527
1528 let mut builder = fixture.table.scan();
1530 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1531 builder = builder
1532 .with_filter(predicate)
1533 .with_row_selection_enabled(true);
1534 let table_scan = builder.build().unwrap();
1535
1536 let batch_stream = table_scan.to_arrow().await.unwrap();
1537
1538 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1539
1540 assert_eq!(batches.len(), 2);
1541 assert_eq!(batches[0].num_rows(), 512);
1542
1543 let col = batches[0].column_by_name("bool").unwrap();
1544 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1545 assert!(bool_arr.value(1));
1546 }
1547
1548 #[tokio::test]
1549 async fn test_filter_on_arrow_is_null() {
1550 let mut fixture = TableTestFixture::new();
1551 fixture.setup_manifest_files().await;
1552
1553 let mut builder = fixture.table.scan();
1555 let predicate = Reference::new("y").is_null();
1556 builder = builder
1557 .with_filter(predicate)
1558 .with_row_selection_enabled(true);
1559 let table_scan = builder.build().unwrap();
1560
1561 let batch_stream = table_scan.to_arrow().await.unwrap();
1562
1563 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1564 assert_eq!(batches.len(), 0);
1565 }
1566
1567 #[tokio::test]
1568 async fn test_filter_on_arrow_is_not_null() {
1569 let mut fixture = TableTestFixture::new();
1570 fixture.setup_manifest_files().await;
1571
1572 let mut builder = fixture.table.scan();
1574 let predicate = Reference::new("y").is_not_null();
1575 builder = builder
1576 .with_filter(predicate)
1577 .with_row_selection_enabled(true);
1578 let table_scan = builder.build().unwrap();
1579
1580 let batch_stream = table_scan.to_arrow().await.unwrap();
1581
1582 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1583 assert_eq!(batches[0].num_rows(), 1024);
1584 }
1585
1586 #[tokio::test]
1587 async fn test_filter_on_arrow_lt_and_gt() {
1588 let mut fixture = TableTestFixture::new();
1589 fixture.setup_manifest_files().await;
1590
1591 let mut builder = fixture.table.scan();
1593 let predicate = Reference::new("y")
1594 .less_than(Datum::long(5))
1595 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1596 builder = builder
1597 .with_filter(predicate)
1598 .with_row_selection_enabled(true);
1599 let table_scan = builder.build().unwrap();
1600
1601 let batch_stream = table_scan.to_arrow().await.unwrap();
1602
1603 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1604 assert_eq!(batches[0].num_rows(), 500);
1605
1606 let col = batches[0].column_by_name("x").unwrap();
1607 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1608 assert_eq!(col, &expected_x);
1609
1610 let col = batches[0].column_by_name("y").unwrap();
1611 let mut values = vec![];
1612 values.append(vec![3; 200].as_mut());
1613 values.append(vec![4; 300].as_mut());
1614 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1615 assert_eq!(col, &expected_y);
1616
1617 let col = batches[0].column_by_name("z").unwrap();
1618 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1619 assert_eq!(col, &expected_z);
1620 }
1621
1622 #[tokio::test]
1623 async fn test_filter_on_arrow_lt_or_gt() {
1624 let mut fixture = TableTestFixture::new();
1625 fixture.setup_manifest_files().await;
1626
1627 let mut builder = fixture.table.scan();
1629 let predicate = Reference::new("y")
1630 .less_than(Datum::long(5))
1631 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1632 builder = builder
1633 .with_filter(predicate)
1634 .with_row_selection_enabled(true);
1635 let table_scan = builder.build().unwrap();
1636
1637 let batch_stream = table_scan.to_arrow().await.unwrap();
1638
1639 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1640 assert_eq!(batches[0].num_rows(), 1024);
1641
1642 let col = batches[0].column_by_name("x").unwrap();
1643 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1644 assert_eq!(col, &expected_x);
1645
1646 let col = batches[0].column_by_name("y").unwrap();
1647 let mut values = vec![2; 512];
1648 values.append(vec![3; 200].as_mut());
1649 values.append(vec![4; 300].as_mut());
1650 values.append(vec![5; 12].as_mut());
1651 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1652 assert_eq!(col, &expected_y);
1653
1654 let col = batches[0].column_by_name("z").unwrap();
1655 let mut values = vec![3; 512];
1656 values.append(vec![4; 512].as_mut());
1657 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1658 assert_eq!(col, &expected_z);
1659 }
1660
1661 #[tokio::test]
1662 async fn test_filter_on_arrow_startswith() {
1663 let mut fixture = TableTestFixture::new();
1664 fixture.setup_manifest_files().await;
1665
1666 let mut builder = fixture.table.scan();
1668 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1669 builder = builder
1670 .with_filter(predicate)
1671 .with_row_selection_enabled(true);
1672 let table_scan = builder.build().unwrap();
1673
1674 let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677
1678 assert_eq!(batches[0].num_rows(), 512);
1679
1680 let col = batches[0].column_by_name("a").unwrap();
1681 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1682 assert_eq!(string_arr.value(0), "Iceberg");
1683 }
1684
1685 #[tokio::test]
1686 async fn test_filter_on_arrow_not_startswith() {
1687 let mut fixture = TableTestFixture::new();
1688 fixture.setup_manifest_files().await;
1689
1690 let mut builder = fixture.table.scan();
1692 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1693 builder = builder
1694 .with_filter(predicate)
1695 .with_row_selection_enabled(true);
1696 let table_scan = builder.build().unwrap();
1697
1698 let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702 assert_eq!(batches[0].num_rows(), 512);
1703
1704 let col = batches[0].column_by_name("a").unwrap();
1705 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706 assert_eq!(string_arr.value(0), "Apache");
1707 }
1708
1709 #[tokio::test]
1710 async fn test_filter_on_arrow_in() {
1711 let mut fixture = TableTestFixture::new();
1712 fixture.setup_manifest_files().await;
1713
1714 let mut builder = fixture.table.scan();
1716 let predicate =
1717 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1718 builder = builder
1719 .with_filter(predicate)
1720 .with_row_selection_enabled(true);
1721 let table_scan = builder.build().unwrap();
1722
1723 let batch_stream = table_scan.to_arrow().await.unwrap();
1724
1725 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1726
1727 assert_eq!(batches[0].num_rows(), 512);
1728
1729 let col = batches[0].column_by_name("a").unwrap();
1730 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1731 assert_eq!(string_arr.value(0), "Iceberg");
1732 }
1733
1734 #[tokio::test]
1735 async fn test_filter_on_arrow_not_in() {
1736 let mut fixture = TableTestFixture::new();
1737 fixture.setup_manifest_files().await;
1738
1739 let mut builder = fixture.table.scan();
1741 let predicate =
1742 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1743 builder = builder
1744 .with_filter(predicate)
1745 .with_row_selection_enabled(true);
1746 let table_scan = builder.build().unwrap();
1747
1748 let batch_stream = table_scan.to_arrow().await.unwrap();
1749
1750 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1751
1752 assert_eq!(batches[0].num_rows(), 512);
1753
1754 let col = batches[0].column_by_name("a").unwrap();
1755 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1756 assert_eq!(string_arr.value(0), "Apache");
1757 }
1758
1759 #[test]
1760 fn test_file_scan_task_serialize_deserialize() {
1761 let test_fn = |task: FileScanTask| {
1762 let serialized = serde_json::to_string(&task).unwrap();
1763 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1764
1765 assert_eq!(task.data_file_path, deserialized.data_file_path);
1766 assert_eq!(task.start, deserialized.start);
1767 assert_eq!(task.length, deserialized.length);
1768 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1769 assert_eq!(task.predicate, deserialized.predicate);
1770 assert_eq!(task.schema, deserialized.schema);
1771 };
1772
1773 let schema = Arc::new(
1775 Schema::builder()
1776 .with_fields(vec![Arc::new(NestedField::required(
1777 1,
1778 "x",
1779 Type::Primitive(PrimitiveType::Binary),
1780 ))])
1781 .build()
1782 .unwrap(),
1783 );
1784 let task = FileScanTask {
1785 data_file_path: "data_file_path".to_string(),
1786 file_size_in_bytes: 0,
1787 start: 0,
1788 length: 100,
1789 project_field_ids: vec![1, 2, 3],
1790 predicate: None,
1791 schema: schema.clone(),
1792 record_count: Some(100),
1793 data_file_format: DataFileFormat::Parquet,
1794 deletes: vec![],
1795 partition: None,
1796 partition_spec: None,
1797 name_mapping: None,
1798 case_sensitive: false,
1799 };
1800 test_fn(task);
1801
1802 let task = FileScanTask {
1804 data_file_path: "data_file_path".to_string(),
1805 file_size_in_bytes: 0,
1806 start: 0,
1807 length: 100,
1808 project_field_ids: vec![1, 2, 3],
1809 predicate: Some(BoundPredicate::AlwaysTrue),
1810 schema,
1811 record_count: None,
1812 data_file_format: DataFileFormat::Avro,
1813 deletes: vec![],
1814 partition: None,
1815 partition_spec: None,
1816 name_mapping: None,
1817 case_sensitive: false,
1818 };
1819 test_fn(task);
1820 }
1821
1822 #[tokio::test]
1823 async fn test_select_with_file_column() {
1824 use arrow_array::cast::AsArray;
1825
1826 let mut fixture = TableTestFixture::new();
1827 fixture.setup_manifest_files().await;
1828
1829 let table_scan = fixture
1831 .table
1832 .scan()
1833 .select(["x", RESERVED_COL_NAME_FILE])
1834 .with_row_selection_enabled(true)
1835 .build()
1836 .unwrap();
1837
1838 let batch_stream = table_scan.to_arrow().await.unwrap();
1839 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1840
1841 assert_eq!(batches[0].num_columns(), 2);
1843
1844 let x_col = batches[0].column_by_name("x").unwrap();
1846 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1847 assert_eq!(x_arr.value(0), 1);
1848
1849 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1851 assert!(
1852 file_col.is_some(),
1853 "_file column should be present in the batch"
1854 );
1855
1856 let file_col = file_col.unwrap();
1858 assert!(
1859 matches!(
1860 file_col.data_type(),
1861 arrow_schema::DataType::RunEndEncoded(_, _)
1862 ),
1863 "_file column should use RunEndEncoded type"
1864 );
1865
1866 let run_array = file_col
1868 .as_any()
1869 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1870 .expect("_file column should be a RunArray");
1871
1872 let values = run_array.values();
1873 let string_values = values.as_string::<i32>();
1874 assert_eq!(string_values.len(), 1, "Should have a single file path");
1875
1876 let file_path = string_values.value(0);
1877 assert!(
1878 file_path.ends_with(".parquet"),
1879 "File path should end with .parquet, got: {file_path}"
1880 );
1881 }
1882
1883 #[tokio::test]
1884 async fn test_select_file_column_position() {
1885 let mut fixture = TableTestFixture::new();
1886 fixture.setup_manifest_files().await;
1887
1888 let table_scan = fixture
1890 .table
1891 .scan()
1892 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1893 .with_row_selection_enabled(true)
1894 .build()
1895 .unwrap();
1896
1897 let batch_stream = table_scan.to_arrow().await.unwrap();
1898 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1899
1900 assert_eq!(batches[0].num_columns(), 3);
1901
1902 let schema = batches[0].schema();
1904 assert_eq!(schema.field(0).name(), "x");
1905 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1906 assert_eq!(schema.field(2).name(), "z");
1907
1908 assert!(batches[0].column_by_name("x").is_some());
1910 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1911 assert!(batches[0].column_by_name("z").is_some());
1912 }
1913
1914 #[tokio::test]
1915 async fn test_select_file_column_only() {
1916 let mut fixture = TableTestFixture::new();
1917 fixture.setup_manifest_files().await;
1918
1919 let table_scan = fixture
1921 .table
1922 .scan()
1923 .select([RESERVED_COL_NAME_FILE])
1924 .with_row_selection_enabled(true)
1925 .build()
1926 .unwrap();
1927
1928 let batch_stream = table_scan.to_arrow().await.unwrap();
1929 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1930
1931 assert_eq!(batches[0].num_columns(), 1);
1933
1934 let schema = batches[0].schema();
1936 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1937
1938 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1942 assert_eq!(total_rows, 2048);
1943 }
1944
1945 #[tokio::test]
1946 async fn test_file_column_with_multiple_files() {
1947 use std::collections::HashSet;
1948
1949 let mut fixture = TableTestFixture::new();
1950 fixture.setup_manifest_files().await;
1951
1952 let table_scan = fixture
1954 .table
1955 .scan()
1956 .select(["x", RESERVED_COL_NAME_FILE])
1957 .with_row_selection_enabled(true)
1958 .build()
1959 .unwrap();
1960
1961 let batch_stream = table_scan.to_arrow().await.unwrap();
1962 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1963
1964 let mut file_paths = HashSet::new();
1966 for batch in &batches {
1967 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1968 let run_array = file_col
1969 .as_any()
1970 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1971 .expect("_file column should be a RunArray");
1972
1973 let values = run_array.values();
1974 let string_values = values.as_string::<i32>();
1975 for i in 0..string_values.len() {
1976 file_paths.insert(string_values.value(i).to_string());
1977 }
1978 }
1979
1980 assert!(!file_paths.is_empty(), "Should have at least one file path");
1982
1983 for path in &file_paths {
1985 assert!(
1986 path.ends_with(".parquet"),
1987 "All file paths should end with .parquet, got: {path}"
1988 );
1989 }
1990 }
1991
1992 #[tokio::test]
1993 async fn test_file_column_at_start() {
1994 let mut fixture = TableTestFixture::new();
1995 fixture.setup_manifest_files().await;
1996
1997 let table_scan = fixture
1999 .table
2000 .scan()
2001 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2002 .with_row_selection_enabled(true)
2003 .build()
2004 .unwrap();
2005
2006 let batch_stream = table_scan.to_arrow().await.unwrap();
2007 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2008
2009 assert_eq!(batches[0].num_columns(), 3);
2010
2011 let schema = batches[0].schema();
2013 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2014 assert_eq!(schema.field(1).name(), "x");
2015 assert_eq!(schema.field(2).name(), "y");
2016 }
2017
2018 #[tokio::test]
2019 async fn test_file_column_at_end() {
2020 let mut fixture = TableTestFixture::new();
2021 fixture.setup_manifest_files().await;
2022
2023 let table_scan = fixture
2025 .table
2026 .scan()
2027 .select(["x", "y", RESERVED_COL_NAME_FILE])
2028 .with_row_selection_enabled(true)
2029 .build()
2030 .unwrap();
2031
2032 let batch_stream = table_scan.to_arrow().await.unwrap();
2033 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2034
2035 assert_eq!(batches[0].num_columns(), 3);
2036
2037 let schema = batches[0].schema();
2039 assert_eq!(schema.field(0).name(), "x");
2040 assert_eq!(schema.field(1).name(), "y");
2041 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2042 }
2043
2044 #[tokio::test]
2045 async fn test_select_with_repeated_column_names() {
2046 let mut fixture = TableTestFixture::new();
2047 fixture.setup_manifest_files().await;
2048
2049 let table_scan = fixture
2052 .table
2053 .scan()
2054 .select([
2055 "x",
2056 RESERVED_COL_NAME_FILE,
2057 "x", "y",
2059 RESERVED_COL_NAME_FILE, "y", ])
2062 .with_row_selection_enabled(true)
2063 .build()
2064 .unwrap();
2065
2066 let batch_stream = table_scan.to_arrow().await.unwrap();
2067 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2068
2069 assert_eq!(
2071 batches[0].num_columns(),
2072 6,
2073 "Should have exactly 6 columns with duplicates"
2074 );
2075
2076 let schema = batches[0].schema();
2077
2078 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2080 assert_eq!(
2081 schema.field(1).name(),
2082 RESERVED_COL_NAME_FILE,
2083 "Column 1 should be _file"
2084 );
2085 assert_eq!(
2086 schema.field(2).name(),
2087 "x",
2088 "Column 2 should be x (duplicate)"
2089 );
2090 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2091 assert_eq!(
2092 schema.field(4).name(),
2093 RESERVED_COL_NAME_FILE,
2094 "Column 4 should be _file (duplicate)"
2095 );
2096 assert_eq!(
2097 schema.field(5).name(),
2098 "y",
2099 "Column 5 should be y (duplicate)"
2100 );
2101
2102 assert!(
2104 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2105 "Column x should be Int64"
2106 );
2107 assert!(
2108 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2109 "Column x (duplicate) should be Int64"
2110 );
2111 assert!(
2112 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2113 "Column y should be Int64"
2114 );
2115 assert!(
2116 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2117 "Column y (duplicate) should be Int64"
2118 );
2119 assert!(
2120 matches!(
2121 schema.field(1).data_type(),
2122 arrow_schema::DataType::RunEndEncoded(_, _)
2123 ),
2124 "_file column should use RunEndEncoded type"
2125 );
2126 assert!(
2127 matches!(
2128 schema.field(4).data_type(),
2129 arrow_schema::DataType::RunEndEncoded(_, _)
2130 ),
2131 "_file column (duplicate) should use RunEndEncoded type"
2132 );
2133 }
2134
2135 #[tokio::test]
2136 async fn test_scan_deadlock() {
2137 let mut fixture = TableTestFixture::new();
2138 fixture.setup_deadlock_manifests().await;
2139
2140 let table_scan = fixture
2147 .table
2148 .scan()
2149 .with_concurrency_limit(1)
2150 .build()
2151 .unwrap();
2152
2153 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2156 table_scan
2157 .plan_files()
2158 .await
2159 .unwrap()
2160 .try_collect::<Vec<_>>()
2161 .await
2162 })
2163 .await;
2164
2165 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2167 }
2168}