1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, NameMapping, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50pub struct TableScanBuilder<'a> {
52 table: &'a Table,
53 column_names: Option<Vec<String>>,
55 snapshot_id: Option<i64>,
56 batch_size: Option<usize>,
57 case_sensitive: bool,
58 filter: Option<Predicate>,
59 concurrency_limit_data_files: usize,
60 concurrency_limit_manifest_entries: usize,
61 concurrency_limit_manifest_files: usize,
62 row_group_filtering_enabled: bool,
63 row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67 pub(crate) fn new(table: &'a Table) -> Self {
68 let num_cpus = available_parallelism().get();
69
70 Self {
71 table,
72 column_names: None,
73 snapshot_id: None,
74 batch_size: None,
75 case_sensitive: true,
76 filter: None,
77 concurrency_limit_data_files: num_cpus,
78 concurrency_limit_manifest_entries: num_cpus,
79 concurrency_limit_manifest_files: num_cpus,
80 row_group_filtering_enabled: true,
81 row_selection_enabled: false,
82 }
83 }
84
85 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88 self.batch_size = batch_size;
89 self
90 }
91
92 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94 self.case_sensitive = case_sensitive;
95 self
96 }
97
98 pub fn with_filter(mut self, predicate: Predicate) -> Self {
100 self.filter = Some(predicate.rewrite_not());
103 self
104 }
105
106 pub fn select_all(mut self) -> Self {
108 self.column_names = None;
109 self
110 }
111
112 pub fn select_empty(mut self) -> Self {
114 self.column_names = Some(vec![]);
115 self
116 }
117
118 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120 self.column_names = Some(
121 column_names
122 .into_iter()
123 .map(|item| item.to_string())
124 .collect(),
125 );
126 self
127 }
128
129 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131 self.snapshot_id = Some(snapshot_id);
132 self
133 }
134
135 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138 self.concurrency_limit_manifest_files = limit;
139 self.concurrency_limit_manifest_entries = limit;
140 self.concurrency_limit_data_files = limit;
141 self
142 }
143
144 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146 self.concurrency_limit_data_files = limit;
147 self
148 }
149
150 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152 self.concurrency_limit_manifest_entries = limit;
153 self
154 }
155
156 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165 self.row_group_filtering_enabled = row_group_filtering_enabled;
166 self
167 }
168
169 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184 self.row_selection_enabled = row_selection_enabled;
185 self
186 }
187
188 pub fn build(self) -> Result<TableScan> {
190 let snapshot = match self.snapshot_id {
191 Some(snapshot_id) => self
192 .table
193 .metadata()
194 .snapshot_by_id(snapshot_id)
195 .ok_or_else(|| {
196 Error::new(
197 ErrorKind::DataInvalid,
198 format!("Snapshot with id {snapshot_id} not found"),
199 )
200 })?
201 .clone(),
202 None => {
203 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204 return Ok(TableScan {
205 batch_size: self.batch_size,
206 column_names: self.column_names,
207 file_io: self.table.file_io().clone(),
208 plan_context: None,
209 concurrency_limit_data_files: self.concurrency_limit_data_files,
210 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212 row_group_filtering_enabled: self.row_group_filtering_enabled,
213 row_selection_enabled: self.row_selection_enabled,
214 runtime: self.table.runtime().clone(),
215 });
216 };
217 current_snapshot_id.clone()
218 }
219 };
220
221 let schema = snapshot.schema(self.table.metadata())?;
222
223 if let Some(column_names) = self.column_names.as_ref() {
225 for column_name in column_names {
226 if is_metadata_column_name(column_name) {
228 continue;
229 }
230 if schema.field_by_name(column_name).is_none() {
231 return Err(Error::new(
232 ErrorKind::DataInvalid,
233 format!("Column {column_name} not found in table. Schema: {schema}"),
234 ));
235 }
236 }
237 }
238
239 let mut field_ids = vec![];
240 let column_names = self.column_names.clone().unwrap_or_else(|| {
241 schema
242 .as_struct()
243 .fields()
244 .iter()
245 .map(|f| f.name.clone())
246 .collect()
247 });
248
249 for column_name in column_names.iter() {
250 if is_metadata_column_name(column_name) {
252 field_ids.push(get_metadata_field_id(column_name)?);
253 continue;
254 }
255
256 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257 Error::new(
258 ErrorKind::DataInvalid,
259 format!("Column {column_name} not found in table. Schema: {schema}"),
260 )
261 })?;
262
263 schema
264 .as_struct()
265 .field_by_id(field_id)
266 .ok_or_else(|| {
267 Error::new(
268 ErrorKind::FeatureUnsupported,
269 format!(
270 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271 ),
272 )
273 })?;
274
275 field_ids.push(field_id);
276 }
277
278 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279 Some(predicates.bind(schema.clone(), true)?)
280 } else {
281 None
282 };
283
284 let name_mapping = self
285 .table
286 .metadata()
287 .properties()
288 .get(DEFAULT_SCHEMA_NAME_MAPPING)
289 .map(|raw| {
290 serde_json::from_str::<NameMapping>(raw).map_err(|e| {
291 Error::new(
292 ErrorKind::DataInvalid,
293 format!(
294 "Failed to parse table property {DEFAULT_SCHEMA_NAME_MAPPING} as a NameMapping"
295 ),
296 )
297 .with_source(e)
298 })
299 })
300 .transpose()?
301 .map(Arc::new);
302
303 let plan_context = PlanContext {
304 snapshot,
305 table_metadata: self.table.metadata_ref(),
306 snapshot_schema: schema,
307 case_sensitive: self.case_sensitive,
308 predicate: self.filter.map(Arc::new),
309 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
310 object_cache: self.table.object_cache(),
311 field_ids: Arc::new(field_ids),
312 name_mapping,
313 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
314 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
315 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
316 };
317
318 Ok(TableScan {
319 batch_size: self.batch_size,
320 column_names: self.column_names,
321 file_io: self.table.file_io().clone(),
322 plan_context: Some(plan_context),
323 concurrency_limit_data_files: self.concurrency_limit_data_files,
324 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
325 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
326 row_group_filtering_enabled: self.row_group_filtering_enabled,
327 row_selection_enabled: self.row_selection_enabled,
328 runtime: self.table.runtime().clone(),
329 })
330 }
331}
332
333#[derive(Debug)]
335pub struct TableScan {
336 plan_context: Option<PlanContext>,
340 batch_size: Option<usize>,
341 file_io: FileIO,
342 column_names: Option<Vec<String>>,
343 concurrency_limit_manifest_files: usize,
346
347 concurrency_limit_manifest_entries: usize,
350
351 concurrency_limit_data_files: usize,
354
355 row_group_filtering_enabled: bool,
356 row_selection_enabled: bool,
357
358 runtime: Runtime,
359}
360
361impl TableScan {
362 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
364 let Some(plan_context) = self.plan_context.as_ref() else {
365 return Ok(Box::pin(futures::stream::empty()));
366 };
367
368 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
369 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
370
371 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
373 channel(concurrency_limit_manifest_files);
374 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
375 channel(concurrency_limit_manifest_files);
376
377 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
379
380 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
381
382 let manifest_list = plan_context.get_manifest_list().await?;
383
384 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
388 manifest_list,
389 manifest_entry_data_ctx_tx,
390 delete_file_idx.clone(),
391 manifest_entry_delete_ctx_tx,
392 )?;
393
394 let mut channel_for_manifest_error = file_scan_task_tx.clone();
395 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
396 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
397
398 let rt = self.runtime.clone();
399
400 rt.io().spawn(async move {
402 let result = futures::stream::iter(manifest_file_contexts)
403 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
404 ctx.fetch_manifest_and_stream_manifest_entries().await
405 })
406 .await;
407
408 if let Err(error) = result {
409 let _ = channel_for_manifest_error.send(Err(error)).await;
410 }
411 });
412
413 {
415 let rt = rt.clone();
416 let rt_inner = rt.clone();
417 rt.cpu().spawn(async move {
418 let result = manifest_entry_delete_ctx_rx
419 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
420 .try_for_each_concurrent(
421 concurrency_limit_manifest_entries,
422 |(manifest_entry_context, tx)| {
423 let rt_inner = rt_inner.clone();
424 async move {
425 rt_inner
426 .cpu()
427 .spawn(async move {
428 Self::process_delete_manifest_entry(
429 manifest_entry_context,
430 tx,
431 )
432 .await
433 })
434 .await?
435 }
436 },
437 )
438 .await;
439
440 if let Err(error) = result {
441 let _ = channel_for_delete_manifest_entry_error
442 .send(Err(error))
443 .await;
444 }
445 });
446 }
447
448 {
450 let rt_inner = rt.clone();
451 rt.cpu().spawn(async move {
452 let result = manifest_entry_data_ctx_rx
453 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
454 .try_for_each_concurrent(
455 concurrency_limit_manifest_entries,
456 |(manifest_entry_context, tx)| {
457 let rt_inner = rt_inner.clone();
458 async move {
459 rt_inner
460 .cpu()
461 .spawn(async move {
462 Self::process_data_manifest_entry(
463 manifest_entry_context,
464 tx,
465 )
466 .await
467 })
468 .await?
469 }
470 },
471 )
472 .await;
473
474 if let Err(error) = result {
475 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
476 }
477 });
478 }
479
480 Ok(file_scan_task_rx.boxed())
481 }
482
483 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
485 let mut arrow_reader_builder =
486 ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
487 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
488 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
489 .with_row_selection_enabled(self.row_selection_enabled);
490
491 if let Some(batch_size) = self.batch_size {
492 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
493 }
494
495 arrow_reader_builder
496 .build()
497 .read(self.plan_files().await?)
498 .map(|result| result.stream())
499 }
500
501 pub fn column_names(&self) -> Option<&[String]> {
503 self.column_names.as_deref()
504 }
505
506 pub fn snapshot(&self) -> Option<&SnapshotRef> {
508 self.plan_context.as_ref().map(|x| &x.snapshot)
509 }
510
511 async fn process_data_manifest_entry(
512 manifest_entry_context: ManifestEntryContext,
513 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
514 ) -> Result<()> {
515 if !manifest_entry_context.manifest_entry.is_alive() {
517 return Ok(());
518 }
519
520 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
522 return Err(Error::new(
523 ErrorKind::FeatureUnsupported,
524 "Encountered an entry for a delete file in a data file manifest",
525 ));
526 }
527
528 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
529 let BoundPredicates {
530 snapshot_bound_predicate,
531 partition_bound_predicate,
532 } = bound_predicates.as_ref();
533
534 let expression_evaluator_cache =
535 manifest_entry_context.expression_evaluator_cache.as_ref();
536
537 let expression_evaluator = expression_evaluator_cache.get(
538 manifest_entry_context.partition_spec_id,
539 partition_bound_predicate,
540 )?;
541
542 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
545 return Ok(());
546 }
547
548 if !InclusiveMetricsEvaluator::eval(
550 snapshot_bound_predicate,
551 manifest_entry_context.manifest_entry.data_file(),
552 false,
553 )? {
554 return Ok(());
555 }
556 }
557
558 file_scan_task_tx
562 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
563 .await?;
564
565 Ok(())
566 }
567
568 async fn process_delete_manifest_entry(
569 manifest_entry_context: ManifestEntryContext,
570 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
571 ) -> Result<()> {
572 if !manifest_entry_context.manifest_entry.is_alive() {
574 return Ok(());
575 }
576
577 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
579 return Err(Error::new(
580 ErrorKind::FeatureUnsupported,
581 "Encountered an entry for a data file in a delete manifest",
582 ));
583 }
584
585 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
586 let expression_evaluator_cache =
587 manifest_entry_context.expression_evaluator_cache.as_ref();
588
589 let expression_evaluator = expression_evaluator_cache.get(
590 manifest_entry_context.partition_spec_id,
591 &bound_predicates.partition_bound_predicate,
592 )?;
593
594 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
597 return Ok(());
598 }
599 }
600
601 delete_file_ctx_tx
602 .send(DeleteFileContext {
603 manifest_entry: manifest_entry_context.manifest_entry.clone(),
604 partition_spec_id: manifest_entry_context.partition_spec_id,
605 })
606 .await?;
607
608 Ok(())
609 }
610}
611
612pub(crate) struct BoundPredicates {
613 partition_bound_predicate: BoundPredicate,
614 snapshot_bound_predicate: BoundPredicate,
615}
616
617#[cfg(test)]
618pub mod tests {
619 #![allow(missing_docs)]
621
622 use std::collections::HashMap;
623 use std::fs;
624 use std::fs::File;
625 use std::sync::Arc;
626
627 use arrow_array::cast::AsArray;
628 use arrow_array::{
629 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
630 StringArray,
631 };
632 use futures::{TryStreamExt, stream};
633 use minijinja::value::Value;
634 use minijinja::{AutoEscape, Environment, context};
635 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
636 use parquet::basic::Compression;
637 use parquet::file::properties::WriterProperties;
638 use tempfile::TempDir;
639 use uuid::Uuid;
640
641 use crate::arrow::ArrowReaderBuilder;
642 use crate::expr::{BoundPredicate, Reference};
643 use crate::io::{FileIO, OutputFile};
644 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
645 use crate::scan::FileScanTask;
646 use crate::spec::{
647 DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, DataFileBuilder, DataFileFormat, Datum,
648 Literal, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder,
649 NestedField, PartitionSpec, PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
650 };
651 use crate::table::Table;
652 use crate::test_utils::test_runtime;
653 use crate::{ErrorKind, TableIdent};
654
655 fn render_template(template: &str, ctx: Value) -> String {
656 let mut env = Environment::new();
657 env.set_auto_escape_callback(|_| AutoEscape::None);
658 env.render_str(template, ctx).unwrap()
659 }
660
661 pub struct TableTestFixture {
662 pub table_location: String,
663 pub table: Table,
664 }
665
666 impl TableTestFixture {
667 #[allow(clippy::new_without_default)]
668 pub fn new() -> Self {
669 let tmp_dir = TempDir::new().unwrap();
670 let table_location = tmp_dir.path().join("table1");
671 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
672 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
673 let table_metadata1_location = table_location.join("metadata/v1.json");
674
675 let file_io = FileIO::new_with_fs();
676
677 let table_metadata = {
678 let template_json_str = fs::read_to_string(format!(
679 "{}/testdata/example_table_metadata_v2.json",
680 env!("CARGO_MANIFEST_DIR")
681 ))
682 .unwrap();
683 let metadata_json = render_template(&template_json_str, context! {
684 table_location => &table_location,
685 manifest_list_1_location => &manifest_list1_location,
686 manifest_list_2_location => &manifest_list2_location,
687 table_metadata_1_location => &table_metadata1_location,
688 });
689 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
690 };
691
692 let table = Table::builder()
693 .metadata(table_metadata)
694 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
695 .file_io(file_io.clone())
696 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
697 .runtime(test_runtime())
698 .build()
699 .unwrap();
700
701 Self {
702 table_location: table_location.to_str().unwrap().to_string(),
703 table,
704 }
705 }
706
707 #[allow(clippy::new_without_default)]
708 pub fn new_empty() -> Self {
709 let tmp_dir = TempDir::new().unwrap();
710 let table_location = tmp_dir.path().join("table1");
711 let table_metadata1_location = table_location.join("metadata/v1.json");
712
713 let file_io = FileIO::new_with_fs();
714
715 let table_metadata = {
716 let template_json_str = fs::read_to_string(format!(
717 "{}/testdata/example_empty_table_metadata_v2.json",
718 env!("CARGO_MANIFEST_DIR")
719 ))
720 .unwrap();
721 let metadata_json = render_template(&template_json_str, context! {
722 table_location => &table_location,
723 table_metadata_1_location => &table_metadata1_location,
724 });
725 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
726 };
727
728 let table = Table::builder()
729 .metadata(table_metadata)
730 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
731 .file_io(file_io.clone())
732 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
733 .runtime(test_runtime())
734 .build()
735 .unwrap();
736
737 Self {
738 table_location: table_location.to_str().unwrap().to_string(),
739 table,
740 }
741 }
742
743 pub fn new_with_deep_history() -> Self {
747 let tmp_dir = TempDir::new().unwrap();
748 let table_location = tmp_dir.path().join("table1");
749 let table_metadata1_location = table_location.join("metadata/v1.json");
750
751 let file_io = FileIO::new_with_fs();
752
753 let table_metadata = {
754 let json_str = fs::read_to_string(format!(
755 "{}/testdata/example_table_metadata_v2_deep_history.json",
756 env!("CARGO_MANIFEST_DIR")
757 ))
758 .unwrap();
759 serde_json::from_str::<TableMetadata>(&json_str).unwrap()
760 };
761
762 let table = Table::builder()
763 .metadata(table_metadata)
764 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
765 .file_io(file_io.clone())
766 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
767 .runtime(test_runtime())
768 .build()
769 .unwrap();
770
771 Self {
772 table_location: table_location.to_str().unwrap().to_string(),
773 table,
774 }
775 }
776
777 pub fn new_unpartitioned() -> Self {
778 let tmp_dir = TempDir::new().unwrap();
779 let table_location = tmp_dir.path().join("table1");
780 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
781 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
782 let table_metadata1_location = table_location.join("metadata/v1.json");
783
784 let file_io = FileIO::new_with_fs();
785
786 let mut table_metadata = {
787 let template_json_str = fs::read_to_string(format!(
788 "{}/testdata/example_table_metadata_v2.json",
789 env!("CARGO_MANIFEST_DIR")
790 ))
791 .unwrap();
792 let metadata_json = render_template(&template_json_str, context! {
793 table_location => &table_location,
794 manifest_list_1_location => &manifest_list1_location,
795 manifest_list_2_location => &manifest_list2_location,
796 table_metadata_1_location => &table_metadata1_location,
797 });
798 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
799 };
800
801 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
802 table_metadata.partition_specs.clear();
803 table_metadata.default_partition_type = StructType::new(vec![]);
804 table_metadata
805 .partition_specs
806 .insert(0, table_metadata.default_spec.clone());
807
808 let table = Table::builder()
809 .metadata(table_metadata)
810 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
811 .file_io(file_io.clone())
812 .metadata_location(table_metadata1_location.to_str().unwrap())
813 .runtime(test_runtime())
814 .build()
815 .unwrap();
816
817 Self {
818 table_location: table_location.to_str().unwrap().to_string(),
819 table,
820 }
821 }
822
823 fn next_manifest_file(&self) -> OutputFile {
824 self.table
825 .file_io()
826 .new_output(format!(
827 "{}/metadata/manifest_{}.avro",
828 self.table_location,
829 Uuid::new_v4()
830 ))
831 .unwrap()
832 }
833
834 pub async fn setup_manifest_files(&mut self) {
835 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
836 let parent_snapshot = current_snapshot
837 .parent_snapshot(self.table.metadata())
838 .unwrap();
839 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
840 let current_partition_spec = self.table.metadata().default_partition_spec();
841
842 let parquet_file_size = self.write_parquet_data_files();
844
845 let mut writer = ManifestWriterBuilder::new(
846 self.next_manifest_file(),
847 Some(current_snapshot.snapshot_id()),
848 None,
849 current_schema.clone(),
850 current_partition_spec.as_ref().clone(),
851 )
852 .build_v2_data();
853 writer
854 .add_entry(
855 ManifestEntry::builder()
856 .status(ManifestStatus::Added)
857 .data_file(
858 DataFileBuilder::default()
859 .partition_spec_id(0)
860 .content(DataContentType::Data)
861 .file_path(format!("{}/1.parquet", &self.table_location))
862 .file_format(DataFileFormat::Parquet)
863 .file_size_in_bytes(parquet_file_size)
864 .record_count(1)
865 .partition(Struct::from_iter([Some(Literal::long(100))]))
866 .key_metadata(None)
867 .build()
868 .unwrap(),
869 )
870 .build(),
871 )
872 .unwrap();
873 writer
874 .add_delete_entry(
875 ManifestEntry::builder()
876 .status(ManifestStatus::Deleted)
877 .snapshot_id(parent_snapshot.snapshot_id())
878 .sequence_number(parent_snapshot.sequence_number())
879 .file_sequence_number(parent_snapshot.sequence_number())
880 .data_file(
881 DataFileBuilder::default()
882 .partition_spec_id(0)
883 .content(DataContentType::Data)
884 .file_path(format!("{}/2.parquet", &self.table_location))
885 .file_format(DataFileFormat::Parquet)
886 .file_size_in_bytes(parquet_file_size)
887 .record_count(1)
888 .partition(Struct::from_iter([Some(Literal::long(200))]))
889 .build()
890 .unwrap(),
891 )
892 .build(),
893 )
894 .unwrap();
895 writer
896 .add_existing_entry(
897 ManifestEntry::builder()
898 .status(ManifestStatus::Existing)
899 .snapshot_id(parent_snapshot.snapshot_id())
900 .sequence_number(parent_snapshot.sequence_number())
901 .file_sequence_number(parent_snapshot.sequence_number())
902 .data_file(
903 DataFileBuilder::default()
904 .partition_spec_id(0)
905 .content(DataContentType::Data)
906 .file_path(format!("{}/3.parquet", &self.table_location))
907 .file_format(DataFileFormat::Parquet)
908 .file_size_in_bytes(parquet_file_size)
909 .record_count(1)
910 .partition(Struct::from_iter([Some(Literal::long(300))]))
911 .build()
912 .unwrap(),
913 )
914 .build(),
915 )
916 .unwrap();
917 let data_file_manifest = writer.write_manifest_file().await.unwrap();
918
919 let manifest_list_writer = self
921 .table
922 .file_io()
923 .new_output(current_snapshot.manifest_list())
924 .unwrap()
925 .writer()
926 .await
927 .unwrap();
928 let mut manifest_list_write = ManifestListWriter::v2(
929 manifest_list_writer,
930 current_snapshot.snapshot_id(),
931 current_snapshot.parent_snapshot_id(),
932 current_snapshot.sequence_number(),
933 );
934 manifest_list_write
935 .add_manifests(vec![data_file_manifest].into_iter())
936 .unwrap();
937 manifest_list_write.close().await.unwrap();
938 }
939
940 fn write_parquet_data_files(&self) -> u64 {
943 std::fs::create_dir_all(&self.table_location).unwrap();
944
945 let schema = {
946 let fields = vec![
947 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
948 .with_metadata(HashMap::from([(
949 PARQUET_FIELD_ID_META_KEY.to_string(),
950 "1".to_string(),
951 )])),
952 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
953 .with_metadata(HashMap::from([(
954 PARQUET_FIELD_ID_META_KEY.to_string(),
955 "2".to_string(),
956 )])),
957 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
958 .with_metadata(HashMap::from([(
959 PARQUET_FIELD_ID_META_KEY.to_string(),
960 "3".to_string(),
961 )])),
962 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
963 .with_metadata(HashMap::from([(
964 PARQUET_FIELD_ID_META_KEY.to_string(),
965 "4".to_string(),
966 )])),
967 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
968 .with_metadata(HashMap::from([(
969 PARQUET_FIELD_ID_META_KEY.to_string(),
970 "5".to_string(),
971 )])),
972 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
973 .with_metadata(HashMap::from([(
974 PARQUET_FIELD_ID_META_KEY.to_string(),
975 "6".to_string(),
976 )])),
977 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
978 .with_metadata(HashMap::from([(
979 PARQUET_FIELD_ID_META_KEY.to_string(),
980 "7".to_string(),
981 )])),
982 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
983 .with_metadata(HashMap::from([(
984 PARQUET_FIELD_ID_META_KEY.to_string(),
985 "8".to_string(),
986 )])),
987 ];
988 Arc::new(arrow_schema::Schema::new(fields))
989 };
990 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
992
993 let mut values = vec![2; 512];
994 values.append(vec![3; 200].as_mut());
995 values.append(vec![4; 300].as_mut());
996 values.append(vec![5; 12].as_mut());
997
998 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1000
1001 let mut values = vec![3; 512];
1002 values.append(vec![4; 512].as_mut());
1003
1004 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1006
1007 let mut values = vec!["Apache"; 512];
1009 values.append(vec!["Iceberg"; 512].as_mut());
1010 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1011
1012 let mut values = vec![100.0f64; 512];
1014 values.append(vec![150.0f64; 12].as_mut());
1015 values.append(vec![200.0f64; 500].as_mut());
1016 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1017
1018 let mut values = vec![100i32; 512];
1020 values.append(vec![150i32; 12].as_mut());
1021 values.append(vec![200i32; 500].as_mut());
1022 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1023
1024 let mut values = vec![100i64; 512];
1026 values.append(vec![150i64; 12].as_mut());
1027 values.append(vec![200i64; 500].as_mut());
1028 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1029
1030 let mut values = vec![false; 512];
1032 values.append(vec![true; 512].as_mut());
1033 let values: BooleanArray = values.into();
1034 let col8 = Arc::new(values) as ArrayRef;
1035
1036 let to_write = RecordBatch::try_new(schema.clone(), vec![
1037 col1, col2, col3, col4, col5, col6, col7, col8,
1038 ])
1039 .unwrap();
1040
1041 let props = WriterProperties::builder()
1043 .set_compression(Compression::SNAPPY)
1044 .build();
1045
1046 for n in 1..=3 {
1047 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1048 let mut writer =
1049 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1050
1051 writer.write(&to_write).expect("Writing batch");
1052
1053 writer.close().unwrap();
1055 }
1056
1057 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1058 .unwrap()
1059 .len()
1060 }
1061
1062 pub async fn setup_unpartitioned_manifest_files(&mut self) {
1063 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1064 let parent_snapshot = current_snapshot
1065 .parent_snapshot(self.table.metadata())
1066 .unwrap();
1067 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1068 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1069
1070 let parquet_file_size = self.write_parquet_data_files();
1072
1073 let mut writer = ManifestWriterBuilder::new(
1075 self.next_manifest_file(),
1076 Some(current_snapshot.snapshot_id()),
1077 None,
1078 current_schema.clone(),
1079 current_partition_spec.as_ref().clone(),
1080 )
1081 .build_v2_data();
1082
1083 let empty_partition = Struct::empty();
1085
1086 writer
1087 .add_entry(
1088 ManifestEntry::builder()
1089 .status(ManifestStatus::Added)
1090 .data_file(
1091 DataFileBuilder::default()
1092 .partition_spec_id(0)
1093 .content(DataContentType::Data)
1094 .file_path(format!("{}/1.parquet", &self.table_location))
1095 .file_format(DataFileFormat::Parquet)
1096 .file_size_in_bytes(parquet_file_size)
1097 .record_count(1)
1098 .partition(empty_partition.clone())
1099 .key_metadata(None)
1100 .build()
1101 .unwrap(),
1102 )
1103 .build(),
1104 )
1105 .unwrap();
1106
1107 writer
1108 .add_delete_entry(
1109 ManifestEntry::builder()
1110 .status(ManifestStatus::Deleted)
1111 .snapshot_id(parent_snapshot.snapshot_id())
1112 .sequence_number(parent_snapshot.sequence_number())
1113 .file_sequence_number(parent_snapshot.sequence_number())
1114 .data_file(
1115 DataFileBuilder::default()
1116 .partition_spec_id(0)
1117 .content(DataContentType::Data)
1118 .file_path(format!("{}/2.parquet", &self.table_location))
1119 .file_format(DataFileFormat::Parquet)
1120 .file_size_in_bytes(parquet_file_size)
1121 .record_count(1)
1122 .partition(empty_partition.clone())
1123 .build()
1124 .unwrap(),
1125 )
1126 .build(),
1127 )
1128 .unwrap();
1129
1130 writer
1131 .add_existing_entry(
1132 ManifestEntry::builder()
1133 .status(ManifestStatus::Existing)
1134 .snapshot_id(parent_snapshot.snapshot_id())
1135 .sequence_number(parent_snapshot.sequence_number())
1136 .file_sequence_number(parent_snapshot.sequence_number())
1137 .data_file(
1138 DataFileBuilder::default()
1139 .partition_spec_id(0)
1140 .content(DataContentType::Data)
1141 .file_path(format!("{}/3.parquet", &self.table_location))
1142 .file_format(DataFileFormat::Parquet)
1143 .file_size_in_bytes(parquet_file_size)
1144 .record_count(1)
1145 .partition(empty_partition.clone())
1146 .build()
1147 .unwrap(),
1148 )
1149 .build(),
1150 )
1151 .unwrap();
1152
1153 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1154
1155 let manifest_list_writer = self
1157 .table
1158 .file_io()
1159 .new_output(current_snapshot.manifest_list())
1160 .unwrap()
1161 .writer()
1162 .await
1163 .unwrap();
1164 let mut manifest_list_write = ManifestListWriter::v2(
1165 manifest_list_writer,
1166 current_snapshot.snapshot_id(),
1167 current_snapshot.parent_snapshot_id(),
1168 current_snapshot.sequence_number(),
1169 );
1170 manifest_list_write
1171 .add_manifests(vec![data_file_manifest].into_iter())
1172 .unwrap();
1173 manifest_list_write.close().await.unwrap();
1174 }
1175
1176 pub async fn setup_deadlock_manifests(&mut self) {
1177 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1178 let _parent_snapshot = current_snapshot
1179 .parent_snapshot(self.table.metadata())
1180 .unwrap();
1181 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1182 let current_partition_spec = self.table.metadata().default_partition_spec();
1183
1184 let mut writer = ManifestWriterBuilder::new(
1186 self.next_manifest_file(),
1187 Some(current_snapshot.snapshot_id()),
1188 None,
1189 current_schema.clone(),
1190 current_partition_spec.as_ref().clone(),
1191 )
1192 .build_v2_data();
1193
1194 for i in 0..10 {
1196 writer
1197 .add_entry(
1198 ManifestEntry::builder()
1199 .status(ManifestStatus::Added)
1200 .data_file(
1201 DataFileBuilder::default()
1202 .partition_spec_id(0)
1203 .content(DataContentType::Data)
1204 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1205 .file_format(DataFileFormat::Parquet)
1206 .file_size_in_bytes(100)
1207 .record_count(1)
1208 .partition(Struct::from_iter([Some(Literal::long(100))]))
1209 .key_metadata(None)
1210 .build()
1211 .unwrap(),
1212 )
1213 .build(),
1214 )
1215 .unwrap();
1216 }
1217 let data_manifest = writer.write_manifest_file().await.unwrap();
1218
1219 let mut writer = ManifestWriterBuilder::new(
1221 self.next_manifest_file(),
1222 Some(current_snapshot.snapshot_id()),
1223 None,
1224 current_schema.clone(),
1225 current_partition_spec.as_ref().clone(),
1226 )
1227 .build_v2_deletes();
1228
1229 writer
1230 .add_entry(
1231 ManifestEntry::builder()
1232 .status(ManifestStatus::Added)
1233 .data_file(
1234 DataFileBuilder::default()
1235 .partition_spec_id(0)
1236 .content(DataContentType::PositionDeletes)
1237 .file_path(format!("{}/del.parquet", &self.table_location))
1238 .file_format(DataFileFormat::Parquet)
1239 .file_size_in_bytes(100)
1240 .record_count(1)
1241 .partition(Struct::from_iter([Some(Literal::long(100))]))
1242 .build()
1243 .unwrap(),
1244 )
1245 .build(),
1246 )
1247 .unwrap();
1248 let delete_manifest = writer.write_manifest_file().await.unwrap();
1249
1250 let manifest_list_writer = self
1253 .table
1254 .file_io()
1255 .new_output(current_snapshot.manifest_list())
1256 .unwrap()
1257 .writer()
1258 .await
1259 .unwrap();
1260 let mut manifest_list_write = ManifestListWriter::v2(
1261 manifest_list_writer,
1262 current_snapshot.snapshot_id(),
1263 current_snapshot.parent_snapshot_id(),
1264 current_snapshot.sequence_number(),
1265 );
1266 manifest_list_write
1267 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1268 .unwrap();
1269 manifest_list_write.close().await.unwrap();
1270 }
1271 }
1272
1273 #[tokio::test]
1274 async fn test_table_scan_columns() {
1275 let table = TableTestFixture::new().table;
1276
1277 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1278 assert_eq!(
1279 Some(vec!["x".to_string(), "y".to_string()]),
1280 table_scan.column_names
1281 );
1282
1283 let table_scan = table
1284 .scan()
1285 .select(["x", "y"])
1286 .select(["z"])
1287 .build()
1288 .unwrap();
1289 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1290 }
1291
1292 #[tokio::test]
1293 async fn test_select_all() {
1294 let table = TableTestFixture::new().table;
1295
1296 let table_scan = table.scan().select_all().build().unwrap();
1297 assert!(table_scan.column_names.is_none());
1298 }
1299
1300 #[test]
1301 fn test_select_no_exist_column() {
1302 let table = TableTestFixture::new().table;
1303
1304 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1305 assert!(table_scan.is_err());
1306 }
1307
1308 #[tokio::test]
1309 async fn test_table_scan_default_snapshot_id() {
1310 let table = TableTestFixture::new().table;
1311
1312 let table_scan = table.scan().build().unwrap();
1313 assert_eq!(
1314 table.metadata().current_snapshot().unwrap().snapshot_id(),
1315 table_scan.snapshot().unwrap().snapshot_id()
1316 );
1317 }
1318
1319 #[test]
1320 fn test_table_scan_non_exist_snapshot_id() {
1321 let table = TableTestFixture::new().table;
1322
1323 let table_scan = table.scan().snapshot_id(1024).build();
1324 assert!(table_scan.is_err());
1325 }
1326
1327 #[tokio::test]
1328 async fn test_table_scan_with_snapshot_id() {
1329 let table = TableTestFixture::new().table;
1330
1331 let table_scan = table
1332 .scan()
1333 .snapshot_id(3051729675574597004)
1334 .with_row_selection_enabled(true)
1335 .build()
1336 .unwrap();
1337 assert_eq!(
1338 table_scan.snapshot().unwrap().snapshot_id(),
1339 3051729675574597004
1340 );
1341 }
1342
1343 fn table_with_property(key: &str, value: &str) -> Table {
1344 let fixture = TableTestFixture::new();
1345 let mut metadata = fixture.table.metadata().clone();
1346 metadata
1347 .properties
1348 .insert(key.to_string(), value.to_string());
1349 Table::builder()
1350 .metadata(metadata)
1351 .identifier(fixture.table.identifier().clone())
1352 .file_io(fixture.table.file_io().clone())
1353 .metadata_location(fixture.table.metadata_location().unwrap().to_string())
1354 .runtime(test_runtime())
1355 .build()
1356 .unwrap()
1357 }
1358
1359 #[test]
1360 fn test_table_scan_without_name_mapping_property() {
1361 let table = TableTestFixture::new().table;
1362
1363 let table_scan = table.scan().build().unwrap();
1364 assert!(
1365 table_scan
1366 .plan_context
1367 .as_ref()
1368 .unwrap()
1369 .name_mapping
1370 .is_none()
1371 );
1372 }
1373
1374 #[test]
1375 fn test_table_scan_with_name_mapping_property() {
1376 let mapping_json = r#"[{"field-id":1,"names":["id","record_id"]}]"#;
1377 let table = table_with_property(DEFAULT_SCHEMA_NAME_MAPPING, mapping_json);
1378
1379 let table_scan = table.scan().build().unwrap();
1380 let mapping = table_scan
1381 .plan_context
1382 .as_ref()
1383 .unwrap()
1384 .name_mapping
1385 .as_ref()
1386 .expect("name_mapping should be parsed from the table property");
1387 let fields = mapping.fields();
1388 assert_eq!(fields.len(), 1);
1389 assert_eq!(fields[0].field_id(), Some(1));
1390 assert_eq!(fields[0].names(), &[
1391 "id".to_string(),
1392 "record_id".to_string()
1393 ]);
1394 }
1395
1396 #[test]
1397 fn test_table_scan_with_malformed_name_mapping_property() {
1398 let table = table_with_property(DEFAULT_SCHEMA_NAME_MAPPING, "{ not valid json");
1399
1400 let err = table
1401 .scan()
1402 .build()
1403 .expect_err("malformed name mapping should fail to parse");
1404 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1405 }
1406
1407 #[tokio::test]
1408 async fn test_plan_files_carries_name_mapping_into_file_scan_task() {
1409 let mut fixture = TableTestFixture::new();
1410 fixture.setup_manifest_files().await;
1411
1412 let mapping_json = r#"[{"field-id":1,"names":["id","record_id"]}]"#;
1413 let mut metadata = fixture.table.metadata().clone();
1414 metadata.properties.insert(
1415 DEFAULT_SCHEMA_NAME_MAPPING.to_string(),
1416 mapping_json.to_string(),
1417 );
1418 let table = Table::builder()
1419 .metadata(metadata)
1420 .identifier(fixture.table.identifier().clone())
1421 .file_io(fixture.table.file_io().clone())
1422 .metadata_location(fixture.table.metadata_location().unwrap().to_string())
1423 .runtime(test_runtime())
1424 .build()
1425 .unwrap();
1426
1427 let tasks: Vec<_> = table
1428 .scan()
1429 .build()
1430 .unwrap()
1431 .plan_files()
1432 .await
1433 .unwrap()
1434 .try_collect()
1435 .await
1436 .unwrap();
1437
1438 assert!(!tasks.is_empty(), "expected at least one FileScanTask");
1439 for task in &tasks {
1440 let mapping = task
1441 .name_mapping
1442 .as_ref()
1443 .expect("name_mapping should reach the FileScanTask");
1444 assert_eq!(mapping.fields().len(), 1);
1445 assert_eq!(mapping.fields()[0].field_id(), Some(1));
1446 }
1447 }
1448
1449 #[tokio::test]
1450 async fn test_plan_files_on_table_without_any_snapshots() {
1451 let table = TableTestFixture::new_empty().table;
1452 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1453 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1454 assert!(batches.is_empty());
1455 }
1456
1457 #[tokio::test]
1458 async fn test_plan_files_no_deletions() {
1459 let mut fixture = TableTestFixture::new();
1460 fixture.setup_manifest_files().await;
1461
1462 let table_scan = fixture
1464 .table
1465 .scan()
1466 .with_row_selection_enabled(true)
1467 .build()
1468 .unwrap();
1469
1470 let mut tasks = table_scan
1471 .plan_files()
1472 .await
1473 .unwrap()
1474 .try_fold(vec![], |mut acc, task| async move {
1475 acc.push(task);
1476 Ok(acc)
1477 })
1478 .await
1479 .unwrap();
1480
1481 assert_eq!(tasks.len(), 2);
1482
1483 tasks.sort_by_key(|t| t.data_file_path.to_string());
1484
1485 assert_eq!(
1487 tasks[0].data_file_path,
1488 format!("{}/1.parquet", &fixture.table_location)
1489 );
1490
1491 assert_eq!(
1493 tasks[1].data_file_path,
1494 format!("{}/3.parquet", &fixture.table_location)
1495 );
1496 }
1497
1498 #[tokio::test]
1499 async fn test_open_parquet_no_deletions() {
1500 let mut fixture = TableTestFixture::new();
1501 fixture.setup_manifest_files().await;
1502
1503 let table_scan = fixture
1505 .table
1506 .scan()
1507 .with_row_selection_enabled(true)
1508 .build()
1509 .unwrap();
1510
1511 let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515 let col = batches[0].column_by_name("x").unwrap();
1516
1517 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1518 assert_eq!(int64_arr.value(0), 1);
1519 }
1520
1521 #[tokio::test]
1522 async fn test_open_parquet_no_deletions_by_separate_reader() {
1523 let mut fixture = TableTestFixture::new();
1524 fixture.setup_manifest_files().await;
1525
1526 let table_scan = fixture
1528 .table
1529 .scan()
1530 .with_row_selection_enabled(true)
1531 .build()
1532 .unwrap();
1533
1534 let mut plan_task: Vec<_> = table_scan
1535 .plan_files()
1536 .await
1537 .unwrap()
1538 .try_collect()
1539 .await
1540 .unwrap();
1541 assert_eq!(plan_task.len(), 2);
1542
1543 let reader = ArrowReaderBuilder::new(
1544 fixture.table.file_io().clone(),
1545 fixture.table.runtime().clone(),
1546 )
1547 .build();
1548 let batch_stream = reader
1549 .clone()
1550 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1551 .unwrap()
1552 .stream();
1553 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1554
1555 let reader = ArrowReaderBuilder::new(
1556 fixture.table.file_io().clone(),
1557 fixture.table.runtime().clone(),
1558 )
1559 .build();
1560 let batch_stream = reader
1561 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1562 .unwrap()
1563 .stream();
1564 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1565
1566 assert_eq!(batch_1, batch_2);
1567 }
1568
1569 #[tokio::test]
1570 async fn test_open_parquet_with_projection() {
1571 let mut fixture = TableTestFixture::new();
1572 fixture.setup_manifest_files().await;
1573
1574 let table_scan = fixture
1576 .table
1577 .scan()
1578 .select(["x", "z"])
1579 .with_row_selection_enabled(true)
1580 .build()
1581 .unwrap();
1582
1583 let batch_stream = table_scan.to_arrow().await.unwrap();
1584
1585 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1586
1587 assert_eq!(batches[0].num_columns(), 2);
1588
1589 let col1 = batches[0].column_by_name("x").unwrap();
1590 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1591 assert_eq!(int64_arr.value(0), 1);
1592
1593 let col2 = batches[0].column_by_name("z").unwrap();
1594 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1595 assert_eq!(int64_arr.value(0), 3);
1596
1597 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1599 let batch_stream = table_scan.to_arrow().await.unwrap();
1600 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1601
1602 assert_eq!(batches[0].num_columns(), 0);
1603 assert_eq!(batches[0].num_rows(), 1024);
1604 }
1605
1606 #[tokio::test]
1607 async fn test_filter_on_arrow_lt() {
1608 let mut fixture = TableTestFixture::new();
1609 fixture.setup_manifest_files().await;
1610
1611 let mut builder = fixture.table.scan();
1613 let predicate = Reference::new("y").less_than(Datum::long(3));
1614 builder = builder
1615 .with_filter(predicate)
1616 .with_row_selection_enabled(true);
1617 let table_scan = builder.build().unwrap();
1618
1619 let batch_stream = table_scan.to_arrow().await.unwrap();
1620
1621 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1622
1623 assert_eq!(batches[0].num_rows(), 512);
1624
1625 let col = batches[0].column_by_name("x").unwrap();
1626 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1627 assert_eq!(int64_arr.value(0), 1);
1628
1629 let col = batches[0].column_by_name("y").unwrap();
1630 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1631 assert_eq!(int64_arr.value(0), 2);
1632 }
1633
1634 #[tokio::test]
1635 async fn test_filter_on_arrow_gt_eq() {
1636 let mut fixture = TableTestFixture::new();
1637 fixture.setup_manifest_files().await;
1638
1639 let mut builder = fixture.table.scan();
1641 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1642 builder = builder
1643 .with_filter(predicate)
1644 .with_row_selection_enabled(true);
1645 let table_scan = builder.build().unwrap();
1646
1647 let batch_stream = table_scan.to_arrow().await.unwrap();
1648
1649 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1650
1651 assert_eq!(batches[0].num_rows(), 12);
1652
1653 let col = batches[0].column_by_name("x").unwrap();
1654 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1655 assert_eq!(int64_arr.value(0), 1);
1656
1657 let col = batches[0].column_by_name("y").unwrap();
1658 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1659 assert_eq!(int64_arr.value(0), 5);
1660 }
1661
1662 #[tokio::test]
1663 async fn test_filter_double_eq() {
1664 let mut fixture = TableTestFixture::new();
1665 fixture.setup_manifest_files().await;
1666
1667 let mut builder = fixture.table.scan();
1669 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1670 builder = builder
1671 .with_filter(predicate)
1672 .with_row_selection_enabled(true);
1673 let table_scan = builder.build().unwrap();
1674
1675 let batch_stream = table_scan.to_arrow().await.unwrap();
1676
1677 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1678
1679 assert_eq!(batches.len(), 2);
1680 assert_eq!(batches[0].num_rows(), 12);
1681
1682 let col = batches[0].column_by_name("dbl").unwrap();
1683 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1684 assert_eq!(f64_arr.value(1), 150.0f64);
1685 }
1686
1687 #[tokio::test]
1688 async fn test_filter_int_eq() {
1689 let mut fixture = TableTestFixture::new();
1690 fixture.setup_manifest_files().await;
1691
1692 let mut builder = fixture.table.scan();
1694 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1695 builder = builder
1696 .with_filter(predicate)
1697 .with_row_selection_enabled(true);
1698 let table_scan = builder.build().unwrap();
1699
1700 let batch_stream = table_scan.to_arrow().await.unwrap();
1701
1702 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1703
1704 assert_eq!(batches.len(), 2);
1705 assert_eq!(batches[0].num_rows(), 12);
1706
1707 let col = batches[0].column_by_name("i32").unwrap();
1708 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1709 assert_eq!(i32_arr.value(1), 150i32);
1710 }
1711
1712 #[tokio::test]
1713 async fn test_filter_long_eq() {
1714 let mut fixture = TableTestFixture::new();
1715 fixture.setup_manifest_files().await;
1716
1717 let mut builder = fixture.table.scan();
1719 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1720 builder = builder
1721 .with_filter(predicate)
1722 .with_row_selection_enabled(true);
1723 let table_scan = builder.build().unwrap();
1724
1725 let batch_stream = table_scan.to_arrow().await.unwrap();
1726
1727 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1728
1729 assert_eq!(batches.len(), 2);
1730 assert_eq!(batches[0].num_rows(), 12);
1731
1732 let col = batches[0].column_by_name("i64").unwrap();
1733 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1734 assert_eq!(i64_arr.value(1), 150i64);
1735 }
1736
1737 #[tokio::test]
1738 async fn test_filter_bool_eq() {
1739 let mut fixture = TableTestFixture::new();
1740 fixture.setup_manifest_files().await;
1741
1742 let mut builder = fixture.table.scan();
1744 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1745 builder = builder
1746 .with_filter(predicate)
1747 .with_row_selection_enabled(true);
1748 let table_scan = builder.build().unwrap();
1749
1750 let batch_stream = table_scan.to_arrow().await.unwrap();
1751
1752 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1753
1754 assert_eq!(batches.len(), 2);
1755 assert_eq!(batches[0].num_rows(), 512);
1756
1757 let col = batches[0].column_by_name("bool").unwrap();
1758 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1759 assert!(bool_arr.value(1));
1760 }
1761
1762 #[tokio::test]
1763 async fn test_filter_on_arrow_is_null() {
1764 let mut fixture = TableTestFixture::new();
1765 fixture.setup_manifest_files().await;
1766
1767 let mut builder = fixture.table.scan();
1769 let predicate = Reference::new("y").is_null();
1770 builder = builder
1771 .with_filter(predicate)
1772 .with_row_selection_enabled(true);
1773 let table_scan = builder.build().unwrap();
1774
1775 let batch_stream = table_scan.to_arrow().await.unwrap();
1776
1777 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1778 assert_eq!(batches.len(), 0);
1779 }
1780
1781 #[tokio::test]
1782 async fn test_filter_on_arrow_is_not_null() {
1783 let mut fixture = TableTestFixture::new();
1784 fixture.setup_manifest_files().await;
1785
1786 let mut builder = fixture.table.scan();
1788 let predicate = Reference::new("y").is_not_null();
1789 builder = builder
1790 .with_filter(predicate)
1791 .with_row_selection_enabled(true);
1792 let table_scan = builder.build().unwrap();
1793
1794 let batch_stream = table_scan.to_arrow().await.unwrap();
1795
1796 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1797 assert_eq!(batches[0].num_rows(), 1024);
1798 }
1799
1800 #[tokio::test]
1801 async fn test_filter_on_arrow_lt_and_gt() {
1802 let mut fixture = TableTestFixture::new();
1803 fixture.setup_manifest_files().await;
1804
1805 let mut builder = fixture.table.scan();
1807 let predicate = Reference::new("y")
1808 .less_than(Datum::long(5))
1809 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1810 builder = builder
1811 .with_filter(predicate)
1812 .with_row_selection_enabled(true);
1813 let table_scan = builder.build().unwrap();
1814
1815 let batch_stream = table_scan.to_arrow().await.unwrap();
1816
1817 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1818 assert_eq!(batches[0].num_rows(), 500);
1819
1820 let col = batches[0].column_by_name("x").unwrap();
1821 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1822 assert_eq!(col, &expected_x);
1823
1824 let col = batches[0].column_by_name("y").unwrap();
1825 let mut values = vec![];
1826 values.append(vec![3; 200].as_mut());
1827 values.append(vec![4; 300].as_mut());
1828 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1829 assert_eq!(col, &expected_y);
1830
1831 let col = batches[0].column_by_name("z").unwrap();
1832 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1833 assert_eq!(col, &expected_z);
1834 }
1835
1836 #[tokio::test]
1837 async fn test_filter_on_arrow_lt_or_gt() {
1838 let mut fixture = TableTestFixture::new();
1839 fixture.setup_manifest_files().await;
1840
1841 let mut builder = fixture.table.scan();
1843 let predicate = Reference::new("y")
1844 .less_than(Datum::long(5))
1845 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1846 builder = builder
1847 .with_filter(predicate)
1848 .with_row_selection_enabled(true);
1849 let table_scan = builder.build().unwrap();
1850
1851 let batch_stream = table_scan.to_arrow().await.unwrap();
1852
1853 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1854 assert_eq!(batches[0].num_rows(), 1024);
1855
1856 let col = batches[0].column_by_name("x").unwrap();
1857 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1858 assert_eq!(col, &expected_x);
1859
1860 let col = batches[0].column_by_name("y").unwrap();
1861 let mut values = vec![2; 512];
1862 values.append(vec![3; 200].as_mut());
1863 values.append(vec![4; 300].as_mut());
1864 values.append(vec![5; 12].as_mut());
1865 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1866 assert_eq!(col, &expected_y);
1867
1868 let col = batches[0].column_by_name("z").unwrap();
1869 let mut values = vec![3; 512];
1870 values.append(vec![4; 512].as_mut());
1871 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1872 assert_eq!(col, &expected_z);
1873 }
1874
1875 #[tokio::test]
1876 async fn test_filter_on_arrow_startswith() {
1877 let mut fixture = TableTestFixture::new();
1878 fixture.setup_manifest_files().await;
1879
1880 let mut builder = fixture.table.scan();
1882 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1883 builder = builder
1884 .with_filter(predicate)
1885 .with_row_selection_enabled(true);
1886 let table_scan = builder.build().unwrap();
1887
1888 let batch_stream = table_scan.to_arrow().await.unwrap();
1889
1890 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1891
1892 assert_eq!(batches[0].num_rows(), 512);
1893
1894 let col = batches[0].column_by_name("a").unwrap();
1895 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1896 assert_eq!(string_arr.value(0), "Iceberg");
1897 }
1898
1899 #[tokio::test]
1900 async fn test_filter_on_arrow_not_startswith() {
1901 let mut fixture = TableTestFixture::new();
1902 fixture.setup_manifest_files().await;
1903
1904 let mut builder = fixture.table.scan();
1906 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1907 builder = builder
1908 .with_filter(predicate)
1909 .with_row_selection_enabled(true);
1910 let table_scan = builder.build().unwrap();
1911
1912 let batch_stream = table_scan.to_arrow().await.unwrap();
1913
1914 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1915
1916 assert_eq!(batches[0].num_rows(), 512);
1917
1918 let col = batches[0].column_by_name("a").unwrap();
1919 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1920 assert_eq!(string_arr.value(0), "Apache");
1921 }
1922
1923 #[tokio::test]
1924 async fn test_filter_on_arrow_in() {
1925 let mut fixture = TableTestFixture::new();
1926 fixture.setup_manifest_files().await;
1927
1928 let mut builder = fixture.table.scan();
1930 let predicate =
1931 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1932 builder = builder
1933 .with_filter(predicate)
1934 .with_row_selection_enabled(true);
1935 let table_scan = builder.build().unwrap();
1936
1937 let batch_stream = table_scan.to_arrow().await.unwrap();
1938
1939 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1940
1941 assert_eq!(batches[0].num_rows(), 512);
1942
1943 let col = batches[0].column_by_name("a").unwrap();
1944 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1945 assert_eq!(string_arr.value(0), "Iceberg");
1946 }
1947
1948 #[tokio::test]
1949 async fn test_filter_on_arrow_not_in() {
1950 let mut fixture = TableTestFixture::new();
1951 fixture.setup_manifest_files().await;
1952
1953 let mut builder = fixture.table.scan();
1955 let predicate =
1956 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1957 builder = builder
1958 .with_filter(predicate)
1959 .with_row_selection_enabled(true);
1960 let table_scan = builder.build().unwrap();
1961
1962 let batch_stream = table_scan.to_arrow().await.unwrap();
1963
1964 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1965
1966 assert_eq!(batches[0].num_rows(), 512);
1967
1968 let col = batches[0].column_by_name("a").unwrap();
1969 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1970 assert_eq!(string_arr.value(0), "Apache");
1971 }
1972
1973 #[test]
1974 fn test_file_scan_task_serialize_deserialize() {
1975 let test_fn = |task: FileScanTask| {
1976 let serialized = serde_json::to_string(&task).unwrap();
1977 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1978
1979 assert_eq!(task.data_file_path, deserialized.data_file_path);
1980 assert_eq!(task.start, deserialized.start);
1981 assert_eq!(task.length, deserialized.length);
1982 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1983 assert_eq!(task.predicate, deserialized.predicate);
1984 assert_eq!(task.schema, deserialized.schema);
1985 };
1986
1987 let schema = Arc::new(
1989 Schema::builder()
1990 .with_fields(vec![Arc::new(NestedField::required(
1991 1,
1992 "x",
1993 Type::Primitive(PrimitiveType::Binary),
1994 ))])
1995 .build()
1996 .unwrap(),
1997 );
1998 let task = FileScanTask::builder()
1999 .with_data_file_path("data_file_path".to_string())
2000 .with_file_size_in_bytes(0)
2001 .with_start(0)
2002 .with_length(100)
2003 .with_project_field_ids(vec![1, 2, 3])
2004 .with_schema(schema.clone())
2005 .with_record_count(Some(100))
2006 .with_data_file_format(DataFileFormat::Parquet)
2007 .with_case_sensitive(false)
2008 .build();
2009 test_fn(task);
2010
2011 let task = FileScanTask::builder()
2013 .with_data_file_path("data_file_path".to_string())
2014 .with_file_size_in_bytes(0)
2015 .with_start(0)
2016 .with_length(100)
2017 .with_project_field_ids(vec![1, 2, 3])
2018 .with_predicate(Some(BoundPredicate::AlwaysTrue))
2019 .with_schema(schema)
2020 .with_data_file_format(DataFileFormat::Avro)
2021 .with_case_sensitive(false)
2022 .build();
2023 test_fn(task);
2024 }
2025
2026 #[tokio::test]
2027 async fn test_select_with_file_column() {
2028 use arrow_array::cast::AsArray;
2029
2030 let mut fixture = TableTestFixture::new();
2031 fixture.setup_manifest_files().await;
2032
2033 let table_scan = fixture
2035 .table
2036 .scan()
2037 .select(["x", RESERVED_COL_NAME_FILE])
2038 .with_row_selection_enabled(true)
2039 .build()
2040 .unwrap();
2041
2042 let batch_stream = table_scan.to_arrow().await.unwrap();
2043 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2044
2045 assert_eq!(batches[0].num_columns(), 2);
2047
2048 let x_col = batches[0].column_by_name("x").unwrap();
2050 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
2051 assert_eq!(x_arr.value(0), 1);
2052
2053 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
2055 assert!(
2056 file_col.is_some(),
2057 "_file column should be present in the batch"
2058 );
2059
2060 let file_col = file_col.unwrap();
2062 assert!(
2063 matches!(
2064 file_col.data_type(),
2065 arrow_schema::DataType::RunEndEncoded(_, _)
2066 ),
2067 "_file column should use RunEndEncoded type"
2068 );
2069
2070 let run_array = file_col
2072 .as_any()
2073 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2074 .expect("_file column should be a RunArray");
2075
2076 let values = run_array.values();
2077 let string_values = values.as_string::<i32>();
2078 assert_eq!(string_values.len(), 1, "Should have a single file path");
2079
2080 let file_path = string_values.value(0);
2081 assert!(
2082 file_path.ends_with(".parquet"),
2083 "File path should end with .parquet, got: {file_path}"
2084 );
2085 }
2086
2087 #[tokio::test]
2088 async fn test_select_file_column_position() {
2089 let mut fixture = TableTestFixture::new();
2090 fixture.setup_manifest_files().await;
2091
2092 let table_scan = fixture
2094 .table
2095 .scan()
2096 .select(["x", RESERVED_COL_NAME_FILE, "z"])
2097 .with_row_selection_enabled(true)
2098 .build()
2099 .unwrap();
2100
2101 let batch_stream = table_scan.to_arrow().await.unwrap();
2102 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2103
2104 assert_eq!(batches[0].num_columns(), 3);
2105
2106 let schema = batches[0].schema();
2108 assert_eq!(schema.field(0).name(), "x");
2109 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
2110 assert_eq!(schema.field(2).name(), "z");
2111
2112 assert!(batches[0].column_by_name("x").is_some());
2114 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
2115 assert!(batches[0].column_by_name("z").is_some());
2116 }
2117
2118 #[tokio::test]
2119 async fn test_select_file_column_only() {
2120 let mut fixture = TableTestFixture::new();
2121 fixture.setup_manifest_files().await;
2122
2123 let table_scan = fixture
2125 .table
2126 .scan()
2127 .select([RESERVED_COL_NAME_FILE])
2128 .with_row_selection_enabled(true)
2129 .build()
2130 .unwrap();
2131
2132 let batch_stream = table_scan.to_arrow().await.unwrap();
2133 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2134
2135 assert_eq!(batches[0].num_columns(), 1);
2137
2138 let schema = batches[0].schema();
2140 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2141
2142 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2146 assert_eq!(total_rows, 2048);
2147 }
2148
2149 #[tokio::test]
2150 async fn test_file_column_with_multiple_files() {
2151 use std::collections::HashSet;
2152
2153 let mut fixture = TableTestFixture::new();
2154 fixture.setup_manifest_files().await;
2155
2156 let table_scan = fixture
2158 .table
2159 .scan()
2160 .select(["x", RESERVED_COL_NAME_FILE])
2161 .with_row_selection_enabled(true)
2162 .build()
2163 .unwrap();
2164
2165 let batch_stream = table_scan.to_arrow().await.unwrap();
2166 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2167
2168 let mut file_paths = HashSet::new();
2170 for batch in &batches {
2171 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2172 let run_array = file_col
2173 .as_any()
2174 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2175 .expect("_file column should be a RunArray");
2176
2177 let values = run_array.values();
2178 let string_values = values.as_string::<i32>();
2179 for i in 0..string_values.len() {
2180 file_paths.insert(string_values.value(i).to_string());
2181 }
2182 }
2183
2184 assert!(!file_paths.is_empty(), "Should have at least one file path");
2186
2187 for path in &file_paths {
2189 assert!(
2190 path.ends_with(".parquet"),
2191 "All file paths should end with .parquet, got: {path}"
2192 );
2193 }
2194 }
2195
2196 #[tokio::test]
2197 async fn test_file_column_at_start() {
2198 let mut fixture = TableTestFixture::new();
2199 fixture.setup_manifest_files().await;
2200
2201 let table_scan = fixture
2203 .table
2204 .scan()
2205 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2206 .with_row_selection_enabled(true)
2207 .build()
2208 .unwrap();
2209
2210 let batch_stream = table_scan.to_arrow().await.unwrap();
2211 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2212
2213 assert_eq!(batches[0].num_columns(), 3);
2214
2215 let schema = batches[0].schema();
2217 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2218 assert_eq!(schema.field(1).name(), "x");
2219 assert_eq!(schema.field(2).name(), "y");
2220 }
2221
2222 #[tokio::test]
2223 async fn test_file_column_at_end() {
2224 let mut fixture = TableTestFixture::new();
2225 fixture.setup_manifest_files().await;
2226
2227 let table_scan = fixture
2229 .table
2230 .scan()
2231 .select(["x", "y", RESERVED_COL_NAME_FILE])
2232 .with_row_selection_enabled(true)
2233 .build()
2234 .unwrap();
2235
2236 let batch_stream = table_scan.to_arrow().await.unwrap();
2237 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2238
2239 assert_eq!(batches[0].num_columns(), 3);
2240
2241 let schema = batches[0].schema();
2243 assert_eq!(schema.field(0).name(), "x");
2244 assert_eq!(schema.field(1).name(), "y");
2245 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2246 }
2247
2248 #[tokio::test]
2249 async fn test_select_with_repeated_column_names() {
2250 let mut fixture = TableTestFixture::new();
2251 fixture.setup_manifest_files().await;
2252
2253 let table_scan = fixture
2256 .table
2257 .scan()
2258 .select([
2259 "x",
2260 RESERVED_COL_NAME_FILE,
2261 "x", "y",
2263 RESERVED_COL_NAME_FILE, "y", ])
2266 .with_row_selection_enabled(true)
2267 .build()
2268 .unwrap();
2269
2270 let batch_stream = table_scan.to_arrow().await.unwrap();
2271 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2272
2273 assert_eq!(
2275 batches[0].num_columns(),
2276 6,
2277 "Should have exactly 6 columns with duplicates"
2278 );
2279
2280 let schema = batches[0].schema();
2281
2282 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2284 assert_eq!(
2285 schema.field(1).name(),
2286 RESERVED_COL_NAME_FILE,
2287 "Column 1 should be _file"
2288 );
2289 assert_eq!(
2290 schema.field(2).name(),
2291 "x",
2292 "Column 2 should be x (duplicate)"
2293 );
2294 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2295 assert_eq!(
2296 schema.field(4).name(),
2297 RESERVED_COL_NAME_FILE,
2298 "Column 4 should be _file (duplicate)"
2299 );
2300 assert_eq!(
2301 schema.field(5).name(),
2302 "y",
2303 "Column 5 should be y (duplicate)"
2304 );
2305
2306 assert!(
2308 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2309 "Column x should be Int64"
2310 );
2311 assert!(
2312 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2313 "Column x (duplicate) should be Int64"
2314 );
2315 assert!(
2316 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2317 "Column y should be Int64"
2318 );
2319 assert!(
2320 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2321 "Column y (duplicate) should be Int64"
2322 );
2323 assert!(
2324 matches!(
2325 schema.field(1).data_type(),
2326 arrow_schema::DataType::RunEndEncoded(_, _)
2327 ),
2328 "_file column should use RunEndEncoded type"
2329 );
2330 assert!(
2331 matches!(
2332 schema.field(4).data_type(),
2333 arrow_schema::DataType::RunEndEncoded(_, _)
2334 ),
2335 "_file column (duplicate) should use RunEndEncoded type"
2336 );
2337 }
2338
2339 #[tokio::test]
2340 async fn test_scan_deadlock() {
2341 let mut fixture = TableTestFixture::new();
2342 fixture.setup_deadlock_manifests().await;
2343
2344 let table_scan = fixture
2351 .table
2352 .scan()
2353 .with_concurrency_limit(1)
2354 .build()
2355 .unwrap();
2356
2357 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2360 table_scan
2361 .plan_files()
2362 .await
2363 .unwrap()
2364 .try_collect::<Vec<_>>()
2365 .await
2366 })
2367 .await;
2368
2369 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2371 }
2372}