iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46/// A stream of arrow [`RecordBatch`]es.
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49/// Builder to create table scan.
50pub struct TableScanBuilder<'a> {
51    table: &'a Table,
52    // Defaults to none which means select all columns
53    column_names: Option<Vec<String>>,
54    snapshot_id: Option<i64>,
55    batch_size: Option<usize>,
56    case_sensitive: bool,
57    filter: Option<Predicate>,
58    concurrency_limit_data_files: usize,
59    concurrency_limit_manifest_entries: usize,
60    concurrency_limit_manifest_files: usize,
61    row_group_filtering_enabled: bool,
62    row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66    pub(crate) fn new(table: &'a Table) -> Self {
67        let num_cpus = available_parallelism().get();
68
69        Self {
70            table,
71            column_names: None,
72            snapshot_id: None,
73            batch_size: None,
74            case_sensitive: true,
75            filter: None,
76            concurrency_limit_data_files: num_cpus,
77            concurrency_limit_manifest_entries: num_cpus,
78            concurrency_limit_manifest_files: num_cpus,
79            row_group_filtering_enabled: true,
80            row_selection_enabled: false,
81        }
82    }
83
84    /// Sets the desired size of batches in the response
85    /// to something other than the default
86    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90
91    /// Sets the scan's case sensitivity
92    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93        self.case_sensitive = case_sensitive;
94        self
95    }
96
97    /// Specifies a predicate to use as a filter
98    pub fn with_filter(mut self, predicate: Predicate) -> Self {
99        // calls rewrite_not to remove Not nodes, which must be absent
100        // when applying the manifest evaluator
101        self.filter = Some(predicate.rewrite_not());
102        self
103    }
104
105    /// Select all columns.
106    pub fn select_all(mut self) -> Self {
107        self.column_names = None;
108        self
109    }
110
111    /// Select empty columns.
112    pub fn select_empty(mut self) -> Self {
113        self.column_names = Some(vec![]);
114        self
115    }
116
117    /// Select some columns of the table.
118    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119        self.column_names = Some(
120            column_names
121                .into_iter()
122                .map(|item| item.to_string())
123                .collect(),
124        );
125        self
126    }
127
128    /// Set the snapshot to scan. When not set, it uses current snapshot.
129    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130        self.snapshot_id = Some(snapshot_id);
131        self
132    }
133
134    /// Sets the concurrency limit for both manifest files and manifest
135    /// entries for this scan
136    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137        self.concurrency_limit_manifest_files = limit;
138        self.concurrency_limit_manifest_entries = limit;
139        self.concurrency_limit_data_files = limit;
140        self
141    }
142
143    /// Sets the data file concurrency limit for this scan
144    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145        self.concurrency_limit_data_files = limit;
146        self
147    }
148
149    /// Sets the manifest entry concurrency limit for this scan
150    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151        self.concurrency_limit_manifest_entries = limit;
152        self
153    }
154
155    /// Determines whether to enable row group filtering.
156    /// When enabled, if a read is performed with a filter predicate,
157    /// then the metadata for each row group in the parquet file is
158    /// evaluated against the filter predicate and row groups
159    /// that cant contain matching rows will be skipped entirely.
160    ///
161    /// Defaults to enabled, as it generally improves performance or
162    /// keeps it the same, with performance degradation unlikely.
163    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164        self.row_group_filtering_enabled = row_group_filtering_enabled;
165        self
166    }
167
168    /// Determines whether to enable row selection.
169    /// When enabled, if a read is performed with a filter predicate,
170    /// then (for row groups that have not been skipped) the page index
171    /// for each row group in a parquet file is parsed and evaluated
172    /// against the filter predicate to determine if ranges of rows
173    /// within a row group can be skipped, based upon the page-level
174    /// statistics for each column.
175    ///
176    /// Defaults to being disabled. Enabling requires parsing the parquet page
177    /// index, which can be slow enough that parsing the page index outweighs any
178    /// gains from the reduced number of rows that need scanning.
179    /// It is recommended to experiment with partitioning, sorting, row group size,
180    /// page size, and page row limit Iceberg settings on the table being scanned in
181    /// order to get the best performance from using row selection.
182    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183        self.row_selection_enabled = row_selection_enabled;
184        self
185    }
186
187    /// Build the table scan.
188    pub fn build(self) -> Result<TableScan> {
189        let snapshot = match self.snapshot_id {
190            Some(snapshot_id) => self
191                .table
192                .metadata()
193                .snapshot_by_id(snapshot_id)
194                .ok_or_else(|| {
195                    Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("Snapshot with id {snapshot_id} not found"),
198                    )
199                })?
200                .clone(),
201            None => {
202                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203                    return Ok(TableScan {
204                        batch_size: self.batch_size,
205                        column_names: self.column_names,
206                        file_io: self.table.file_io().clone(),
207                        plan_context: None,
208                        concurrency_limit_data_files: self.concurrency_limit_data_files,
209                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211                        row_group_filtering_enabled: self.row_group_filtering_enabled,
212                        row_selection_enabled: self.row_selection_enabled,
213                    });
214                };
215                current_snapshot_id.clone()
216            }
217        };
218
219        let schema = snapshot.schema(self.table.metadata())?;
220
221        // Check that all column names exist in the schema (skip reserved columns).
222        if let Some(column_names) = self.column_names.as_ref() {
223            for column_name in column_names {
224                // Skip reserved columns that don't exist in the schema
225                if is_metadata_column_name(column_name) {
226                    continue;
227                }
228                if schema.field_by_name(column_name).is_none() {
229                    return Err(Error::new(
230                        ErrorKind::DataInvalid,
231                        format!("Column {column_name} not found in table. Schema: {schema}"),
232                    ));
233                }
234            }
235        }
236
237        let mut field_ids = vec![];
238        let column_names = self.column_names.clone().unwrap_or_else(|| {
239            schema
240                .as_struct()
241                .fields()
242                .iter()
243                .map(|f| f.name.clone())
244                .collect()
245        });
246
247        for column_name in column_names.iter() {
248            // Handle metadata columns (like "_file")
249            if is_metadata_column_name(column_name) {
250                field_ids.push(get_metadata_field_id(column_name)?);
251                continue;
252            }
253
254            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255                Error::new(
256                    ErrorKind::DataInvalid,
257                    format!("Column {column_name} not found in table. Schema: {schema}"),
258                )
259            })?;
260
261            schema
262                .as_struct()
263                .field_by_id(field_id)
264                .ok_or_else(|| {
265                    Error::new(
266                        ErrorKind::FeatureUnsupported,
267                        format!(
268                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269                    ),
270                )
271            })?;
272
273            field_ids.push(field_id);
274        }
275
276        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277            Some(predicates.bind(schema.clone(), true)?)
278        } else {
279            None
280        };
281
282        let plan_context = PlanContext {
283            snapshot,
284            table_metadata: self.table.metadata_ref(),
285            snapshot_schema: schema,
286            case_sensitive: self.case_sensitive,
287            predicate: self.filter.map(Arc::new),
288            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289            object_cache: self.table.object_cache(),
290            field_ids: Arc::new(field_ids),
291            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294        };
295
296        Ok(TableScan {
297            batch_size: self.batch_size,
298            column_names: self.column_names,
299            file_io: self.table.file_io().clone(),
300            plan_context: Some(plan_context),
301            concurrency_limit_data_files: self.concurrency_limit_data_files,
302            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304            row_group_filtering_enabled: self.row_group_filtering_enabled,
305            row_selection_enabled: self.row_selection_enabled,
306        })
307    }
308}
309
310/// Table scan.
311#[derive(Debug)]
312pub struct TableScan {
313    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
314    ///
315    /// If this is None, then the scan contains no rows.
316    plan_context: Option<PlanContext>,
317    batch_size: Option<usize>,
318    file_io: FileIO,
319    column_names: Option<Vec<String>>,
320    /// The maximum number of manifest files that will be
321    /// retrieved from [`FileIO`] concurrently
322    concurrency_limit_manifest_files: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_manifest_entries: usize,
327
328    /// The maximum number of [`ManifestEntry`]s that will
329    /// be processed in parallel
330    concurrency_limit_data_files: usize,
331
332    row_group_filtering_enabled: bool,
333    row_selection_enabled: bool,
334}
335
336impl TableScan {
337    /// Returns a stream of [`FileScanTask`]s.
338    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339        let Some(plan_context) = self.plan_context.as_ref() else {
340            return Ok(Box::pin(futures::stream::empty()));
341        };
342
343        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346        // used to stream ManifestEntryContexts between stages of the file plan operation
347        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348            channel(concurrency_limit_manifest_files);
349        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350            channel(concurrency_limit_manifest_files);
351
352        // used to stream the results back to the caller
353        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357        let manifest_list = plan_context.get_manifest_list().await?;
358
359        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
360        // whose partitions cannot match this
361        // scan's filter
362        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363            manifest_list,
364            manifest_entry_data_ctx_tx,
365            delete_file_idx.clone(),
366            manifest_entry_delete_ctx_tx,
367        )?;
368
369        let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
372        spawn(async move {
373            let result = futures::stream::iter(manifest_file_contexts)
374                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375                    ctx.fetch_manifest_and_stream_manifest_entries().await
376                })
377                .await;
378
379            if let Err(error) = result {
380                let _ = channel_for_manifest_error.send(Err(error)).await;
381            }
382        });
383
384        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387        // Process the delete file [`ManifestEntry`] stream in parallel
388        spawn(async move {
389            let result = manifest_entry_delete_ctx_rx
390                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391                .try_for_each_concurrent(
392                    concurrency_limit_manifest_entries,
393                    |(manifest_entry_context, tx)| async move {
394                        spawn(async move {
395                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396                        })
397                        .await
398                    },
399                )
400                .await;
401
402            if let Err(error) = result {
403                let _ = channel_for_delete_manifest_entry_error
404                    .send(Err(error))
405                    .await;
406            }
407        })
408        .await;
409
410        // Process the data file [`ManifestEntry`] stream in parallel
411        spawn(async move {
412            let result = manifest_entry_data_ctx_rx
413                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414                .try_for_each_concurrent(
415                    concurrency_limit_manifest_entries,
416                    |(manifest_entry_context, tx)| async move {
417                        spawn(async move {
418                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
419                        })
420                        .await
421                    },
422                )
423                .await;
424
425            if let Err(error) = result {
426                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427            }
428        });
429
430        Ok(file_scan_task_rx.boxed())
431    }
432
433    /// Returns an [`ArrowRecordBatchStream`].
434    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438            .with_row_selection_enabled(self.row_selection_enabled);
439
440        if let Some(batch_size) = self.batch_size {
441            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442        }
443
444        arrow_reader_builder.build().read(self.plan_files().await?)
445    }
446
447    /// Returns a reference to the column names of the table scan.
448    pub fn column_names(&self) -> Option<&[String]> {
449        self.column_names.as_deref()
450    }
451
452    /// Returns a reference to the snapshot of the table scan.
453    pub fn snapshot(&self) -> Option<&SnapshotRef> {
454        self.plan_context.as_ref().map(|x| &x.snapshot)
455    }
456
457    async fn process_data_manifest_entry(
458        manifest_entry_context: ManifestEntryContext,
459        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460    ) -> Result<()> {
461        // skip processing this manifest entry if it has been marked as deleted
462        if !manifest_entry_context.manifest_entry.is_alive() {
463            return Ok(());
464        }
465
466        // abort the plan if we encounter a manifest entry for a delete file
467        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468            return Err(Error::new(
469                ErrorKind::FeatureUnsupported,
470                "Encountered an entry for a delete file in a data file manifest",
471            ));
472        }
473
474        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475            let BoundPredicates {
476                snapshot_bound_predicate,
477                partition_bound_predicate,
478            } = bound_predicates.as_ref();
479
480            let expression_evaluator_cache =
481                manifest_entry_context.expression_evaluator_cache.as_ref();
482
483            let expression_evaluator = expression_evaluator_cache.get(
484                manifest_entry_context.partition_spec_id,
485                partition_bound_predicate,
486            )?;
487
488            // skip any data file whose partition data indicates that it can't contain
489            // any data that matches this scan's filter
490            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491                return Ok(());
492            }
493
494            // skip any data file whose metrics don't match this scan's filter
495            if !InclusiveMetricsEvaluator::eval(
496                snapshot_bound_predicate,
497                manifest_entry_context.manifest_entry.data_file(),
498                false,
499            )? {
500                return Ok(());
501            }
502        }
503
504        // congratulations! the manifest entry has made its way through the
505        // entire plan without getting filtered out. Create a corresponding
506        // FileScanTask and push it to the result stream
507        file_scan_task_tx
508            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509            .await?;
510
511        Ok(())
512    }
513
514    async fn process_delete_manifest_entry(
515        manifest_entry_context: ManifestEntryContext,
516        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517    ) -> Result<()> {
518        // skip processing this manifest entry if it has been marked as deleted
519        if !manifest_entry_context.manifest_entry.is_alive() {
520            return Ok(());
521        }
522
523        // abort the plan if we encounter a manifest entry that is not for a delete file
524        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525            return Err(Error::new(
526                ErrorKind::FeatureUnsupported,
527                "Encountered an entry for a data file in a delete manifest",
528            ));
529        }
530
531        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532            let expression_evaluator_cache =
533                manifest_entry_context.expression_evaluator_cache.as_ref();
534
535            let expression_evaluator = expression_evaluator_cache.get(
536                manifest_entry_context.partition_spec_id,
537                &bound_predicates.partition_bound_predicate,
538            )?;
539
540            // skip any data file whose partition data indicates that it can't contain
541            // any data that matches this scan's filter
542            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543                return Ok(());
544            }
545        }
546
547        delete_file_ctx_tx
548            .send(DeleteFileContext {
549                manifest_entry: manifest_entry_context.manifest_entry.clone(),
550                partition_spec_id: manifest_entry_context.partition_spec_id,
551            })
552            .await?;
553
554        Ok(())
555    }
556}
557
558pub(crate) struct BoundPredicates {
559    partition_bound_predicate: BoundPredicate,
560    snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565    //! shared tests for the table scan API
566    #![allow(missing_docs)]
567
568    use std::collections::HashMap;
569    use std::fs;
570    use std::fs::File;
571    use std::sync::Arc;
572
573    use arrow_array::cast::AsArray;
574    use arrow_array::{
575        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576        StringArray,
577    };
578    use futures::{TryStreamExt, stream};
579    use minijinja::value::Value;
580    use minijinja::{AutoEscape, Environment, context};
581    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582    use parquet::basic::Compression;
583    use parquet::file::properties::WriterProperties;
584    use tempfile::TempDir;
585    use uuid::Uuid;
586
587    use crate::TableIdent;
588    use crate::arrow::ArrowReaderBuilder;
589    use crate::expr::{BoundPredicate, Reference};
590    use crate::io::{FileIO, OutputFile};
591    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592    use crate::scan::FileScanTask;
593    use crate::spec::{
594        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597    };
598    use crate::table::Table;
599
600    fn render_template(template: &str, ctx: Value) -> String {
601        let mut env = Environment::new();
602        env.set_auto_escape_callback(|_| AutoEscape::None);
603        env.render_str(template, ctx).unwrap()
604    }
605
606    pub struct TableTestFixture {
607        pub table_location: String,
608        pub table: Table,
609    }
610
611    impl TableTestFixture {
612        #[allow(clippy::new_without_default)]
613        pub fn new() -> Self {
614            let tmp_dir = TempDir::new().unwrap();
615            let table_location = tmp_dir.path().join("table1");
616            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618            let table_metadata1_location = table_location.join("metadata/v1.json");
619
620            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621                .unwrap()
622                .build()
623                .unwrap();
624
625            let table_metadata = {
626                let template_json_str = fs::read_to_string(format!(
627                    "{}/testdata/example_table_metadata_v2.json",
628                    env!("CARGO_MANIFEST_DIR")
629                ))
630                .unwrap();
631                let metadata_json = render_template(&template_json_str, context! {
632                    table_location => &table_location,
633                    manifest_list_1_location => &manifest_list1_location,
634                    manifest_list_2_location => &manifest_list2_location,
635                    table_metadata_1_location => &table_metadata1_location,
636                });
637                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638            };
639
640            let table = Table::builder()
641                .metadata(table_metadata)
642                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643                .file_io(file_io.clone())
644                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645                .build()
646                .unwrap();
647
648            Self {
649                table_location: table_location.to_str().unwrap().to_string(),
650                table,
651            }
652        }
653
654        #[allow(clippy::new_without_default)]
655        pub fn new_empty() -> Self {
656            let tmp_dir = TempDir::new().unwrap();
657            let table_location = tmp_dir.path().join("table1");
658            let table_metadata1_location = table_location.join("metadata/v1.json");
659
660            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661                .unwrap()
662                .build()
663                .unwrap();
664
665            let table_metadata = {
666                let template_json_str = fs::read_to_string(format!(
667                    "{}/testdata/example_empty_table_metadata_v2.json",
668                    env!("CARGO_MANIFEST_DIR")
669                ))
670                .unwrap();
671                let metadata_json = render_template(&template_json_str, context! {
672                    table_location => &table_location,
673                    table_metadata_1_location => &table_metadata1_location,
674                });
675                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676            };
677
678            let table = Table::builder()
679                .metadata(table_metadata)
680                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681                .file_io(file_io.clone())
682                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683                .build()
684                .unwrap();
685
686            Self {
687                table_location: table_location.to_str().unwrap().to_string(),
688                table,
689            }
690        }
691
692        pub fn new_unpartitioned() -> Self {
693            let tmp_dir = TempDir::new().unwrap();
694            let table_location = tmp_dir.path().join("table1");
695            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697            let table_metadata1_location = table_location.join("metadata/v1.json");
698
699            let file_io = FileIO::from_path(table_location.to_str().unwrap())
700                .unwrap()
701                .build()
702                .unwrap();
703
704            let mut table_metadata = {
705                let template_json_str = fs::read_to_string(format!(
706                    "{}/testdata/example_table_metadata_v2.json",
707                    env!("CARGO_MANIFEST_DIR")
708                ))
709                .unwrap();
710                let metadata_json = render_template(&template_json_str, context! {
711                    table_location => &table_location,
712                    manifest_list_1_location => &manifest_list1_location,
713                    manifest_list_2_location => &manifest_list2_location,
714                    table_metadata_1_location => &table_metadata1_location,
715                });
716                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717            };
718
719            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720            table_metadata.partition_specs.clear();
721            table_metadata.default_partition_type = StructType::new(vec![]);
722            table_metadata
723                .partition_specs
724                .insert(0, table_metadata.default_spec.clone());
725
726            let table = Table::builder()
727                .metadata(table_metadata)
728                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729                .file_io(file_io.clone())
730                .metadata_location(table_metadata1_location.to_str().unwrap())
731                .build()
732                .unwrap();
733
734            Self {
735                table_location: table_location.to_str().unwrap().to_string(),
736                table,
737            }
738        }
739
740        fn next_manifest_file(&self) -> OutputFile {
741            self.table
742                .file_io()
743                .new_output(format!(
744                    "{}/metadata/manifest_{}.avro",
745                    self.table_location,
746                    Uuid::new_v4()
747                ))
748                .unwrap()
749        }
750
751        pub async fn setup_manifest_files(&mut self) {
752            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753            let parent_snapshot = current_snapshot
754                .parent_snapshot(self.table.metadata())
755                .unwrap();
756            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757            let current_partition_spec = self.table.metadata().default_partition_spec();
758
759            // Write the data files first, then use the file size in the manifest entries
760            let parquet_file_size = self.write_parquet_data_files();
761
762            let mut writer = ManifestWriterBuilder::new(
763                self.next_manifest_file(),
764                Some(current_snapshot.snapshot_id()),
765                None,
766                current_schema.clone(),
767                current_partition_spec.as_ref().clone(),
768            )
769            .build_v2_data();
770            writer
771                .add_entry(
772                    ManifestEntry::builder()
773                        .status(ManifestStatus::Added)
774                        .data_file(
775                            DataFileBuilder::default()
776                                .partition_spec_id(0)
777                                .content(DataContentType::Data)
778                                .file_path(format!("{}/1.parquet", &self.table_location))
779                                .file_format(DataFileFormat::Parquet)
780                                .file_size_in_bytes(parquet_file_size)
781                                .record_count(1)
782                                .partition(Struct::from_iter([Some(Literal::long(100))]))
783                                .key_metadata(None)
784                                .build()
785                                .unwrap(),
786                        )
787                        .build(),
788                )
789                .unwrap();
790            writer
791                .add_delete_entry(
792                    ManifestEntry::builder()
793                        .status(ManifestStatus::Deleted)
794                        .snapshot_id(parent_snapshot.snapshot_id())
795                        .sequence_number(parent_snapshot.sequence_number())
796                        .file_sequence_number(parent_snapshot.sequence_number())
797                        .data_file(
798                            DataFileBuilder::default()
799                                .partition_spec_id(0)
800                                .content(DataContentType::Data)
801                                .file_path(format!("{}/2.parquet", &self.table_location))
802                                .file_format(DataFileFormat::Parquet)
803                                .file_size_in_bytes(parquet_file_size)
804                                .record_count(1)
805                                .partition(Struct::from_iter([Some(Literal::long(200))]))
806                                .build()
807                                .unwrap(),
808                        )
809                        .build(),
810                )
811                .unwrap();
812            writer
813                .add_existing_entry(
814                    ManifestEntry::builder()
815                        .status(ManifestStatus::Existing)
816                        .snapshot_id(parent_snapshot.snapshot_id())
817                        .sequence_number(parent_snapshot.sequence_number())
818                        .file_sequence_number(parent_snapshot.sequence_number())
819                        .data_file(
820                            DataFileBuilder::default()
821                                .partition_spec_id(0)
822                                .content(DataContentType::Data)
823                                .file_path(format!("{}/3.parquet", &self.table_location))
824                                .file_format(DataFileFormat::Parquet)
825                                .file_size_in_bytes(parquet_file_size)
826                                .record_count(1)
827                                .partition(Struct::from_iter([Some(Literal::long(300))]))
828                                .build()
829                                .unwrap(),
830                        )
831                        .build(),
832                )
833                .unwrap();
834            let data_file_manifest = writer.write_manifest_file().await.unwrap();
835
836            // Write to manifest list
837            let mut manifest_list_write = ManifestListWriter::v2(
838                self.table
839                    .file_io()
840                    .new_output(current_snapshot.manifest_list())
841                    .unwrap(),
842                current_snapshot.snapshot_id(),
843                current_snapshot.parent_snapshot_id(),
844                current_snapshot.sequence_number(),
845            );
846            manifest_list_write
847                .add_manifests(vec![data_file_manifest].into_iter())
848                .unwrap();
849            manifest_list_write.close().await.unwrap();
850        }
851
852        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
853        /// and returns the file size in bytes.
854        fn write_parquet_data_files(&self) -> u64 {
855            std::fs::create_dir_all(&self.table_location).unwrap();
856
857            let schema = {
858                let fields = vec![
859                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
860                        .with_metadata(HashMap::from([(
861                            PARQUET_FIELD_ID_META_KEY.to_string(),
862                            "1".to_string(),
863                        )])),
864                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
865                        .with_metadata(HashMap::from([(
866                            PARQUET_FIELD_ID_META_KEY.to_string(),
867                            "2".to_string(),
868                        )])),
869                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
870                        .with_metadata(HashMap::from([(
871                            PARQUET_FIELD_ID_META_KEY.to_string(),
872                            "3".to_string(),
873                        )])),
874                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
875                        .with_metadata(HashMap::from([(
876                            PARQUET_FIELD_ID_META_KEY.to_string(),
877                            "4".to_string(),
878                        )])),
879                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
880                        .with_metadata(HashMap::from([(
881                            PARQUET_FIELD_ID_META_KEY.to_string(),
882                            "5".to_string(),
883                        )])),
884                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
885                        .with_metadata(HashMap::from([(
886                            PARQUET_FIELD_ID_META_KEY.to_string(),
887                            "6".to_string(),
888                        )])),
889                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
890                        .with_metadata(HashMap::from([(
891                            PARQUET_FIELD_ID_META_KEY.to_string(),
892                            "7".to_string(),
893                        )])),
894                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
895                        .with_metadata(HashMap::from([(
896                            PARQUET_FIELD_ID_META_KEY.to_string(),
897                            "8".to_string(),
898                        )])),
899                ];
900                Arc::new(arrow_schema::Schema::new(fields))
901            };
902            // x: [1, 1, 1, 1, ...]
903            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
904
905            let mut values = vec![2; 512];
906            values.append(vec![3; 200].as_mut());
907            values.append(vec![4; 300].as_mut());
908            values.append(vec![5; 12].as_mut());
909
910            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
911            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
912
913            let mut values = vec![3; 512];
914            values.append(vec![4; 512].as_mut());
915
916            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
917            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
918
919            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
920            let mut values = vec!["Apache"; 512];
921            values.append(vec!["Iceberg"; 512].as_mut());
922            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
923
924            // dbl:
925            let mut values = vec![100.0f64; 512];
926            values.append(vec![150.0f64; 12].as_mut());
927            values.append(vec![200.0f64; 500].as_mut());
928            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
929
930            // i32:
931            let mut values = vec![100i32; 512];
932            values.append(vec![150i32; 12].as_mut());
933            values.append(vec![200i32; 500].as_mut());
934            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
935
936            // i64:
937            let mut values = vec![100i64; 512];
938            values.append(vec![150i64; 12].as_mut());
939            values.append(vec![200i64; 500].as_mut());
940            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
941
942            // bool:
943            let mut values = vec![false; 512];
944            values.append(vec![true; 512].as_mut());
945            let values: BooleanArray = values.into();
946            let col8 = Arc::new(values) as ArrayRef;
947
948            let to_write = RecordBatch::try_new(schema.clone(), vec![
949                col1, col2, col3, col4, col5, col6, col7, col8,
950            ])
951            .unwrap();
952
953            // Write the Parquet files
954            let props = WriterProperties::builder()
955                .set_compression(Compression::SNAPPY)
956                .build();
957
958            for n in 1..=3 {
959                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
960                let mut writer =
961                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
962
963                writer.write(&to_write).expect("Writing batch");
964
965                // writer must be closed to write footer
966                writer.close().unwrap();
967            }
968
969            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
970                .unwrap()
971                .len()
972        }
973
974        pub async fn setup_unpartitioned_manifest_files(&mut self) {
975            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
976            let parent_snapshot = current_snapshot
977                .parent_snapshot(self.table.metadata())
978                .unwrap();
979            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
980            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
981
982            // Write the data files first, then use the file size in the manifest entries
983            let parquet_file_size = self.write_parquet_data_files();
984
985            // Write data files using an empty partition for unpartitioned tables.
986            let mut writer = ManifestWriterBuilder::new(
987                self.next_manifest_file(),
988                Some(current_snapshot.snapshot_id()),
989                None,
990                current_schema.clone(),
991                current_partition_spec.as_ref().clone(),
992            )
993            .build_v2_data();
994
995            // Create an empty partition value.
996            let empty_partition = Struct::empty();
997
998            writer
999                .add_entry(
1000                    ManifestEntry::builder()
1001                        .status(ManifestStatus::Added)
1002                        .data_file(
1003                            DataFileBuilder::default()
1004                                .partition_spec_id(0)
1005                                .content(DataContentType::Data)
1006                                .file_path(format!("{}/1.parquet", &self.table_location))
1007                                .file_format(DataFileFormat::Parquet)
1008                                .file_size_in_bytes(parquet_file_size)
1009                                .record_count(1)
1010                                .partition(empty_partition.clone())
1011                                .key_metadata(None)
1012                                .build()
1013                                .unwrap(),
1014                        )
1015                        .build(),
1016                )
1017                .unwrap();
1018
1019            writer
1020                .add_delete_entry(
1021                    ManifestEntry::builder()
1022                        .status(ManifestStatus::Deleted)
1023                        .snapshot_id(parent_snapshot.snapshot_id())
1024                        .sequence_number(parent_snapshot.sequence_number())
1025                        .file_sequence_number(parent_snapshot.sequence_number())
1026                        .data_file(
1027                            DataFileBuilder::default()
1028                                .partition_spec_id(0)
1029                                .content(DataContentType::Data)
1030                                .file_path(format!("{}/2.parquet", &self.table_location))
1031                                .file_format(DataFileFormat::Parquet)
1032                                .file_size_in_bytes(parquet_file_size)
1033                                .record_count(1)
1034                                .partition(empty_partition.clone())
1035                                .build()
1036                                .unwrap(),
1037                        )
1038                        .build(),
1039                )
1040                .unwrap();
1041
1042            writer
1043                .add_existing_entry(
1044                    ManifestEntry::builder()
1045                        .status(ManifestStatus::Existing)
1046                        .snapshot_id(parent_snapshot.snapshot_id())
1047                        .sequence_number(parent_snapshot.sequence_number())
1048                        .file_sequence_number(parent_snapshot.sequence_number())
1049                        .data_file(
1050                            DataFileBuilder::default()
1051                                .partition_spec_id(0)
1052                                .content(DataContentType::Data)
1053                                .file_path(format!("{}/3.parquet", &self.table_location))
1054                                .file_format(DataFileFormat::Parquet)
1055                                .file_size_in_bytes(parquet_file_size)
1056                                .record_count(1)
1057                                .partition(empty_partition.clone())
1058                                .build()
1059                                .unwrap(),
1060                        )
1061                        .build(),
1062                )
1063                .unwrap();
1064
1065            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1066
1067            // Write to manifest list
1068            let mut manifest_list_write = ManifestListWriter::v2(
1069                self.table
1070                    .file_io()
1071                    .new_output(current_snapshot.manifest_list())
1072                    .unwrap(),
1073                current_snapshot.snapshot_id(),
1074                current_snapshot.parent_snapshot_id(),
1075                current_snapshot.sequence_number(),
1076            );
1077            manifest_list_write
1078                .add_manifests(vec![data_file_manifest].into_iter())
1079                .unwrap();
1080            manifest_list_write.close().await.unwrap();
1081        }
1082
1083        pub async fn setup_deadlock_manifests(&mut self) {
1084            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1085            let _parent_snapshot = current_snapshot
1086                .parent_snapshot(self.table.metadata())
1087                .unwrap();
1088            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1089            let current_partition_spec = self.table.metadata().default_partition_spec();
1090
1091            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1092            let mut writer = ManifestWriterBuilder::new(
1093                self.next_manifest_file(),
1094                Some(current_snapshot.snapshot_id()),
1095                None,
1096                current_schema.clone(),
1097                current_partition_spec.as_ref().clone(),
1098            )
1099            .build_v2_data();
1100
1101            // Add 10 data entries
1102            for i in 0..10 {
1103                writer
1104                    .add_entry(
1105                        ManifestEntry::builder()
1106                            .status(ManifestStatus::Added)
1107                            .data_file(
1108                                DataFileBuilder::default()
1109                                    .partition_spec_id(0)
1110                                    .content(DataContentType::Data)
1111                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1112                                    .file_format(DataFileFormat::Parquet)
1113                                    .file_size_in_bytes(100)
1114                                    .record_count(1)
1115                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1116                                    .key_metadata(None)
1117                                    .build()
1118                                    .unwrap(),
1119                            )
1120                            .build(),
1121                    )
1122                    .unwrap();
1123            }
1124            let data_manifest = writer.write_manifest_file().await.unwrap();
1125
1126            // 2. Write DELETE manifest
1127            let mut writer = ManifestWriterBuilder::new(
1128                self.next_manifest_file(),
1129                Some(current_snapshot.snapshot_id()),
1130                None,
1131                current_schema.clone(),
1132                current_partition_spec.as_ref().clone(),
1133            )
1134            .build_v2_deletes();
1135
1136            writer
1137                .add_entry(
1138                    ManifestEntry::builder()
1139                        .status(ManifestStatus::Added)
1140                        .data_file(
1141                            DataFileBuilder::default()
1142                                .partition_spec_id(0)
1143                                .content(DataContentType::PositionDeletes)
1144                                .file_path(format!("{}/del.parquet", &self.table_location))
1145                                .file_format(DataFileFormat::Parquet)
1146                                .file_size_in_bytes(100)
1147                                .record_count(1)
1148                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1149                                .build()
1150                                .unwrap(),
1151                        )
1152                        .build(),
1153                )
1154                .unwrap();
1155            let delete_manifest = writer.write_manifest_file().await.unwrap();
1156
1157            // Write to manifest list - DATA FIRST then DELETE
1158            // This order is crucial for reproduction
1159            let mut manifest_list_write = ManifestListWriter::v2(
1160                self.table
1161                    .file_io()
1162                    .new_output(current_snapshot.manifest_list())
1163                    .unwrap(),
1164                current_snapshot.snapshot_id(),
1165                current_snapshot.parent_snapshot_id(),
1166                current_snapshot.sequence_number(),
1167            );
1168            manifest_list_write
1169                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1170                .unwrap();
1171            manifest_list_write.close().await.unwrap();
1172        }
1173    }
1174
1175    #[test]
1176    fn test_table_scan_columns() {
1177        let table = TableTestFixture::new().table;
1178
1179        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1180        assert_eq!(
1181            Some(vec!["x".to_string(), "y".to_string()]),
1182            table_scan.column_names
1183        );
1184
1185        let table_scan = table
1186            .scan()
1187            .select(["x", "y"])
1188            .select(["z"])
1189            .build()
1190            .unwrap();
1191        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1192    }
1193
1194    #[test]
1195    fn test_select_all() {
1196        let table = TableTestFixture::new().table;
1197
1198        let table_scan = table.scan().select_all().build().unwrap();
1199        assert!(table_scan.column_names.is_none());
1200    }
1201
1202    #[test]
1203    fn test_select_no_exist_column() {
1204        let table = TableTestFixture::new().table;
1205
1206        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1207        assert!(table_scan.is_err());
1208    }
1209
1210    #[test]
1211    fn test_table_scan_default_snapshot_id() {
1212        let table = TableTestFixture::new().table;
1213
1214        let table_scan = table.scan().build().unwrap();
1215        assert_eq!(
1216            table.metadata().current_snapshot().unwrap().snapshot_id(),
1217            table_scan.snapshot().unwrap().snapshot_id()
1218        );
1219    }
1220
1221    #[test]
1222    fn test_table_scan_non_exist_snapshot_id() {
1223        let table = TableTestFixture::new().table;
1224
1225        let table_scan = table.scan().snapshot_id(1024).build();
1226        assert!(table_scan.is_err());
1227    }
1228
1229    #[test]
1230    fn test_table_scan_with_snapshot_id() {
1231        let table = TableTestFixture::new().table;
1232
1233        let table_scan = table
1234            .scan()
1235            .snapshot_id(3051729675574597004)
1236            .with_row_selection_enabled(true)
1237            .build()
1238            .unwrap();
1239        assert_eq!(
1240            table_scan.snapshot().unwrap().snapshot_id(),
1241            3051729675574597004
1242        );
1243    }
1244
1245    #[tokio::test]
1246    async fn test_plan_files_on_table_without_any_snapshots() {
1247        let table = TableTestFixture::new_empty().table;
1248        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1249        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1250        assert!(batches.is_empty());
1251    }
1252
1253    #[tokio::test]
1254    async fn test_plan_files_no_deletions() {
1255        let mut fixture = TableTestFixture::new();
1256        fixture.setup_manifest_files().await;
1257
1258        // Create table scan for current snapshot and plan files
1259        let table_scan = fixture
1260            .table
1261            .scan()
1262            .with_row_selection_enabled(true)
1263            .build()
1264            .unwrap();
1265
1266        let mut tasks = table_scan
1267            .plan_files()
1268            .await
1269            .unwrap()
1270            .try_fold(vec![], |mut acc, task| async move {
1271                acc.push(task);
1272                Ok(acc)
1273            })
1274            .await
1275            .unwrap();
1276
1277        assert_eq!(tasks.len(), 2);
1278
1279        tasks.sort_by_key(|t| t.data_file_path.to_string());
1280
1281        // Check first task is added data file
1282        assert_eq!(
1283            tasks[0].data_file_path,
1284            format!("{}/1.parquet", &fixture.table_location)
1285        );
1286
1287        // Check second task is existing data file
1288        assert_eq!(
1289            tasks[1].data_file_path,
1290            format!("{}/3.parquet", &fixture.table_location)
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn test_open_parquet_no_deletions() {
1296        let mut fixture = TableTestFixture::new();
1297        fixture.setup_manifest_files().await;
1298
1299        // Create table scan for current snapshot and plan files
1300        let table_scan = fixture
1301            .table
1302            .scan()
1303            .with_row_selection_enabled(true)
1304            .build()
1305            .unwrap();
1306
1307        let batch_stream = table_scan.to_arrow().await.unwrap();
1308
1309        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1310
1311        let col = batches[0].column_by_name("x").unwrap();
1312
1313        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1314        assert_eq!(int64_arr.value(0), 1);
1315    }
1316
1317    #[tokio::test]
1318    async fn test_open_parquet_no_deletions_by_separate_reader() {
1319        let mut fixture = TableTestFixture::new();
1320        fixture.setup_manifest_files().await;
1321
1322        // Create table scan for current snapshot and plan files
1323        let table_scan = fixture
1324            .table
1325            .scan()
1326            .with_row_selection_enabled(true)
1327            .build()
1328            .unwrap();
1329
1330        let mut plan_task: Vec<_> = table_scan
1331            .plan_files()
1332            .await
1333            .unwrap()
1334            .try_collect()
1335            .await
1336            .unwrap();
1337        assert_eq!(plan_task.len(), 2);
1338
1339        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1340        let batch_stream = reader
1341            .clone()
1342            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1343            .unwrap();
1344        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1345
1346        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1347        let batch_stream = reader
1348            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1349            .unwrap();
1350        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1351
1352        assert_eq!(batch_1, batch_2);
1353    }
1354
1355    #[tokio::test]
1356    async fn test_open_parquet_with_projection() {
1357        let mut fixture = TableTestFixture::new();
1358        fixture.setup_manifest_files().await;
1359
1360        // Create table scan for current snapshot and plan files
1361        let table_scan = fixture
1362            .table
1363            .scan()
1364            .select(["x", "z"])
1365            .with_row_selection_enabled(true)
1366            .build()
1367            .unwrap();
1368
1369        let batch_stream = table_scan.to_arrow().await.unwrap();
1370
1371        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1372
1373        assert_eq!(batches[0].num_columns(), 2);
1374
1375        let col1 = batches[0].column_by_name("x").unwrap();
1376        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1377        assert_eq!(int64_arr.value(0), 1);
1378
1379        let col2 = batches[0].column_by_name("z").unwrap();
1380        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1381        assert_eq!(int64_arr.value(0), 3);
1382
1383        // test empty scan
1384        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1385        let batch_stream = table_scan.to_arrow().await.unwrap();
1386        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1387
1388        assert_eq!(batches[0].num_columns(), 0);
1389        assert_eq!(batches[0].num_rows(), 1024);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_filter_on_arrow_lt() {
1394        let mut fixture = TableTestFixture::new();
1395        fixture.setup_manifest_files().await;
1396
1397        // Filter: y < 3
1398        let mut builder = fixture.table.scan();
1399        let predicate = Reference::new("y").less_than(Datum::long(3));
1400        builder = builder
1401            .with_filter(predicate)
1402            .with_row_selection_enabled(true);
1403        let table_scan = builder.build().unwrap();
1404
1405        let batch_stream = table_scan.to_arrow().await.unwrap();
1406
1407        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1408
1409        assert_eq!(batches[0].num_rows(), 512);
1410
1411        let col = batches[0].column_by_name("x").unwrap();
1412        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1413        assert_eq!(int64_arr.value(0), 1);
1414
1415        let col = batches[0].column_by_name("y").unwrap();
1416        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1417        assert_eq!(int64_arr.value(0), 2);
1418    }
1419
1420    #[tokio::test]
1421    async fn test_filter_on_arrow_gt_eq() {
1422        let mut fixture = TableTestFixture::new();
1423        fixture.setup_manifest_files().await;
1424
1425        // Filter: y >= 5
1426        let mut builder = fixture.table.scan();
1427        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1428        builder = builder
1429            .with_filter(predicate)
1430            .with_row_selection_enabled(true);
1431        let table_scan = builder.build().unwrap();
1432
1433        let batch_stream = table_scan.to_arrow().await.unwrap();
1434
1435        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437        assert_eq!(batches[0].num_rows(), 12);
1438
1439        let col = batches[0].column_by_name("x").unwrap();
1440        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441        assert_eq!(int64_arr.value(0), 1);
1442
1443        let col = batches[0].column_by_name("y").unwrap();
1444        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1445        assert_eq!(int64_arr.value(0), 5);
1446    }
1447
1448    #[tokio::test]
1449    async fn test_filter_double_eq() {
1450        let mut fixture = TableTestFixture::new();
1451        fixture.setup_manifest_files().await;
1452
1453        // Filter: dbl == 150.0
1454        let mut builder = fixture.table.scan();
1455        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1456        builder = builder
1457            .with_filter(predicate)
1458            .with_row_selection_enabled(true);
1459        let table_scan = builder.build().unwrap();
1460
1461        let batch_stream = table_scan.to_arrow().await.unwrap();
1462
1463        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1464
1465        assert_eq!(batches.len(), 2);
1466        assert_eq!(batches[0].num_rows(), 12);
1467
1468        let col = batches[0].column_by_name("dbl").unwrap();
1469        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1470        assert_eq!(f64_arr.value(1), 150.0f64);
1471    }
1472
1473    #[tokio::test]
1474    async fn test_filter_int_eq() {
1475        let mut fixture = TableTestFixture::new();
1476        fixture.setup_manifest_files().await;
1477
1478        // Filter: i32 == 150
1479        let mut builder = fixture.table.scan();
1480        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1481        builder = builder
1482            .with_filter(predicate)
1483            .with_row_selection_enabled(true);
1484        let table_scan = builder.build().unwrap();
1485
1486        let batch_stream = table_scan.to_arrow().await.unwrap();
1487
1488        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1489
1490        assert_eq!(batches.len(), 2);
1491        assert_eq!(batches[0].num_rows(), 12);
1492
1493        let col = batches[0].column_by_name("i32").unwrap();
1494        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1495        assert_eq!(i32_arr.value(1), 150i32);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_filter_long_eq() {
1500        let mut fixture = TableTestFixture::new();
1501        fixture.setup_manifest_files().await;
1502
1503        // Filter: i64 == 150
1504        let mut builder = fixture.table.scan();
1505        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1506        builder = builder
1507            .with_filter(predicate)
1508            .with_row_selection_enabled(true);
1509        let table_scan = builder.build().unwrap();
1510
1511        let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515        assert_eq!(batches.len(), 2);
1516        assert_eq!(batches[0].num_rows(), 12);
1517
1518        let col = batches[0].column_by_name("i64").unwrap();
1519        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1520        assert_eq!(i64_arr.value(1), 150i64);
1521    }
1522
1523    #[tokio::test]
1524    async fn test_filter_bool_eq() {
1525        let mut fixture = TableTestFixture::new();
1526        fixture.setup_manifest_files().await;
1527
1528        // Filter: bool == true
1529        let mut builder = fixture.table.scan();
1530        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1531        builder = builder
1532            .with_filter(predicate)
1533            .with_row_selection_enabled(true);
1534        let table_scan = builder.build().unwrap();
1535
1536        let batch_stream = table_scan.to_arrow().await.unwrap();
1537
1538        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1539
1540        assert_eq!(batches.len(), 2);
1541        assert_eq!(batches[0].num_rows(), 512);
1542
1543        let col = batches[0].column_by_name("bool").unwrap();
1544        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1545        assert!(bool_arr.value(1));
1546    }
1547
1548    #[tokio::test]
1549    async fn test_filter_on_arrow_is_null() {
1550        let mut fixture = TableTestFixture::new();
1551        fixture.setup_manifest_files().await;
1552
1553        // Filter: y is null
1554        let mut builder = fixture.table.scan();
1555        let predicate = Reference::new("y").is_null();
1556        builder = builder
1557            .with_filter(predicate)
1558            .with_row_selection_enabled(true);
1559        let table_scan = builder.build().unwrap();
1560
1561        let batch_stream = table_scan.to_arrow().await.unwrap();
1562
1563        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1564        assert_eq!(batches.len(), 0);
1565    }
1566
1567    #[tokio::test]
1568    async fn test_filter_on_arrow_is_not_null() {
1569        let mut fixture = TableTestFixture::new();
1570        fixture.setup_manifest_files().await;
1571
1572        // Filter: y is not null
1573        let mut builder = fixture.table.scan();
1574        let predicate = Reference::new("y").is_not_null();
1575        builder = builder
1576            .with_filter(predicate)
1577            .with_row_selection_enabled(true);
1578        let table_scan = builder.build().unwrap();
1579
1580        let batch_stream = table_scan.to_arrow().await.unwrap();
1581
1582        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1583        assert_eq!(batches[0].num_rows(), 1024);
1584    }
1585
1586    #[tokio::test]
1587    async fn test_filter_on_arrow_lt_and_gt() {
1588        let mut fixture = TableTestFixture::new();
1589        fixture.setup_manifest_files().await;
1590
1591        // Filter: y < 5 AND z >= 4
1592        let mut builder = fixture.table.scan();
1593        let predicate = Reference::new("y")
1594            .less_than(Datum::long(5))
1595            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1596        builder = builder
1597            .with_filter(predicate)
1598            .with_row_selection_enabled(true);
1599        let table_scan = builder.build().unwrap();
1600
1601        let batch_stream = table_scan.to_arrow().await.unwrap();
1602
1603        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1604        assert_eq!(batches[0].num_rows(), 500);
1605
1606        let col = batches[0].column_by_name("x").unwrap();
1607        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1608        assert_eq!(col, &expected_x);
1609
1610        let col = batches[0].column_by_name("y").unwrap();
1611        let mut values = vec![];
1612        values.append(vec![3; 200].as_mut());
1613        values.append(vec![4; 300].as_mut());
1614        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1615        assert_eq!(col, &expected_y);
1616
1617        let col = batches[0].column_by_name("z").unwrap();
1618        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1619        assert_eq!(col, &expected_z);
1620    }
1621
1622    #[tokio::test]
1623    async fn test_filter_on_arrow_lt_or_gt() {
1624        let mut fixture = TableTestFixture::new();
1625        fixture.setup_manifest_files().await;
1626
1627        // Filter: y < 5 AND z >= 4
1628        let mut builder = fixture.table.scan();
1629        let predicate = Reference::new("y")
1630            .less_than(Datum::long(5))
1631            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1632        builder = builder
1633            .with_filter(predicate)
1634            .with_row_selection_enabled(true);
1635        let table_scan = builder.build().unwrap();
1636
1637        let batch_stream = table_scan.to_arrow().await.unwrap();
1638
1639        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1640        assert_eq!(batches[0].num_rows(), 1024);
1641
1642        let col = batches[0].column_by_name("x").unwrap();
1643        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1644        assert_eq!(col, &expected_x);
1645
1646        let col = batches[0].column_by_name("y").unwrap();
1647        let mut values = vec![2; 512];
1648        values.append(vec![3; 200].as_mut());
1649        values.append(vec![4; 300].as_mut());
1650        values.append(vec![5; 12].as_mut());
1651        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1652        assert_eq!(col, &expected_y);
1653
1654        let col = batches[0].column_by_name("z").unwrap();
1655        let mut values = vec![3; 512];
1656        values.append(vec![4; 512].as_mut());
1657        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1658        assert_eq!(col, &expected_z);
1659    }
1660
1661    #[tokio::test]
1662    async fn test_filter_on_arrow_startswith() {
1663        let mut fixture = TableTestFixture::new();
1664        fixture.setup_manifest_files().await;
1665
1666        // Filter: a STARTSWITH "Ice"
1667        let mut builder = fixture.table.scan();
1668        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1669        builder = builder
1670            .with_filter(predicate)
1671            .with_row_selection_enabled(true);
1672        let table_scan = builder.build().unwrap();
1673
1674        let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677
1678        assert_eq!(batches[0].num_rows(), 512);
1679
1680        let col = batches[0].column_by_name("a").unwrap();
1681        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1682        assert_eq!(string_arr.value(0), "Iceberg");
1683    }
1684
1685    #[tokio::test]
1686    async fn test_filter_on_arrow_not_startswith() {
1687        let mut fixture = TableTestFixture::new();
1688        fixture.setup_manifest_files().await;
1689
1690        // Filter: a NOT STARTSWITH "Ice"
1691        let mut builder = fixture.table.scan();
1692        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1693        builder = builder
1694            .with_filter(predicate)
1695            .with_row_selection_enabled(true);
1696        let table_scan = builder.build().unwrap();
1697
1698        let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702        assert_eq!(batches[0].num_rows(), 512);
1703
1704        let col = batches[0].column_by_name("a").unwrap();
1705        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706        assert_eq!(string_arr.value(0), "Apache");
1707    }
1708
1709    #[tokio::test]
1710    async fn test_filter_on_arrow_in() {
1711        let mut fixture = TableTestFixture::new();
1712        fixture.setup_manifest_files().await;
1713
1714        // Filter: a IN ("Sioux", "Iceberg")
1715        let mut builder = fixture.table.scan();
1716        let predicate =
1717            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1718        builder = builder
1719            .with_filter(predicate)
1720            .with_row_selection_enabled(true);
1721        let table_scan = builder.build().unwrap();
1722
1723        let batch_stream = table_scan.to_arrow().await.unwrap();
1724
1725        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1726
1727        assert_eq!(batches[0].num_rows(), 512);
1728
1729        let col = batches[0].column_by_name("a").unwrap();
1730        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1731        assert_eq!(string_arr.value(0), "Iceberg");
1732    }
1733
1734    #[tokio::test]
1735    async fn test_filter_on_arrow_not_in() {
1736        let mut fixture = TableTestFixture::new();
1737        fixture.setup_manifest_files().await;
1738
1739        // Filter: a NOT IN ("Sioux", "Iceberg")
1740        let mut builder = fixture.table.scan();
1741        let predicate =
1742            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1743        builder = builder
1744            .with_filter(predicate)
1745            .with_row_selection_enabled(true);
1746        let table_scan = builder.build().unwrap();
1747
1748        let batch_stream = table_scan.to_arrow().await.unwrap();
1749
1750        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1751
1752        assert_eq!(batches[0].num_rows(), 512);
1753
1754        let col = batches[0].column_by_name("a").unwrap();
1755        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1756        assert_eq!(string_arr.value(0), "Apache");
1757    }
1758
1759    #[test]
1760    fn test_file_scan_task_serialize_deserialize() {
1761        let test_fn = |task: FileScanTask| {
1762            let serialized = serde_json::to_string(&task).unwrap();
1763            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1764
1765            assert_eq!(task.data_file_path, deserialized.data_file_path);
1766            assert_eq!(task.start, deserialized.start);
1767            assert_eq!(task.length, deserialized.length);
1768            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1769            assert_eq!(task.predicate, deserialized.predicate);
1770            assert_eq!(task.schema, deserialized.schema);
1771        };
1772
1773        // without predicate
1774        let schema = Arc::new(
1775            Schema::builder()
1776                .with_fields(vec![Arc::new(NestedField::required(
1777                    1,
1778                    "x",
1779                    Type::Primitive(PrimitiveType::Binary),
1780                ))])
1781                .build()
1782                .unwrap(),
1783        );
1784        let task = FileScanTask {
1785            data_file_path: "data_file_path".to_string(),
1786            file_size_in_bytes: 0,
1787            start: 0,
1788            length: 100,
1789            project_field_ids: vec![1, 2, 3],
1790            predicate: None,
1791            schema: schema.clone(),
1792            record_count: Some(100),
1793            data_file_format: DataFileFormat::Parquet,
1794            deletes: vec![],
1795            partition: None,
1796            partition_spec: None,
1797            name_mapping: None,
1798            case_sensitive: false,
1799        };
1800        test_fn(task);
1801
1802        // with predicate
1803        let task = FileScanTask {
1804            data_file_path: "data_file_path".to_string(),
1805            file_size_in_bytes: 0,
1806            start: 0,
1807            length: 100,
1808            project_field_ids: vec![1, 2, 3],
1809            predicate: Some(BoundPredicate::AlwaysTrue),
1810            schema,
1811            record_count: None,
1812            data_file_format: DataFileFormat::Avro,
1813            deletes: vec![],
1814            partition: None,
1815            partition_spec: None,
1816            name_mapping: None,
1817            case_sensitive: false,
1818        };
1819        test_fn(task);
1820    }
1821
1822    #[tokio::test]
1823    async fn test_select_with_file_column() {
1824        use arrow_array::cast::AsArray;
1825
1826        let mut fixture = TableTestFixture::new();
1827        fixture.setup_manifest_files().await;
1828
1829        // Select regular columns plus the _file column
1830        let table_scan = fixture
1831            .table
1832            .scan()
1833            .select(["x", RESERVED_COL_NAME_FILE])
1834            .with_row_selection_enabled(true)
1835            .build()
1836            .unwrap();
1837
1838        let batch_stream = table_scan.to_arrow().await.unwrap();
1839        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1840
1841        // Verify we have 2 columns: x and _file
1842        assert_eq!(batches[0].num_columns(), 2);
1843
1844        // Verify the x column exists and has correct data
1845        let x_col = batches[0].column_by_name("x").unwrap();
1846        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1847        assert_eq!(x_arr.value(0), 1);
1848
1849        // Verify the _file column exists
1850        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1851        assert!(
1852            file_col.is_some(),
1853            "_file column should be present in the batch"
1854        );
1855
1856        // Verify the _file column contains a file path
1857        let file_col = file_col.unwrap();
1858        assert!(
1859            matches!(
1860                file_col.data_type(),
1861                arrow_schema::DataType::RunEndEncoded(_, _)
1862            ),
1863            "_file column should use RunEndEncoded type"
1864        );
1865
1866        // Decode the RunArray to verify it contains the file path
1867        let run_array = file_col
1868            .as_any()
1869            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1870            .expect("_file column should be a RunArray");
1871
1872        let values = run_array.values();
1873        let string_values = values.as_string::<i32>();
1874        assert_eq!(string_values.len(), 1, "Should have a single file path");
1875
1876        let file_path = string_values.value(0);
1877        assert!(
1878            file_path.ends_with(".parquet"),
1879            "File path should end with .parquet, got: {file_path}"
1880        );
1881    }
1882
1883    #[tokio::test]
1884    async fn test_select_file_column_position() {
1885        let mut fixture = TableTestFixture::new();
1886        fixture.setup_manifest_files().await;
1887
1888        // Select columns in specific order: x, _file, z
1889        let table_scan = fixture
1890            .table
1891            .scan()
1892            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1893            .with_row_selection_enabled(true)
1894            .build()
1895            .unwrap();
1896
1897        let batch_stream = table_scan.to_arrow().await.unwrap();
1898        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1899
1900        assert_eq!(batches[0].num_columns(), 3);
1901
1902        // Verify column order: x at position 0, _file at position 1, z at position 2
1903        let schema = batches[0].schema();
1904        assert_eq!(schema.field(0).name(), "x");
1905        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1906        assert_eq!(schema.field(2).name(), "z");
1907
1908        // Verify columns by name also works
1909        assert!(batches[0].column_by_name("x").is_some());
1910        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1911        assert!(batches[0].column_by_name("z").is_some());
1912    }
1913
1914    #[tokio::test]
1915    async fn test_select_file_column_only() {
1916        let mut fixture = TableTestFixture::new();
1917        fixture.setup_manifest_files().await;
1918
1919        // Select only the _file column
1920        let table_scan = fixture
1921            .table
1922            .scan()
1923            .select([RESERVED_COL_NAME_FILE])
1924            .with_row_selection_enabled(true)
1925            .build()
1926            .unwrap();
1927
1928        let batch_stream = table_scan.to_arrow().await.unwrap();
1929        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1930
1931        // Should have exactly 1 column
1932        assert_eq!(batches[0].num_columns(), 1);
1933
1934        // Verify it's the _file column
1935        let schema = batches[0].schema();
1936        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1937
1938        // Verify the batch has the correct number of rows
1939        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
1940        // Each file has 1024 rows, so total is 2048 rows
1941        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1942        assert_eq!(total_rows, 2048);
1943    }
1944
1945    #[tokio::test]
1946    async fn test_file_column_with_multiple_files() {
1947        use std::collections::HashSet;
1948
1949        let mut fixture = TableTestFixture::new();
1950        fixture.setup_manifest_files().await;
1951
1952        // Select x and _file columns
1953        let table_scan = fixture
1954            .table
1955            .scan()
1956            .select(["x", RESERVED_COL_NAME_FILE])
1957            .with_row_selection_enabled(true)
1958            .build()
1959            .unwrap();
1960
1961        let batch_stream = table_scan.to_arrow().await.unwrap();
1962        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1963
1964        // Collect all unique file paths from the batches
1965        let mut file_paths = HashSet::new();
1966        for batch in &batches {
1967            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1968            let run_array = file_col
1969                .as_any()
1970                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1971                .expect("_file column should be a RunArray");
1972
1973            let values = run_array.values();
1974            let string_values = values.as_string::<i32>();
1975            for i in 0..string_values.len() {
1976                file_paths.insert(string_values.value(i).to_string());
1977            }
1978        }
1979
1980        // We should have multiple files (the test creates 1.parquet and 3.parquet)
1981        assert!(!file_paths.is_empty(), "Should have at least one file path");
1982
1983        // All paths should end with .parquet
1984        for path in &file_paths {
1985            assert!(
1986                path.ends_with(".parquet"),
1987                "All file paths should end with .parquet, got: {path}"
1988            );
1989        }
1990    }
1991
1992    #[tokio::test]
1993    async fn test_file_column_at_start() {
1994        let mut fixture = TableTestFixture::new();
1995        fixture.setup_manifest_files().await;
1996
1997        // Select _file at the start
1998        let table_scan = fixture
1999            .table
2000            .scan()
2001            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2002            .with_row_selection_enabled(true)
2003            .build()
2004            .unwrap();
2005
2006        let batch_stream = table_scan.to_arrow().await.unwrap();
2007        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2008
2009        assert_eq!(batches[0].num_columns(), 3);
2010
2011        // Verify _file is at position 0
2012        let schema = batches[0].schema();
2013        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2014        assert_eq!(schema.field(1).name(), "x");
2015        assert_eq!(schema.field(2).name(), "y");
2016    }
2017
2018    #[tokio::test]
2019    async fn test_file_column_at_end() {
2020        let mut fixture = TableTestFixture::new();
2021        fixture.setup_manifest_files().await;
2022
2023        // Select _file at the end
2024        let table_scan = fixture
2025            .table
2026            .scan()
2027            .select(["x", "y", RESERVED_COL_NAME_FILE])
2028            .with_row_selection_enabled(true)
2029            .build()
2030            .unwrap();
2031
2032        let batch_stream = table_scan.to_arrow().await.unwrap();
2033        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2034
2035        assert_eq!(batches[0].num_columns(), 3);
2036
2037        // Verify _file is at position 2 (the end)
2038        let schema = batches[0].schema();
2039        assert_eq!(schema.field(0).name(), "x");
2040        assert_eq!(schema.field(1).name(), "y");
2041        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2042    }
2043
2044    #[tokio::test]
2045    async fn test_select_with_repeated_column_names() {
2046        let mut fixture = TableTestFixture::new();
2047        fixture.setup_manifest_files().await;
2048
2049        // Select with repeated column names - both regular columns and virtual columns
2050        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2051        let table_scan = fixture
2052            .table
2053            .scan()
2054            .select([
2055                "x",
2056                RESERVED_COL_NAME_FILE,
2057                "x", // x repeated
2058                "y",
2059                RESERVED_COL_NAME_FILE, // _file repeated
2060                "y",                    // y repeated
2061            ])
2062            .with_row_selection_enabled(true)
2063            .build()
2064            .unwrap();
2065
2066        let batch_stream = table_scan.to_arrow().await.unwrap();
2067        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2068
2069        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2070        assert_eq!(
2071            batches[0].num_columns(),
2072            6,
2073            "Should have exactly 6 columns with duplicates"
2074        );
2075
2076        let schema = batches[0].schema();
2077
2078        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2079        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2080        assert_eq!(
2081            schema.field(1).name(),
2082            RESERVED_COL_NAME_FILE,
2083            "Column 1 should be _file"
2084        );
2085        assert_eq!(
2086            schema.field(2).name(),
2087            "x",
2088            "Column 2 should be x (duplicate)"
2089        );
2090        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2091        assert_eq!(
2092            schema.field(4).name(),
2093            RESERVED_COL_NAME_FILE,
2094            "Column 4 should be _file (duplicate)"
2095        );
2096        assert_eq!(
2097            schema.field(5).name(),
2098            "y",
2099            "Column 5 should be y (duplicate)"
2100        );
2101
2102        // Verify all columns have correct data types
2103        assert!(
2104            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2105            "Column x should be Int64"
2106        );
2107        assert!(
2108            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2109            "Column x (duplicate) should be Int64"
2110        );
2111        assert!(
2112            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2113            "Column y should be Int64"
2114        );
2115        assert!(
2116            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2117            "Column y (duplicate) should be Int64"
2118        );
2119        assert!(
2120            matches!(
2121                schema.field(1).data_type(),
2122                arrow_schema::DataType::RunEndEncoded(_, _)
2123            ),
2124            "_file column should use RunEndEncoded type"
2125        );
2126        assert!(
2127            matches!(
2128                schema.field(4).data_type(),
2129                arrow_schema::DataType::RunEndEncoded(_, _)
2130            ),
2131            "_file column (duplicate) should use RunEndEncoded type"
2132        );
2133    }
2134
2135    #[tokio::test]
2136    async fn test_scan_deadlock() {
2137        let mut fixture = TableTestFixture::new();
2138        fixture.setup_deadlock_manifests().await;
2139
2140        // Create table scan with concurrency limit 1
2141        // This sets channel size to 1.
2142        // Data manifest has 10 entries -> will block producer.
2143        // Delete manifest is 2nd in list -> won't be processed.
2144        // Consumer 2 (Data) not started -> blocked.
2145        // Consumer 1 (Delete) waiting -> blocked.
2146        let table_scan = fixture
2147            .table
2148            .scan()
2149            .with_concurrency_limit(1)
2150            .build()
2151            .unwrap();
2152
2153        // This should timeout/hang if deadlock exists
2154        // We can use tokio::time::timeout
2155        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2156            table_scan
2157                .plan_files()
2158                .await
2159                .unwrap()
2160                .try_collect::<Vec<_>>()
2161                .await
2162        })
2163        .await;
2164
2165        // Assert it finished (didn't timeout)
2166        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2167    }
2168}