iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47/// A stream of arrow [`RecordBatch`]es.
48pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50/// Builder to create table scan.
51pub struct TableScanBuilder<'a> {
52    table: &'a Table,
53    // Defaults to none which means select all columns
54    column_names: Option<Vec<String>>,
55    snapshot_id: Option<i64>,
56    batch_size: Option<usize>,
57    case_sensitive: bool,
58    filter: Option<Predicate>,
59    concurrency_limit_data_files: usize,
60    concurrency_limit_manifest_entries: usize,
61    concurrency_limit_manifest_files: usize,
62    row_group_filtering_enabled: bool,
63    row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67    pub(crate) fn new(table: &'a Table) -> Self {
68        let num_cpus = available_parallelism().get();
69
70        Self {
71            table,
72            column_names: None,
73            snapshot_id: None,
74            batch_size: None,
75            case_sensitive: true,
76            filter: None,
77            concurrency_limit_data_files: num_cpus,
78            concurrency_limit_manifest_entries: num_cpus,
79            concurrency_limit_manifest_files: num_cpus,
80            row_group_filtering_enabled: true,
81            row_selection_enabled: false,
82        }
83    }
84
85    /// Sets the desired size of batches in the response
86    /// to something other than the default
87    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88        self.batch_size = batch_size;
89        self
90    }
91
92    /// Sets the scan's case sensitivity
93    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94        self.case_sensitive = case_sensitive;
95        self
96    }
97
98    /// Specifies a predicate to use as a filter
99    pub fn with_filter(mut self, predicate: Predicate) -> Self {
100        // calls rewrite_not to remove Not nodes, which must be absent
101        // when applying the manifest evaluator
102        self.filter = Some(predicate.rewrite_not());
103        self
104    }
105
106    /// Select all columns.
107    pub fn select_all(mut self) -> Self {
108        self.column_names = None;
109        self
110    }
111
112    /// Select empty columns.
113    pub fn select_empty(mut self) -> Self {
114        self.column_names = Some(vec![]);
115        self
116    }
117
118    /// Select some columns of the table.
119    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120        self.column_names = Some(
121            column_names
122                .into_iter()
123                .map(|item| item.to_string())
124                .collect(),
125        );
126        self
127    }
128
129    /// Set the snapshot to scan. When not set, it uses current snapshot.
130    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131        self.snapshot_id = Some(snapshot_id);
132        self
133    }
134
135    /// Sets the concurrency limit for both manifest files and manifest
136    /// entries for this scan
137    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138        self.concurrency_limit_manifest_files = limit;
139        self.concurrency_limit_manifest_entries = limit;
140        self.concurrency_limit_data_files = limit;
141        self
142    }
143
144    /// Sets the data file concurrency limit for this scan
145    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146        self.concurrency_limit_data_files = limit;
147        self
148    }
149
150    /// Sets the manifest entry concurrency limit for this scan
151    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152        self.concurrency_limit_manifest_entries = limit;
153        self
154    }
155
156    /// Determines whether to enable row group filtering.
157    /// When enabled, if a read is performed with a filter predicate,
158    /// then the metadata for each row group in the parquet file is
159    /// evaluated against the filter predicate and row groups
160    /// that cant contain matching rows will be skipped entirely.
161    ///
162    /// Defaults to enabled, as it generally improves performance or
163    /// keeps it the same, with performance degradation unlikely.
164    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165        self.row_group_filtering_enabled = row_group_filtering_enabled;
166        self
167    }
168
169    /// Determines whether to enable row selection.
170    /// When enabled, if a read is performed with a filter predicate,
171    /// then (for row groups that have not been skipped) the page index
172    /// for each row group in a parquet file is parsed and evaluated
173    /// against the filter predicate to determine if ranges of rows
174    /// within a row group can be skipped, based upon the page-level
175    /// statistics for each column.
176    ///
177    /// Defaults to being disabled. Enabling requires parsing the parquet page
178    /// index, which can be slow enough that parsing the page index outweighs any
179    /// gains from the reduced number of rows that need scanning.
180    /// It is recommended to experiment with partitioning, sorting, row group size,
181    /// page size, and page row limit Iceberg settings on the table being scanned in
182    /// order to get the best performance from using row selection.
183    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184        self.row_selection_enabled = row_selection_enabled;
185        self
186    }
187
188    /// Build the table scan.
189    pub fn build(self) -> Result<TableScan> {
190        let snapshot = match self.snapshot_id {
191            Some(snapshot_id) => self
192                .table
193                .metadata()
194                .snapshot_by_id(snapshot_id)
195                .ok_or_else(|| {
196                    Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("Snapshot with id {snapshot_id} not found"),
199                    )
200                })?
201                .clone(),
202            None => {
203                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204                    return Ok(TableScan {
205                        batch_size: self.batch_size,
206                        column_names: self.column_names,
207                        file_io: self.table.file_io().clone(),
208                        plan_context: None,
209                        concurrency_limit_data_files: self.concurrency_limit_data_files,
210                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212                        row_group_filtering_enabled: self.row_group_filtering_enabled,
213                        row_selection_enabled: self.row_selection_enabled,
214                        runtime: self.table.runtime().clone(),
215                    });
216                };
217                current_snapshot_id.clone()
218            }
219        };
220
221        let schema = snapshot.schema(self.table.metadata())?;
222
223        // Check that all column names exist in the schema (skip reserved columns).
224        if let Some(column_names) = self.column_names.as_ref() {
225            for column_name in column_names {
226                // Skip reserved columns that don't exist in the schema
227                if is_metadata_column_name(column_name) {
228                    continue;
229                }
230                if schema.field_by_name(column_name).is_none() {
231                    return Err(Error::new(
232                        ErrorKind::DataInvalid,
233                        format!("Column {column_name} not found in table. Schema: {schema}"),
234                    ));
235                }
236            }
237        }
238
239        let mut field_ids = vec![];
240        let column_names = self.column_names.clone().unwrap_or_else(|| {
241            schema
242                .as_struct()
243                .fields()
244                .iter()
245                .map(|f| f.name.clone())
246                .collect()
247        });
248
249        for column_name in column_names.iter() {
250            // Handle metadata columns (like "_file")
251            if is_metadata_column_name(column_name) {
252                field_ids.push(get_metadata_field_id(column_name)?);
253                continue;
254            }
255
256            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257                Error::new(
258                    ErrorKind::DataInvalid,
259                    format!("Column {column_name} not found in table. Schema: {schema}"),
260                )
261            })?;
262
263            schema
264                .as_struct()
265                .field_by_id(field_id)
266                .ok_or_else(|| {
267                    Error::new(
268                        ErrorKind::FeatureUnsupported,
269                        format!(
270                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271                    ),
272                )
273            })?;
274
275            field_ids.push(field_id);
276        }
277
278        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279            Some(predicates.bind(schema.clone(), true)?)
280        } else {
281            None
282        };
283
284        let plan_context = PlanContext {
285            snapshot,
286            table_metadata: self.table.metadata_ref(),
287            snapshot_schema: schema,
288            case_sensitive: self.case_sensitive,
289            predicate: self.filter.map(Arc::new),
290            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
291            object_cache: self.table.object_cache(),
292            field_ids: Arc::new(field_ids),
293            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
294            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
295            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
296        };
297
298        Ok(TableScan {
299            batch_size: self.batch_size,
300            column_names: self.column_names,
301            file_io: self.table.file_io().clone(),
302            plan_context: Some(plan_context),
303            concurrency_limit_data_files: self.concurrency_limit_data_files,
304            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
305            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
306            row_group_filtering_enabled: self.row_group_filtering_enabled,
307            row_selection_enabled: self.row_selection_enabled,
308            runtime: self.table.runtime().clone(),
309        })
310    }
311}
312
313/// Table scan.
314#[derive(Debug)]
315pub struct TableScan {
316    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
317    ///
318    /// If this is None, then the scan contains no rows.
319    plan_context: Option<PlanContext>,
320    batch_size: Option<usize>,
321    file_io: FileIO,
322    column_names: Option<Vec<String>>,
323    /// The maximum number of manifest files that will be
324    /// retrieved from [`FileIO`] concurrently
325    concurrency_limit_manifest_files: usize,
326
327    /// The maximum number of [`ManifestEntry`]s that will
328    /// be processed in parallel
329    concurrency_limit_manifest_entries: usize,
330
331    /// The maximum number of [`ManifestEntry`]s that will
332    /// be processed in parallel
333    concurrency_limit_data_files: usize,
334
335    row_group_filtering_enabled: bool,
336    row_selection_enabled: bool,
337
338    runtime: Runtime,
339}
340
341impl TableScan {
342    /// Returns a stream of [`FileScanTask`]s.
343    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
344        let Some(plan_context) = self.plan_context.as_ref() else {
345            return Ok(Box::pin(futures::stream::empty()));
346        };
347
348        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
349        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
350
351        // used to stream ManifestEntryContexts between stages of the file plan operation
352        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
353            channel(concurrency_limit_manifest_files);
354        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
355            channel(concurrency_limit_manifest_files);
356
357        // used to stream the results back to the caller
358        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
359
360        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
361
362        let manifest_list = plan_context.get_manifest_list().await?;
363
364        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
365        // whose partitions cannot match this
366        // scan's filter
367        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
368            manifest_list,
369            manifest_entry_data_ctx_tx,
370            delete_file_idx.clone(),
371            manifest_entry_delete_ctx_tx,
372        )?;
373
374        let mut channel_for_manifest_error = file_scan_task_tx.clone();
375        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
376        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
377
378        let rt = self.runtime.clone();
379
380        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
381        rt.io().spawn(async move {
382            let result = futures::stream::iter(manifest_file_contexts)
383                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
384                    ctx.fetch_manifest_and_stream_manifest_entries().await
385                })
386                .await;
387
388            if let Err(error) = result {
389                let _ = channel_for_manifest_error.send(Err(error)).await;
390            }
391        });
392
393        // Process the delete file [`ManifestEntry`] stream in parallel
394        {
395            let rt = rt.clone();
396            let rt_inner = rt.clone();
397            rt.cpu().spawn(async move {
398                let result = manifest_entry_delete_ctx_rx
399                    .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
400                    .try_for_each_concurrent(
401                        concurrency_limit_manifest_entries,
402                        |(manifest_entry_context, tx)| {
403                            let rt_inner = rt_inner.clone();
404                            async move {
405                                rt_inner
406                                    .cpu()
407                                    .spawn(async move {
408                                        Self::process_delete_manifest_entry(
409                                            manifest_entry_context,
410                                            tx,
411                                        )
412                                        .await
413                                    })
414                                    .await?
415                            }
416                        },
417                    )
418                    .await;
419
420                if let Err(error) = result {
421                    let _ = channel_for_delete_manifest_entry_error
422                        .send(Err(error))
423                        .await;
424                }
425            });
426        }
427
428        // Process the data file [`ManifestEntry`] stream in parallel
429        {
430            let rt_inner = rt.clone();
431            rt.cpu().spawn(async move {
432                let result = manifest_entry_data_ctx_rx
433                    .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
434                    .try_for_each_concurrent(
435                        concurrency_limit_manifest_entries,
436                        |(manifest_entry_context, tx)| {
437                            let rt_inner = rt_inner.clone();
438                            async move {
439                                rt_inner
440                                    .cpu()
441                                    .spawn(async move {
442                                        Self::process_data_manifest_entry(
443                                            manifest_entry_context,
444                                            tx,
445                                        )
446                                        .await
447                                    })
448                                    .await?
449                            }
450                        },
451                    )
452                    .await;
453
454                if let Err(error) = result {
455                    let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
456                }
457            });
458        }
459
460        Ok(file_scan_task_rx.boxed())
461    }
462
463    /// Returns an [`ArrowRecordBatchStream`].
464    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
465        let mut arrow_reader_builder =
466            ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
467                .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
468                .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
469                .with_row_selection_enabled(self.row_selection_enabled);
470
471        if let Some(batch_size) = self.batch_size {
472            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
473        }
474
475        arrow_reader_builder
476            .build()
477            .read(self.plan_files().await?)
478            .map(|result| result.stream())
479    }
480
481    /// Returns a reference to the column names of the table scan.
482    pub fn column_names(&self) -> Option<&[String]> {
483        self.column_names.as_deref()
484    }
485
486    /// Returns a reference to the snapshot of the table scan.
487    pub fn snapshot(&self) -> Option<&SnapshotRef> {
488        self.plan_context.as_ref().map(|x| &x.snapshot)
489    }
490
491    async fn process_data_manifest_entry(
492        manifest_entry_context: ManifestEntryContext,
493        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
494    ) -> Result<()> {
495        // skip processing this manifest entry if it has been marked as deleted
496        if !manifest_entry_context.manifest_entry.is_alive() {
497            return Ok(());
498        }
499
500        // abort the plan if we encounter a manifest entry for a delete file
501        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
502            return Err(Error::new(
503                ErrorKind::FeatureUnsupported,
504                "Encountered an entry for a delete file in a data file manifest",
505            ));
506        }
507
508        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
509            let BoundPredicates {
510                snapshot_bound_predicate,
511                partition_bound_predicate,
512            } = bound_predicates.as_ref();
513
514            let expression_evaluator_cache =
515                manifest_entry_context.expression_evaluator_cache.as_ref();
516
517            let expression_evaluator = expression_evaluator_cache.get(
518                manifest_entry_context.partition_spec_id,
519                partition_bound_predicate,
520            )?;
521
522            // skip any data file whose partition data indicates that it can't contain
523            // any data that matches this scan's filter
524            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
525                return Ok(());
526            }
527
528            // skip any data file whose metrics don't match this scan's filter
529            if !InclusiveMetricsEvaluator::eval(
530                snapshot_bound_predicate,
531                manifest_entry_context.manifest_entry.data_file(),
532                false,
533            )? {
534                return Ok(());
535            }
536        }
537
538        // congratulations! the manifest entry has made its way through the
539        // entire plan without getting filtered out. Create a corresponding
540        // FileScanTask and push it to the result stream
541        file_scan_task_tx
542            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
543            .await?;
544
545        Ok(())
546    }
547
548    async fn process_delete_manifest_entry(
549        manifest_entry_context: ManifestEntryContext,
550        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
551    ) -> Result<()> {
552        // skip processing this manifest entry if it has been marked as deleted
553        if !manifest_entry_context.manifest_entry.is_alive() {
554            return Ok(());
555        }
556
557        // abort the plan if we encounter a manifest entry that is not for a delete file
558        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
559            return Err(Error::new(
560                ErrorKind::FeatureUnsupported,
561                "Encountered an entry for a data file in a delete manifest",
562            ));
563        }
564
565        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
566            let expression_evaluator_cache =
567                manifest_entry_context.expression_evaluator_cache.as_ref();
568
569            let expression_evaluator = expression_evaluator_cache.get(
570                manifest_entry_context.partition_spec_id,
571                &bound_predicates.partition_bound_predicate,
572            )?;
573
574            // skip any data file whose partition data indicates that it can't contain
575            // any data that matches this scan's filter
576            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
577                return Ok(());
578            }
579        }
580
581        delete_file_ctx_tx
582            .send(DeleteFileContext {
583                manifest_entry: manifest_entry_context.manifest_entry.clone(),
584                partition_spec_id: manifest_entry_context.partition_spec_id,
585            })
586            .await?;
587
588        Ok(())
589    }
590}
591
592pub(crate) struct BoundPredicates {
593    partition_bound_predicate: BoundPredicate,
594    snapshot_bound_predicate: BoundPredicate,
595}
596
597#[cfg(test)]
598pub mod tests {
599    //! shared tests for the table scan API
600    #![allow(missing_docs)]
601
602    use std::collections::HashMap;
603    use std::fs;
604    use std::fs::File;
605    use std::sync::Arc;
606
607    use arrow_array::cast::AsArray;
608    use arrow_array::{
609        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
610        StringArray,
611    };
612    use futures::{TryStreamExt, stream};
613    use minijinja::value::Value;
614    use minijinja::{AutoEscape, Environment, context};
615    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
616    use parquet::basic::Compression;
617    use parquet::file::properties::WriterProperties;
618    use tempfile::TempDir;
619    use uuid::Uuid;
620
621    use crate::TableIdent;
622    use crate::arrow::ArrowReaderBuilder;
623    use crate::expr::{BoundPredicate, Reference};
624    use crate::io::{FileIO, OutputFile};
625    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
626    use crate::scan::FileScanTask;
627    use crate::spec::{
628        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
629        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
630        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
631    };
632    use crate::table::Table;
633    use crate::test_utils::test_runtime;
634
635    fn render_template(template: &str, ctx: Value) -> String {
636        let mut env = Environment::new();
637        env.set_auto_escape_callback(|_| AutoEscape::None);
638        env.render_str(template, ctx).unwrap()
639    }
640
641    pub struct TableTestFixture {
642        pub table_location: String,
643        pub table: Table,
644    }
645
646    impl TableTestFixture {
647        #[allow(clippy::new_without_default)]
648        pub fn new() -> Self {
649            let tmp_dir = TempDir::new().unwrap();
650            let table_location = tmp_dir.path().join("table1");
651            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
652            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
653            let table_metadata1_location = table_location.join("metadata/v1.json");
654
655            let file_io = FileIO::new_with_fs();
656
657            let table_metadata = {
658                let template_json_str = fs::read_to_string(format!(
659                    "{}/testdata/example_table_metadata_v2.json",
660                    env!("CARGO_MANIFEST_DIR")
661                ))
662                .unwrap();
663                let metadata_json = render_template(&template_json_str, context! {
664                    table_location => &table_location,
665                    manifest_list_1_location => &manifest_list1_location,
666                    manifest_list_2_location => &manifest_list2_location,
667                    table_metadata_1_location => &table_metadata1_location,
668                });
669                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670            };
671
672            let table = Table::builder()
673                .metadata(table_metadata)
674                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675                .file_io(file_io.clone())
676                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677                .runtime(test_runtime())
678                .build()
679                .unwrap();
680
681            Self {
682                table_location: table_location.to_str().unwrap().to_string(),
683                table,
684            }
685        }
686
687        #[allow(clippy::new_without_default)]
688        pub fn new_empty() -> Self {
689            let tmp_dir = TempDir::new().unwrap();
690            let table_location = tmp_dir.path().join("table1");
691            let table_metadata1_location = table_location.join("metadata/v1.json");
692
693            let file_io = FileIO::new_with_fs();
694
695            let table_metadata = {
696                let template_json_str = fs::read_to_string(format!(
697                    "{}/testdata/example_empty_table_metadata_v2.json",
698                    env!("CARGO_MANIFEST_DIR")
699                ))
700                .unwrap();
701                let metadata_json = render_template(&template_json_str, context! {
702                    table_location => &table_location,
703                    table_metadata_1_location => &table_metadata1_location,
704                });
705                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706            };
707
708            let table = Table::builder()
709                .metadata(table_metadata)
710                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
711                .file_io(file_io.clone())
712                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
713                .runtime(test_runtime())
714                .build()
715                .unwrap();
716
717            Self {
718                table_location: table_location.to_str().unwrap().to_string(),
719                table,
720            }
721        }
722
723        /// Creates a fixture with 5 snapshots chained as:
724        ///   S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
725        /// Useful for testing snapshot history traversal.
726        pub fn new_with_deep_history() -> Self {
727            let tmp_dir = TempDir::new().unwrap();
728            let table_location = tmp_dir.path().join("table1");
729            let table_metadata1_location = table_location.join("metadata/v1.json");
730
731            let file_io = FileIO::new_with_fs();
732
733            let table_metadata = {
734                let json_str = fs::read_to_string(format!(
735                    "{}/testdata/example_table_metadata_v2_deep_history.json",
736                    env!("CARGO_MANIFEST_DIR")
737                ))
738                .unwrap();
739                serde_json::from_str::<TableMetadata>(&json_str).unwrap()
740            };
741
742            let table = Table::builder()
743                .metadata(table_metadata)
744                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
745                .file_io(file_io.clone())
746                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
747                .runtime(test_runtime())
748                .build()
749                .unwrap();
750
751            Self {
752                table_location: table_location.to_str().unwrap().to_string(),
753                table,
754            }
755        }
756
757        pub fn new_unpartitioned() -> Self {
758            let tmp_dir = TempDir::new().unwrap();
759            let table_location = tmp_dir.path().join("table1");
760            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
761            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
762            let table_metadata1_location = table_location.join("metadata/v1.json");
763
764            let file_io = FileIO::new_with_fs();
765
766            let mut table_metadata = {
767                let template_json_str = fs::read_to_string(format!(
768                    "{}/testdata/example_table_metadata_v2.json",
769                    env!("CARGO_MANIFEST_DIR")
770                ))
771                .unwrap();
772                let metadata_json = render_template(&template_json_str, context! {
773                    table_location => &table_location,
774                    manifest_list_1_location => &manifest_list1_location,
775                    manifest_list_2_location => &manifest_list2_location,
776                    table_metadata_1_location => &table_metadata1_location,
777                });
778                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
779            };
780
781            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
782            table_metadata.partition_specs.clear();
783            table_metadata.default_partition_type = StructType::new(vec![]);
784            table_metadata
785                .partition_specs
786                .insert(0, table_metadata.default_spec.clone());
787
788            let table = Table::builder()
789                .metadata(table_metadata)
790                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
791                .file_io(file_io.clone())
792                .metadata_location(table_metadata1_location.to_str().unwrap())
793                .runtime(test_runtime())
794                .build()
795                .unwrap();
796
797            Self {
798                table_location: table_location.to_str().unwrap().to_string(),
799                table,
800            }
801        }
802
803        fn next_manifest_file(&self) -> OutputFile {
804            self.table
805                .file_io()
806                .new_output(format!(
807                    "{}/metadata/manifest_{}.avro",
808                    self.table_location,
809                    Uuid::new_v4()
810                ))
811                .unwrap()
812        }
813
814        pub async fn setup_manifest_files(&mut self) {
815            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
816            let parent_snapshot = current_snapshot
817                .parent_snapshot(self.table.metadata())
818                .unwrap();
819            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
820            let current_partition_spec = self.table.metadata().default_partition_spec();
821
822            // Write the data files first, then use the file size in the manifest entries
823            let parquet_file_size = self.write_parquet_data_files();
824
825            let mut writer = ManifestWriterBuilder::new(
826                self.next_manifest_file(),
827                Some(current_snapshot.snapshot_id()),
828                None,
829                current_schema.clone(),
830                current_partition_spec.as_ref().clone(),
831            )
832            .build_v2_data();
833            writer
834                .add_entry(
835                    ManifestEntry::builder()
836                        .status(ManifestStatus::Added)
837                        .data_file(
838                            DataFileBuilder::default()
839                                .partition_spec_id(0)
840                                .content(DataContentType::Data)
841                                .file_path(format!("{}/1.parquet", &self.table_location))
842                                .file_format(DataFileFormat::Parquet)
843                                .file_size_in_bytes(parquet_file_size)
844                                .record_count(1)
845                                .partition(Struct::from_iter([Some(Literal::long(100))]))
846                                .key_metadata(None)
847                                .build()
848                                .unwrap(),
849                        )
850                        .build(),
851                )
852                .unwrap();
853            writer
854                .add_delete_entry(
855                    ManifestEntry::builder()
856                        .status(ManifestStatus::Deleted)
857                        .snapshot_id(parent_snapshot.snapshot_id())
858                        .sequence_number(parent_snapshot.sequence_number())
859                        .file_sequence_number(parent_snapshot.sequence_number())
860                        .data_file(
861                            DataFileBuilder::default()
862                                .partition_spec_id(0)
863                                .content(DataContentType::Data)
864                                .file_path(format!("{}/2.parquet", &self.table_location))
865                                .file_format(DataFileFormat::Parquet)
866                                .file_size_in_bytes(parquet_file_size)
867                                .record_count(1)
868                                .partition(Struct::from_iter([Some(Literal::long(200))]))
869                                .build()
870                                .unwrap(),
871                        )
872                        .build(),
873                )
874                .unwrap();
875            writer
876                .add_existing_entry(
877                    ManifestEntry::builder()
878                        .status(ManifestStatus::Existing)
879                        .snapshot_id(parent_snapshot.snapshot_id())
880                        .sequence_number(parent_snapshot.sequence_number())
881                        .file_sequence_number(parent_snapshot.sequence_number())
882                        .data_file(
883                            DataFileBuilder::default()
884                                .partition_spec_id(0)
885                                .content(DataContentType::Data)
886                                .file_path(format!("{}/3.parquet", &self.table_location))
887                                .file_format(DataFileFormat::Parquet)
888                                .file_size_in_bytes(parquet_file_size)
889                                .record_count(1)
890                                .partition(Struct::from_iter([Some(Literal::long(300))]))
891                                .build()
892                                .unwrap(),
893                        )
894                        .build(),
895                )
896                .unwrap();
897            let data_file_manifest = writer.write_manifest_file().await.unwrap();
898
899            // Write to manifest list
900            let mut manifest_list_write = ManifestListWriter::v2(
901                self.table
902                    .file_io()
903                    .new_output(current_snapshot.manifest_list())
904                    .unwrap(),
905                current_snapshot.snapshot_id(),
906                current_snapshot.parent_snapshot_id(),
907                current_snapshot.sequence_number(),
908            );
909            manifest_list_write
910                .add_manifests(vec![data_file_manifest].into_iter())
911                .unwrap();
912            manifest_list_write.close().await.unwrap();
913        }
914
915        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
916        /// and returns the file size in bytes.
917        fn write_parquet_data_files(&self) -> u64 {
918            std::fs::create_dir_all(&self.table_location).unwrap();
919
920            let schema = {
921                let fields = vec![
922                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
923                        .with_metadata(HashMap::from([(
924                            PARQUET_FIELD_ID_META_KEY.to_string(),
925                            "1".to_string(),
926                        )])),
927                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
928                        .with_metadata(HashMap::from([(
929                            PARQUET_FIELD_ID_META_KEY.to_string(),
930                            "2".to_string(),
931                        )])),
932                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
933                        .with_metadata(HashMap::from([(
934                            PARQUET_FIELD_ID_META_KEY.to_string(),
935                            "3".to_string(),
936                        )])),
937                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
938                        .with_metadata(HashMap::from([(
939                            PARQUET_FIELD_ID_META_KEY.to_string(),
940                            "4".to_string(),
941                        )])),
942                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
943                        .with_metadata(HashMap::from([(
944                            PARQUET_FIELD_ID_META_KEY.to_string(),
945                            "5".to_string(),
946                        )])),
947                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
948                        .with_metadata(HashMap::from([(
949                            PARQUET_FIELD_ID_META_KEY.to_string(),
950                            "6".to_string(),
951                        )])),
952                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
953                        .with_metadata(HashMap::from([(
954                            PARQUET_FIELD_ID_META_KEY.to_string(),
955                            "7".to_string(),
956                        )])),
957                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
958                        .with_metadata(HashMap::from([(
959                            PARQUET_FIELD_ID_META_KEY.to_string(),
960                            "8".to_string(),
961                        )])),
962                ];
963                Arc::new(arrow_schema::Schema::new(fields))
964            };
965            // x: [1, 1, 1, 1, ...]
966            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
967
968            let mut values = vec![2; 512];
969            values.append(vec![3; 200].as_mut());
970            values.append(vec![4; 300].as_mut());
971            values.append(vec![5; 12].as_mut());
972
973            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
974            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
975
976            let mut values = vec![3; 512];
977            values.append(vec![4; 512].as_mut());
978
979            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
980            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
981
982            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
983            let mut values = vec!["Apache"; 512];
984            values.append(vec!["Iceberg"; 512].as_mut());
985            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
986
987            // dbl:
988            let mut values = vec![100.0f64; 512];
989            values.append(vec![150.0f64; 12].as_mut());
990            values.append(vec![200.0f64; 500].as_mut());
991            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
992
993            // i32:
994            let mut values = vec![100i32; 512];
995            values.append(vec![150i32; 12].as_mut());
996            values.append(vec![200i32; 500].as_mut());
997            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
998
999            // i64:
1000            let mut values = vec![100i64; 512];
1001            values.append(vec![150i64; 12].as_mut());
1002            values.append(vec![200i64; 500].as_mut());
1003            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1004
1005            // bool:
1006            let mut values = vec![false; 512];
1007            values.append(vec![true; 512].as_mut());
1008            let values: BooleanArray = values.into();
1009            let col8 = Arc::new(values) as ArrayRef;
1010
1011            let to_write = RecordBatch::try_new(schema.clone(), vec![
1012                col1, col2, col3, col4, col5, col6, col7, col8,
1013            ])
1014            .unwrap();
1015
1016            // Write the Parquet files
1017            let props = WriterProperties::builder()
1018                .set_compression(Compression::SNAPPY)
1019                .build();
1020
1021            for n in 1..=3 {
1022                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1023                let mut writer =
1024                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1025
1026                writer.write(&to_write).expect("Writing batch");
1027
1028                // writer must be closed to write footer
1029                writer.close().unwrap();
1030            }
1031
1032            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1033                .unwrap()
1034                .len()
1035        }
1036
1037        pub async fn setup_unpartitioned_manifest_files(&mut self) {
1038            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1039            let parent_snapshot = current_snapshot
1040                .parent_snapshot(self.table.metadata())
1041                .unwrap();
1042            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1043            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1044
1045            // Write the data files first, then use the file size in the manifest entries
1046            let parquet_file_size = self.write_parquet_data_files();
1047
1048            // Write data files using an empty partition for unpartitioned tables.
1049            let mut writer = ManifestWriterBuilder::new(
1050                self.next_manifest_file(),
1051                Some(current_snapshot.snapshot_id()),
1052                None,
1053                current_schema.clone(),
1054                current_partition_spec.as_ref().clone(),
1055            )
1056            .build_v2_data();
1057
1058            // Create an empty partition value.
1059            let empty_partition = Struct::empty();
1060
1061            writer
1062                .add_entry(
1063                    ManifestEntry::builder()
1064                        .status(ManifestStatus::Added)
1065                        .data_file(
1066                            DataFileBuilder::default()
1067                                .partition_spec_id(0)
1068                                .content(DataContentType::Data)
1069                                .file_path(format!("{}/1.parquet", &self.table_location))
1070                                .file_format(DataFileFormat::Parquet)
1071                                .file_size_in_bytes(parquet_file_size)
1072                                .record_count(1)
1073                                .partition(empty_partition.clone())
1074                                .key_metadata(None)
1075                                .build()
1076                                .unwrap(),
1077                        )
1078                        .build(),
1079                )
1080                .unwrap();
1081
1082            writer
1083                .add_delete_entry(
1084                    ManifestEntry::builder()
1085                        .status(ManifestStatus::Deleted)
1086                        .snapshot_id(parent_snapshot.snapshot_id())
1087                        .sequence_number(parent_snapshot.sequence_number())
1088                        .file_sequence_number(parent_snapshot.sequence_number())
1089                        .data_file(
1090                            DataFileBuilder::default()
1091                                .partition_spec_id(0)
1092                                .content(DataContentType::Data)
1093                                .file_path(format!("{}/2.parquet", &self.table_location))
1094                                .file_format(DataFileFormat::Parquet)
1095                                .file_size_in_bytes(parquet_file_size)
1096                                .record_count(1)
1097                                .partition(empty_partition.clone())
1098                                .build()
1099                                .unwrap(),
1100                        )
1101                        .build(),
1102                )
1103                .unwrap();
1104
1105            writer
1106                .add_existing_entry(
1107                    ManifestEntry::builder()
1108                        .status(ManifestStatus::Existing)
1109                        .snapshot_id(parent_snapshot.snapshot_id())
1110                        .sequence_number(parent_snapshot.sequence_number())
1111                        .file_sequence_number(parent_snapshot.sequence_number())
1112                        .data_file(
1113                            DataFileBuilder::default()
1114                                .partition_spec_id(0)
1115                                .content(DataContentType::Data)
1116                                .file_path(format!("{}/3.parquet", &self.table_location))
1117                                .file_format(DataFileFormat::Parquet)
1118                                .file_size_in_bytes(parquet_file_size)
1119                                .record_count(1)
1120                                .partition(empty_partition.clone())
1121                                .build()
1122                                .unwrap(),
1123                        )
1124                        .build(),
1125                )
1126                .unwrap();
1127
1128            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1129
1130            // Write to manifest list
1131            let mut manifest_list_write = ManifestListWriter::v2(
1132                self.table
1133                    .file_io()
1134                    .new_output(current_snapshot.manifest_list())
1135                    .unwrap(),
1136                current_snapshot.snapshot_id(),
1137                current_snapshot.parent_snapshot_id(),
1138                current_snapshot.sequence_number(),
1139            );
1140            manifest_list_write
1141                .add_manifests(vec![data_file_manifest].into_iter())
1142                .unwrap();
1143            manifest_list_write.close().await.unwrap();
1144        }
1145
1146        pub async fn setup_deadlock_manifests(&mut self) {
1147            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1148            let _parent_snapshot = current_snapshot
1149                .parent_snapshot(self.table.metadata())
1150                .unwrap();
1151            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1152            let current_partition_spec = self.table.metadata().default_partition_spec();
1153
1154            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1155            let mut writer = ManifestWriterBuilder::new(
1156                self.next_manifest_file(),
1157                Some(current_snapshot.snapshot_id()),
1158                None,
1159                current_schema.clone(),
1160                current_partition_spec.as_ref().clone(),
1161            )
1162            .build_v2_data();
1163
1164            // Add 10 data entries
1165            for i in 0..10 {
1166                writer
1167                    .add_entry(
1168                        ManifestEntry::builder()
1169                            .status(ManifestStatus::Added)
1170                            .data_file(
1171                                DataFileBuilder::default()
1172                                    .partition_spec_id(0)
1173                                    .content(DataContentType::Data)
1174                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1175                                    .file_format(DataFileFormat::Parquet)
1176                                    .file_size_in_bytes(100)
1177                                    .record_count(1)
1178                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1179                                    .key_metadata(None)
1180                                    .build()
1181                                    .unwrap(),
1182                            )
1183                            .build(),
1184                    )
1185                    .unwrap();
1186            }
1187            let data_manifest = writer.write_manifest_file().await.unwrap();
1188
1189            // 2. Write DELETE manifest
1190            let mut writer = ManifestWriterBuilder::new(
1191                self.next_manifest_file(),
1192                Some(current_snapshot.snapshot_id()),
1193                None,
1194                current_schema.clone(),
1195                current_partition_spec.as_ref().clone(),
1196            )
1197            .build_v2_deletes();
1198
1199            writer
1200                .add_entry(
1201                    ManifestEntry::builder()
1202                        .status(ManifestStatus::Added)
1203                        .data_file(
1204                            DataFileBuilder::default()
1205                                .partition_spec_id(0)
1206                                .content(DataContentType::PositionDeletes)
1207                                .file_path(format!("{}/del.parquet", &self.table_location))
1208                                .file_format(DataFileFormat::Parquet)
1209                                .file_size_in_bytes(100)
1210                                .record_count(1)
1211                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1212                                .build()
1213                                .unwrap(),
1214                        )
1215                        .build(),
1216                )
1217                .unwrap();
1218            let delete_manifest = writer.write_manifest_file().await.unwrap();
1219
1220            // Write to manifest list - DATA FIRST then DELETE
1221            // This order is crucial for reproduction
1222            let mut manifest_list_write = ManifestListWriter::v2(
1223                self.table
1224                    .file_io()
1225                    .new_output(current_snapshot.manifest_list())
1226                    .unwrap(),
1227                current_snapshot.snapshot_id(),
1228                current_snapshot.parent_snapshot_id(),
1229                current_snapshot.sequence_number(),
1230            );
1231            manifest_list_write
1232                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1233                .unwrap();
1234            manifest_list_write.close().await.unwrap();
1235        }
1236    }
1237
1238    #[tokio::test]
1239    async fn test_table_scan_columns() {
1240        let table = TableTestFixture::new().table;
1241
1242        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1243        assert_eq!(
1244            Some(vec!["x".to_string(), "y".to_string()]),
1245            table_scan.column_names
1246        );
1247
1248        let table_scan = table
1249            .scan()
1250            .select(["x", "y"])
1251            .select(["z"])
1252            .build()
1253            .unwrap();
1254        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_select_all() {
1259        let table = TableTestFixture::new().table;
1260
1261        let table_scan = table.scan().select_all().build().unwrap();
1262        assert!(table_scan.column_names.is_none());
1263    }
1264
1265    #[test]
1266    fn test_select_no_exist_column() {
1267        let table = TableTestFixture::new().table;
1268
1269        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1270        assert!(table_scan.is_err());
1271    }
1272
1273    #[tokio::test]
1274    async fn test_table_scan_default_snapshot_id() {
1275        let table = TableTestFixture::new().table;
1276
1277        let table_scan = table.scan().build().unwrap();
1278        assert_eq!(
1279            table.metadata().current_snapshot().unwrap().snapshot_id(),
1280            table_scan.snapshot().unwrap().snapshot_id()
1281        );
1282    }
1283
1284    #[test]
1285    fn test_table_scan_non_exist_snapshot_id() {
1286        let table = TableTestFixture::new().table;
1287
1288        let table_scan = table.scan().snapshot_id(1024).build();
1289        assert!(table_scan.is_err());
1290    }
1291
1292    #[tokio::test]
1293    async fn test_table_scan_with_snapshot_id() {
1294        let table = TableTestFixture::new().table;
1295
1296        let table_scan = table
1297            .scan()
1298            .snapshot_id(3051729675574597004)
1299            .with_row_selection_enabled(true)
1300            .build()
1301            .unwrap();
1302        assert_eq!(
1303            table_scan.snapshot().unwrap().snapshot_id(),
1304            3051729675574597004
1305        );
1306    }
1307
1308    #[tokio::test]
1309    async fn test_plan_files_on_table_without_any_snapshots() {
1310        let table = TableTestFixture::new_empty().table;
1311        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1312        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1313        assert!(batches.is_empty());
1314    }
1315
1316    #[tokio::test]
1317    async fn test_plan_files_no_deletions() {
1318        let mut fixture = TableTestFixture::new();
1319        fixture.setup_manifest_files().await;
1320
1321        // Create table scan for current snapshot and plan files
1322        let table_scan = fixture
1323            .table
1324            .scan()
1325            .with_row_selection_enabled(true)
1326            .build()
1327            .unwrap();
1328
1329        let mut tasks = table_scan
1330            .plan_files()
1331            .await
1332            .unwrap()
1333            .try_fold(vec![], |mut acc, task| async move {
1334                acc.push(task);
1335                Ok(acc)
1336            })
1337            .await
1338            .unwrap();
1339
1340        assert_eq!(tasks.len(), 2);
1341
1342        tasks.sort_by_key(|t| t.data_file_path.to_string());
1343
1344        // Check first task is added data file
1345        assert_eq!(
1346            tasks[0].data_file_path,
1347            format!("{}/1.parquet", &fixture.table_location)
1348        );
1349
1350        // Check second task is existing data file
1351        assert_eq!(
1352            tasks[1].data_file_path,
1353            format!("{}/3.parquet", &fixture.table_location)
1354        );
1355    }
1356
1357    #[tokio::test]
1358    async fn test_open_parquet_no_deletions() {
1359        let mut fixture = TableTestFixture::new();
1360        fixture.setup_manifest_files().await;
1361
1362        // Create table scan for current snapshot and plan files
1363        let table_scan = fixture
1364            .table
1365            .scan()
1366            .with_row_selection_enabled(true)
1367            .build()
1368            .unwrap();
1369
1370        let batch_stream = table_scan.to_arrow().await.unwrap();
1371
1372        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1373
1374        let col = batches[0].column_by_name("x").unwrap();
1375
1376        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1377        assert_eq!(int64_arr.value(0), 1);
1378    }
1379
1380    #[tokio::test]
1381    async fn test_open_parquet_no_deletions_by_separate_reader() {
1382        let mut fixture = TableTestFixture::new();
1383        fixture.setup_manifest_files().await;
1384
1385        // Create table scan for current snapshot and plan files
1386        let table_scan = fixture
1387            .table
1388            .scan()
1389            .with_row_selection_enabled(true)
1390            .build()
1391            .unwrap();
1392
1393        let mut plan_task: Vec<_> = table_scan
1394            .plan_files()
1395            .await
1396            .unwrap()
1397            .try_collect()
1398            .await
1399            .unwrap();
1400        assert_eq!(plan_task.len(), 2);
1401
1402        let reader = ArrowReaderBuilder::new(
1403            fixture.table.file_io().clone(),
1404            fixture.table.runtime().clone(),
1405        )
1406        .build();
1407        let batch_stream = reader
1408            .clone()
1409            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1410            .unwrap()
1411            .stream();
1412        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1413
1414        let reader = ArrowReaderBuilder::new(
1415            fixture.table.file_io().clone(),
1416            fixture.table.runtime().clone(),
1417        )
1418        .build();
1419        let batch_stream = reader
1420            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1421            .unwrap()
1422            .stream();
1423        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1424
1425        assert_eq!(batch_1, batch_2);
1426    }
1427
1428    #[tokio::test]
1429    async fn test_open_parquet_with_projection() {
1430        let mut fixture = TableTestFixture::new();
1431        fixture.setup_manifest_files().await;
1432
1433        // Create table scan for current snapshot and plan files
1434        let table_scan = fixture
1435            .table
1436            .scan()
1437            .select(["x", "z"])
1438            .with_row_selection_enabled(true)
1439            .build()
1440            .unwrap();
1441
1442        let batch_stream = table_scan.to_arrow().await.unwrap();
1443
1444        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1445
1446        assert_eq!(batches[0].num_columns(), 2);
1447
1448        let col1 = batches[0].column_by_name("x").unwrap();
1449        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1450        assert_eq!(int64_arr.value(0), 1);
1451
1452        let col2 = batches[0].column_by_name("z").unwrap();
1453        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1454        assert_eq!(int64_arr.value(0), 3);
1455
1456        // test empty scan
1457        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1458        let batch_stream = table_scan.to_arrow().await.unwrap();
1459        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461        assert_eq!(batches[0].num_columns(), 0);
1462        assert_eq!(batches[0].num_rows(), 1024);
1463    }
1464
1465    #[tokio::test]
1466    async fn test_filter_on_arrow_lt() {
1467        let mut fixture = TableTestFixture::new();
1468        fixture.setup_manifest_files().await;
1469
1470        // Filter: y < 3
1471        let mut builder = fixture.table.scan();
1472        let predicate = Reference::new("y").less_than(Datum::long(3));
1473        builder = builder
1474            .with_filter(predicate)
1475            .with_row_selection_enabled(true);
1476        let table_scan = builder.build().unwrap();
1477
1478        let batch_stream = table_scan.to_arrow().await.unwrap();
1479
1480        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1481
1482        assert_eq!(batches[0].num_rows(), 512);
1483
1484        let col = batches[0].column_by_name("x").unwrap();
1485        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1486        assert_eq!(int64_arr.value(0), 1);
1487
1488        let col = batches[0].column_by_name("y").unwrap();
1489        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1490        assert_eq!(int64_arr.value(0), 2);
1491    }
1492
1493    #[tokio::test]
1494    async fn test_filter_on_arrow_gt_eq() {
1495        let mut fixture = TableTestFixture::new();
1496        fixture.setup_manifest_files().await;
1497
1498        // Filter: y >= 5
1499        let mut builder = fixture.table.scan();
1500        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1501        builder = builder
1502            .with_filter(predicate)
1503            .with_row_selection_enabled(true);
1504        let table_scan = builder.build().unwrap();
1505
1506        let batch_stream = table_scan.to_arrow().await.unwrap();
1507
1508        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1509
1510        assert_eq!(batches[0].num_rows(), 12);
1511
1512        let col = batches[0].column_by_name("x").unwrap();
1513        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1514        assert_eq!(int64_arr.value(0), 1);
1515
1516        let col = batches[0].column_by_name("y").unwrap();
1517        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1518        assert_eq!(int64_arr.value(0), 5);
1519    }
1520
1521    #[tokio::test]
1522    async fn test_filter_double_eq() {
1523        let mut fixture = TableTestFixture::new();
1524        fixture.setup_manifest_files().await;
1525
1526        // Filter: dbl == 150.0
1527        let mut builder = fixture.table.scan();
1528        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1529        builder = builder
1530            .with_filter(predicate)
1531            .with_row_selection_enabled(true);
1532        let table_scan = builder.build().unwrap();
1533
1534        let batch_stream = table_scan.to_arrow().await.unwrap();
1535
1536        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1537
1538        assert_eq!(batches.len(), 2);
1539        assert_eq!(batches[0].num_rows(), 12);
1540
1541        let col = batches[0].column_by_name("dbl").unwrap();
1542        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1543        assert_eq!(f64_arr.value(1), 150.0f64);
1544    }
1545
1546    #[tokio::test]
1547    async fn test_filter_int_eq() {
1548        let mut fixture = TableTestFixture::new();
1549        fixture.setup_manifest_files().await;
1550
1551        // Filter: i32 == 150
1552        let mut builder = fixture.table.scan();
1553        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1554        builder = builder
1555            .with_filter(predicate)
1556            .with_row_selection_enabled(true);
1557        let table_scan = builder.build().unwrap();
1558
1559        let batch_stream = table_scan.to_arrow().await.unwrap();
1560
1561        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1562
1563        assert_eq!(batches.len(), 2);
1564        assert_eq!(batches[0].num_rows(), 12);
1565
1566        let col = batches[0].column_by_name("i32").unwrap();
1567        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1568        assert_eq!(i32_arr.value(1), 150i32);
1569    }
1570
1571    #[tokio::test]
1572    async fn test_filter_long_eq() {
1573        let mut fixture = TableTestFixture::new();
1574        fixture.setup_manifest_files().await;
1575
1576        // Filter: i64 == 150
1577        let mut builder = fixture.table.scan();
1578        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1579        builder = builder
1580            .with_filter(predicate)
1581            .with_row_selection_enabled(true);
1582        let table_scan = builder.build().unwrap();
1583
1584        let batch_stream = table_scan.to_arrow().await.unwrap();
1585
1586        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1587
1588        assert_eq!(batches.len(), 2);
1589        assert_eq!(batches[0].num_rows(), 12);
1590
1591        let col = batches[0].column_by_name("i64").unwrap();
1592        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1593        assert_eq!(i64_arr.value(1), 150i64);
1594    }
1595
1596    #[tokio::test]
1597    async fn test_filter_bool_eq() {
1598        let mut fixture = TableTestFixture::new();
1599        fixture.setup_manifest_files().await;
1600
1601        // Filter: bool == true
1602        let mut builder = fixture.table.scan();
1603        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1604        builder = builder
1605            .with_filter(predicate)
1606            .with_row_selection_enabled(true);
1607        let table_scan = builder.build().unwrap();
1608
1609        let batch_stream = table_scan.to_arrow().await.unwrap();
1610
1611        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1612
1613        assert_eq!(batches.len(), 2);
1614        assert_eq!(batches[0].num_rows(), 512);
1615
1616        let col = batches[0].column_by_name("bool").unwrap();
1617        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1618        assert!(bool_arr.value(1));
1619    }
1620
1621    #[tokio::test]
1622    async fn test_filter_on_arrow_is_null() {
1623        let mut fixture = TableTestFixture::new();
1624        fixture.setup_manifest_files().await;
1625
1626        // Filter: y is null
1627        let mut builder = fixture.table.scan();
1628        let predicate = Reference::new("y").is_null();
1629        builder = builder
1630            .with_filter(predicate)
1631            .with_row_selection_enabled(true);
1632        let table_scan = builder.build().unwrap();
1633
1634        let batch_stream = table_scan.to_arrow().await.unwrap();
1635
1636        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1637        assert_eq!(batches.len(), 0);
1638    }
1639
1640    #[tokio::test]
1641    async fn test_filter_on_arrow_is_not_null() {
1642        let mut fixture = TableTestFixture::new();
1643        fixture.setup_manifest_files().await;
1644
1645        // Filter: y is not null
1646        let mut builder = fixture.table.scan();
1647        let predicate = Reference::new("y").is_not_null();
1648        builder = builder
1649            .with_filter(predicate)
1650            .with_row_selection_enabled(true);
1651        let table_scan = builder.build().unwrap();
1652
1653        let batch_stream = table_scan.to_arrow().await.unwrap();
1654
1655        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1656        assert_eq!(batches[0].num_rows(), 1024);
1657    }
1658
1659    #[tokio::test]
1660    async fn test_filter_on_arrow_lt_and_gt() {
1661        let mut fixture = TableTestFixture::new();
1662        fixture.setup_manifest_files().await;
1663
1664        // Filter: y < 5 AND z >= 4
1665        let mut builder = fixture.table.scan();
1666        let predicate = Reference::new("y")
1667            .less_than(Datum::long(5))
1668            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1669        builder = builder
1670            .with_filter(predicate)
1671            .with_row_selection_enabled(true);
1672        let table_scan = builder.build().unwrap();
1673
1674        let batch_stream = table_scan.to_arrow().await.unwrap();
1675
1676        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1677        assert_eq!(batches[0].num_rows(), 500);
1678
1679        let col = batches[0].column_by_name("x").unwrap();
1680        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1681        assert_eq!(col, &expected_x);
1682
1683        let col = batches[0].column_by_name("y").unwrap();
1684        let mut values = vec![];
1685        values.append(vec![3; 200].as_mut());
1686        values.append(vec![4; 300].as_mut());
1687        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1688        assert_eq!(col, &expected_y);
1689
1690        let col = batches[0].column_by_name("z").unwrap();
1691        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1692        assert_eq!(col, &expected_z);
1693    }
1694
1695    #[tokio::test]
1696    async fn test_filter_on_arrow_lt_or_gt() {
1697        let mut fixture = TableTestFixture::new();
1698        fixture.setup_manifest_files().await;
1699
1700        // Filter: y < 5 AND z >= 4
1701        let mut builder = fixture.table.scan();
1702        let predicate = Reference::new("y")
1703            .less_than(Datum::long(5))
1704            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1705        builder = builder
1706            .with_filter(predicate)
1707            .with_row_selection_enabled(true);
1708        let table_scan = builder.build().unwrap();
1709
1710        let batch_stream = table_scan.to_arrow().await.unwrap();
1711
1712        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1713        assert_eq!(batches[0].num_rows(), 1024);
1714
1715        let col = batches[0].column_by_name("x").unwrap();
1716        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1717        assert_eq!(col, &expected_x);
1718
1719        let col = batches[0].column_by_name("y").unwrap();
1720        let mut values = vec![2; 512];
1721        values.append(vec![3; 200].as_mut());
1722        values.append(vec![4; 300].as_mut());
1723        values.append(vec![5; 12].as_mut());
1724        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1725        assert_eq!(col, &expected_y);
1726
1727        let col = batches[0].column_by_name("z").unwrap();
1728        let mut values = vec![3; 512];
1729        values.append(vec![4; 512].as_mut());
1730        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1731        assert_eq!(col, &expected_z);
1732    }
1733
1734    #[tokio::test]
1735    async fn test_filter_on_arrow_startswith() {
1736        let mut fixture = TableTestFixture::new();
1737        fixture.setup_manifest_files().await;
1738
1739        // Filter: a STARTSWITH "Ice"
1740        let mut builder = fixture.table.scan();
1741        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1742        builder = builder
1743            .with_filter(predicate)
1744            .with_row_selection_enabled(true);
1745        let table_scan = builder.build().unwrap();
1746
1747        let batch_stream = table_scan.to_arrow().await.unwrap();
1748
1749        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1750
1751        assert_eq!(batches[0].num_rows(), 512);
1752
1753        let col = batches[0].column_by_name("a").unwrap();
1754        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1755        assert_eq!(string_arr.value(0), "Iceberg");
1756    }
1757
1758    #[tokio::test]
1759    async fn test_filter_on_arrow_not_startswith() {
1760        let mut fixture = TableTestFixture::new();
1761        fixture.setup_manifest_files().await;
1762
1763        // Filter: a NOT STARTSWITH "Ice"
1764        let mut builder = fixture.table.scan();
1765        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1766        builder = builder
1767            .with_filter(predicate)
1768            .with_row_selection_enabled(true);
1769        let table_scan = builder.build().unwrap();
1770
1771        let batch_stream = table_scan.to_arrow().await.unwrap();
1772
1773        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1774
1775        assert_eq!(batches[0].num_rows(), 512);
1776
1777        let col = batches[0].column_by_name("a").unwrap();
1778        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1779        assert_eq!(string_arr.value(0), "Apache");
1780    }
1781
1782    #[tokio::test]
1783    async fn test_filter_on_arrow_in() {
1784        let mut fixture = TableTestFixture::new();
1785        fixture.setup_manifest_files().await;
1786
1787        // Filter: a IN ("Sioux", "Iceberg")
1788        let mut builder = fixture.table.scan();
1789        let predicate =
1790            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1791        builder = builder
1792            .with_filter(predicate)
1793            .with_row_selection_enabled(true);
1794        let table_scan = builder.build().unwrap();
1795
1796        let batch_stream = table_scan.to_arrow().await.unwrap();
1797
1798        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1799
1800        assert_eq!(batches[0].num_rows(), 512);
1801
1802        let col = batches[0].column_by_name("a").unwrap();
1803        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1804        assert_eq!(string_arr.value(0), "Iceberg");
1805    }
1806
1807    #[tokio::test]
1808    async fn test_filter_on_arrow_not_in() {
1809        let mut fixture = TableTestFixture::new();
1810        fixture.setup_manifest_files().await;
1811
1812        // Filter: a NOT IN ("Sioux", "Iceberg")
1813        let mut builder = fixture.table.scan();
1814        let predicate =
1815            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1816        builder = builder
1817            .with_filter(predicate)
1818            .with_row_selection_enabled(true);
1819        let table_scan = builder.build().unwrap();
1820
1821        let batch_stream = table_scan.to_arrow().await.unwrap();
1822
1823        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1824
1825        assert_eq!(batches[0].num_rows(), 512);
1826
1827        let col = batches[0].column_by_name("a").unwrap();
1828        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1829        assert_eq!(string_arr.value(0), "Apache");
1830    }
1831
1832    #[test]
1833    fn test_file_scan_task_serialize_deserialize() {
1834        let test_fn = |task: FileScanTask| {
1835            let serialized = serde_json::to_string(&task).unwrap();
1836            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1837
1838            assert_eq!(task.data_file_path, deserialized.data_file_path);
1839            assert_eq!(task.start, deserialized.start);
1840            assert_eq!(task.length, deserialized.length);
1841            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1842            assert_eq!(task.predicate, deserialized.predicate);
1843            assert_eq!(task.schema, deserialized.schema);
1844        };
1845
1846        // without predicate
1847        let schema = Arc::new(
1848            Schema::builder()
1849                .with_fields(vec![Arc::new(NestedField::required(
1850                    1,
1851                    "x",
1852                    Type::Primitive(PrimitiveType::Binary),
1853                ))])
1854                .build()
1855                .unwrap(),
1856        );
1857        let task = FileScanTask {
1858            data_file_path: "data_file_path".to_string(),
1859            file_size_in_bytes: 0,
1860            start: 0,
1861            length: 100,
1862            project_field_ids: vec![1, 2, 3],
1863            predicate: None,
1864            schema: schema.clone(),
1865            record_count: Some(100),
1866            data_file_format: DataFileFormat::Parquet,
1867            deletes: vec![],
1868            partition: None,
1869            partition_spec: None,
1870            name_mapping: None,
1871            case_sensitive: false,
1872        };
1873        test_fn(task);
1874
1875        // with predicate
1876        let task = FileScanTask {
1877            data_file_path: "data_file_path".to_string(),
1878            file_size_in_bytes: 0,
1879            start: 0,
1880            length: 100,
1881            project_field_ids: vec![1, 2, 3],
1882            predicate: Some(BoundPredicate::AlwaysTrue),
1883            schema,
1884            record_count: None,
1885            data_file_format: DataFileFormat::Avro,
1886            deletes: vec![],
1887            partition: None,
1888            partition_spec: None,
1889            name_mapping: None,
1890            case_sensitive: false,
1891        };
1892        test_fn(task);
1893    }
1894
1895    #[tokio::test]
1896    async fn test_select_with_file_column() {
1897        use arrow_array::cast::AsArray;
1898
1899        let mut fixture = TableTestFixture::new();
1900        fixture.setup_manifest_files().await;
1901
1902        // Select regular columns plus the _file column
1903        let table_scan = fixture
1904            .table
1905            .scan()
1906            .select(["x", RESERVED_COL_NAME_FILE])
1907            .with_row_selection_enabled(true)
1908            .build()
1909            .unwrap();
1910
1911        let batch_stream = table_scan.to_arrow().await.unwrap();
1912        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1913
1914        // Verify we have 2 columns: x and _file
1915        assert_eq!(batches[0].num_columns(), 2);
1916
1917        // Verify the x column exists and has correct data
1918        let x_col = batches[0].column_by_name("x").unwrap();
1919        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1920        assert_eq!(x_arr.value(0), 1);
1921
1922        // Verify the _file column exists
1923        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1924        assert!(
1925            file_col.is_some(),
1926            "_file column should be present in the batch"
1927        );
1928
1929        // Verify the _file column contains a file path
1930        let file_col = file_col.unwrap();
1931        assert!(
1932            matches!(
1933                file_col.data_type(),
1934                arrow_schema::DataType::RunEndEncoded(_, _)
1935            ),
1936            "_file column should use RunEndEncoded type"
1937        );
1938
1939        // Decode the RunArray to verify it contains the file path
1940        let run_array = file_col
1941            .as_any()
1942            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1943            .expect("_file column should be a RunArray");
1944
1945        let values = run_array.values();
1946        let string_values = values.as_string::<i32>();
1947        assert_eq!(string_values.len(), 1, "Should have a single file path");
1948
1949        let file_path = string_values.value(0);
1950        assert!(
1951            file_path.ends_with(".parquet"),
1952            "File path should end with .parquet, got: {file_path}"
1953        );
1954    }
1955
1956    #[tokio::test]
1957    async fn test_select_file_column_position() {
1958        let mut fixture = TableTestFixture::new();
1959        fixture.setup_manifest_files().await;
1960
1961        // Select columns in specific order: x, _file, z
1962        let table_scan = fixture
1963            .table
1964            .scan()
1965            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1966            .with_row_selection_enabled(true)
1967            .build()
1968            .unwrap();
1969
1970        let batch_stream = table_scan.to_arrow().await.unwrap();
1971        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1972
1973        assert_eq!(batches[0].num_columns(), 3);
1974
1975        // Verify column order: x at position 0, _file at position 1, z at position 2
1976        let schema = batches[0].schema();
1977        assert_eq!(schema.field(0).name(), "x");
1978        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1979        assert_eq!(schema.field(2).name(), "z");
1980
1981        // Verify columns by name also works
1982        assert!(batches[0].column_by_name("x").is_some());
1983        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1984        assert!(batches[0].column_by_name("z").is_some());
1985    }
1986
1987    #[tokio::test]
1988    async fn test_select_file_column_only() {
1989        let mut fixture = TableTestFixture::new();
1990        fixture.setup_manifest_files().await;
1991
1992        // Select only the _file column
1993        let table_scan = fixture
1994            .table
1995            .scan()
1996            .select([RESERVED_COL_NAME_FILE])
1997            .with_row_selection_enabled(true)
1998            .build()
1999            .unwrap();
2000
2001        let batch_stream = table_scan.to_arrow().await.unwrap();
2002        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2003
2004        // Should have exactly 1 column
2005        assert_eq!(batches[0].num_columns(), 1);
2006
2007        // Verify it's the _file column
2008        let schema = batches[0].schema();
2009        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2010
2011        // Verify the batch has the correct number of rows
2012        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
2013        // Each file has 1024 rows, so total is 2048 rows
2014        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2015        assert_eq!(total_rows, 2048);
2016    }
2017
2018    #[tokio::test]
2019    async fn test_file_column_with_multiple_files() {
2020        use std::collections::HashSet;
2021
2022        let mut fixture = TableTestFixture::new();
2023        fixture.setup_manifest_files().await;
2024
2025        // Select x and _file columns
2026        let table_scan = fixture
2027            .table
2028            .scan()
2029            .select(["x", RESERVED_COL_NAME_FILE])
2030            .with_row_selection_enabled(true)
2031            .build()
2032            .unwrap();
2033
2034        let batch_stream = table_scan.to_arrow().await.unwrap();
2035        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2036
2037        // Collect all unique file paths from the batches
2038        let mut file_paths = HashSet::new();
2039        for batch in &batches {
2040            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2041            let run_array = file_col
2042                .as_any()
2043                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2044                .expect("_file column should be a RunArray");
2045
2046            let values = run_array.values();
2047            let string_values = values.as_string::<i32>();
2048            for i in 0..string_values.len() {
2049                file_paths.insert(string_values.value(i).to_string());
2050            }
2051        }
2052
2053        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2054        assert!(!file_paths.is_empty(), "Should have at least one file path");
2055
2056        // All paths should end with .parquet
2057        for path in &file_paths {
2058            assert!(
2059                path.ends_with(".parquet"),
2060                "All file paths should end with .parquet, got: {path}"
2061            );
2062        }
2063    }
2064
2065    #[tokio::test]
2066    async fn test_file_column_at_start() {
2067        let mut fixture = TableTestFixture::new();
2068        fixture.setup_manifest_files().await;
2069
2070        // Select _file at the start
2071        let table_scan = fixture
2072            .table
2073            .scan()
2074            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2075            .with_row_selection_enabled(true)
2076            .build()
2077            .unwrap();
2078
2079        let batch_stream = table_scan.to_arrow().await.unwrap();
2080        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2081
2082        assert_eq!(batches[0].num_columns(), 3);
2083
2084        // Verify _file is at position 0
2085        let schema = batches[0].schema();
2086        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2087        assert_eq!(schema.field(1).name(), "x");
2088        assert_eq!(schema.field(2).name(), "y");
2089    }
2090
2091    #[tokio::test]
2092    async fn test_file_column_at_end() {
2093        let mut fixture = TableTestFixture::new();
2094        fixture.setup_manifest_files().await;
2095
2096        // Select _file at the end
2097        let table_scan = fixture
2098            .table
2099            .scan()
2100            .select(["x", "y", RESERVED_COL_NAME_FILE])
2101            .with_row_selection_enabled(true)
2102            .build()
2103            .unwrap();
2104
2105        let batch_stream = table_scan.to_arrow().await.unwrap();
2106        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2107
2108        assert_eq!(batches[0].num_columns(), 3);
2109
2110        // Verify _file is at position 2 (the end)
2111        let schema = batches[0].schema();
2112        assert_eq!(schema.field(0).name(), "x");
2113        assert_eq!(schema.field(1).name(), "y");
2114        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2115    }
2116
2117    #[tokio::test]
2118    async fn test_select_with_repeated_column_names() {
2119        let mut fixture = TableTestFixture::new();
2120        fixture.setup_manifest_files().await;
2121
2122        // Select with repeated column names - both regular columns and virtual columns
2123        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2124        let table_scan = fixture
2125            .table
2126            .scan()
2127            .select([
2128                "x",
2129                RESERVED_COL_NAME_FILE,
2130                "x", // x repeated
2131                "y",
2132                RESERVED_COL_NAME_FILE, // _file repeated
2133                "y",                    // y repeated
2134            ])
2135            .with_row_selection_enabled(true)
2136            .build()
2137            .unwrap();
2138
2139        let batch_stream = table_scan.to_arrow().await.unwrap();
2140        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2141
2142        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2143        assert_eq!(
2144            batches[0].num_columns(),
2145            6,
2146            "Should have exactly 6 columns with duplicates"
2147        );
2148
2149        let schema = batches[0].schema();
2150
2151        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2152        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2153        assert_eq!(
2154            schema.field(1).name(),
2155            RESERVED_COL_NAME_FILE,
2156            "Column 1 should be _file"
2157        );
2158        assert_eq!(
2159            schema.field(2).name(),
2160            "x",
2161            "Column 2 should be x (duplicate)"
2162        );
2163        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2164        assert_eq!(
2165            schema.field(4).name(),
2166            RESERVED_COL_NAME_FILE,
2167            "Column 4 should be _file (duplicate)"
2168        );
2169        assert_eq!(
2170            schema.field(5).name(),
2171            "y",
2172            "Column 5 should be y (duplicate)"
2173        );
2174
2175        // Verify all columns have correct data types
2176        assert!(
2177            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2178            "Column x should be Int64"
2179        );
2180        assert!(
2181            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2182            "Column x (duplicate) should be Int64"
2183        );
2184        assert!(
2185            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2186            "Column y should be Int64"
2187        );
2188        assert!(
2189            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2190            "Column y (duplicate) should be Int64"
2191        );
2192        assert!(
2193            matches!(
2194                schema.field(1).data_type(),
2195                arrow_schema::DataType::RunEndEncoded(_, _)
2196            ),
2197            "_file column should use RunEndEncoded type"
2198        );
2199        assert!(
2200            matches!(
2201                schema.field(4).data_type(),
2202                arrow_schema::DataType::RunEndEncoded(_, _)
2203            ),
2204            "_file column (duplicate) should use RunEndEncoded type"
2205        );
2206    }
2207
2208    #[tokio::test]
2209    async fn test_scan_deadlock() {
2210        let mut fixture = TableTestFixture::new();
2211        fixture.setup_deadlock_manifests().await;
2212
2213        // Create table scan with concurrency limit 1
2214        // This sets channel size to 1.
2215        // Data manifest has 10 entries -> will block producer.
2216        // Delete manifest is 2nd in list -> won't be processed.
2217        // Consumer 2 (Data) not started -> blocked.
2218        // Consumer 1 (Delete) waiting -> blocked.
2219        let table_scan = fixture
2220            .table
2221            .scan()
2222            .with_concurrency_limit(1)
2223            .build()
2224            .unwrap();
2225
2226        // This should timeout/hang if deadlock exists
2227        // We can use tokio::time::timeout
2228        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2229            table_scan
2230                .plan_files()
2231                .await
2232                .unwrap()
2233                .try_collect::<Vec<_>>()
2234                .await
2235        })
2236        .await;
2237
2238        // Assert it finished (didn't timeout)
2239        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2240    }
2241}