iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46/// A stream of arrow [`RecordBatch`]es.
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49/// Builder to create table scan.
50pub struct TableScanBuilder<'a> {
51    table: &'a Table,
52    // Defaults to none which means select all columns
53    column_names: Option<Vec<String>>,
54    snapshot_id: Option<i64>,
55    batch_size: Option<usize>,
56    case_sensitive: bool,
57    filter: Option<Predicate>,
58    concurrency_limit_data_files: usize,
59    concurrency_limit_manifest_entries: usize,
60    concurrency_limit_manifest_files: usize,
61    row_group_filtering_enabled: bool,
62    row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66    pub(crate) fn new(table: &'a Table) -> Self {
67        let num_cpus = available_parallelism().get();
68
69        Self {
70            table,
71            column_names: None,
72            snapshot_id: None,
73            batch_size: None,
74            case_sensitive: true,
75            filter: None,
76            concurrency_limit_data_files: num_cpus,
77            concurrency_limit_manifest_entries: num_cpus,
78            concurrency_limit_manifest_files: num_cpus,
79            row_group_filtering_enabled: true,
80            row_selection_enabled: false,
81        }
82    }
83
84    /// Sets the desired size of batches in the response
85    /// to something other than the default
86    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90
91    /// Sets the scan's case sensitivity
92    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93        self.case_sensitive = case_sensitive;
94        self
95    }
96
97    /// Specifies a predicate to use as a filter
98    pub fn with_filter(mut self, predicate: Predicate) -> Self {
99        // calls rewrite_not to remove Not nodes, which must be absent
100        // when applying the manifest evaluator
101        self.filter = Some(predicate.rewrite_not());
102        self
103    }
104
105    /// Select all columns.
106    pub fn select_all(mut self) -> Self {
107        self.column_names = None;
108        self
109    }
110
111    /// Select empty columns.
112    pub fn select_empty(mut self) -> Self {
113        self.column_names = Some(vec![]);
114        self
115    }
116
117    /// Select some columns of the table.
118    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119        self.column_names = Some(
120            column_names
121                .into_iter()
122                .map(|item| item.to_string())
123                .collect(),
124        );
125        self
126    }
127
128    /// Set the snapshot to scan. When not set, it uses current snapshot.
129    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130        self.snapshot_id = Some(snapshot_id);
131        self
132    }
133
134    /// Sets the concurrency limit for both manifest files and manifest
135    /// entries for this scan
136    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137        self.concurrency_limit_manifest_files = limit;
138        self.concurrency_limit_manifest_entries = limit;
139        self.concurrency_limit_data_files = limit;
140        self
141    }
142
143    /// Sets the data file concurrency limit for this scan
144    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145        self.concurrency_limit_data_files = limit;
146        self
147    }
148
149    /// Sets the manifest entry concurrency limit for this scan
150    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151        self.concurrency_limit_manifest_entries = limit;
152        self
153    }
154
155    /// Determines whether to enable row group filtering.
156    /// When enabled, if a read is performed with a filter predicate,
157    /// then the metadata for each row group in the parquet file is
158    /// evaluated against the filter predicate and row groups
159    /// that cant contain matching rows will be skipped entirely.
160    ///
161    /// Defaults to enabled, as it generally improves performance or
162    /// keeps it the same, with performance degradation unlikely.
163    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164        self.row_group_filtering_enabled = row_group_filtering_enabled;
165        self
166    }
167
168    /// Determines whether to enable row selection.
169    /// When enabled, if a read is performed with a filter predicate,
170    /// then (for row groups that have not been skipped) the page index
171    /// for each row group in a parquet file is parsed and evaluated
172    /// against the filter predicate to determine if ranges of rows
173    /// within a row group can be skipped, based upon the page-level
174    /// statistics for each column.
175    ///
176    /// Defaults to being disabled. Enabling requires parsing the parquet page
177    /// index, which can be slow enough that parsing the page index outweighs any
178    /// gains from the reduced number of rows that need scanning.
179    /// It is recommended to experiment with partitioning, sorting, row group size,
180    /// page size, and page row limit Iceberg settings on the table being scanned in
181    /// order to get the best performance from using row selection.
182    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183        self.row_selection_enabled = row_selection_enabled;
184        self
185    }
186
187    /// Build the table scan.
188    pub fn build(self) -> Result<TableScan> {
189        let snapshot = match self.snapshot_id {
190            Some(snapshot_id) => self
191                .table
192                .metadata()
193                .snapshot_by_id(snapshot_id)
194                .ok_or_else(|| {
195                    Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("Snapshot with id {snapshot_id} not found"),
198                    )
199                })?
200                .clone(),
201            None => {
202                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203                    return Ok(TableScan {
204                        batch_size: self.batch_size,
205                        column_names: self.column_names,
206                        file_io: self.table.file_io().clone(),
207                        plan_context: None,
208                        concurrency_limit_data_files: self.concurrency_limit_data_files,
209                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211                        row_group_filtering_enabled: self.row_group_filtering_enabled,
212                        row_selection_enabled: self.row_selection_enabled,
213                    });
214                };
215                current_snapshot_id.clone()
216            }
217        };
218
219        let schema = snapshot.schema(self.table.metadata())?;
220
221        // Check that all column names exist in the schema (skip reserved columns).
222        if let Some(column_names) = self.column_names.as_ref() {
223            for column_name in column_names {
224                // Skip reserved columns that don't exist in the schema
225                if is_metadata_column_name(column_name) {
226                    continue;
227                }
228                if schema.field_by_name(column_name).is_none() {
229                    return Err(Error::new(
230                        ErrorKind::DataInvalid,
231                        format!("Column {column_name} not found in table. Schema: {schema}"),
232                    ));
233                }
234            }
235        }
236
237        let mut field_ids = vec![];
238        let column_names = self.column_names.clone().unwrap_or_else(|| {
239            schema
240                .as_struct()
241                .fields()
242                .iter()
243                .map(|f| f.name.clone())
244                .collect()
245        });
246
247        for column_name in column_names.iter() {
248            // Handle metadata columns (like "_file")
249            if is_metadata_column_name(column_name) {
250                field_ids.push(get_metadata_field_id(column_name)?);
251                continue;
252            }
253
254            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255                Error::new(
256                    ErrorKind::DataInvalid,
257                    format!("Column {column_name} not found in table. Schema: {schema}"),
258                )
259            })?;
260
261            schema
262                .as_struct()
263                .field_by_id(field_id)
264                .ok_or_else(|| {
265                    Error::new(
266                        ErrorKind::FeatureUnsupported,
267                        format!(
268                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269                    ),
270                )
271            })?;
272
273            field_ids.push(field_id);
274        }
275
276        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277            Some(predicates.bind(schema.clone(), true)?)
278        } else {
279            None
280        };
281
282        let plan_context = PlanContext {
283            snapshot,
284            table_metadata: self.table.metadata_ref(),
285            snapshot_schema: schema,
286            case_sensitive: self.case_sensitive,
287            predicate: self.filter.map(Arc::new),
288            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289            object_cache: self.table.object_cache(),
290            field_ids: Arc::new(field_ids),
291            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294        };
295
296        Ok(TableScan {
297            batch_size: self.batch_size,
298            column_names: self.column_names,
299            file_io: self.table.file_io().clone(),
300            plan_context: Some(plan_context),
301            concurrency_limit_data_files: self.concurrency_limit_data_files,
302            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304            row_group_filtering_enabled: self.row_group_filtering_enabled,
305            row_selection_enabled: self.row_selection_enabled,
306        })
307    }
308}
309
310/// Table scan.
311#[derive(Debug)]
312pub struct TableScan {
313    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
314    ///
315    /// If this is None, then the scan contains no rows.
316    plan_context: Option<PlanContext>,
317    batch_size: Option<usize>,
318    file_io: FileIO,
319    column_names: Option<Vec<String>>,
320    /// The maximum number of manifest files that will be
321    /// retrieved from [`FileIO`] concurrently
322    concurrency_limit_manifest_files: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_manifest_entries: usize,
327
328    /// The maximum number of [`ManifestEntry`]s that will
329    /// be processed in parallel
330    concurrency_limit_data_files: usize,
331
332    row_group_filtering_enabled: bool,
333    row_selection_enabled: bool,
334}
335
336impl TableScan {
337    /// Returns a stream of [`FileScanTask`]s.
338    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339        let Some(plan_context) = self.plan_context.as_ref() else {
340            return Ok(Box::pin(futures::stream::empty()));
341        };
342
343        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346        // used to stream ManifestEntryContexts between stages of the file plan operation
347        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348            channel(concurrency_limit_manifest_files);
349        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350            channel(concurrency_limit_manifest_files);
351
352        // used to stream the results back to the caller
353        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357        let manifest_list = plan_context.get_manifest_list().await?;
358
359        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
360        // whose partitions cannot match this
361        // scan's filter
362        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363            manifest_list,
364            manifest_entry_data_ctx_tx,
365            delete_file_idx.clone(),
366            manifest_entry_delete_ctx_tx,
367        )?;
368
369        let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
372        spawn(async move {
373            let result = futures::stream::iter(manifest_file_contexts)
374                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375                    ctx.fetch_manifest_and_stream_manifest_entries().await
376                })
377                .await;
378
379            if let Err(error) = result {
380                let _ = channel_for_manifest_error.send(Err(error)).await;
381            }
382        });
383
384        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387        // Process the delete file [`ManifestEntry`] stream in parallel
388        spawn(async move {
389            let result = manifest_entry_delete_ctx_rx
390                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391                .try_for_each_concurrent(
392                    concurrency_limit_manifest_entries,
393                    |(manifest_entry_context, tx)| async move {
394                        spawn(async move {
395                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396                        })
397                        .await
398                    },
399                )
400                .await;
401
402            if let Err(error) = result {
403                let _ = channel_for_delete_manifest_entry_error
404                    .send(Err(error))
405                    .await;
406            }
407        })
408        .await;
409
410        // Process the data file [`ManifestEntry`] stream in parallel
411        spawn(async move {
412            let result = manifest_entry_data_ctx_rx
413                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414                .try_for_each_concurrent(
415                    concurrency_limit_manifest_entries,
416                    |(manifest_entry_context, tx)| async move {
417                        spawn(async move {
418                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
419                        })
420                        .await
421                    },
422                )
423                .await;
424
425            if let Err(error) = result {
426                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427            }
428        });
429
430        Ok(file_scan_task_rx.boxed())
431    }
432
433    /// Returns an [`ArrowRecordBatchStream`].
434    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438            .with_row_selection_enabled(self.row_selection_enabled);
439
440        if let Some(batch_size) = self.batch_size {
441            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442        }
443
444        arrow_reader_builder.build().read(self.plan_files().await?)
445    }
446
447    /// Returns a reference to the column names of the table scan.
448    pub fn column_names(&self) -> Option<&[String]> {
449        self.column_names.as_deref()
450    }
451
452    /// Returns a reference to the snapshot of the table scan.
453    pub fn snapshot(&self) -> Option<&SnapshotRef> {
454        self.plan_context.as_ref().map(|x| &x.snapshot)
455    }
456
457    async fn process_data_manifest_entry(
458        manifest_entry_context: ManifestEntryContext,
459        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460    ) -> Result<()> {
461        // skip processing this manifest entry if it has been marked as deleted
462        if !manifest_entry_context.manifest_entry.is_alive() {
463            return Ok(());
464        }
465
466        // abort the plan if we encounter a manifest entry for a delete file
467        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468            return Err(Error::new(
469                ErrorKind::FeatureUnsupported,
470                "Encountered an entry for a delete file in a data file manifest",
471            ));
472        }
473
474        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475            let BoundPredicates {
476                snapshot_bound_predicate,
477                partition_bound_predicate,
478            } = bound_predicates.as_ref();
479
480            let expression_evaluator_cache =
481                manifest_entry_context.expression_evaluator_cache.as_ref();
482
483            let expression_evaluator = expression_evaluator_cache.get(
484                manifest_entry_context.partition_spec_id,
485                partition_bound_predicate,
486            )?;
487
488            // skip any data file whose partition data indicates that it can't contain
489            // any data that matches this scan's filter
490            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491                return Ok(());
492            }
493
494            // skip any data file whose metrics don't match this scan's filter
495            if !InclusiveMetricsEvaluator::eval(
496                snapshot_bound_predicate,
497                manifest_entry_context.manifest_entry.data_file(),
498                false,
499            )? {
500                return Ok(());
501            }
502        }
503
504        // congratulations! the manifest entry has made its way through the
505        // entire plan without getting filtered out. Create a corresponding
506        // FileScanTask and push it to the result stream
507        file_scan_task_tx
508            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509            .await?;
510
511        Ok(())
512    }
513
514    async fn process_delete_manifest_entry(
515        manifest_entry_context: ManifestEntryContext,
516        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517    ) -> Result<()> {
518        // skip processing this manifest entry if it has been marked as deleted
519        if !manifest_entry_context.manifest_entry.is_alive() {
520            return Ok(());
521        }
522
523        // abort the plan if we encounter a manifest entry that is not for a delete file
524        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525            return Err(Error::new(
526                ErrorKind::FeatureUnsupported,
527                "Encountered an entry for a data file in a delete manifest",
528            ));
529        }
530
531        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532            let expression_evaluator_cache =
533                manifest_entry_context.expression_evaluator_cache.as_ref();
534
535            let expression_evaluator = expression_evaluator_cache.get(
536                manifest_entry_context.partition_spec_id,
537                &bound_predicates.partition_bound_predicate,
538            )?;
539
540            // skip any data file whose partition data indicates that it can't contain
541            // any data that matches this scan's filter
542            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543                return Ok(());
544            }
545        }
546
547        delete_file_ctx_tx
548            .send(DeleteFileContext {
549                manifest_entry: manifest_entry_context.manifest_entry.clone(),
550                partition_spec_id: manifest_entry_context.partition_spec_id,
551            })
552            .await?;
553
554        Ok(())
555    }
556}
557
558pub(crate) struct BoundPredicates {
559    partition_bound_predicate: BoundPredicate,
560    snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565    //! shared tests for the table scan API
566    #![allow(missing_docs)]
567
568    use std::collections::HashMap;
569    use std::fs;
570    use std::fs::File;
571    use std::sync::Arc;
572
573    use arrow_array::cast::AsArray;
574    use arrow_array::{
575        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576        StringArray,
577    };
578    use futures::{TryStreamExt, stream};
579    use minijinja::value::Value;
580    use minijinja::{AutoEscape, Environment, context};
581    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582    use parquet::basic::Compression;
583    use parquet::file::properties::WriterProperties;
584    use tempfile::TempDir;
585    use uuid::Uuid;
586
587    use crate::TableIdent;
588    use crate::arrow::ArrowReaderBuilder;
589    use crate::expr::{BoundPredicate, Reference};
590    use crate::io::{FileIO, OutputFile};
591    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592    use crate::scan::FileScanTask;
593    use crate::spec::{
594        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597    };
598    use crate::table::Table;
599
600    fn render_template(template: &str, ctx: Value) -> String {
601        let mut env = Environment::new();
602        env.set_auto_escape_callback(|_| AutoEscape::None);
603        env.render_str(template, ctx).unwrap()
604    }
605
606    pub struct TableTestFixture {
607        pub table_location: String,
608        pub table: Table,
609    }
610
611    impl TableTestFixture {
612        #[allow(clippy::new_without_default)]
613        pub fn new() -> Self {
614            let tmp_dir = TempDir::new().unwrap();
615            let table_location = tmp_dir.path().join("table1");
616            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618            let table_metadata1_location = table_location.join("metadata/v1.json");
619
620            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621                .unwrap()
622                .build()
623                .unwrap();
624
625            let table_metadata = {
626                let template_json_str = fs::read_to_string(format!(
627                    "{}/testdata/example_table_metadata_v2.json",
628                    env!("CARGO_MANIFEST_DIR")
629                ))
630                .unwrap();
631                let metadata_json = render_template(&template_json_str, context! {
632                    table_location => &table_location,
633                    manifest_list_1_location => &manifest_list1_location,
634                    manifest_list_2_location => &manifest_list2_location,
635                    table_metadata_1_location => &table_metadata1_location,
636                });
637                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638            };
639
640            let table = Table::builder()
641                .metadata(table_metadata)
642                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643                .file_io(file_io.clone())
644                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645                .build()
646                .unwrap();
647
648            Self {
649                table_location: table_location.to_str().unwrap().to_string(),
650                table,
651            }
652        }
653
654        #[allow(clippy::new_without_default)]
655        pub fn new_empty() -> Self {
656            let tmp_dir = TempDir::new().unwrap();
657            let table_location = tmp_dir.path().join("table1");
658            let table_metadata1_location = table_location.join("metadata/v1.json");
659
660            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661                .unwrap()
662                .build()
663                .unwrap();
664
665            let table_metadata = {
666                let template_json_str = fs::read_to_string(format!(
667                    "{}/testdata/example_empty_table_metadata_v2.json",
668                    env!("CARGO_MANIFEST_DIR")
669                ))
670                .unwrap();
671                let metadata_json = render_template(&template_json_str, context! {
672                    table_location => &table_location,
673                    table_metadata_1_location => &table_metadata1_location,
674                });
675                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676            };
677
678            let table = Table::builder()
679                .metadata(table_metadata)
680                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681                .file_io(file_io.clone())
682                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683                .build()
684                .unwrap();
685
686            Self {
687                table_location: table_location.to_str().unwrap().to_string(),
688                table,
689            }
690        }
691
692        pub fn new_unpartitioned() -> Self {
693            let tmp_dir = TempDir::new().unwrap();
694            let table_location = tmp_dir.path().join("table1");
695            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697            let table_metadata1_location = table_location.join("metadata/v1.json");
698
699            let file_io = FileIO::from_path(table_location.to_str().unwrap())
700                .unwrap()
701                .build()
702                .unwrap();
703
704            let mut table_metadata = {
705                let template_json_str = fs::read_to_string(format!(
706                    "{}/testdata/example_table_metadata_v2.json",
707                    env!("CARGO_MANIFEST_DIR")
708                ))
709                .unwrap();
710                let metadata_json = render_template(&template_json_str, context! {
711                    table_location => &table_location,
712                    manifest_list_1_location => &manifest_list1_location,
713                    manifest_list_2_location => &manifest_list2_location,
714                    table_metadata_1_location => &table_metadata1_location,
715                });
716                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717            };
718
719            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720            table_metadata.partition_specs.clear();
721            table_metadata.default_partition_type = StructType::new(vec![]);
722            table_metadata
723                .partition_specs
724                .insert(0, table_metadata.default_spec.clone());
725
726            let table = Table::builder()
727                .metadata(table_metadata)
728                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729                .file_io(file_io.clone())
730                .metadata_location(table_metadata1_location.to_str().unwrap())
731                .build()
732                .unwrap();
733
734            Self {
735                table_location: table_location.to_str().unwrap().to_string(),
736                table,
737            }
738        }
739
740        fn next_manifest_file(&self) -> OutputFile {
741            self.table
742                .file_io()
743                .new_output(format!(
744                    "{}/metadata/manifest_{}.avro",
745                    self.table_location,
746                    Uuid::new_v4()
747                ))
748                .unwrap()
749        }
750
751        pub async fn setup_manifest_files(&mut self) {
752            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753            let parent_snapshot = current_snapshot
754                .parent_snapshot(self.table.metadata())
755                .unwrap();
756            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757            let current_partition_spec = self.table.metadata().default_partition_spec();
758
759            // Write data files
760            let mut writer = ManifestWriterBuilder::new(
761                self.next_manifest_file(),
762                Some(current_snapshot.snapshot_id()),
763                None,
764                current_schema.clone(),
765                current_partition_spec.as_ref().clone(),
766            )
767            .build_v2_data();
768            writer
769                .add_entry(
770                    ManifestEntry::builder()
771                        .status(ManifestStatus::Added)
772                        .data_file(
773                            DataFileBuilder::default()
774                                .partition_spec_id(0)
775                                .content(DataContentType::Data)
776                                .file_path(format!("{}/1.parquet", &self.table_location))
777                                .file_format(DataFileFormat::Parquet)
778                                .file_size_in_bytes(100)
779                                .record_count(1)
780                                .partition(Struct::from_iter([Some(Literal::long(100))]))
781                                .key_metadata(None)
782                                .build()
783                                .unwrap(),
784                        )
785                        .build(),
786                )
787                .unwrap();
788            writer
789                .add_delete_entry(
790                    ManifestEntry::builder()
791                        .status(ManifestStatus::Deleted)
792                        .snapshot_id(parent_snapshot.snapshot_id())
793                        .sequence_number(parent_snapshot.sequence_number())
794                        .file_sequence_number(parent_snapshot.sequence_number())
795                        .data_file(
796                            DataFileBuilder::default()
797                                .partition_spec_id(0)
798                                .content(DataContentType::Data)
799                                .file_path(format!("{}/2.parquet", &self.table_location))
800                                .file_format(DataFileFormat::Parquet)
801                                .file_size_in_bytes(100)
802                                .record_count(1)
803                                .partition(Struct::from_iter([Some(Literal::long(200))]))
804                                .build()
805                                .unwrap(),
806                        )
807                        .build(),
808                )
809                .unwrap();
810            writer
811                .add_existing_entry(
812                    ManifestEntry::builder()
813                        .status(ManifestStatus::Existing)
814                        .snapshot_id(parent_snapshot.snapshot_id())
815                        .sequence_number(parent_snapshot.sequence_number())
816                        .file_sequence_number(parent_snapshot.sequence_number())
817                        .data_file(
818                            DataFileBuilder::default()
819                                .partition_spec_id(0)
820                                .content(DataContentType::Data)
821                                .file_path(format!("{}/3.parquet", &self.table_location))
822                                .file_format(DataFileFormat::Parquet)
823                                .file_size_in_bytes(100)
824                                .record_count(1)
825                                .partition(Struct::from_iter([Some(Literal::long(300))]))
826                                .build()
827                                .unwrap(),
828                        )
829                        .build(),
830                )
831                .unwrap();
832            let data_file_manifest = writer.write_manifest_file().await.unwrap();
833
834            // Write to manifest list
835            let mut manifest_list_write = ManifestListWriter::v2(
836                self.table
837                    .file_io()
838                    .new_output(current_snapshot.manifest_list())
839                    .unwrap(),
840                current_snapshot.snapshot_id(),
841                current_snapshot.parent_snapshot_id(),
842                current_snapshot.sequence_number(),
843            );
844            manifest_list_write
845                .add_manifests(vec![data_file_manifest].into_iter())
846                .unwrap();
847            manifest_list_write.close().await.unwrap();
848
849            // prepare data
850            let schema = {
851                let fields = vec![
852                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
853                        .with_metadata(HashMap::from([(
854                            PARQUET_FIELD_ID_META_KEY.to_string(),
855                            "1".to_string(),
856                        )])),
857                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
858                        .with_metadata(HashMap::from([(
859                            PARQUET_FIELD_ID_META_KEY.to_string(),
860                            "2".to_string(),
861                        )])),
862                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
863                        .with_metadata(HashMap::from([(
864                            PARQUET_FIELD_ID_META_KEY.to_string(),
865                            "3".to_string(),
866                        )])),
867                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
868                        .with_metadata(HashMap::from([(
869                            PARQUET_FIELD_ID_META_KEY.to_string(),
870                            "4".to_string(),
871                        )])),
872                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
873                        .with_metadata(HashMap::from([(
874                            PARQUET_FIELD_ID_META_KEY.to_string(),
875                            "5".to_string(),
876                        )])),
877                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
878                        .with_metadata(HashMap::from([(
879                            PARQUET_FIELD_ID_META_KEY.to_string(),
880                            "6".to_string(),
881                        )])),
882                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
883                        .with_metadata(HashMap::from([(
884                            PARQUET_FIELD_ID_META_KEY.to_string(),
885                            "7".to_string(),
886                        )])),
887                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
888                        .with_metadata(HashMap::from([(
889                            PARQUET_FIELD_ID_META_KEY.to_string(),
890                            "8".to_string(),
891                        )])),
892                ];
893                Arc::new(arrow_schema::Schema::new(fields))
894            };
895            // x: [1, 1, 1, 1, ...]
896            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
897
898            let mut values = vec![2; 512];
899            values.append(vec![3; 200].as_mut());
900            values.append(vec![4; 300].as_mut());
901            values.append(vec![5; 12].as_mut());
902
903            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
904            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
905
906            let mut values = vec![3; 512];
907            values.append(vec![4; 512].as_mut());
908
909            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
910            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
911
912            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
913            let mut values = vec!["Apache"; 512];
914            values.append(vec!["Iceberg"; 512].as_mut());
915            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
916
917            // dbl:
918            let mut values = vec![100.0f64; 512];
919            values.append(vec![150.0f64; 12].as_mut());
920            values.append(vec![200.0f64; 500].as_mut());
921            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
922
923            // i32:
924            let mut values = vec![100i32; 512];
925            values.append(vec![150i32; 12].as_mut());
926            values.append(vec![200i32; 500].as_mut());
927            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
928
929            // i64:
930            let mut values = vec![100i64; 512];
931            values.append(vec![150i64; 12].as_mut());
932            values.append(vec![200i64; 500].as_mut());
933            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
934
935            // bool:
936            let mut values = vec![false; 512];
937            values.append(vec![true; 512].as_mut());
938            let values: BooleanArray = values.into();
939            let col8 = Arc::new(values) as ArrayRef;
940
941            let to_write = RecordBatch::try_new(schema.clone(), vec![
942                col1, col2, col3, col4, col5, col6, col7, col8,
943            ])
944            .unwrap();
945
946            // Write the Parquet files
947            let props = WriterProperties::builder()
948                .set_compression(Compression::SNAPPY)
949                .build();
950
951            for n in 1..=3 {
952                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
953                let mut writer =
954                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
955
956                writer.write(&to_write).expect("Writing batch");
957
958                // writer must be closed to write footer
959                writer.close().unwrap();
960            }
961        }
962
963        pub async fn setup_unpartitioned_manifest_files(&mut self) {
964            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
965            let parent_snapshot = current_snapshot
966                .parent_snapshot(self.table.metadata())
967                .unwrap();
968            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
969            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
970
971            // Write data files using an empty partition for unpartitioned tables.
972            let mut writer = ManifestWriterBuilder::new(
973                self.next_manifest_file(),
974                Some(current_snapshot.snapshot_id()),
975                None,
976                current_schema.clone(),
977                current_partition_spec.as_ref().clone(),
978            )
979            .build_v2_data();
980
981            // Create an empty partition value.
982            let empty_partition = Struct::empty();
983
984            writer
985                .add_entry(
986                    ManifestEntry::builder()
987                        .status(ManifestStatus::Added)
988                        .data_file(
989                            DataFileBuilder::default()
990                                .partition_spec_id(0)
991                                .content(DataContentType::Data)
992                                .file_path(format!("{}/1.parquet", &self.table_location))
993                                .file_format(DataFileFormat::Parquet)
994                                .file_size_in_bytes(100)
995                                .record_count(1)
996                                .partition(empty_partition.clone())
997                                .key_metadata(None)
998                                .build()
999                                .unwrap(),
1000                        )
1001                        .build(),
1002                )
1003                .unwrap();
1004
1005            writer
1006                .add_delete_entry(
1007                    ManifestEntry::builder()
1008                        .status(ManifestStatus::Deleted)
1009                        .snapshot_id(parent_snapshot.snapshot_id())
1010                        .sequence_number(parent_snapshot.sequence_number())
1011                        .file_sequence_number(parent_snapshot.sequence_number())
1012                        .data_file(
1013                            DataFileBuilder::default()
1014                                .partition_spec_id(0)
1015                                .content(DataContentType::Data)
1016                                .file_path(format!("{}/2.parquet", &self.table_location))
1017                                .file_format(DataFileFormat::Parquet)
1018                                .file_size_in_bytes(100)
1019                                .record_count(1)
1020                                .partition(empty_partition.clone())
1021                                .build()
1022                                .unwrap(),
1023                        )
1024                        .build(),
1025                )
1026                .unwrap();
1027
1028            writer
1029                .add_existing_entry(
1030                    ManifestEntry::builder()
1031                        .status(ManifestStatus::Existing)
1032                        .snapshot_id(parent_snapshot.snapshot_id())
1033                        .sequence_number(parent_snapshot.sequence_number())
1034                        .file_sequence_number(parent_snapshot.sequence_number())
1035                        .data_file(
1036                            DataFileBuilder::default()
1037                                .partition_spec_id(0)
1038                                .content(DataContentType::Data)
1039                                .file_path(format!("{}/3.parquet", &self.table_location))
1040                                .file_format(DataFileFormat::Parquet)
1041                                .file_size_in_bytes(100)
1042                                .record_count(1)
1043                                .partition(empty_partition.clone())
1044                                .build()
1045                                .unwrap(),
1046                        )
1047                        .build(),
1048                )
1049                .unwrap();
1050
1051            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1052
1053            // Write to manifest list
1054            let mut manifest_list_write = ManifestListWriter::v2(
1055                self.table
1056                    .file_io()
1057                    .new_output(current_snapshot.manifest_list())
1058                    .unwrap(),
1059                current_snapshot.snapshot_id(),
1060                current_snapshot.parent_snapshot_id(),
1061                current_snapshot.sequence_number(),
1062            );
1063            manifest_list_write
1064                .add_manifests(vec![data_file_manifest].into_iter())
1065                .unwrap();
1066            manifest_list_write.close().await.unwrap();
1067
1068            // prepare data for parquet files
1069            let schema = {
1070                let fields = vec![
1071                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1072                        .with_metadata(HashMap::from([(
1073                            PARQUET_FIELD_ID_META_KEY.to_string(),
1074                            "1".to_string(),
1075                        )])),
1076                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1077                        .with_metadata(HashMap::from([(
1078                            PARQUET_FIELD_ID_META_KEY.to_string(),
1079                            "2".to_string(),
1080                        )])),
1081                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1082                        .with_metadata(HashMap::from([(
1083                            PARQUET_FIELD_ID_META_KEY.to_string(),
1084                            "3".to_string(),
1085                        )])),
1086                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1087                        .with_metadata(HashMap::from([(
1088                            PARQUET_FIELD_ID_META_KEY.to_string(),
1089                            "4".to_string(),
1090                        )])),
1091                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1092                        .with_metadata(HashMap::from([(
1093                            PARQUET_FIELD_ID_META_KEY.to_string(),
1094                            "5".to_string(),
1095                        )])),
1096                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1097                        .with_metadata(HashMap::from([(
1098                            PARQUET_FIELD_ID_META_KEY.to_string(),
1099                            "6".to_string(),
1100                        )])),
1101                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1102                        .with_metadata(HashMap::from([(
1103                            PARQUET_FIELD_ID_META_KEY.to_string(),
1104                            "7".to_string(),
1105                        )])),
1106                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1107                        .with_metadata(HashMap::from([(
1108                            PARQUET_FIELD_ID_META_KEY.to_string(),
1109                            "8".to_string(),
1110                        )])),
1111                ];
1112                Arc::new(arrow_schema::Schema::new(fields))
1113            };
1114
1115            // Build the arrays for the RecordBatch
1116            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1117
1118            let mut values = vec![2; 512];
1119            values.append(vec![3; 200].as_mut());
1120            values.append(vec![4; 300].as_mut());
1121            values.append(vec![5; 12].as_mut());
1122            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1123
1124            let mut values = vec![3; 512];
1125            values.append(vec![4; 512].as_mut());
1126            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1127
1128            let mut values = vec!["Apache"; 512];
1129            values.append(vec!["Iceberg"; 512].as_mut());
1130            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1131
1132            let mut values = vec![100.0f64; 512];
1133            values.append(vec![150.0f64; 12].as_mut());
1134            values.append(vec![200.0f64; 500].as_mut());
1135            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1136
1137            let mut values = vec![100i32; 512];
1138            values.append(vec![150i32; 12].as_mut());
1139            values.append(vec![200i32; 500].as_mut());
1140            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1141
1142            let mut values = vec![100i64; 512];
1143            values.append(vec![150i64; 12].as_mut());
1144            values.append(vec![200i64; 500].as_mut());
1145            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1146
1147            let mut values = vec![false; 512];
1148            values.append(vec![true; 512].as_mut());
1149            let values: BooleanArray = values.into();
1150            let col8 = Arc::new(values) as ArrayRef;
1151
1152            let to_write = RecordBatch::try_new(schema.clone(), vec![
1153                col1, col2, col3, col4, col5, col6, col7, col8,
1154            ])
1155            .unwrap();
1156
1157            // Write the Parquet files
1158            let props = WriterProperties::builder()
1159                .set_compression(Compression::SNAPPY)
1160                .build();
1161
1162            for n in 1..=3 {
1163                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1164                let mut writer =
1165                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1166
1167                writer.write(&to_write).expect("Writing batch");
1168
1169                // writer must be closed to write footer
1170                writer.close().unwrap();
1171            }
1172        }
1173    }
1174
1175    #[test]
1176    fn test_table_scan_columns() {
1177        let table = TableTestFixture::new().table;
1178
1179        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1180        assert_eq!(
1181            Some(vec!["x".to_string(), "y".to_string()]),
1182            table_scan.column_names
1183        );
1184
1185        let table_scan = table
1186            .scan()
1187            .select(["x", "y"])
1188            .select(["z"])
1189            .build()
1190            .unwrap();
1191        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1192    }
1193
1194    #[test]
1195    fn test_select_all() {
1196        let table = TableTestFixture::new().table;
1197
1198        let table_scan = table.scan().select_all().build().unwrap();
1199        assert!(table_scan.column_names.is_none());
1200    }
1201
1202    #[test]
1203    fn test_select_no_exist_column() {
1204        let table = TableTestFixture::new().table;
1205
1206        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1207        assert!(table_scan.is_err());
1208    }
1209
1210    #[test]
1211    fn test_table_scan_default_snapshot_id() {
1212        let table = TableTestFixture::new().table;
1213
1214        let table_scan = table.scan().build().unwrap();
1215        assert_eq!(
1216            table.metadata().current_snapshot().unwrap().snapshot_id(),
1217            table_scan.snapshot().unwrap().snapshot_id()
1218        );
1219    }
1220
1221    #[test]
1222    fn test_table_scan_non_exist_snapshot_id() {
1223        let table = TableTestFixture::new().table;
1224
1225        let table_scan = table.scan().snapshot_id(1024).build();
1226        assert!(table_scan.is_err());
1227    }
1228
1229    #[test]
1230    fn test_table_scan_with_snapshot_id() {
1231        let table = TableTestFixture::new().table;
1232
1233        let table_scan = table
1234            .scan()
1235            .snapshot_id(3051729675574597004)
1236            .with_row_selection_enabled(true)
1237            .build()
1238            .unwrap();
1239        assert_eq!(
1240            table_scan.snapshot().unwrap().snapshot_id(),
1241            3051729675574597004
1242        );
1243    }
1244
1245    #[tokio::test]
1246    async fn test_plan_files_on_table_without_any_snapshots() {
1247        let table = TableTestFixture::new_empty().table;
1248        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1249        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1250        assert!(batches.is_empty());
1251    }
1252
1253    #[tokio::test]
1254    async fn test_plan_files_no_deletions() {
1255        let mut fixture = TableTestFixture::new();
1256        fixture.setup_manifest_files().await;
1257
1258        // Create table scan for current snapshot and plan files
1259        let table_scan = fixture
1260            .table
1261            .scan()
1262            .with_row_selection_enabled(true)
1263            .build()
1264            .unwrap();
1265
1266        let mut tasks = table_scan
1267            .plan_files()
1268            .await
1269            .unwrap()
1270            .try_fold(vec![], |mut acc, task| async move {
1271                acc.push(task);
1272                Ok(acc)
1273            })
1274            .await
1275            .unwrap();
1276
1277        assert_eq!(tasks.len(), 2);
1278
1279        tasks.sort_by_key(|t| t.data_file_path.to_string());
1280
1281        // Check first task is added data file
1282        assert_eq!(
1283            tasks[0].data_file_path,
1284            format!("{}/1.parquet", &fixture.table_location)
1285        );
1286
1287        // Check second task is existing data file
1288        assert_eq!(
1289            tasks[1].data_file_path,
1290            format!("{}/3.parquet", &fixture.table_location)
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn test_open_parquet_no_deletions() {
1296        let mut fixture = TableTestFixture::new();
1297        fixture.setup_manifest_files().await;
1298
1299        // Create table scan for current snapshot and plan files
1300        let table_scan = fixture
1301            .table
1302            .scan()
1303            .with_row_selection_enabled(true)
1304            .build()
1305            .unwrap();
1306
1307        let batch_stream = table_scan.to_arrow().await.unwrap();
1308
1309        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1310
1311        let col = batches[0].column_by_name("x").unwrap();
1312
1313        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1314        assert_eq!(int64_arr.value(0), 1);
1315    }
1316
1317    #[tokio::test]
1318    async fn test_open_parquet_no_deletions_by_separate_reader() {
1319        let mut fixture = TableTestFixture::new();
1320        fixture.setup_manifest_files().await;
1321
1322        // Create table scan for current snapshot and plan files
1323        let table_scan = fixture
1324            .table
1325            .scan()
1326            .with_row_selection_enabled(true)
1327            .build()
1328            .unwrap();
1329
1330        let mut plan_task: Vec<_> = table_scan
1331            .plan_files()
1332            .await
1333            .unwrap()
1334            .try_collect()
1335            .await
1336            .unwrap();
1337        assert_eq!(plan_task.len(), 2);
1338
1339        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1340        let batch_stream = reader
1341            .clone()
1342            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1343            .unwrap();
1344        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1345
1346        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1347        let batch_stream = reader
1348            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1349            .unwrap();
1350        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1351
1352        assert_eq!(batch_1, batch_2);
1353    }
1354
1355    #[tokio::test]
1356    async fn test_open_parquet_with_projection() {
1357        let mut fixture = TableTestFixture::new();
1358        fixture.setup_manifest_files().await;
1359
1360        // Create table scan for current snapshot and plan files
1361        let table_scan = fixture
1362            .table
1363            .scan()
1364            .select(["x", "z"])
1365            .with_row_selection_enabled(true)
1366            .build()
1367            .unwrap();
1368
1369        let batch_stream = table_scan.to_arrow().await.unwrap();
1370
1371        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1372
1373        assert_eq!(batches[0].num_columns(), 2);
1374
1375        let col1 = batches[0].column_by_name("x").unwrap();
1376        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1377        assert_eq!(int64_arr.value(0), 1);
1378
1379        let col2 = batches[0].column_by_name("z").unwrap();
1380        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1381        assert_eq!(int64_arr.value(0), 3);
1382
1383        // test empty scan
1384        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1385        let batch_stream = table_scan.to_arrow().await.unwrap();
1386        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1387
1388        assert_eq!(batches[0].num_columns(), 0);
1389        assert_eq!(batches[0].num_rows(), 1024);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_filter_on_arrow_lt() {
1394        let mut fixture = TableTestFixture::new();
1395        fixture.setup_manifest_files().await;
1396
1397        // Filter: y < 3
1398        let mut builder = fixture.table.scan();
1399        let predicate = Reference::new("y").less_than(Datum::long(3));
1400        builder = builder
1401            .with_filter(predicate)
1402            .with_row_selection_enabled(true);
1403        let table_scan = builder.build().unwrap();
1404
1405        let batch_stream = table_scan.to_arrow().await.unwrap();
1406
1407        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1408
1409        assert_eq!(batches[0].num_rows(), 512);
1410
1411        let col = batches[0].column_by_name("x").unwrap();
1412        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1413        assert_eq!(int64_arr.value(0), 1);
1414
1415        let col = batches[0].column_by_name("y").unwrap();
1416        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1417        assert_eq!(int64_arr.value(0), 2);
1418    }
1419
1420    #[tokio::test]
1421    async fn test_filter_on_arrow_gt_eq() {
1422        let mut fixture = TableTestFixture::new();
1423        fixture.setup_manifest_files().await;
1424
1425        // Filter: y >= 5
1426        let mut builder = fixture.table.scan();
1427        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1428        builder = builder
1429            .with_filter(predicate)
1430            .with_row_selection_enabled(true);
1431        let table_scan = builder.build().unwrap();
1432
1433        let batch_stream = table_scan.to_arrow().await.unwrap();
1434
1435        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437        assert_eq!(batches[0].num_rows(), 12);
1438
1439        let col = batches[0].column_by_name("x").unwrap();
1440        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441        assert_eq!(int64_arr.value(0), 1);
1442
1443        let col = batches[0].column_by_name("y").unwrap();
1444        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1445        assert_eq!(int64_arr.value(0), 5);
1446    }
1447
1448    #[tokio::test]
1449    async fn test_filter_double_eq() {
1450        let mut fixture = TableTestFixture::new();
1451        fixture.setup_manifest_files().await;
1452
1453        // Filter: dbl == 150.0
1454        let mut builder = fixture.table.scan();
1455        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1456        builder = builder
1457            .with_filter(predicate)
1458            .with_row_selection_enabled(true);
1459        let table_scan = builder.build().unwrap();
1460
1461        let batch_stream = table_scan.to_arrow().await.unwrap();
1462
1463        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1464
1465        assert_eq!(batches.len(), 2);
1466        assert_eq!(batches[0].num_rows(), 12);
1467
1468        let col = batches[0].column_by_name("dbl").unwrap();
1469        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1470        assert_eq!(f64_arr.value(1), 150.0f64);
1471    }
1472
1473    #[tokio::test]
1474    async fn test_filter_int_eq() {
1475        let mut fixture = TableTestFixture::new();
1476        fixture.setup_manifest_files().await;
1477
1478        // Filter: i32 == 150
1479        let mut builder = fixture.table.scan();
1480        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1481        builder = builder
1482            .with_filter(predicate)
1483            .with_row_selection_enabled(true);
1484        let table_scan = builder.build().unwrap();
1485
1486        let batch_stream = table_scan.to_arrow().await.unwrap();
1487
1488        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1489
1490        assert_eq!(batches.len(), 2);
1491        assert_eq!(batches[0].num_rows(), 12);
1492
1493        let col = batches[0].column_by_name("i32").unwrap();
1494        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1495        assert_eq!(i32_arr.value(1), 150i32);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_filter_long_eq() {
1500        let mut fixture = TableTestFixture::new();
1501        fixture.setup_manifest_files().await;
1502
1503        // Filter: i64 == 150
1504        let mut builder = fixture.table.scan();
1505        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1506        builder = builder
1507            .with_filter(predicate)
1508            .with_row_selection_enabled(true);
1509        let table_scan = builder.build().unwrap();
1510
1511        let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515        assert_eq!(batches.len(), 2);
1516        assert_eq!(batches[0].num_rows(), 12);
1517
1518        let col = batches[0].column_by_name("i64").unwrap();
1519        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1520        assert_eq!(i64_arr.value(1), 150i64);
1521    }
1522
1523    #[tokio::test]
1524    async fn test_filter_bool_eq() {
1525        let mut fixture = TableTestFixture::new();
1526        fixture.setup_manifest_files().await;
1527
1528        // Filter: bool == true
1529        let mut builder = fixture.table.scan();
1530        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1531        builder = builder
1532            .with_filter(predicate)
1533            .with_row_selection_enabled(true);
1534        let table_scan = builder.build().unwrap();
1535
1536        let batch_stream = table_scan.to_arrow().await.unwrap();
1537
1538        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1539
1540        assert_eq!(batches.len(), 2);
1541        assert_eq!(batches[0].num_rows(), 512);
1542
1543        let col = batches[0].column_by_name("bool").unwrap();
1544        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1545        assert!(bool_arr.value(1));
1546    }
1547
1548    #[tokio::test]
1549    async fn test_filter_on_arrow_is_null() {
1550        let mut fixture = TableTestFixture::new();
1551        fixture.setup_manifest_files().await;
1552
1553        // Filter: y is null
1554        let mut builder = fixture.table.scan();
1555        let predicate = Reference::new("y").is_null();
1556        builder = builder
1557            .with_filter(predicate)
1558            .with_row_selection_enabled(true);
1559        let table_scan = builder.build().unwrap();
1560
1561        let batch_stream = table_scan.to_arrow().await.unwrap();
1562
1563        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1564        assert_eq!(batches.len(), 0);
1565    }
1566
1567    #[tokio::test]
1568    async fn test_filter_on_arrow_is_not_null() {
1569        let mut fixture = TableTestFixture::new();
1570        fixture.setup_manifest_files().await;
1571
1572        // Filter: y is not null
1573        let mut builder = fixture.table.scan();
1574        let predicate = Reference::new("y").is_not_null();
1575        builder = builder
1576            .with_filter(predicate)
1577            .with_row_selection_enabled(true);
1578        let table_scan = builder.build().unwrap();
1579
1580        let batch_stream = table_scan.to_arrow().await.unwrap();
1581
1582        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1583        assert_eq!(batches[0].num_rows(), 1024);
1584    }
1585
1586    #[tokio::test]
1587    async fn test_filter_on_arrow_lt_and_gt() {
1588        let mut fixture = TableTestFixture::new();
1589        fixture.setup_manifest_files().await;
1590
1591        // Filter: y < 5 AND z >= 4
1592        let mut builder = fixture.table.scan();
1593        let predicate = Reference::new("y")
1594            .less_than(Datum::long(5))
1595            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1596        builder = builder
1597            .with_filter(predicate)
1598            .with_row_selection_enabled(true);
1599        let table_scan = builder.build().unwrap();
1600
1601        let batch_stream = table_scan.to_arrow().await.unwrap();
1602
1603        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1604        assert_eq!(batches[0].num_rows(), 500);
1605
1606        let col = batches[0].column_by_name("x").unwrap();
1607        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1608        assert_eq!(col, &expected_x);
1609
1610        let col = batches[0].column_by_name("y").unwrap();
1611        let mut values = vec![];
1612        values.append(vec![3; 200].as_mut());
1613        values.append(vec![4; 300].as_mut());
1614        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1615        assert_eq!(col, &expected_y);
1616
1617        let col = batches[0].column_by_name("z").unwrap();
1618        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1619        assert_eq!(col, &expected_z);
1620    }
1621
1622    #[tokio::test]
1623    async fn test_filter_on_arrow_lt_or_gt() {
1624        let mut fixture = TableTestFixture::new();
1625        fixture.setup_manifest_files().await;
1626
1627        // Filter: y < 5 AND z >= 4
1628        let mut builder = fixture.table.scan();
1629        let predicate = Reference::new("y")
1630            .less_than(Datum::long(5))
1631            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1632        builder = builder
1633            .with_filter(predicate)
1634            .with_row_selection_enabled(true);
1635        let table_scan = builder.build().unwrap();
1636
1637        let batch_stream = table_scan.to_arrow().await.unwrap();
1638
1639        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1640        assert_eq!(batches[0].num_rows(), 1024);
1641
1642        let col = batches[0].column_by_name("x").unwrap();
1643        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1644        assert_eq!(col, &expected_x);
1645
1646        let col = batches[0].column_by_name("y").unwrap();
1647        let mut values = vec![2; 512];
1648        values.append(vec![3; 200].as_mut());
1649        values.append(vec![4; 300].as_mut());
1650        values.append(vec![5; 12].as_mut());
1651        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1652        assert_eq!(col, &expected_y);
1653
1654        let col = batches[0].column_by_name("z").unwrap();
1655        let mut values = vec![3; 512];
1656        values.append(vec![4; 512].as_mut());
1657        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1658        assert_eq!(col, &expected_z);
1659    }
1660
1661    #[tokio::test]
1662    async fn test_filter_on_arrow_startswith() {
1663        let mut fixture = TableTestFixture::new();
1664        fixture.setup_manifest_files().await;
1665
1666        // Filter: a STARTSWITH "Ice"
1667        let mut builder = fixture.table.scan();
1668        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1669        builder = builder
1670            .with_filter(predicate)
1671            .with_row_selection_enabled(true);
1672        let table_scan = builder.build().unwrap();
1673
1674        let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677
1678        assert_eq!(batches[0].num_rows(), 512);
1679
1680        let col = batches[0].column_by_name("a").unwrap();
1681        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1682        assert_eq!(string_arr.value(0), "Iceberg");
1683    }
1684
1685    #[tokio::test]
1686    async fn test_filter_on_arrow_not_startswith() {
1687        let mut fixture = TableTestFixture::new();
1688        fixture.setup_manifest_files().await;
1689
1690        // Filter: a NOT STARTSWITH "Ice"
1691        let mut builder = fixture.table.scan();
1692        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1693        builder = builder
1694            .with_filter(predicate)
1695            .with_row_selection_enabled(true);
1696        let table_scan = builder.build().unwrap();
1697
1698        let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702        assert_eq!(batches[0].num_rows(), 512);
1703
1704        let col = batches[0].column_by_name("a").unwrap();
1705        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706        assert_eq!(string_arr.value(0), "Apache");
1707    }
1708
1709    #[tokio::test]
1710    async fn test_filter_on_arrow_in() {
1711        let mut fixture = TableTestFixture::new();
1712        fixture.setup_manifest_files().await;
1713
1714        // Filter: a IN ("Sioux", "Iceberg")
1715        let mut builder = fixture.table.scan();
1716        let predicate =
1717            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1718        builder = builder
1719            .with_filter(predicate)
1720            .with_row_selection_enabled(true);
1721        let table_scan = builder.build().unwrap();
1722
1723        let batch_stream = table_scan.to_arrow().await.unwrap();
1724
1725        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1726
1727        assert_eq!(batches[0].num_rows(), 512);
1728
1729        let col = batches[0].column_by_name("a").unwrap();
1730        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1731        assert_eq!(string_arr.value(0), "Iceberg");
1732    }
1733
1734    #[tokio::test]
1735    async fn test_filter_on_arrow_not_in() {
1736        let mut fixture = TableTestFixture::new();
1737        fixture.setup_manifest_files().await;
1738
1739        // Filter: a NOT IN ("Sioux", "Iceberg")
1740        let mut builder = fixture.table.scan();
1741        let predicate =
1742            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1743        builder = builder
1744            .with_filter(predicate)
1745            .with_row_selection_enabled(true);
1746        let table_scan = builder.build().unwrap();
1747
1748        let batch_stream = table_scan.to_arrow().await.unwrap();
1749
1750        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1751
1752        assert_eq!(batches[0].num_rows(), 512);
1753
1754        let col = batches[0].column_by_name("a").unwrap();
1755        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1756        assert_eq!(string_arr.value(0), "Apache");
1757    }
1758
1759    #[test]
1760    fn test_file_scan_task_serialize_deserialize() {
1761        let test_fn = |task: FileScanTask| {
1762            let serialized = serde_json::to_string(&task).unwrap();
1763            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1764
1765            assert_eq!(task.data_file_path, deserialized.data_file_path);
1766            assert_eq!(task.start, deserialized.start);
1767            assert_eq!(task.length, deserialized.length);
1768            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1769            assert_eq!(task.predicate, deserialized.predicate);
1770            assert_eq!(task.schema, deserialized.schema);
1771        };
1772
1773        // without predicate
1774        let schema = Arc::new(
1775            Schema::builder()
1776                .with_fields(vec![Arc::new(NestedField::required(
1777                    1,
1778                    "x",
1779                    Type::Primitive(PrimitiveType::Binary),
1780                ))])
1781                .build()
1782                .unwrap(),
1783        );
1784        let task = FileScanTask {
1785            data_file_path: "data_file_path".to_string(),
1786            start: 0,
1787            length: 100,
1788            project_field_ids: vec![1, 2, 3],
1789            predicate: None,
1790            schema: schema.clone(),
1791            record_count: Some(100),
1792            data_file_format: DataFileFormat::Parquet,
1793            deletes: vec![],
1794            partition: None,
1795            partition_spec: None,
1796            name_mapping: None,
1797        };
1798        test_fn(task);
1799
1800        // with predicate
1801        let task = FileScanTask {
1802            data_file_path: "data_file_path".to_string(),
1803            start: 0,
1804            length: 100,
1805            project_field_ids: vec![1, 2, 3],
1806            predicate: Some(BoundPredicate::AlwaysTrue),
1807            schema,
1808            record_count: None,
1809            data_file_format: DataFileFormat::Avro,
1810            deletes: vec![],
1811            partition: None,
1812            partition_spec: None,
1813            name_mapping: None,
1814        };
1815        test_fn(task);
1816    }
1817
1818    #[tokio::test]
1819    async fn test_select_with_file_column() {
1820        use arrow_array::cast::AsArray;
1821
1822        let mut fixture = TableTestFixture::new();
1823        fixture.setup_manifest_files().await;
1824
1825        // Select regular columns plus the _file column
1826        let table_scan = fixture
1827            .table
1828            .scan()
1829            .select(["x", RESERVED_COL_NAME_FILE])
1830            .with_row_selection_enabled(true)
1831            .build()
1832            .unwrap();
1833
1834        let batch_stream = table_scan.to_arrow().await.unwrap();
1835        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1836
1837        // Verify we have 2 columns: x and _file
1838        assert_eq!(batches[0].num_columns(), 2);
1839
1840        // Verify the x column exists and has correct data
1841        let x_col = batches[0].column_by_name("x").unwrap();
1842        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1843        assert_eq!(x_arr.value(0), 1);
1844
1845        // Verify the _file column exists
1846        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1847        assert!(
1848            file_col.is_some(),
1849            "_file column should be present in the batch"
1850        );
1851
1852        // Verify the _file column contains a file path
1853        let file_col = file_col.unwrap();
1854        assert!(
1855            matches!(
1856                file_col.data_type(),
1857                arrow_schema::DataType::RunEndEncoded(_, _)
1858            ),
1859            "_file column should use RunEndEncoded type"
1860        );
1861
1862        // Decode the RunArray to verify it contains the file path
1863        let run_array = file_col
1864            .as_any()
1865            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1866            .expect("_file column should be a RunArray");
1867
1868        let values = run_array.values();
1869        let string_values = values.as_string::<i32>();
1870        assert_eq!(string_values.len(), 1, "Should have a single file path");
1871
1872        let file_path = string_values.value(0);
1873        assert!(
1874            file_path.ends_with(".parquet"),
1875            "File path should end with .parquet, got: {file_path}"
1876        );
1877    }
1878
1879    #[tokio::test]
1880    async fn test_select_file_column_position() {
1881        let mut fixture = TableTestFixture::new();
1882        fixture.setup_manifest_files().await;
1883
1884        // Select columns in specific order: x, _file, z
1885        let table_scan = fixture
1886            .table
1887            .scan()
1888            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1889            .with_row_selection_enabled(true)
1890            .build()
1891            .unwrap();
1892
1893        let batch_stream = table_scan.to_arrow().await.unwrap();
1894        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1895
1896        assert_eq!(batches[0].num_columns(), 3);
1897
1898        // Verify column order: x at position 0, _file at position 1, z at position 2
1899        let schema = batches[0].schema();
1900        assert_eq!(schema.field(0).name(), "x");
1901        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1902        assert_eq!(schema.field(2).name(), "z");
1903
1904        // Verify columns by name also works
1905        assert!(batches[0].column_by_name("x").is_some());
1906        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1907        assert!(batches[0].column_by_name("z").is_some());
1908    }
1909
1910    #[tokio::test]
1911    async fn test_select_file_column_only() {
1912        let mut fixture = TableTestFixture::new();
1913        fixture.setup_manifest_files().await;
1914
1915        // Select only the _file column
1916        let table_scan = fixture
1917            .table
1918            .scan()
1919            .select([RESERVED_COL_NAME_FILE])
1920            .with_row_selection_enabled(true)
1921            .build()
1922            .unwrap();
1923
1924        let batch_stream = table_scan.to_arrow().await.unwrap();
1925        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1926
1927        // Should have exactly 1 column
1928        assert_eq!(batches[0].num_columns(), 1);
1929
1930        // Verify it's the _file column
1931        let schema = batches[0].schema();
1932        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1933
1934        // Verify the batch has the correct number of rows
1935        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
1936        // Each file has 1024 rows, so total is 2048 rows
1937        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1938        assert_eq!(total_rows, 2048);
1939    }
1940
1941    #[tokio::test]
1942    async fn test_file_column_with_multiple_files() {
1943        use std::collections::HashSet;
1944
1945        let mut fixture = TableTestFixture::new();
1946        fixture.setup_manifest_files().await;
1947
1948        // Select x and _file columns
1949        let table_scan = fixture
1950            .table
1951            .scan()
1952            .select(["x", RESERVED_COL_NAME_FILE])
1953            .with_row_selection_enabled(true)
1954            .build()
1955            .unwrap();
1956
1957        let batch_stream = table_scan.to_arrow().await.unwrap();
1958        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1959
1960        // Collect all unique file paths from the batches
1961        let mut file_paths = HashSet::new();
1962        for batch in &batches {
1963            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1964            let run_array = file_col
1965                .as_any()
1966                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1967                .expect("_file column should be a RunArray");
1968
1969            let values = run_array.values();
1970            let string_values = values.as_string::<i32>();
1971            for i in 0..string_values.len() {
1972                file_paths.insert(string_values.value(i).to_string());
1973            }
1974        }
1975
1976        // We should have multiple files (the test creates 1.parquet and 3.parquet)
1977        assert!(!file_paths.is_empty(), "Should have at least one file path");
1978
1979        // All paths should end with .parquet
1980        for path in &file_paths {
1981            assert!(
1982                path.ends_with(".parquet"),
1983                "All file paths should end with .parquet, got: {path}"
1984            );
1985        }
1986    }
1987
1988    #[tokio::test]
1989    async fn test_file_column_at_start() {
1990        let mut fixture = TableTestFixture::new();
1991        fixture.setup_manifest_files().await;
1992
1993        // Select _file at the start
1994        let table_scan = fixture
1995            .table
1996            .scan()
1997            .select([RESERVED_COL_NAME_FILE, "x", "y"])
1998            .with_row_selection_enabled(true)
1999            .build()
2000            .unwrap();
2001
2002        let batch_stream = table_scan.to_arrow().await.unwrap();
2003        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2004
2005        assert_eq!(batches[0].num_columns(), 3);
2006
2007        // Verify _file is at position 0
2008        let schema = batches[0].schema();
2009        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2010        assert_eq!(schema.field(1).name(), "x");
2011        assert_eq!(schema.field(2).name(), "y");
2012    }
2013
2014    #[tokio::test]
2015    async fn test_file_column_at_end() {
2016        let mut fixture = TableTestFixture::new();
2017        fixture.setup_manifest_files().await;
2018
2019        // Select _file at the end
2020        let table_scan = fixture
2021            .table
2022            .scan()
2023            .select(["x", "y", RESERVED_COL_NAME_FILE])
2024            .with_row_selection_enabled(true)
2025            .build()
2026            .unwrap();
2027
2028        let batch_stream = table_scan.to_arrow().await.unwrap();
2029        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2030
2031        assert_eq!(batches[0].num_columns(), 3);
2032
2033        // Verify _file is at position 2 (the end)
2034        let schema = batches[0].schema();
2035        assert_eq!(schema.field(0).name(), "x");
2036        assert_eq!(schema.field(1).name(), "y");
2037        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2038    }
2039
2040    #[tokio::test]
2041    async fn test_select_with_repeated_column_names() {
2042        let mut fixture = TableTestFixture::new();
2043        fixture.setup_manifest_files().await;
2044
2045        // Select with repeated column names - both regular columns and virtual columns
2046        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2047        let table_scan = fixture
2048            .table
2049            .scan()
2050            .select([
2051                "x",
2052                RESERVED_COL_NAME_FILE,
2053                "x", // x repeated
2054                "y",
2055                RESERVED_COL_NAME_FILE, // _file repeated
2056                "y",                    // y repeated
2057            ])
2058            .with_row_selection_enabled(true)
2059            .build()
2060            .unwrap();
2061
2062        let batch_stream = table_scan.to_arrow().await.unwrap();
2063        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2064
2065        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2066        assert_eq!(
2067            batches[0].num_columns(),
2068            6,
2069            "Should have exactly 6 columns with duplicates"
2070        );
2071
2072        let schema = batches[0].schema();
2073
2074        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2075        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2076        assert_eq!(
2077            schema.field(1).name(),
2078            RESERVED_COL_NAME_FILE,
2079            "Column 1 should be _file"
2080        );
2081        assert_eq!(
2082            schema.field(2).name(),
2083            "x",
2084            "Column 2 should be x (duplicate)"
2085        );
2086        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2087        assert_eq!(
2088            schema.field(4).name(),
2089            RESERVED_COL_NAME_FILE,
2090            "Column 4 should be _file (duplicate)"
2091        );
2092        assert_eq!(
2093            schema.field(5).name(),
2094            "y",
2095            "Column 5 should be y (duplicate)"
2096        );
2097
2098        // Verify all columns have correct data types
2099        assert!(
2100            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2101            "Column x should be Int64"
2102        );
2103        assert!(
2104            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2105            "Column x (duplicate) should be Int64"
2106        );
2107        assert!(
2108            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2109            "Column y should be Int64"
2110        );
2111        assert!(
2112            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2113            "Column y (duplicate) should be Int64"
2114        );
2115        assert!(
2116            matches!(
2117                schema.field(1).data_type(),
2118                arrow_schema::DataType::RunEndEncoded(_, _)
2119            ),
2120            "_file column should use RunEndEncoded type"
2121        );
2122        assert!(
2123            matches!(
2124                schema.field(4).data_type(),
2125                arrow_schema::DataType::RunEndEncoded(_, _)
2126            ),
2127            "_file column (duplicate) should use RunEndEncoded type"
2128        );
2129    }
2130}