iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46/// A stream of arrow [`RecordBatch`]es.
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49/// Builder to create table scan.
50pub struct TableScanBuilder<'a> {
51    table: &'a Table,
52    // Defaults to none which means select all columns
53    column_names: Option<Vec<String>>,
54    snapshot_id: Option<i64>,
55    batch_size: Option<usize>,
56    case_sensitive: bool,
57    filter: Option<Predicate>,
58    concurrency_limit_data_files: usize,
59    concurrency_limit_manifest_entries: usize,
60    concurrency_limit_manifest_files: usize,
61    row_group_filtering_enabled: bool,
62    row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66    pub(crate) fn new(table: &'a Table) -> Self {
67        let num_cpus = available_parallelism().get();
68
69        Self {
70            table,
71            column_names: None,
72            snapshot_id: None,
73            batch_size: None,
74            case_sensitive: true,
75            filter: None,
76            concurrency_limit_data_files: num_cpus,
77            concurrency_limit_manifest_entries: num_cpus,
78            concurrency_limit_manifest_files: num_cpus,
79            row_group_filtering_enabled: true,
80            row_selection_enabled: false,
81        }
82    }
83
84    /// Sets the desired size of batches in the response
85    /// to something other than the default
86    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90
91    /// Sets the scan's case sensitivity
92    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93        self.case_sensitive = case_sensitive;
94        self
95    }
96
97    /// Specifies a predicate to use as a filter
98    pub fn with_filter(mut self, predicate: Predicate) -> Self {
99        // calls rewrite_not to remove Not nodes, which must be absent
100        // when applying the manifest evaluator
101        self.filter = Some(predicate.rewrite_not());
102        self
103    }
104
105    /// Select all columns.
106    pub fn select_all(mut self) -> Self {
107        self.column_names = None;
108        self
109    }
110
111    /// Select empty columns.
112    pub fn select_empty(mut self) -> Self {
113        self.column_names = Some(vec![]);
114        self
115    }
116
117    /// Select some columns of the table.
118    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119        self.column_names = Some(
120            column_names
121                .into_iter()
122                .map(|item| item.to_string())
123                .collect(),
124        );
125        self
126    }
127
128    /// Set the snapshot to scan. When not set, it uses current snapshot.
129    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130        self.snapshot_id = Some(snapshot_id);
131        self
132    }
133
134    /// Sets the concurrency limit for both manifest files and manifest
135    /// entries for this scan
136    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137        self.concurrency_limit_manifest_files = limit;
138        self.concurrency_limit_manifest_entries = limit;
139        self.concurrency_limit_data_files = limit;
140        self
141    }
142
143    /// Sets the data file concurrency limit for this scan
144    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145        self.concurrency_limit_data_files = limit;
146        self
147    }
148
149    /// Sets the manifest entry concurrency limit for this scan
150    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151        self.concurrency_limit_manifest_entries = limit;
152        self
153    }
154
155    /// Determines whether to enable row group filtering.
156    /// When enabled, if a read is performed with a filter predicate,
157    /// then the metadata for each row group in the parquet file is
158    /// evaluated against the filter predicate and row groups
159    /// that cant contain matching rows will be skipped entirely.
160    ///
161    /// Defaults to enabled, as it generally improves performance or
162    /// keeps it the same, with performance degradation unlikely.
163    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164        self.row_group_filtering_enabled = row_group_filtering_enabled;
165        self
166    }
167
168    /// Determines whether to enable row selection.
169    /// When enabled, if a read is performed with a filter predicate,
170    /// then (for row groups that have not been skipped) the page index
171    /// for each row group in a parquet file is parsed and evaluated
172    /// against the filter predicate to determine if ranges of rows
173    /// within a row group can be skipped, based upon the page-level
174    /// statistics for each column.
175    ///
176    /// Defaults to being disabled. Enabling requires parsing the parquet page
177    /// index, which can be slow enough that parsing the page index outweighs any
178    /// gains from the reduced number of rows that need scanning.
179    /// It is recommended to experiment with partitioning, sorting, row group size,
180    /// page size, and page row limit Iceberg settings on the table being scanned in
181    /// order to get the best performance from using row selection.
182    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183        self.row_selection_enabled = row_selection_enabled;
184        self
185    }
186
187    /// Build the table scan.
188    pub fn build(self) -> Result<TableScan> {
189        let snapshot = match self.snapshot_id {
190            Some(snapshot_id) => self
191                .table
192                .metadata()
193                .snapshot_by_id(snapshot_id)
194                .ok_or_else(|| {
195                    Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("Snapshot with id {snapshot_id} not found"),
198                    )
199                })?
200                .clone(),
201            None => {
202                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203                    return Ok(TableScan {
204                        batch_size: self.batch_size,
205                        column_names: self.column_names,
206                        file_io: self.table.file_io().clone(),
207                        plan_context: None,
208                        concurrency_limit_data_files: self.concurrency_limit_data_files,
209                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211                        row_group_filtering_enabled: self.row_group_filtering_enabled,
212                        row_selection_enabled: self.row_selection_enabled,
213                    });
214                };
215                current_snapshot_id.clone()
216            }
217        };
218
219        let schema = snapshot.schema(self.table.metadata())?;
220
221        // Check that all column names exist in the schema (skip reserved columns).
222        if let Some(column_names) = self.column_names.as_ref() {
223            for column_name in column_names {
224                // Skip reserved columns that don't exist in the schema
225                if is_metadata_column_name(column_name) {
226                    continue;
227                }
228                if schema.field_by_name(column_name).is_none() {
229                    return Err(Error::new(
230                        ErrorKind::DataInvalid,
231                        format!("Column {column_name} not found in table. Schema: {schema}"),
232                    ));
233                }
234            }
235        }
236
237        let mut field_ids = vec![];
238        let column_names = self.column_names.clone().unwrap_or_else(|| {
239            schema
240                .as_struct()
241                .fields()
242                .iter()
243                .map(|f| f.name.clone())
244                .collect()
245        });
246
247        for column_name in column_names.iter() {
248            // Handle metadata columns (like "_file")
249            if is_metadata_column_name(column_name) {
250                field_ids.push(get_metadata_field_id(column_name)?);
251                continue;
252            }
253
254            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255                Error::new(
256                    ErrorKind::DataInvalid,
257                    format!("Column {column_name} not found in table. Schema: {schema}"),
258                )
259            })?;
260
261            schema
262                .as_struct()
263                .field_by_id(field_id)
264                .ok_or_else(|| {
265                    Error::new(
266                        ErrorKind::FeatureUnsupported,
267                        format!(
268                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269                    ),
270                )
271            })?;
272
273            field_ids.push(field_id);
274        }
275
276        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277            Some(predicates.bind(schema.clone(), true)?)
278        } else {
279            None
280        };
281
282        let plan_context = PlanContext {
283            snapshot,
284            table_metadata: self.table.metadata_ref(),
285            snapshot_schema: schema,
286            case_sensitive: self.case_sensitive,
287            predicate: self.filter.map(Arc::new),
288            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289            object_cache: self.table.object_cache(),
290            field_ids: Arc::new(field_ids),
291            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294        };
295
296        Ok(TableScan {
297            batch_size: self.batch_size,
298            column_names: self.column_names,
299            file_io: self.table.file_io().clone(),
300            plan_context: Some(plan_context),
301            concurrency_limit_data_files: self.concurrency_limit_data_files,
302            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304            row_group_filtering_enabled: self.row_group_filtering_enabled,
305            row_selection_enabled: self.row_selection_enabled,
306        })
307    }
308}
309
310/// Table scan.
311#[derive(Debug)]
312pub struct TableScan {
313    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
314    ///
315    /// If this is None, then the scan contains no rows.
316    plan_context: Option<PlanContext>,
317    batch_size: Option<usize>,
318    file_io: FileIO,
319    column_names: Option<Vec<String>>,
320    /// The maximum number of manifest files that will be
321    /// retrieved from [`FileIO`] concurrently
322    concurrency_limit_manifest_files: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_manifest_entries: usize,
327
328    /// The maximum number of [`ManifestEntry`]s that will
329    /// be processed in parallel
330    concurrency_limit_data_files: usize,
331
332    row_group_filtering_enabled: bool,
333    row_selection_enabled: bool,
334}
335
336impl TableScan {
337    /// Returns a stream of [`FileScanTask`]s.
338    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339        let Some(plan_context) = self.plan_context.as_ref() else {
340            return Ok(Box::pin(futures::stream::empty()));
341        };
342
343        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346        // used to stream ManifestEntryContexts between stages of the file plan operation
347        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348            channel(concurrency_limit_manifest_files);
349        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350            channel(concurrency_limit_manifest_files);
351
352        // used to stream the results back to the caller
353        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357        let manifest_list = plan_context.get_manifest_list().await?;
358
359        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
360        // whose partitions cannot match this
361        // scan's filter
362        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363            manifest_list,
364            manifest_entry_data_ctx_tx,
365            delete_file_idx.clone(),
366            manifest_entry_delete_ctx_tx,
367        )?;
368
369        let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
372        spawn(async move {
373            let result = futures::stream::iter(manifest_file_contexts)
374                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375                    ctx.fetch_manifest_and_stream_manifest_entries().await
376                })
377                .await;
378
379            if let Err(error) = result {
380                let _ = channel_for_manifest_error.send(Err(error)).await;
381            }
382        });
383
384        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387        // Process the delete file [`ManifestEntry`] stream in parallel
388        spawn(async move {
389            let result = manifest_entry_delete_ctx_rx
390                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391                .try_for_each_concurrent(
392                    concurrency_limit_manifest_entries,
393                    |(manifest_entry_context, tx)| async move {
394                        spawn(async move {
395                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396                        })
397                        .await
398                    },
399                )
400                .await;
401
402            if let Err(error) = result {
403                let _ = channel_for_delete_manifest_entry_error
404                    .send(Err(error))
405                    .await;
406            }
407        })
408        .await;
409
410        // Process the data file [`ManifestEntry`] stream in parallel
411        spawn(async move {
412            let result = manifest_entry_data_ctx_rx
413                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414                .try_for_each_concurrent(
415                    concurrency_limit_manifest_entries,
416                    |(manifest_entry_context, tx)| async move {
417                        spawn(async move {
418                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
419                        })
420                        .await
421                    },
422                )
423                .await;
424
425            if let Err(error) = result {
426                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427            }
428        });
429
430        Ok(file_scan_task_rx.boxed())
431    }
432
433    /// Returns an [`ArrowRecordBatchStream`].
434    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438            .with_row_selection_enabled(self.row_selection_enabled);
439
440        if let Some(batch_size) = self.batch_size {
441            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442        }
443
444        arrow_reader_builder.build().read(self.plan_files().await?)
445    }
446
447    /// Returns a reference to the column names of the table scan.
448    pub fn column_names(&self) -> Option<&[String]> {
449        self.column_names.as_deref()
450    }
451
452    /// Returns a reference to the snapshot of the table scan.
453    pub fn snapshot(&self) -> Option<&SnapshotRef> {
454        self.plan_context.as_ref().map(|x| &x.snapshot)
455    }
456
457    async fn process_data_manifest_entry(
458        manifest_entry_context: ManifestEntryContext,
459        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460    ) -> Result<()> {
461        // skip processing this manifest entry if it has been marked as deleted
462        if !manifest_entry_context.manifest_entry.is_alive() {
463            return Ok(());
464        }
465
466        // abort the plan if we encounter a manifest entry for a delete file
467        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468            return Err(Error::new(
469                ErrorKind::FeatureUnsupported,
470                "Encountered an entry for a delete file in a data file manifest",
471            ));
472        }
473
474        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475            let BoundPredicates {
476                snapshot_bound_predicate,
477                partition_bound_predicate,
478            } = bound_predicates.as_ref();
479
480            let expression_evaluator_cache =
481                manifest_entry_context.expression_evaluator_cache.as_ref();
482
483            let expression_evaluator = expression_evaluator_cache.get(
484                manifest_entry_context.partition_spec_id,
485                partition_bound_predicate,
486            )?;
487
488            // skip any data file whose partition data indicates that it can't contain
489            // any data that matches this scan's filter
490            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491                return Ok(());
492            }
493
494            // skip any data file whose metrics don't match this scan's filter
495            if !InclusiveMetricsEvaluator::eval(
496                snapshot_bound_predicate,
497                manifest_entry_context.manifest_entry.data_file(),
498                false,
499            )? {
500                return Ok(());
501            }
502        }
503
504        // congratulations! the manifest entry has made its way through the
505        // entire plan without getting filtered out. Create a corresponding
506        // FileScanTask and push it to the result stream
507        file_scan_task_tx
508            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509            .await?;
510
511        Ok(())
512    }
513
514    async fn process_delete_manifest_entry(
515        manifest_entry_context: ManifestEntryContext,
516        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517    ) -> Result<()> {
518        // skip processing this manifest entry if it has been marked as deleted
519        if !manifest_entry_context.manifest_entry.is_alive() {
520            return Ok(());
521        }
522
523        // abort the plan if we encounter a manifest entry that is not for a delete file
524        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525            return Err(Error::new(
526                ErrorKind::FeatureUnsupported,
527                "Encountered an entry for a data file in a delete manifest",
528            ));
529        }
530
531        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532            let expression_evaluator_cache =
533                manifest_entry_context.expression_evaluator_cache.as_ref();
534
535            let expression_evaluator = expression_evaluator_cache.get(
536                manifest_entry_context.partition_spec_id,
537                &bound_predicates.partition_bound_predicate,
538            )?;
539
540            // skip any data file whose partition data indicates that it can't contain
541            // any data that matches this scan's filter
542            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543                return Ok(());
544            }
545        }
546
547        delete_file_ctx_tx
548            .send(DeleteFileContext {
549                manifest_entry: manifest_entry_context.manifest_entry.clone(),
550                partition_spec_id: manifest_entry_context.partition_spec_id,
551            })
552            .await?;
553
554        Ok(())
555    }
556}
557
558pub(crate) struct BoundPredicates {
559    partition_bound_predicate: BoundPredicate,
560    snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565    //! shared tests for the table scan API
566    #![allow(missing_docs)]
567
568    use std::collections::HashMap;
569    use std::fs;
570    use std::fs::File;
571    use std::sync::Arc;
572
573    use arrow_array::cast::AsArray;
574    use arrow_array::{
575        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576        StringArray,
577    };
578    use futures::{TryStreamExt, stream};
579    use minijinja::value::Value;
580    use minijinja::{AutoEscape, Environment, context};
581    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582    use parquet::basic::Compression;
583    use parquet::file::properties::WriterProperties;
584    use tempfile::TempDir;
585    use uuid::Uuid;
586
587    use crate::TableIdent;
588    use crate::arrow::ArrowReaderBuilder;
589    use crate::expr::{BoundPredicate, Reference};
590    use crate::io::{FileIO, OutputFile};
591    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592    use crate::scan::FileScanTask;
593    use crate::spec::{
594        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597    };
598    use crate::table::Table;
599
600    fn render_template(template: &str, ctx: Value) -> String {
601        let mut env = Environment::new();
602        env.set_auto_escape_callback(|_| AutoEscape::None);
603        env.render_str(template, ctx).unwrap()
604    }
605
606    pub struct TableTestFixture {
607        pub table_location: String,
608        pub table: Table,
609    }
610
611    impl TableTestFixture {
612        #[allow(clippy::new_without_default)]
613        pub fn new() -> Self {
614            let tmp_dir = TempDir::new().unwrap();
615            let table_location = tmp_dir.path().join("table1");
616            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618            let table_metadata1_location = table_location.join("metadata/v1.json");
619
620            let file_io = FileIO::new_with_fs();
621
622            let table_metadata = {
623                let template_json_str = fs::read_to_string(format!(
624                    "{}/testdata/example_table_metadata_v2.json",
625                    env!("CARGO_MANIFEST_DIR")
626                ))
627                .unwrap();
628                let metadata_json = render_template(&template_json_str, context! {
629                    table_location => &table_location,
630                    manifest_list_1_location => &manifest_list1_location,
631                    manifest_list_2_location => &manifest_list2_location,
632                    table_metadata_1_location => &table_metadata1_location,
633                });
634                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
635            };
636
637            let table = Table::builder()
638                .metadata(table_metadata)
639                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
640                .file_io(file_io.clone())
641                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
642                .build()
643                .unwrap();
644
645            Self {
646                table_location: table_location.to_str().unwrap().to_string(),
647                table,
648            }
649        }
650
651        #[allow(clippy::new_without_default)]
652        pub fn new_empty() -> Self {
653            let tmp_dir = TempDir::new().unwrap();
654            let table_location = tmp_dir.path().join("table1");
655            let table_metadata1_location = table_location.join("metadata/v1.json");
656
657            let file_io = FileIO::new_with_fs();
658
659            let table_metadata = {
660                let template_json_str = fs::read_to_string(format!(
661                    "{}/testdata/example_empty_table_metadata_v2.json",
662                    env!("CARGO_MANIFEST_DIR")
663                ))
664                .unwrap();
665                let metadata_json = render_template(&template_json_str, context! {
666                    table_location => &table_location,
667                    table_metadata_1_location => &table_metadata1_location,
668                });
669                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670            };
671
672            let table = Table::builder()
673                .metadata(table_metadata)
674                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675                .file_io(file_io.clone())
676                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677                .build()
678                .unwrap();
679
680            Self {
681                table_location: table_location.to_str().unwrap().to_string(),
682                table,
683            }
684        }
685
686        pub fn new_unpartitioned() -> Self {
687            let tmp_dir = TempDir::new().unwrap();
688            let table_location = tmp_dir.path().join("table1");
689            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
690            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
691            let table_metadata1_location = table_location.join("metadata/v1.json");
692
693            let file_io = FileIO::new_with_fs();
694
695            let mut table_metadata = {
696                let template_json_str = fs::read_to_string(format!(
697                    "{}/testdata/example_table_metadata_v2.json",
698                    env!("CARGO_MANIFEST_DIR")
699                ))
700                .unwrap();
701                let metadata_json = render_template(&template_json_str, context! {
702                    table_location => &table_location,
703                    manifest_list_1_location => &manifest_list1_location,
704                    manifest_list_2_location => &manifest_list2_location,
705                    table_metadata_1_location => &table_metadata1_location,
706                });
707                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
708            };
709
710            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
711            table_metadata.partition_specs.clear();
712            table_metadata.default_partition_type = StructType::new(vec![]);
713            table_metadata
714                .partition_specs
715                .insert(0, table_metadata.default_spec.clone());
716
717            let table = Table::builder()
718                .metadata(table_metadata)
719                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
720                .file_io(file_io.clone())
721                .metadata_location(table_metadata1_location.to_str().unwrap())
722                .build()
723                .unwrap();
724
725            Self {
726                table_location: table_location.to_str().unwrap().to_string(),
727                table,
728            }
729        }
730
731        fn next_manifest_file(&self) -> OutputFile {
732            self.table
733                .file_io()
734                .new_output(format!(
735                    "{}/metadata/manifest_{}.avro",
736                    self.table_location,
737                    Uuid::new_v4()
738                ))
739                .unwrap()
740        }
741
742        pub async fn setup_manifest_files(&mut self) {
743            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
744            let parent_snapshot = current_snapshot
745                .parent_snapshot(self.table.metadata())
746                .unwrap();
747            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
748            let current_partition_spec = self.table.metadata().default_partition_spec();
749
750            // Write the data files first, then use the file size in the manifest entries
751            let parquet_file_size = self.write_parquet_data_files();
752
753            let mut writer = ManifestWriterBuilder::new(
754                self.next_manifest_file(),
755                Some(current_snapshot.snapshot_id()),
756                None,
757                current_schema.clone(),
758                current_partition_spec.as_ref().clone(),
759            )
760            .build_v2_data();
761            writer
762                .add_entry(
763                    ManifestEntry::builder()
764                        .status(ManifestStatus::Added)
765                        .data_file(
766                            DataFileBuilder::default()
767                                .partition_spec_id(0)
768                                .content(DataContentType::Data)
769                                .file_path(format!("{}/1.parquet", &self.table_location))
770                                .file_format(DataFileFormat::Parquet)
771                                .file_size_in_bytes(parquet_file_size)
772                                .record_count(1)
773                                .partition(Struct::from_iter([Some(Literal::long(100))]))
774                                .key_metadata(None)
775                                .build()
776                                .unwrap(),
777                        )
778                        .build(),
779                )
780                .unwrap();
781            writer
782                .add_delete_entry(
783                    ManifestEntry::builder()
784                        .status(ManifestStatus::Deleted)
785                        .snapshot_id(parent_snapshot.snapshot_id())
786                        .sequence_number(parent_snapshot.sequence_number())
787                        .file_sequence_number(parent_snapshot.sequence_number())
788                        .data_file(
789                            DataFileBuilder::default()
790                                .partition_spec_id(0)
791                                .content(DataContentType::Data)
792                                .file_path(format!("{}/2.parquet", &self.table_location))
793                                .file_format(DataFileFormat::Parquet)
794                                .file_size_in_bytes(parquet_file_size)
795                                .record_count(1)
796                                .partition(Struct::from_iter([Some(Literal::long(200))]))
797                                .build()
798                                .unwrap(),
799                        )
800                        .build(),
801                )
802                .unwrap();
803            writer
804                .add_existing_entry(
805                    ManifestEntry::builder()
806                        .status(ManifestStatus::Existing)
807                        .snapshot_id(parent_snapshot.snapshot_id())
808                        .sequence_number(parent_snapshot.sequence_number())
809                        .file_sequence_number(parent_snapshot.sequence_number())
810                        .data_file(
811                            DataFileBuilder::default()
812                                .partition_spec_id(0)
813                                .content(DataContentType::Data)
814                                .file_path(format!("{}/3.parquet", &self.table_location))
815                                .file_format(DataFileFormat::Parquet)
816                                .file_size_in_bytes(parquet_file_size)
817                                .record_count(1)
818                                .partition(Struct::from_iter([Some(Literal::long(300))]))
819                                .build()
820                                .unwrap(),
821                        )
822                        .build(),
823                )
824                .unwrap();
825            let data_file_manifest = writer.write_manifest_file().await.unwrap();
826
827            // Write to manifest list
828            let mut manifest_list_write = ManifestListWriter::v2(
829                self.table
830                    .file_io()
831                    .new_output(current_snapshot.manifest_list())
832                    .unwrap(),
833                current_snapshot.snapshot_id(),
834                current_snapshot.parent_snapshot_id(),
835                current_snapshot.sequence_number(),
836            );
837            manifest_list_write
838                .add_manifests(vec![data_file_manifest].into_iter())
839                .unwrap();
840            manifest_list_write.close().await.unwrap();
841        }
842
843        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
844        /// and returns the file size in bytes.
845        fn write_parquet_data_files(&self) -> u64 {
846            std::fs::create_dir_all(&self.table_location).unwrap();
847
848            let schema = {
849                let fields = vec![
850                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
851                        .with_metadata(HashMap::from([(
852                            PARQUET_FIELD_ID_META_KEY.to_string(),
853                            "1".to_string(),
854                        )])),
855                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
856                        .with_metadata(HashMap::from([(
857                            PARQUET_FIELD_ID_META_KEY.to_string(),
858                            "2".to_string(),
859                        )])),
860                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
861                        .with_metadata(HashMap::from([(
862                            PARQUET_FIELD_ID_META_KEY.to_string(),
863                            "3".to_string(),
864                        )])),
865                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
866                        .with_metadata(HashMap::from([(
867                            PARQUET_FIELD_ID_META_KEY.to_string(),
868                            "4".to_string(),
869                        )])),
870                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
871                        .with_metadata(HashMap::from([(
872                            PARQUET_FIELD_ID_META_KEY.to_string(),
873                            "5".to_string(),
874                        )])),
875                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
876                        .with_metadata(HashMap::from([(
877                            PARQUET_FIELD_ID_META_KEY.to_string(),
878                            "6".to_string(),
879                        )])),
880                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
881                        .with_metadata(HashMap::from([(
882                            PARQUET_FIELD_ID_META_KEY.to_string(),
883                            "7".to_string(),
884                        )])),
885                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
886                        .with_metadata(HashMap::from([(
887                            PARQUET_FIELD_ID_META_KEY.to_string(),
888                            "8".to_string(),
889                        )])),
890                ];
891                Arc::new(arrow_schema::Schema::new(fields))
892            };
893            // x: [1, 1, 1, 1, ...]
894            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
895
896            let mut values = vec![2; 512];
897            values.append(vec![3; 200].as_mut());
898            values.append(vec![4; 300].as_mut());
899            values.append(vec![5; 12].as_mut());
900
901            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
902            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
903
904            let mut values = vec![3; 512];
905            values.append(vec![4; 512].as_mut());
906
907            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
908            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
909
910            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
911            let mut values = vec!["Apache"; 512];
912            values.append(vec!["Iceberg"; 512].as_mut());
913            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
914
915            // dbl:
916            let mut values = vec![100.0f64; 512];
917            values.append(vec![150.0f64; 12].as_mut());
918            values.append(vec![200.0f64; 500].as_mut());
919            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
920
921            // i32:
922            let mut values = vec![100i32; 512];
923            values.append(vec![150i32; 12].as_mut());
924            values.append(vec![200i32; 500].as_mut());
925            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
926
927            // i64:
928            let mut values = vec![100i64; 512];
929            values.append(vec![150i64; 12].as_mut());
930            values.append(vec![200i64; 500].as_mut());
931            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
932
933            // bool:
934            let mut values = vec![false; 512];
935            values.append(vec![true; 512].as_mut());
936            let values: BooleanArray = values.into();
937            let col8 = Arc::new(values) as ArrayRef;
938
939            let to_write = RecordBatch::try_new(schema.clone(), vec![
940                col1, col2, col3, col4, col5, col6, col7, col8,
941            ])
942            .unwrap();
943
944            // Write the Parquet files
945            let props = WriterProperties::builder()
946                .set_compression(Compression::SNAPPY)
947                .build();
948
949            for n in 1..=3 {
950                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
951                let mut writer =
952                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
953
954                writer.write(&to_write).expect("Writing batch");
955
956                // writer must be closed to write footer
957                writer.close().unwrap();
958            }
959
960            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
961                .unwrap()
962                .len()
963        }
964
965        pub async fn setup_unpartitioned_manifest_files(&mut self) {
966            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
967            let parent_snapshot = current_snapshot
968                .parent_snapshot(self.table.metadata())
969                .unwrap();
970            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
971            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
972
973            // Write the data files first, then use the file size in the manifest entries
974            let parquet_file_size = self.write_parquet_data_files();
975
976            // Write data files using an empty partition for unpartitioned tables.
977            let mut writer = ManifestWriterBuilder::new(
978                self.next_manifest_file(),
979                Some(current_snapshot.snapshot_id()),
980                None,
981                current_schema.clone(),
982                current_partition_spec.as_ref().clone(),
983            )
984            .build_v2_data();
985
986            // Create an empty partition value.
987            let empty_partition = Struct::empty();
988
989            writer
990                .add_entry(
991                    ManifestEntry::builder()
992                        .status(ManifestStatus::Added)
993                        .data_file(
994                            DataFileBuilder::default()
995                                .partition_spec_id(0)
996                                .content(DataContentType::Data)
997                                .file_path(format!("{}/1.parquet", &self.table_location))
998                                .file_format(DataFileFormat::Parquet)
999                                .file_size_in_bytes(parquet_file_size)
1000                                .record_count(1)
1001                                .partition(empty_partition.clone())
1002                                .key_metadata(None)
1003                                .build()
1004                                .unwrap(),
1005                        )
1006                        .build(),
1007                )
1008                .unwrap();
1009
1010            writer
1011                .add_delete_entry(
1012                    ManifestEntry::builder()
1013                        .status(ManifestStatus::Deleted)
1014                        .snapshot_id(parent_snapshot.snapshot_id())
1015                        .sequence_number(parent_snapshot.sequence_number())
1016                        .file_sequence_number(parent_snapshot.sequence_number())
1017                        .data_file(
1018                            DataFileBuilder::default()
1019                                .partition_spec_id(0)
1020                                .content(DataContentType::Data)
1021                                .file_path(format!("{}/2.parquet", &self.table_location))
1022                                .file_format(DataFileFormat::Parquet)
1023                                .file_size_in_bytes(parquet_file_size)
1024                                .record_count(1)
1025                                .partition(empty_partition.clone())
1026                                .build()
1027                                .unwrap(),
1028                        )
1029                        .build(),
1030                )
1031                .unwrap();
1032
1033            writer
1034                .add_existing_entry(
1035                    ManifestEntry::builder()
1036                        .status(ManifestStatus::Existing)
1037                        .snapshot_id(parent_snapshot.snapshot_id())
1038                        .sequence_number(parent_snapshot.sequence_number())
1039                        .file_sequence_number(parent_snapshot.sequence_number())
1040                        .data_file(
1041                            DataFileBuilder::default()
1042                                .partition_spec_id(0)
1043                                .content(DataContentType::Data)
1044                                .file_path(format!("{}/3.parquet", &self.table_location))
1045                                .file_format(DataFileFormat::Parquet)
1046                                .file_size_in_bytes(parquet_file_size)
1047                                .record_count(1)
1048                                .partition(empty_partition.clone())
1049                                .build()
1050                                .unwrap(),
1051                        )
1052                        .build(),
1053                )
1054                .unwrap();
1055
1056            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1057
1058            // Write to manifest list
1059            let mut manifest_list_write = ManifestListWriter::v2(
1060                self.table
1061                    .file_io()
1062                    .new_output(current_snapshot.manifest_list())
1063                    .unwrap(),
1064                current_snapshot.snapshot_id(),
1065                current_snapshot.parent_snapshot_id(),
1066                current_snapshot.sequence_number(),
1067            );
1068            manifest_list_write
1069                .add_manifests(vec![data_file_manifest].into_iter())
1070                .unwrap();
1071            manifest_list_write.close().await.unwrap();
1072        }
1073
1074        pub async fn setup_deadlock_manifests(&mut self) {
1075            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1076            let _parent_snapshot = current_snapshot
1077                .parent_snapshot(self.table.metadata())
1078                .unwrap();
1079            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1080            let current_partition_spec = self.table.metadata().default_partition_spec();
1081
1082            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1083            let mut writer = ManifestWriterBuilder::new(
1084                self.next_manifest_file(),
1085                Some(current_snapshot.snapshot_id()),
1086                None,
1087                current_schema.clone(),
1088                current_partition_spec.as_ref().clone(),
1089            )
1090            .build_v2_data();
1091
1092            // Add 10 data entries
1093            for i in 0..10 {
1094                writer
1095                    .add_entry(
1096                        ManifestEntry::builder()
1097                            .status(ManifestStatus::Added)
1098                            .data_file(
1099                                DataFileBuilder::default()
1100                                    .partition_spec_id(0)
1101                                    .content(DataContentType::Data)
1102                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1103                                    .file_format(DataFileFormat::Parquet)
1104                                    .file_size_in_bytes(100)
1105                                    .record_count(1)
1106                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1107                                    .key_metadata(None)
1108                                    .build()
1109                                    .unwrap(),
1110                            )
1111                            .build(),
1112                    )
1113                    .unwrap();
1114            }
1115            let data_manifest = writer.write_manifest_file().await.unwrap();
1116
1117            // 2. Write DELETE manifest
1118            let mut writer = ManifestWriterBuilder::new(
1119                self.next_manifest_file(),
1120                Some(current_snapshot.snapshot_id()),
1121                None,
1122                current_schema.clone(),
1123                current_partition_spec.as_ref().clone(),
1124            )
1125            .build_v2_deletes();
1126
1127            writer
1128                .add_entry(
1129                    ManifestEntry::builder()
1130                        .status(ManifestStatus::Added)
1131                        .data_file(
1132                            DataFileBuilder::default()
1133                                .partition_spec_id(0)
1134                                .content(DataContentType::PositionDeletes)
1135                                .file_path(format!("{}/del.parquet", &self.table_location))
1136                                .file_format(DataFileFormat::Parquet)
1137                                .file_size_in_bytes(100)
1138                                .record_count(1)
1139                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1140                                .build()
1141                                .unwrap(),
1142                        )
1143                        .build(),
1144                )
1145                .unwrap();
1146            let delete_manifest = writer.write_manifest_file().await.unwrap();
1147
1148            // Write to manifest list - DATA FIRST then DELETE
1149            // This order is crucial for reproduction
1150            let mut manifest_list_write = ManifestListWriter::v2(
1151                self.table
1152                    .file_io()
1153                    .new_output(current_snapshot.manifest_list())
1154                    .unwrap(),
1155                current_snapshot.snapshot_id(),
1156                current_snapshot.parent_snapshot_id(),
1157                current_snapshot.sequence_number(),
1158            );
1159            manifest_list_write
1160                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1161                .unwrap();
1162            manifest_list_write.close().await.unwrap();
1163        }
1164    }
1165
1166    #[test]
1167    fn test_table_scan_columns() {
1168        let table = TableTestFixture::new().table;
1169
1170        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1171        assert_eq!(
1172            Some(vec!["x".to_string(), "y".to_string()]),
1173            table_scan.column_names
1174        );
1175
1176        let table_scan = table
1177            .scan()
1178            .select(["x", "y"])
1179            .select(["z"])
1180            .build()
1181            .unwrap();
1182        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1183    }
1184
1185    #[test]
1186    fn test_select_all() {
1187        let table = TableTestFixture::new().table;
1188
1189        let table_scan = table.scan().select_all().build().unwrap();
1190        assert!(table_scan.column_names.is_none());
1191    }
1192
1193    #[test]
1194    fn test_select_no_exist_column() {
1195        let table = TableTestFixture::new().table;
1196
1197        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1198        assert!(table_scan.is_err());
1199    }
1200
1201    #[test]
1202    fn test_table_scan_default_snapshot_id() {
1203        let table = TableTestFixture::new().table;
1204
1205        let table_scan = table.scan().build().unwrap();
1206        assert_eq!(
1207            table.metadata().current_snapshot().unwrap().snapshot_id(),
1208            table_scan.snapshot().unwrap().snapshot_id()
1209        );
1210    }
1211
1212    #[test]
1213    fn test_table_scan_non_exist_snapshot_id() {
1214        let table = TableTestFixture::new().table;
1215
1216        let table_scan = table.scan().snapshot_id(1024).build();
1217        assert!(table_scan.is_err());
1218    }
1219
1220    #[test]
1221    fn test_table_scan_with_snapshot_id() {
1222        let table = TableTestFixture::new().table;
1223
1224        let table_scan = table
1225            .scan()
1226            .snapshot_id(3051729675574597004)
1227            .with_row_selection_enabled(true)
1228            .build()
1229            .unwrap();
1230        assert_eq!(
1231            table_scan.snapshot().unwrap().snapshot_id(),
1232            3051729675574597004
1233        );
1234    }
1235
1236    #[tokio::test]
1237    async fn test_plan_files_on_table_without_any_snapshots() {
1238        let table = TableTestFixture::new_empty().table;
1239        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1240        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1241        assert!(batches.is_empty());
1242    }
1243
1244    #[tokio::test]
1245    async fn test_plan_files_no_deletions() {
1246        let mut fixture = TableTestFixture::new();
1247        fixture.setup_manifest_files().await;
1248
1249        // Create table scan for current snapshot and plan files
1250        let table_scan = fixture
1251            .table
1252            .scan()
1253            .with_row_selection_enabled(true)
1254            .build()
1255            .unwrap();
1256
1257        let mut tasks = table_scan
1258            .plan_files()
1259            .await
1260            .unwrap()
1261            .try_fold(vec![], |mut acc, task| async move {
1262                acc.push(task);
1263                Ok(acc)
1264            })
1265            .await
1266            .unwrap();
1267
1268        assert_eq!(tasks.len(), 2);
1269
1270        tasks.sort_by_key(|t| t.data_file_path.to_string());
1271
1272        // Check first task is added data file
1273        assert_eq!(
1274            tasks[0].data_file_path,
1275            format!("{}/1.parquet", &fixture.table_location)
1276        );
1277
1278        // Check second task is existing data file
1279        assert_eq!(
1280            tasks[1].data_file_path,
1281            format!("{}/3.parquet", &fixture.table_location)
1282        );
1283    }
1284
1285    #[tokio::test]
1286    async fn test_open_parquet_no_deletions() {
1287        let mut fixture = TableTestFixture::new();
1288        fixture.setup_manifest_files().await;
1289
1290        // Create table scan for current snapshot and plan files
1291        let table_scan = fixture
1292            .table
1293            .scan()
1294            .with_row_selection_enabled(true)
1295            .build()
1296            .unwrap();
1297
1298        let batch_stream = table_scan.to_arrow().await.unwrap();
1299
1300        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1301
1302        let col = batches[0].column_by_name("x").unwrap();
1303
1304        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1305        assert_eq!(int64_arr.value(0), 1);
1306    }
1307
1308    #[tokio::test]
1309    async fn test_open_parquet_no_deletions_by_separate_reader() {
1310        let mut fixture = TableTestFixture::new();
1311        fixture.setup_manifest_files().await;
1312
1313        // Create table scan for current snapshot and plan files
1314        let table_scan = fixture
1315            .table
1316            .scan()
1317            .with_row_selection_enabled(true)
1318            .build()
1319            .unwrap();
1320
1321        let mut plan_task: Vec<_> = table_scan
1322            .plan_files()
1323            .await
1324            .unwrap()
1325            .try_collect()
1326            .await
1327            .unwrap();
1328        assert_eq!(plan_task.len(), 2);
1329
1330        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1331        let batch_stream = reader
1332            .clone()
1333            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1334            .unwrap();
1335        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1336
1337        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1338        let batch_stream = reader
1339            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1340            .unwrap();
1341        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1342
1343        assert_eq!(batch_1, batch_2);
1344    }
1345
1346    #[tokio::test]
1347    async fn test_open_parquet_with_projection() {
1348        let mut fixture = TableTestFixture::new();
1349        fixture.setup_manifest_files().await;
1350
1351        // Create table scan for current snapshot and plan files
1352        let table_scan = fixture
1353            .table
1354            .scan()
1355            .select(["x", "z"])
1356            .with_row_selection_enabled(true)
1357            .build()
1358            .unwrap();
1359
1360        let batch_stream = table_scan.to_arrow().await.unwrap();
1361
1362        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1363
1364        assert_eq!(batches[0].num_columns(), 2);
1365
1366        let col1 = batches[0].column_by_name("x").unwrap();
1367        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1368        assert_eq!(int64_arr.value(0), 1);
1369
1370        let col2 = batches[0].column_by_name("z").unwrap();
1371        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1372        assert_eq!(int64_arr.value(0), 3);
1373
1374        // test empty scan
1375        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1376        let batch_stream = table_scan.to_arrow().await.unwrap();
1377        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1378
1379        assert_eq!(batches[0].num_columns(), 0);
1380        assert_eq!(batches[0].num_rows(), 1024);
1381    }
1382
1383    #[tokio::test]
1384    async fn test_filter_on_arrow_lt() {
1385        let mut fixture = TableTestFixture::new();
1386        fixture.setup_manifest_files().await;
1387
1388        // Filter: y < 3
1389        let mut builder = fixture.table.scan();
1390        let predicate = Reference::new("y").less_than(Datum::long(3));
1391        builder = builder
1392            .with_filter(predicate)
1393            .with_row_selection_enabled(true);
1394        let table_scan = builder.build().unwrap();
1395
1396        let batch_stream = table_scan.to_arrow().await.unwrap();
1397
1398        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1399
1400        assert_eq!(batches[0].num_rows(), 512);
1401
1402        let col = batches[0].column_by_name("x").unwrap();
1403        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1404        assert_eq!(int64_arr.value(0), 1);
1405
1406        let col = batches[0].column_by_name("y").unwrap();
1407        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1408        assert_eq!(int64_arr.value(0), 2);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_filter_on_arrow_gt_eq() {
1413        let mut fixture = TableTestFixture::new();
1414        fixture.setup_manifest_files().await;
1415
1416        // Filter: y >= 5
1417        let mut builder = fixture.table.scan();
1418        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1419        builder = builder
1420            .with_filter(predicate)
1421            .with_row_selection_enabled(true);
1422        let table_scan = builder.build().unwrap();
1423
1424        let batch_stream = table_scan.to_arrow().await.unwrap();
1425
1426        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1427
1428        assert_eq!(batches[0].num_rows(), 12);
1429
1430        let col = batches[0].column_by_name("x").unwrap();
1431        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1432        assert_eq!(int64_arr.value(0), 1);
1433
1434        let col = batches[0].column_by_name("y").unwrap();
1435        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1436        assert_eq!(int64_arr.value(0), 5);
1437    }
1438
1439    #[tokio::test]
1440    async fn test_filter_double_eq() {
1441        let mut fixture = TableTestFixture::new();
1442        fixture.setup_manifest_files().await;
1443
1444        // Filter: dbl == 150.0
1445        let mut builder = fixture.table.scan();
1446        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1447        builder = builder
1448            .with_filter(predicate)
1449            .with_row_selection_enabled(true);
1450        let table_scan = builder.build().unwrap();
1451
1452        let batch_stream = table_scan.to_arrow().await.unwrap();
1453
1454        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1455
1456        assert_eq!(batches.len(), 2);
1457        assert_eq!(batches[0].num_rows(), 12);
1458
1459        let col = batches[0].column_by_name("dbl").unwrap();
1460        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1461        assert_eq!(f64_arr.value(1), 150.0f64);
1462    }
1463
1464    #[tokio::test]
1465    async fn test_filter_int_eq() {
1466        let mut fixture = TableTestFixture::new();
1467        fixture.setup_manifest_files().await;
1468
1469        // Filter: i32 == 150
1470        let mut builder = fixture.table.scan();
1471        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1472        builder = builder
1473            .with_filter(predicate)
1474            .with_row_selection_enabled(true);
1475        let table_scan = builder.build().unwrap();
1476
1477        let batch_stream = table_scan.to_arrow().await.unwrap();
1478
1479        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1480
1481        assert_eq!(batches.len(), 2);
1482        assert_eq!(batches[0].num_rows(), 12);
1483
1484        let col = batches[0].column_by_name("i32").unwrap();
1485        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1486        assert_eq!(i32_arr.value(1), 150i32);
1487    }
1488
1489    #[tokio::test]
1490    async fn test_filter_long_eq() {
1491        let mut fixture = TableTestFixture::new();
1492        fixture.setup_manifest_files().await;
1493
1494        // Filter: i64 == 150
1495        let mut builder = fixture.table.scan();
1496        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1497        builder = builder
1498            .with_filter(predicate)
1499            .with_row_selection_enabled(true);
1500        let table_scan = builder.build().unwrap();
1501
1502        let batch_stream = table_scan.to_arrow().await.unwrap();
1503
1504        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1505
1506        assert_eq!(batches.len(), 2);
1507        assert_eq!(batches[0].num_rows(), 12);
1508
1509        let col = batches[0].column_by_name("i64").unwrap();
1510        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1511        assert_eq!(i64_arr.value(1), 150i64);
1512    }
1513
1514    #[tokio::test]
1515    async fn test_filter_bool_eq() {
1516        let mut fixture = TableTestFixture::new();
1517        fixture.setup_manifest_files().await;
1518
1519        // Filter: bool == true
1520        let mut builder = fixture.table.scan();
1521        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1522        builder = builder
1523            .with_filter(predicate)
1524            .with_row_selection_enabled(true);
1525        let table_scan = builder.build().unwrap();
1526
1527        let batch_stream = table_scan.to_arrow().await.unwrap();
1528
1529        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1530
1531        assert_eq!(batches.len(), 2);
1532        assert_eq!(batches[0].num_rows(), 512);
1533
1534        let col = batches[0].column_by_name("bool").unwrap();
1535        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1536        assert!(bool_arr.value(1));
1537    }
1538
1539    #[tokio::test]
1540    async fn test_filter_on_arrow_is_null() {
1541        let mut fixture = TableTestFixture::new();
1542        fixture.setup_manifest_files().await;
1543
1544        // Filter: y is null
1545        let mut builder = fixture.table.scan();
1546        let predicate = Reference::new("y").is_null();
1547        builder = builder
1548            .with_filter(predicate)
1549            .with_row_selection_enabled(true);
1550        let table_scan = builder.build().unwrap();
1551
1552        let batch_stream = table_scan.to_arrow().await.unwrap();
1553
1554        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1555        assert_eq!(batches.len(), 0);
1556    }
1557
1558    #[tokio::test]
1559    async fn test_filter_on_arrow_is_not_null() {
1560        let mut fixture = TableTestFixture::new();
1561        fixture.setup_manifest_files().await;
1562
1563        // Filter: y is not null
1564        let mut builder = fixture.table.scan();
1565        let predicate = Reference::new("y").is_not_null();
1566        builder = builder
1567            .with_filter(predicate)
1568            .with_row_selection_enabled(true);
1569        let table_scan = builder.build().unwrap();
1570
1571        let batch_stream = table_scan.to_arrow().await.unwrap();
1572
1573        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1574        assert_eq!(batches[0].num_rows(), 1024);
1575    }
1576
1577    #[tokio::test]
1578    async fn test_filter_on_arrow_lt_and_gt() {
1579        let mut fixture = TableTestFixture::new();
1580        fixture.setup_manifest_files().await;
1581
1582        // Filter: y < 5 AND z >= 4
1583        let mut builder = fixture.table.scan();
1584        let predicate = Reference::new("y")
1585            .less_than(Datum::long(5))
1586            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1587        builder = builder
1588            .with_filter(predicate)
1589            .with_row_selection_enabled(true);
1590        let table_scan = builder.build().unwrap();
1591
1592        let batch_stream = table_scan.to_arrow().await.unwrap();
1593
1594        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1595        assert_eq!(batches[0].num_rows(), 500);
1596
1597        let col = batches[0].column_by_name("x").unwrap();
1598        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1599        assert_eq!(col, &expected_x);
1600
1601        let col = batches[0].column_by_name("y").unwrap();
1602        let mut values = vec![];
1603        values.append(vec![3; 200].as_mut());
1604        values.append(vec![4; 300].as_mut());
1605        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1606        assert_eq!(col, &expected_y);
1607
1608        let col = batches[0].column_by_name("z").unwrap();
1609        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1610        assert_eq!(col, &expected_z);
1611    }
1612
1613    #[tokio::test]
1614    async fn test_filter_on_arrow_lt_or_gt() {
1615        let mut fixture = TableTestFixture::new();
1616        fixture.setup_manifest_files().await;
1617
1618        // Filter: y < 5 AND z >= 4
1619        let mut builder = fixture.table.scan();
1620        let predicate = Reference::new("y")
1621            .less_than(Datum::long(5))
1622            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1623        builder = builder
1624            .with_filter(predicate)
1625            .with_row_selection_enabled(true);
1626        let table_scan = builder.build().unwrap();
1627
1628        let batch_stream = table_scan.to_arrow().await.unwrap();
1629
1630        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1631        assert_eq!(batches[0].num_rows(), 1024);
1632
1633        let col = batches[0].column_by_name("x").unwrap();
1634        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1635        assert_eq!(col, &expected_x);
1636
1637        let col = batches[0].column_by_name("y").unwrap();
1638        let mut values = vec![2; 512];
1639        values.append(vec![3; 200].as_mut());
1640        values.append(vec![4; 300].as_mut());
1641        values.append(vec![5; 12].as_mut());
1642        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1643        assert_eq!(col, &expected_y);
1644
1645        let col = batches[0].column_by_name("z").unwrap();
1646        let mut values = vec![3; 512];
1647        values.append(vec![4; 512].as_mut());
1648        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1649        assert_eq!(col, &expected_z);
1650    }
1651
1652    #[tokio::test]
1653    async fn test_filter_on_arrow_startswith() {
1654        let mut fixture = TableTestFixture::new();
1655        fixture.setup_manifest_files().await;
1656
1657        // Filter: a STARTSWITH "Ice"
1658        let mut builder = fixture.table.scan();
1659        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1660        builder = builder
1661            .with_filter(predicate)
1662            .with_row_selection_enabled(true);
1663        let table_scan = builder.build().unwrap();
1664
1665        let batch_stream = table_scan.to_arrow().await.unwrap();
1666
1667        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1668
1669        assert_eq!(batches[0].num_rows(), 512);
1670
1671        let col = batches[0].column_by_name("a").unwrap();
1672        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1673        assert_eq!(string_arr.value(0), "Iceberg");
1674    }
1675
1676    #[tokio::test]
1677    async fn test_filter_on_arrow_not_startswith() {
1678        let mut fixture = TableTestFixture::new();
1679        fixture.setup_manifest_files().await;
1680
1681        // Filter: a NOT STARTSWITH "Ice"
1682        let mut builder = fixture.table.scan();
1683        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1684        builder = builder
1685            .with_filter(predicate)
1686            .with_row_selection_enabled(true);
1687        let table_scan = builder.build().unwrap();
1688
1689        let batch_stream = table_scan.to_arrow().await.unwrap();
1690
1691        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1692
1693        assert_eq!(batches[0].num_rows(), 512);
1694
1695        let col = batches[0].column_by_name("a").unwrap();
1696        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1697        assert_eq!(string_arr.value(0), "Apache");
1698    }
1699
1700    #[tokio::test]
1701    async fn test_filter_on_arrow_in() {
1702        let mut fixture = TableTestFixture::new();
1703        fixture.setup_manifest_files().await;
1704
1705        // Filter: a IN ("Sioux", "Iceberg")
1706        let mut builder = fixture.table.scan();
1707        let predicate =
1708            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1709        builder = builder
1710            .with_filter(predicate)
1711            .with_row_selection_enabled(true);
1712        let table_scan = builder.build().unwrap();
1713
1714        let batch_stream = table_scan.to_arrow().await.unwrap();
1715
1716        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1717
1718        assert_eq!(batches[0].num_rows(), 512);
1719
1720        let col = batches[0].column_by_name("a").unwrap();
1721        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1722        assert_eq!(string_arr.value(0), "Iceberg");
1723    }
1724
1725    #[tokio::test]
1726    async fn test_filter_on_arrow_not_in() {
1727        let mut fixture = TableTestFixture::new();
1728        fixture.setup_manifest_files().await;
1729
1730        // Filter: a NOT IN ("Sioux", "Iceberg")
1731        let mut builder = fixture.table.scan();
1732        let predicate =
1733            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1734        builder = builder
1735            .with_filter(predicate)
1736            .with_row_selection_enabled(true);
1737        let table_scan = builder.build().unwrap();
1738
1739        let batch_stream = table_scan.to_arrow().await.unwrap();
1740
1741        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1742
1743        assert_eq!(batches[0].num_rows(), 512);
1744
1745        let col = batches[0].column_by_name("a").unwrap();
1746        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1747        assert_eq!(string_arr.value(0), "Apache");
1748    }
1749
1750    #[test]
1751    fn test_file_scan_task_serialize_deserialize() {
1752        let test_fn = |task: FileScanTask| {
1753            let serialized = serde_json::to_string(&task).unwrap();
1754            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1755
1756            assert_eq!(task.data_file_path, deserialized.data_file_path);
1757            assert_eq!(task.start, deserialized.start);
1758            assert_eq!(task.length, deserialized.length);
1759            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1760            assert_eq!(task.predicate, deserialized.predicate);
1761            assert_eq!(task.schema, deserialized.schema);
1762        };
1763
1764        // without predicate
1765        let schema = Arc::new(
1766            Schema::builder()
1767                .with_fields(vec![Arc::new(NestedField::required(
1768                    1,
1769                    "x",
1770                    Type::Primitive(PrimitiveType::Binary),
1771                ))])
1772                .build()
1773                .unwrap(),
1774        );
1775        let task = FileScanTask {
1776            data_file_path: "data_file_path".to_string(),
1777            file_size_in_bytes: 0,
1778            start: 0,
1779            length: 100,
1780            project_field_ids: vec![1, 2, 3],
1781            predicate: None,
1782            schema: schema.clone(),
1783            record_count: Some(100),
1784            data_file_format: DataFileFormat::Parquet,
1785            deletes: vec![],
1786            partition: None,
1787            partition_spec: None,
1788            name_mapping: None,
1789            case_sensitive: false,
1790        };
1791        test_fn(task);
1792
1793        // with predicate
1794        let task = FileScanTask {
1795            data_file_path: "data_file_path".to_string(),
1796            file_size_in_bytes: 0,
1797            start: 0,
1798            length: 100,
1799            project_field_ids: vec![1, 2, 3],
1800            predicate: Some(BoundPredicate::AlwaysTrue),
1801            schema,
1802            record_count: None,
1803            data_file_format: DataFileFormat::Avro,
1804            deletes: vec![],
1805            partition: None,
1806            partition_spec: None,
1807            name_mapping: None,
1808            case_sensitive: false,
1809        };
1810        test_fn(task);
1811    }
1812
1813    #[tokio::test]
1814    async fn test_select_with_file_column() {
1815        use arrow_array::cast::AsArray;
1816
1817        let mut fixture = TableTestFixture::new();
1818        fixture.setup_manifest_files().await;
1819
1820        // Select regular columns plus the _file column
1821        let table_scan = fixture
1822            .table
1823            .scan()
1824            .select(["x", RESERVED_COL_NAME_FILE])
1825            .with_row_selection_enabled(true)
1826            .build()
1827            .unwrap();
1828
1829        let batch_stream = table_scan.to_arrow().await.unwrap();
1830        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1831
1832        // Verify we have 2 columns: x and _file
1833        assert_eq!(batches[0].num_columns(), 2);
1834
1835        // Verify the x column exists and has correct data
1836        let x_col = batches[0].column_by_name("x").unwrap();
1837        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1838        assert_eq!(x_arr.value(0), 1);
1839
1840        // Verify the _file column exists
1841        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1842        assert!(
1843            file_col.is_some(),
1844            "_file column should be present in the batch"
1845        );
1846
1847        // Verify the _file column contains a file path
1848        let file_col = file_col.unwrap();
1849        assert!(
1850            matches!(
1851                file_col.data_type(),
1852                arrow_schema::DataType::RunEndEncoded(_, _)
1853            ),
1854            "_file column should use RunEndEncoded type"
1855        );
1856
1857        // Decode the RunArray to verify it contains the file path
1858        let run_array = file_col
1859            .as_any()
1860            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1861            .expect("_file column should be a RunArray");
1862
1863        let values = run_array.values();
1864        let string_values = values.as_string::<i32>();
1865        assert_eq!(string_values.len(), 1, "Should have a single file path");
1866
1867        let file_path = string_values.value(0);
1868        assert!(
1869            file_path.ends_with(".parquet"),
1870            "File path should end with .parquet, got: {file_path}"
1871        );
1872    }
1873
1874    #[tokio::test]
1875    async fn test_select_file_column_position() {
1876        let mut fixture = TableTestFixture::new();
1877        fixture.setup_manifest_files().await;
1878
1879        // Select columns in specific order: x, _file, z
1880        let table_scan = fixture
1881            .table
1882            .scan()
1883            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1884            .with_row_selection_enabled(true)
1885            .build()
1886            .unwrap();
1887
1888        let batch_stream = table_scan.to_arrow().await.unwrap();
1889        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1890
1891        assert_eq!(batches[0].num_columns(), 3);
1892
1893        // Verify column order: x at position 0, _file at position 1, z at position 2
1894        let schema = batches[0].schema();
1895        assert_eq!(schema.field(0).name(), "x");
1896        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1897        assert_eq!(schema.field(2).name(), "z");
1898
1899        // Verify columns by name also works
1900        assert!(batches[0].column_by_name("x").is_some());
1901        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1902        assert!(batches[0].column_by_name("z").is_some());
1903    }
1904
1905    #[tokio::test]
1906    async fn test_select_file_column_only() {
1907        let mut fixture = TableTestFixture::new();
1908        fixture.setup_manifest_files().await;
1909
1910        // Select only the _file column
1911        let table_scan = fixture
1912            .table
1913            .scan()
1914            .select([RESERVED_COL_NAME_FILE])
1915            .with_row_selection_enabled(true)
1916            .build()
1917            .unwrap();
1918
1919        let batch_stream = table_scan.to_arrow().await.unwrap();
1920        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1921
1922        // Should have exactly 1 column
1923        assert_eq!(batches[0].num_columns(), 1);
1924
1925        // Verify it's the _file column
1926        let schema = batches[0].schema();
1927        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1928
1929        // Verify the batch has the correct number of rows
1930        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
1931        // Each file has 1024 rows, so total is 2048 rows
1932        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1933        assert_eq!(total_rows, 2048);
1934    }
1935
1936    #[tokio::test]
1937    async fn test_file_column_with_multiple_files() {
1938        use std::collections::HashSet;
1939
1940        let mut fixture = TableTestFixture::new();
1941        fixture.setup_manifest_files().await;
1942
1943        // Select x and _file columns
1944        let table_scan = fixture
1945            .table
1946            .scan()
1947            .select(["x", RESERVED_COL_NAME_FILE])
1948            .with_row_selection_enabled(true)
1949            .build()
1950            .unwrap();
1951
1952        let batch_stream = table_scan.to_arrow().await.unwrap();
1953        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1954
1955        // Collect all unique file paths from the batches
1956        let mut file_paths = HashSet::new();
1957        for batch in &batches {
1958            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1959            let run_array = file_col
1960                .as_any()
1961                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1962                .expect("_file column should be a RunArray");
1963
1964            let values = run_array.values();
1965            let string_values = values.as_string::<i32>();
1966            for i in 0..string_values.len() {
1967                file_paths.insert(string_values.value(i).to_string());
1968            }
1969        }
1970
1971        // We should have multiple files (the test creates 1.parquet and 3.parquet)
1972        assert!(!file_paths.is_empty(), "Should have at least one file path");
1973
1974        // All paths should end with .parquet
1975        for path in &file_paths {
1976            assert!(
1977                path.ends_with(".parquet"),
1978                "All file paths should end with .parquet, got: {path}"
1979            );
1980        }
1981    }
1982
1983    #[tokio::test]
1984    async fn test_file_column_at_start() {
1985        let mut fixture = TableTestFixture::new();
1986        fixture.setup_manifest_files().await;
1987
1988        // Select _file at the start
1989        let table_scan = fixture
1990            .table
1991            .scan()
1992            .select([RESERVED_COL_NAME_FILE, "x", "y"])
1993            .with_row_selection_enabled(true)
1994            .build()
1995            .unwrap();
1996
1997        let batch_stream = table_scan.to_arrow().await.unwrap();
1998        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1999
2000        assert_eq!(batches[0].num_columns(), 3);
2001
2002        // Verify _file is at position 0
2003        let schema = batches[0].schema();
2004        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2005        assert_eq!(schema.field(1).name(), "x");
2006        assert_eq!(schema.field(2).name(), "y");
2007    }
2008
2009    #[tokio::test]
2010    async fn test_file_column_at_end() {
2011        let mut fixture = TableTestFixture::new();
2012        fixture.setup_manifest_files().await;
2013
2014        // Select _file at the end
2015        let table_scan = fixture
2016            .table
2017            .scan()
2018            .select(["x", "y", RESERVED_COL_NAME_FILE])
2019            .with_row_selection_enabled(true)
2020            .build()
2021            .unwrap();
2022
2023        let batch_stream = table_scan.to_arrow().await.unwrap();
2024        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2025
2026        assert_eq!(batches[0].num_columns(), 3);
2027
2028        // Verify _file is at position 2 (the end)
2029        let schema = batches[0].schema();
2030        assert_eq!(schema.field(0).name(), "x");
2031        assert_eq!(schema.field(1).name(), "y");
2032        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2033    }
2034
2035    #[tokio::test]
2036    async fn test_select_with_repeated_column_names() {
2037        let mut fixture = TableTestFixture::new();
2038        fixture.setup_manifest_files().await;
2039
2040        // Select with repeated column names - both regular columns and virtual columns
2041        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2042        let table_scan = fixture
2043            .table
2044            .scan()
2045            .select([
2046                "x",
2047                RESERVED_COL_NAME_FILE,
2048                "x", // x repeated
2049                "y",
2050                RESERVED_COL_NAME_FILE, // _file repeated
2051                "y",                    // y repeated
2052            ])
2053            .with_row_selection_enabled(true)
2054            .build()
2055            .unwrap();
2056
2057        let batch_stream = table_scan.to_arrow().await.unwrap();
2058        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2059
2060        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2061        assert_eq!(
2062            batches[0].num_columns(),
2063            6,
2064            "Should have exactly 6 columns with duplicates"
2065        );
2066
2067        let schema = batches[0].schema();
2068
2069        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2070        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2071        assert_eq!(
2072            schema.field(1).name(),
2073            RESERVED_COL_NAME_FILE,
2074            "Column 1 should be _file"
2075        );
2076        assert_eq!(
2077            schema.field(2).name(),
2078            "x",
2079            "Column 2 should be x (duplicate)"
2080        );
2081        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2082        assert_eq!(
2083            schema.field(4).name(),
2084            RESERVED_COL_NAME_FILE,
2085            "Column 4 should be _file (duplicate)"
2086        );
2087        assert_eq!(
2088            schema.field(5).name(),
2089            "y",
2090            "Column 5 should be y (duplicate)"
2091        );
2092
2093        // Verify all columns have correct data types
2094        assert!(
2095            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2096            "Column x should be Int64"
2097        );
2098        assert!(
2099            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2100            "Column x (duplicate) should be Int64"
2101        );
2102        assert!(
2103            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2104            "Column y should be Int64"
2105        );
2106        assert!(
2107            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2108            "Column y (duplicate) should be Int64"
2109        );
2110        assert!(
2111            matches!(
2112                schema.field(1).data_type(),
2113                arrow_schema::DataType::RunEndEncoded(_, _)
2114            ),
2115            "_file column should use RunEndEncoded type"
2116        );
2117        assert!(
2118            matches!(
2119                schema.field(4).data_type(),
2120                arrow_schema::DataType::RunEndEncoded(_, _)
2121            ),
2122            "_file column (duplicate) should use RunEndEncoded type"
2123        );
2124    }
2125
2126    #[tokio::test]
2127    async fn test_scan_deadlock() {
2128        let mut fixture = TableTestFixture::new();
2129        fixture.setup_deadlock_manifests().await;
2130
2131        // Create table scan with concurrency limit 1
2132        // This sets channel size to 1.
2133        // Data manifest has 10 entries -> will block producer.
2134        // Delete manifest is 2nd in list -> won't be processed.
2135        // Consumer 2 (Data) not started -> blocked.
2136        // Consumer 1 (Delete) waiting -> blocked.
2137        let table_scan = fixture
2138            .table
2139            .scan()
2140            .with_concurrency_limit(1)
2141            .build()
2142            .unwrap();
2143
2144        // This should timeout/hang if deadlock exists
2145        // We can use tokio::time::timeout
2146        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2147            table_scan
2148                .plan_files()
2149                .await
2150                .unwrap()
2151                .try_collect::<Vec<_>>()
2152                .await
2153        })
2154        .await;
2155
2156        // Assert it finished (didn't timeout)
2157        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2158    }
2159}