iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47/// A stream of arrow [`RecordBatch`]es.
48pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50/// Builder to create table scan.
51pub struct TableScanBuilder<'a> {
52    table: &'a Table,
53    // Defaults to none which means select all columns
54    column_names: Option<Vec<String>>,
55    snapshot_id: Option<i64>,
56    batch_size: Option<usize>,
57    case_sensitive: bool,
58    filter: Option<Predicate>,
59    concurrency_limit_data_files: usize,
60    concurrency_limit_manifest_entries: usize,
61    concurrency_limit_manifest_files: usize,
62    row_group_filtering_enabled: bool,
63    row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67    pub(crate) fn new(table: &'a Table) -> Self {
68        let num_cpus = available_parallelism().get();
69
70        Self {
71            table,
72            column_names: None,
73            snapshot_id: None,
74            batch_size: None,
75            case_sensitive: true,
76            filter: None,
77            concurrency_limit_data_files: num_cpus,
78            concurrency_limit_manifest_entries: num_cpus,
79            concurrency_limit_manifest_files: num_cpus,
80            row_group_filtering_enabled: true,
81            row_selection_enabled: false,
82        }
83    }
84
85    /// Sets the desired size of batches in the response
86    /// to something other than the default
87    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88        self.batch_size = batch_size;
89        self
90    }
91
92    /// Sets the scan's case sensitivity
93    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94        self.case_sensitive = case_sensitive;
95        self
96    }
97
98    /// Specifies a predicate to use as a filter
99    pub fn with_filter(mut self, predicate: Predicate) -> Self {
100        // calls rewrite_not to remove Not nodes, which must be absent
101        // when applying the manifest evaluator
102        self.filter = Some(predicate.rewrite_not());
103        self
104    }
105
106    /// Select all columns.
107    pub fn select_all(mut self) -> Self {
108        self.column_names = None;
109        self
110    }
111
112    /// Select empty columns.
113    pub fn select_empty(mut self) -> Self {
114        self.column_names = Some(vec![]);
115        self
116    }
117
118    /// Select some columns of the table.
119    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120        self.column_names = Some(
121            column_names
122                .into_iter()
123                .map(|item| item.to_string())
124                .collect(),
125        );
126        self
127    }
128
129    /// Set the snapshot to scan. When not set, it uses current snapshot.
130    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131        self.snapshot_id = Some(snapshot_id);
132        self
133    }
134
135    /// Sets the concurrency limit for both manifest files and manifest
136    /// entries for this scan
137    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138        self.concurrency_limit_manifest_files = limit;
139        self.concurrency_limit_manifest_entries = limit;
140        self.concurrency_limit_data_files = limit;
141        self
142    }
143
144    /// Sets the data file concurrency limit for this scan
145    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146        self.concurrency_limit_data_files = limit;
147        self
148    }
149
150    /// Sets the manifest entry concurrency limit for this scan
151    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152        self.concurrency_limit_manifest_entries = limit;
153        self
154    }
155
156    /// Determines whether to enable row group filtering.
157    /// When enabled, if a read is performed with a filter predicate,
158    /// then the metadata for each row group in the parquet file is
159    /// evaluated against the filter predicate and row groups
160    /// that cant contain matching rows will be skipped entirely.
161    ///
162    /// Defaults to enabled, as it generally improves performance or
163    /// keeps it the same, with performance degradation unlikely.
164    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165        self.row_group_filtering_enabled = row_group_filtering_enabled;
166        self
167    }
168
169    /// Determines whether to enable row selection.
170    /// When enabled, if a read is performed with a filter predicate,
171    /// then (for row groups that have not been skipped) the page index
172    /// for each row group in a parquet file is parsed and evaluated
173    /// against the filter predicate to determine if ranges of rows
174    /// within a row group can be skipped, based upon the page-level
175    /// statistics for each column.
176    ///
177    /// Defaults to being disabled. Enabling requires parsing the parquet page
178    /// index, which can be slow enough that parsing the page index outweighs any
179    /// gains from the reduced number of rows that need scanning.
180    /// It is recommended to experiment with partitioning, sorting, row group size,
181    /// page size, and page row limit Iceberg settings on the table being scanned in
182    /// order to get the best performance from using row selection.
183    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184        self.row_selection_enabled = row_selection_enabled;
185        self
186    }
187
188    /// Build the table scan.
189    pub fn build(self) -> Result<TableScan> {
190        let snapshot = match self.snapshot_id {
191            Some(snapshot_id) => self
192                .table
193                .metadata()
194                .snapshot_by_id(snapshot_id)
195                .ok_or_else(|| {
196                    Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("Snapshot with id {snapshot_id} not found"),
199                    )
200                })?
201                .clone(),
202            None => {
203                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204                    return Ok(TableScan {
205                        batch_size: self.batch_size,
206                        column_names: self.column_names,
207                        file_io: self.table.file_io().clone(),
208                        plan_context: None,
209                        concurrency_limit_data_files: self.concurrency_limit_data_files,
210                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212                        row_group_filtering_enabled: self.row_group_filtering_enabled,
213                        row_selection_enabled: self.row_selection_enabled,
214                        runtime: self.table.runtime().clone(),
215                    });
216                };
217                current_snapshot_id.clone()
218            }
219        };
220
221        let schema = snapshot.schema(self.table.metadata())?;
222
223        // Check that all column names exist in the schema (skip reserved columns).
224        if let Some(column_names) = self.column_names.as_ref() {
225            for column_name in column_names {
226                // Skip reserved columns that don't exist in the schema
227                if is_metadata_column_name(column_name) {
228                    continue;
229                }
230                if schema.field_by_name(column_name).is_none() {
231                    return Err(Error::new(
232                        ErrorKind::DataInvalid,
233                        format!("Column {column_name} not found in table. Schema: {schema}"),
234                    ));
235                }
236            }
237        }
238
239        let mut field_ids = vec![];
240        let column_names = self.column_names.clone().unwrap_or_else(|| {
241            schema
242                .as_struct()
243                .fields()
244                .iter()
245                .map(|f| f.name.clone())
246                .collect()
247        });
248
249        for column_name in column_names.iter() {
250            // Handle metadata columns (like "_file")
251            if is_metadata_column_name(column_name) {
252                field_ids.push(get_metadata_field_id(column_name)?);
253                continue;
254            }
255
256            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257                Error::new(
258                    ErrorKind::DataInvalid,
259                    format!("Column {column_name} not found in table. Schema: {schema}"),
260                )
261            })?;
262
263            schema
264                .as_struct()
265                .field_by_id(field_id)
266                .ok_or_else(|| {
267                    Error::new(
268                        ErrorKind::FeatureUnsupported,
269                        format!(
270                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271                    ),
272                )
273            })?;
274
275            field_ids.push(field_id);
276        }
277
278        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279            Some(predicates.bind(schema.clone(), true)?)
280        } else {
281            None
282        };
283
284        let plan_context = PlanContext {
285            snapshot,
286            table_metadata: self.table.metadata_ref(),
287            snapshot_schema: schema,
288            case_sensitive: self.case_sensitive,
289            predicate: self.filter.map(Arc::new),
290            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
291            object_cache: self.table.object_cache(),
292            field_ids: Arc::new(field_ids),
293            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
294            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
295            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
296        };
297
298        Ok(TableScan {
299            batch_size: self.batch_size,
300            column_names: self.column_names,
301            file_io: self.table.file_io().clone(),
302            plan_context: Some(plan_context),
303            concurrency_limit_data_files: self.concurrency_limit_data_files,
304            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
305            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
306            row_group_filtering_enabled: self.row_group_filtering_enabled,
307            row_selection_enabled: self.row_selection_enabled,
308            runtime: self.table.runtime().clone(),
309        })
310    }
311}
312
313/// Table scan.
314#[derive(Debug)]
315pub struct TableScan {
316    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
317    ///
318    /// If this is None, then the scan contains no rows.
319    plan_context: Option<PlanContext>,
320    batch_size: Option<usize>,
321    file_io: FileIO,
322    column_names: Option<Vec<String>>,
323    /// The maximum number of manifest files that will be
324    /// retrieved from [`FileIO`] concurrently
325    concurrency_limit_manifest_files: usize,
326
327    /// The maximum number of [`ManifestEntry`]s that will
328    /// be processed in parallel
329    concurrency_limit_manifest_entries: usize,
330
331    /// The maximum number of [`ManifestEntry`]s that will
332    /// be processed in parallel
333    concurrency_limit_data_files: usize,
334
335    row_group_filtering_enabled: bool,
336    row_selection_enabled: bool,
337
338    runtime: Runtime,
339}
340
341impl TableScan {
342    /// Returns a stream of [`FileScanTask`]s.
343    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
344        let Some(plan_context) = self.plan_context.as_ref() else {
345            return Ok(Box::pin(futures::stream::empty()));
346        };
347
348        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
349        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
350
351        // used to stream ManifestEntryContexts between stages of the file plan operation
352        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
353            channel(concurrency_limit_manifest_files);
354        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
355            channel(concurrency_limit_manifest_files);
356
357        // used to stream the results back to the caller
358        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
359
360        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
361
362        let manifest_list = plan_context.get_manifest_list().await?;
363
364        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
365        // whose partitions cannot match this
366        // scan's filter
367        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
368            manifest_list,
369            manifest_entry_data_ctx_tx,
370            delete_file_idx.clone(),
371            manifest_entry_delete_ctx_tx,
372        )?;
373
374        let mut channel_for_manifest_error = file_scan_task_tx.clone();
375        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
376        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
377
378        let rt = self.runtime.clone();
379
380        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
381        rt.io().spawn(async move {
382            let result = futures::stream::iter(manifest_file_contexts)
383                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
384                    ctx.fetch_manifest_and_stream_manifest_entries().await
385                })
386                .await;
387
388            if let Err(error) = result {
389                let _ = channel_for_manifest_error.send(Err(error)).await;
390            }
391        });
392
393        // Process the delete file [`ManifestEntry`] stream in parallel
394        {
395            let rt = rt.clone();
396            let rt_inner = rt.clone();
397            rt.cpu().spawn(async move {
398                let result = manifest_entry_delete_ctx_rx
399                    .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
400                    .try_for_each_concurrent(
401                        concurrency_limit_manifest_entries,
402                        |(manifest_entry_context, tx)| {
403                            let rt_inner = rt_inner.clone();
404                            async move {
405                                rt_inner
406                                    .cpu()
407                                    .spawn(async move {
408                                        Self::process_delete_manifest_entry(
409                                            manifest_entry_context,
410                                            tx,
411                                        )
412                                        .await
413                                    })
414                                    .await?
415                            }
416                        },
417                    )
418                    .await;
419
420                if let Err(error) = result {
421                    let _ = channel_for_delete_manifest_entry_error
422                        .send(Err(error))
423                        .await;
424                }
425            });
426        }
427
428        // Process the data file [`ManifestEntry`] stream in parallel
429        {
430            let rt_inner = rt.clone();
431            rt.cpu().spawn(async move {
432                let result = manifest_entry_data_ctx_rx
433                    .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
434                    .try_for_each_concurrent(
435                        concurrency_limit_manifest_entries,
436                        |(manifest_entry_context, tx)| {
437                            let rt_inner = rt_inner.clone();
438                            async move {
439                                rt_inner
440                                    .cpu()
441                                    .spawn(async move {
442                                        Self::process_data_manifest_entry(
443                                            manifest_entry_context,
444                                            tx,
445                                        )
446                                        .await
447                                    })
448                                    .await?
449                            }
450                        },
451                    )
452                    .await;
453
454                if let Err(error) = result {
455                    let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
456                }
457            });
458        }
459
460        Ok(file_scan_task_rx.boxed())
461    }
462
463    /// Returns an [`ArrowRecordBatchStream`].
464    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
465        let mut arrow_reader_builder =
466            ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
467                .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
468                .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
469                .with_row_selection_enabled(self.row_selection_enabled);
470
471        if let Some(batch_size) = self.batch_size {
472            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
473        }
474
475        arrow_reader_builder
476            .build()
477            .read(self.plan_files().await?)
478            .map(|result| result.stream())
479    }
480
481    /// Returns a reference to the column names of the table scan.
482    pub fn column_names(&self) -> Option<&[String]> {
483        self.column_names.as_deref()
484    }
485
486    /// Returns a reference to the snapshot of the table scan.
487    pub fn snapshot(&self) -> Option<&SnapshotRef> {
488        self.plan_context.as_ref().map(|x| &x.snapshot)
489    }
490
491    async fn process_data_manifest_entry(
492        manifest_entry_context: ManifestEntryContext,
493        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
494    ) -> Result<()> {
495        // skip processing this manifest entry if it has been marked as deleted
496        if !manifest_entry_context.manifest_entry.is_alive() {
497            return Ok(());
498        }
499
500        // abort the plan if we encounter a manifest entry for a delete file
501        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
502            return Err(Error::new(
503                ErrorKind::FeatureUnsupported,
504                "Encountered an entry for a delete file in a data file manifest",
505            ));
506        }
507
508        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
509            let BoundPredicates {
510                snapshot_bound_predicate,
511                partition_bound_predicate,
512            } = bound_predicates.as_ref();
513
514            let expression_evaluator_cache =
515                manifest_entry_context.expression_evaluator_cache.as_ref();
516
517            let expression_evaluator = expression_evaluator_cache.get(
518                manifest_entry_context.partition_spec_id,
519                partition_bound_predicate,
520            )?;
521
522            // skip any data file whose partition data indicates that it can't contain
523            // any data that matches this scan's filter
524            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
525                return Ok(());
526            }
527
528            // skip any data file whose metrics don't match this scan's filter
529            if !InclusiveMetricsEvaluator::eval(
530                snapshot_bound_predicate,
531                manifest_entry_context.manifest_entry.data_file(),
532                false,
533            )? {
534                return Ok(());
535            }
536        }
537
538        // congratulations! the manifest entry has made its way through the
539        // entire plan without getting filtered out. Create a corresponding
540        // FileScanTask and push it to the result stream
541        file_scan_task_tx
542            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
543            .await?;
544
545        Ok(())
546    }
547
548    async fn process_delete_manifest_entry(
549        manifest_entry_context: ManifestEntryContext,
550        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
551    ) -> Result<()> {
552        // skip processing this manifest entry if it has been marked as deleted
553        if !manifest_entry_context.manifest_entry.is_alive() {
554            return Ok(());
555        }
556
557        // abort the plan if we encounter a manifest entry that is not for a delete file
558        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
559            return Err(Error::new(
560                ErrorKind::FeatureUnsupported,
561                "Encountered an entry for a data file in a delete manifest",
562            ));
563        }
564
565        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
566            let expression_evaluator_cache =
567                manifest_entry_context.expression_evaluator_cache.as_ref();
568
569            let expression_evaluator = expression_evaluator_cache.get(
570                manifest_entry_context.partition_spec_id,
571                &bound_predicates.partition_bound_predicate,
572            )?;
573
574            // skip any data file whose partition data indicates that it can't contain
575            // any data that matches this scan's filter
576            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
577                return Ok(());
578            }
579        }
580
581        delete_file_ctx_tx
582            .send(DeleteFileContext {
583                manifest_entry: manifest_entry_context.manifest_entry.clone(),
584                partition_spec_id: manifest_entry_context.partition_spec_id,
585            })
586            .await?;
587
588        Ok(())
589    }
590}
591
592pub(crate) struct BoundPredicates {
593    partition_bound_predicate: BoundPredicate,
594    snapshot_bound_predicate: BoundPredicate,
595}
596
597#[cfg(test)]
598pub mod tests {
599    //! shared tests for the table scan API
600    #![allow(missing_docs)]
601
602    use std::collections::HashMap;
603    use std::fs;
604    use std::fs::File;
605    use std::sync::Arc;
606
607    use arrow_array::cast::AsArray;
608    use arrow_array::{
609        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
610        StringArray,
611    };
612    use futures::{TryStreamExt, stream};
613    use minijinja::value::Value;
614    use minijinja::{AutoEscape, Environment, context};
615    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
616    use parquet::basic::Compression;
617    use parquet::file::properties::WriterProperties;
618    use tempfile::TempDir;
619    use uuid::Uuid;
620
621    use crate::TableIdent;
622    use crate::arrow::ArrowReaderBuilder;
623    use crate::expr::{BoundPredicate, Reference};
624    use crate::io::{FileIO, OutputFile};
625    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
626    use crate::scan::FileScanTask;
627    use crate::spec::{
628        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
629        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
630        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
631    };
632    use crate::table::Table;
633    use crate::test_utils::test_runtime;
634
635    fn render_template(template: &str, ctx: Value) -> String {
636        let mut env = Environment::new();
637        env.set_auto_escape_callback(|_| AutoEscape::None);
638        env.render_str(template, ctx).unwrap()
639    }
640
641    pub struct TableTestFixture {
642        pub table_location: String,
643        pub table: Table,
644    }
645
646    impl TableTestFixture {
647        #[allow(clippy::new_without_default)]
648        pub fn new() -> Self {
649            let tmp_dir = TempDir::new().unwrap();
650            let table_location = tmp_dir.path().join("table1");
651            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
652            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
653            let table_metadata1_location = table_location.join("metadata/v1.json");
654
655            let file_io = FileIO::new_with_fs();
656
657            let table_metadata = {
658                let template_json_str = fs::read_to_string(format!(
659                    "{}/testdata/example_table_metadata_v2.json",
660                    env!("CARGO_MANIFEST_DIR")
661                ))
662                .unwrap();
663                let metadata_json = render_template(&template_json_str, context! {
664                    table_location => &table_location,
665                    manifest_list_1_location => &manifest_list1_location,
666                    manifest_list_2_location => &manifest_list2_location,
667                    table_metadata_1_location => &table_metadata1_location,
668                });
669                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670            };
671
672            let table = Table::builder()
673                .metadata(table_metadata)
674                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675                .file_io(file_io.clone())
676                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677                .runtime(test_runtime())
678                .build()
679                .unwrap();
680
681            Self {
682                table_location: table_location.to_str().unwrap().to_string(),
683                table,
684            }
685        }
686
687        #[allow(clippy::new_without_default)]
688        pub fn new_empty() -> Self {
689            let tmp_dir = TempDir::new().unwrap();
690            let table_location = tmp_dir.path().join("table1");
691            let table_metadata1_location = table_location.join("metadata/v1.json");
692
693            let file_io = FileIO::new_with_fs();
694
695            let table_metadata = {
696                let template_json_str = fs::read_to_string(format!(
697                    "{}/testdata/example_empty_table_metadata_v2.json",
698                    env!("CARGO_MANIFEST_DIR")
699                ))
700                .unwrap();
701                let metadata_json = render_template(&template_json_str, context! {
702                    table_location => &table_location,
703                    table_metadata_1_location => &table_metadata1_location,
704                });
705                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706            };
707
708            let table = Table::builder()
709                .metadata(table_metadata)
710                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
711                .file_io(file_io.clone())
712                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
713                .runtime(test_runtime())
714                .build()
715                .unwrap();
716
717            Self {
718                table_location: table_location.to_str().unwrap().to_string(),
719                table,
720            }
721        }
722
723        /// Creates a fixture with 5 snapshots chained as:
724        ///   S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
725        /// Useful for testing snapshot history traversal.
726        pub fn new_with_deep_history() -> Self {
727            let tmp_dir = TempDir::new().unwrap();
728            let table_location = tmp_dir.path().join("table1");
729            let table_metadata1_location = table_location.join("metadata/v1.json");
730
731            let file_io = FileIO::new_with_fs();
732
733            let table_metadata = {
734                let json_str = fs::read_to_string(format!(
735                    "{}/testdata/example_table_metadata_v2_deep_history.json",
736                    env!("CARGO_MANIFEST_DIR")
737                ))
738                .unwrap();
739                serde_json::from_str::<TableMetadata>(&json_str).unwrap()
740            };
741
742            let table = Table::builder()
743                .metadata(table_metadata)
744                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
745                .file_io(file_io.clone())
746                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
747                .runtime(test_runtime())
748                .build()
749                .unwrap();
750
751            Self {
752                table_location: table_location.to_str().unwrap().to_string(),
753                table,
754            }
755        }
756
757        pub fn new_unpartitioned() -> Self {
758            let tmp_dir = TempDir::new().unwrap();
759            let table_location = tmp_dir.path().join("table1");
760            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
761            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
762            let table_metadata1_location = table_location.join("metadata/v1.json");
763
764            let file_io = FileIO::new_with_fs();
765
766            let mut table_metadata = {
767                let template_json_str = fs::read_to_string(format!(
768                    "{}/testdata/example_table_metadata_v2.json",
769                    env!("CARGO_MANIFEST_DIR")
770                ))
771                .unwrap();
772                let metadata_json = render_template(&template_json_str, context! {
773                    table_location => &table_location,
774                    manifest_list_1_location => &manifest_list1_location,
775                    manifest_list_2_location => &manifest_list2_location,
776                    table_metadata_1_location => &table_metadata1_location,
777                });
778                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
779            };
780
781            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
782            table_metadata.partition_specs.clear();
783            table_metadata.default_partition_type = StructType::new(vec![]);
784            table_metadata
785                .partition_specs
786                .insert(0, table_metadata.default_spec.clone());
787
788            let table = Table::builder()
789                .metadata(table_metadata)
790                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
791                .file_io(file_io.clone())
792                .metadata_location(table_metadata1_location.to_str().unwrap())
793                .runtime(test_runtime())
794                .build()
795                .unwrap();
796
797            Self {
798                table_location: table_location.to_str().unwrap().to_string(),
799                table,
800            }
801        }
802
803        fn next_manifest_file(&self) -> OutputFile {
804            self.table
805                .file_io()
806                .new_output(format!(
807                    "{}/metadata/manifest_{}.avro",
808                    self.table_location,
809                    Uuid::new_v4()
810                ))
811                .unwrap()
812        }
813
814        pub async fn setup_manifest_files(&mut self) {
815            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
816            let parent_snapshot = current_snapshot
817                .parent_snapshot(self.table.metadata())
818                .unwrap();
819            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
820            let current_partition_spec = self.table.metadata().default_partition_spec();
821
822            // Write the data files first, then use the file size in the manifest entries
823            let parquet_file_size = self.write_parquet_data_files();
824
825            let mut writer = ManifestWriterBuilder::new(
826                self.next_manifest_file(),
827                Some(current_snapshot.snapshot_id()),
828                None,
829                current_schema.clone(),
830                current_partition_spec.as_ref().clone(),
831            )
832            .build_v2_data();
833            writer
834                .add_entry(
835                    ManifestEntry::builder()
836                        .status(ManifestStatus::Added)
837                        .data_file(
838                            DataFileBuilder::default()
839                                .partition_spec_id(0)
840                                .content(DataContentType::Data)
841                                .file_path(format!("{}/1.parquet", &self.table_location))
842                                .file_format(DataFileFormat::Parquet)
843                                .file_size_in_bytes(parquet_file_size)
844                                .record_count(1)
845                                .partition(Struct::from_iter([Some(Literal::long(100))]))
846                                .key_metadata(None)
847                                .build()
848                                .unwrap(),
849                        )
850                        .build(),
851                )
852                .unwrap();
853            writer
854                .add_delete_entry(
855                    ManifestEntry::builder()
856                        .status(ManifestStatus::Deleted)
857                        .snapshot_id(parent_snapshot.snapshot_id())
858                        .sequence_number(parent_snapshot.sequence_number())
859                        .file_sequence_number(parent_snapshot.sequence_number())
860                        .data_file(
861                            DataFileBuilder::default()
862                                .partition_spec_id(0)
863                                .content(DataContentType::Data)
864                                .file_path(format!("{}/2.parquet", &self.table_location))
865                                .file_format(DataFileFormat::Parquet)
866                                .file_size_in_bytes(parquet_file_size)
867                                .record_count(1)
868                                .partition(Struct::from_iter([Some(Literal::long(200))]))
869                                .build()
870                                .unwrap(),
871                        )
872                        .build(),
873                )
874                .unwrap();
875            writer
876                .add_existing_entry(
877                    ManifestEntry::builder()
878                        .status(ManifestStatus::Existing)
879                        .snapshot_id(parent_snapshot.snapshot_id())
880                        .sequence_number(parent_snapshot.sequence_number())
881                        .file_sequence_number(parent_snapshot.sequence_number())
882                        .data_file(
883                            DataFileBuilder::default()
884                                .partition_spec_id(0)
885                                .content(DataContentType::Data)
886                                .file_path(format!("{}/3.parquet", &self.table_location))
887                                .file_format(DataFileFormat::Parquet)
888                                .file_size_in_bytes(parquet_file_size)
889                                .record_count(1)
890                                .partition(Struct::from_iter([Some(Literal::long(300))]))
891                                .build()
892                                .unwrap(),
893                        )
894                        .build(),
895                )
896                .unwrap();
897            let data_file_manifest = writer.write_manifest_file().await.unwrap();
898
899            // Write to manifest list
900            let manifest_list_writer = self
901                .table
902                .file_io()
903                .new_output(current_snapshot.manifest_list())
904                .unwrap()
905                .writer()
906                .await
907                .unwrap();
908            let mut manifest_list_write = ManifestListWriter::v2(
909                manifest_list_writer,
910                current_snapshot.snapshot_id(),
911                current_snapshot.parent_snapshot_id(),
912                current_snapshot.sequence_number(),
913            );
914            manifest_list_write
915                .add_manifests(vec![data_file_manifest].into_iter())
916                .unwrap();
917            manifest_list_write.close().await.unwrap();
918        }
919
920        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
921        /// and returns the file size in bytes.
922        fn write_parquet_data_files(&self) -> u64 {
923            std::fs::create_dir_all(&self.table_location).unwrap();
924
925            let schema = {
926                let fields = vec![
927                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
928                        .with_metadata(HashMap::from([(
929                            PARQUET_FIELD_ID_META_KEY.to_string(),
930                            "1".to_string(),
931                        )])),
932                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
933                        .with_metadata(HashMap::from([(
934                            PARQUET_FIELD_ID_META_KEY.to_string(),
935                            "2".to_string(),
936                        )])),
937                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
938                        .with_metadata(HashMap::from([(
939                            PARQUET_FIELD_ID_META_KEY.to_string(),
940                            "3".to_string(),
941                        )])),
942                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
943                        .with_metadata(HashMap::from([(
944                            PARQUET_FIELD_ID_META_KEY.to_string(),
945                            "4".to_string(),
946                        )])),
947                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
948                        .with_metadata(HashMap::from([(
949                            PARQUET_FIELD_ID_META_KEY.to_string(),
950                            "5".to_string(),
951                        )])),
952                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
953                        .with_metadata(HashMap::from([(
954                            PARQUET_FIELD_ID_META_KEY.to_string(),
955                            "6".to_string(),
956                        )])),
957                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
958                        .with_metadata(HashMap::from([(
959                            PARQUET_FIELD_ID_META_KEY.to_string(),
960                            "7".to_string(),
961                        )])),
962                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
963                        .with_metadata(HashMap::from([(
964                            PARQUET_FIELD_ID_META_KEY.to_string(),
965                            "8".to_string(),
966                        )])),
967                ];
968                Arc::new(arrow_schema::Schema::new(fields))
969            };
970            // x: [1, 1, 1, 1, ...]
971            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
972
973            let mut values = vec![2; 512];
974            values.append(vec![3; 200].as_mut());
975            values.append(vec![4; 300].as_mut());
976            values.append(vec![5; 12].as_mut());
977
978            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
979            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
980
981            let mut values = vec![3; 512];
982            values.append(vec![4; 512].as_mut());
983
984            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
985            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
986
987            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
988            let mut values = vec!["Apache"; 512];
989            values.append(vec!["Iceberg"; 512].as_mut());
990            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
991
992            // dbl:
993            let mut values = vec![100.0f64; 512];
994            values.append(vec![150.0f64; 12].as_mut());
995            values.append(vec![200.0f64; 500].as_mut());
996            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
997
998            // i32:
999            let mut values = vec![100i32; 512];
1000            values.append(vec![150i32; 12].as_mut());
1001            values.append(vec![200i32; 500].as_mut());
1002            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1003
1004            // i64:
1005            let mut values = vec![100i64; 512];
1006            values.append(vec![150i64; 12].as_mut());
1007            values.append(vec![200i64; 500].as_mut());
1008            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1009
1010            // bool:
1011            let mut values = vec![false; 512];
1012            values.append(vec![true; 512].as_mut());
1013            let values: BooleanArray = values.into();
1014            let col8 = Arc::new(values) as ArrayRef;
1015
1016            let to_write = RecordBatch::try_new(schema.clone(), vec![
1017                col1, col2, col3, col4, col5, col6, col7, col8,
1018            ])
1019            .unwrap();
1020
1021            // Write the Parquet files
1022            let props = WriterProperties::builder()
1023                .set_compression(Compression::SNAPPY)
1024                .build();
1025
1026            for n in 1..=3 {
1027                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1028                let mut writer =
1029                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1030
1031                writer.write(&to_write).expect("Writing batch");
1032
1033                // writer must be closed to write footer
1034                writer.close().unwrap();
1035            }
1036
1037            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1038                .unwrap()
1039                .len()
1040        }
1041
1042        pub async fn setup_unpartitioned_manifest_files(&mut self) {
1043            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1044            let parent_snapshot = current_snapshot
1045                .parent_snapshot(self.table.metadata())
1046                .unwrap();
1047            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1048            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1049
1050            // Write the data files first, then use the file size in the manifest entries
1051            let parquet_file_size = self.write_parquet_data_files();
1052
1053            // Write data files using an empty partition for unpartitioned tables.
1054            let mut writer = ManifestWriterBuilder::new(
1055                self.next_manifest_file(),
1056                Some(current_snapshot.snapshot_id()),
1057                None,
1058                current_schema.clone(),
1059                current_partition_spec.as_ref().clone(),
1060            )
1061            .build_v2_data();
1062
1063            // Create an empty partition value.
1064            let empty_partition = Struct::empty();
1065
1066            writer
1067                .add_entry(
1068                    ManifestEntry::builder()
1069                        .status(ManifestStatus::Added)
1070                        .data_file(
1071                            DataFileBuilder::default()
1072                                .partition_spec_id(0)
1073                                .content(DataContentType::Data)
1074                                .file_path(format!("{}/1.parquet", &self.table_location))
1075                                .file_format(DataFileFormat::Parquet)
1076                                .file_size_in_bytes(parquet_file_size)
1077                                .record_count(1)
1078                                .partition(empty_partition.clone())
1079                                .key_metadata(None)
1080                                .build()
1081                                .unwrap(),
1082                        )
1083                        .build(),
1084                )
1085                .unwrap();
1086
1087            writer
1088                .add_delete_entry(
1089                    ManifestEntry::builder()
1090                        .status(ManifestStatus::Deleted)
1091                        .snapshot_id(parent_snapshot.snapshot_id())
1092                        .sequence_number(parent_snapshot.sequence_number())
1093                        .file_sequence_number(parent_snapshot.sequence_number())
1094                        .data_file(
1095                            DataFileBuilder::default()
1096                                .partition_spec_id(0)
1097                                .content(DataContentType::Data)
1098                                .file_path(format!("{}/2.parquet", &self.table_location))
1099                                .file_format(DataFileFormat::Parquet)
1100                                .file_size_in_bytes(parquet_file_size)
1101                                .record_count(1)
1102                                .partition(empty_partition.clone())
1103                                .build()
1104                                .unwrap(),
1105                        )
1106                        .build(),
1107                )
1108                .unwrap();
1109
1110            writer
1111                .add_existing_entry(
1112                    ManifestEntry::builder()
1113                        .status(ManifestStatus::Existing)
1114                        .snapshot_id(parent_snapshot.snapshot_id())
1115                        .sequence_number(parent_snapshot.sequence_number())
1116                        .file_sequence_number(parent_snapshot.sequence_number())
1117                        .data_file(
1118                            DataFileBuilder::default()
1119                                .partition_spec_id(0)
1120                                .content(DataContentType::Data)
1121                                .file_path(format!("{}/3.parquet", &self.table_location))
1122                                .file_format(DataFileFormat::Parquet)
1123                                .file_size_in_bytes(parquet_file_size)
1124                                .record_count(1)
1125                                .partition(empty_partition.clone())
1126                                .build()
1127                                .unwrap(),
1128                        )
1129                        .build(),
1130                )
1131                .unwrap();
1132
1133            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1134
1135            // Write to manifest list
1136            let manifest_list_writer = self
1137                .table
1138                .file_io()
1139                .new_output(current_snapshot.manifest_list())
1140                .unwrap()
1141                .writer()
1142                .await
1143                .unwrap();
1144            let mut manifest_list_write = ManifestListWriter::v2(
1145                manifest_list_writer,
1146                current_snapshot.snapshot_id(),
1147                current_snapshot.parent_snapshot_id(),
1148                current_snapshot.sequence_number(),
1149            );
1150            manifest_list_write
1151                .add_manifests(vec![data_file_manifest].into_iter())
1152                .unwrap();
1153            manifest_list_write.close().await.unwrap();
1154        }
1155
1156        pub async fn setup_deadlock_manifests(&mut self) {
1157            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1158            let _parent_snapshot = current_snapshot
1159                .parent_snapshot(self.table.metadata())
1160                .unwrap();
1161            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1162            let current_partition_spec = self.table.metadata().default_partition_spec();
1163
1164            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1165            let mut writer = ManifestWriterBuilder::new(
1166                self.next_manifest_file(),
1167                Some(current_snapshot.snapshot_id()),
1168                None,
1169                current_schema.clone(),
1170                current_partition_spec.as_ref().clone(),
1171            )
1172            .build_v2_data();
1173
1174            // Add 10 data entries
1175            for i in 0..10 {
1176                writer
1177                    .add_entry(
1178                        ManifestEntry::builder()
1179                            .status(ManifestStatus::Added)
1180                            .data_file(
1181                                DataFileBuilder::default()
1182                                    .partition_spec_id(0)
1183                                    .content(DataContentType::Data)
1184                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1185                                    .file_format(DataFileFormat::Parquet)
1186                                    .file_size_in_bytes(100)
1187                                    .record_count(1)
1188                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1189                                    .key_metadata(None)
1190                                    .build()
1191                                    .unwrap(),
1192                            )
1193                            .build(),
1194                    )
1195                    .unwrap();
1196            }
1197            let data_manifest = writer.write_manifest_file().await.unwrap();
1198
1199            // 2. Write DELETE manifest
1200            let mut writer = ManifestWriterBuilder::new(
1201                self.next_manifest_file(),
1202                Some(current_snapshot.snapshot_id()),
1203                None,
1204                current_schema.clone(),
1205                current_partition_spec.as_ref().clone(),
1206            )
1207            .build_v2_deletes();
1208
1209            writer
1210                .add_entry(
1211                    ManifestEntry::builder()
1212                        .status(ManifestStatus::Added)
1213                        .data_file(
1214                            DataFileBuilder::default()
1215                                .partition_spec_id(0)
1216                                .content(DataContentType::PositionDeletes)
1217                                .file_path(format!("{}/del.parquet", &self.table_location))
1218                                .file_format(DataFileFormat::Parquet)
1219                                .file_size_in_bytes(100)
1220                                .record_count(1)
1221                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1222                                .build()
1223                                .unwrap(),
1224                        )
1225                        .build(),
1226                )
1227                .unwrap();
1228            let delete_manifest = writer.write_manifest_file().await.unwrap();
1229
1230            // Write to manifest list - DATA FIRST then DELETE
1231            // This order is crucial for reproduction
1232            let manifest_list_writer = self
1233                .table
1234                .file_io()
1235                .new_output(current_snapshot.manifest_list())
1236                .unwrap()
1237                .writer()
1238                .await
1239                .unwrap();
1240            let mut manifest_list_write = ManifestListWriter::v2(
1241                manifest_list_writer,
1242                current_snapshot.snapshot_id(),
1243                current_snapshot.parent_snapshot_id(),
1244                current_snapshot.sequence_number(),
1245            );
1246            manifest_list_write
1247                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1248                .unwrap();
1249            manifest_list_write.close().await.unwrap();
1250        }
1251    }
1252
1253    #[tokio::test]
1254    async fn test_table_scan_columns() {
1255        let table = TableTestFixture::new().table;
1256
1257        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1258        assert_eq!(
1259            Some(vec!["x".to_string(), "y".to_string()]),
1260            table_scan.column_names
1261        );
1262
1263        let table_scan = table
1264            .scan()
1265            .select(["x", "y"])
1266            .select(["z"])
1267            .build()
1268            .unwrap();
1269        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1270    }
1271
1272    #[tokio::test]
1273    async fn test_select_all() {
1274        let table = TableTestFixture::new().table;
1275
1276        let table_scan = table.scan().select_all().build().unwrap();
1277        assert!(table_scan.column_names.is_none());
1278    }
1279
1280    #[test]
1281    fn test_select_no_exist_column() {
1282        let table = TableTestFixture::new().table;
1283
1284        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1285        assert!(table_scan.is_err());
1286    }
1287
1288    #[tokio::test]
1289    async fn test_table_scan_default_snapshot_id() {
1290        let table = TableTestFixture::new().table;
1291
1292        let table_scan = table.scan().build().unwrap();
1293        assert_eq!(
1294            table.metadata().current_snapshot().unwrap().snapshot_id(),
1295            table_scan.snapshot().unwrap().snapshot_id()
1296        );
1297    }
1298
1299    #[test]
1300    fn test_table_scan_non_exist_snapshot_id() {
1301        let table = TableTestFixture::new().table;
1302
1303        let table_scan = table.scan().snapshot_id(1024).build();
1304        assert!(table_scan.is_err());
1305    }
1306
1307    #[tokio::test]
1308    async fn test_table_scan_with_snapshot_id() {
1309        let table = TableTestFixture::new().table;
1310
1311        let table_scan = table
1312            .scan()
1313            .snapshot_id(3051729675574597004)
1314            .with_row_selection_enabled(true)
1315            .build()
1316            .unwrap();
1317        assert_eq!(
1318            table_scan.snapshot().unwrap().snapshot_id(),
1319            3051729675574597004
1320        );
1321    }
1322
1323    #[tokio::test]
1324    async fn test_plan_files_on_table_without_any_snapshots() {
1325        let table = TableTestFixture::new_empty().table;
1326        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1327        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1328        assert!(batches.is_empty());
1329    }
1330
1331    #[tokio::test]
1332    async fn test_plan_files_no_deletions() {
1333        let mut fixture = TableTestFixture::new();
1334        fixture.setup_manifest_files().await;
1335
1336        // Create table scan for current snapshot and plan files
1337        let table_scan = fixture
1338            .table
1339            .scan()
1340            .with_row_selection_enabled(true)
1341            .build()
1342            .unwrap();
1343
1344        let mut tasks = table_scan
1345            .plan_files()
1346            .await
1347            .unwrap()
1348            .try_fold(vec![], |mut acc, task| async move {
1349                acc.push(task);
1350                Ok(acc)
1351            })
1352            .await
1353            .unwrap();
1354
1355        assert_eq!(tasks.len(), 2);
1356
1357        tasks.sort_by_key(|t| t.data_file_path.to_string());
1358
1359        // Check first task is added data file
1360        assert_eq!(
1361            tasks[0].data_file_path,
1362            format!("{}/1.parquet", &fixture.table_location)
1363        );
1364
1365        // Check second task is existing data file
1366        assert_eq!(
1367            tasks[1].data_file_path,
1368            format!("{}/3.parquet", &fixture.table_location)
1369        );
1370    }
1371
1372    #[tokio::test]
1373    async fn test_open_parquet_no_deletions() {
1374        let mut fixture = TableTestFixture::new();
1375        fixture.setup_manifest_files().await;
1376
1377        // Create table scan for current snapshot and plan files
1378        let table_scan = fixture
1379            .table
1380            .scan()
1381            .with_row_selection_enabled(true)
1382            .build()
1383            .unwrap();
1384
1385        let batch_stream = table_scan.to_arrow().await.unwrap();
1386
1387        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1388
1389        let col = batches[0].column_by_name("x").unwrap();
1390
1391        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1392        assert_eq!(int64_arr.value(0), 1);
1393    }
1394
1395    #[tokio::test]
1396    async fn test_open_parquet_no_deletions_by_separate_reader() {
1397        let mut fixture = TableTestFixture::new();
1398        fixture.setup_manifest_files().await;
1399
1400        // Create table scan for current snapshot and plan files
1401        let table_scan = fixture
1402            .table
1403            .scan()
1404            .with_row_selection_enabled(true)
1405            .build()
1406            .unwrap();
1407
1408        let mut plan_task: Vec<_> = table_scan
1409            .plan_files()
1410            .await
1411            .unwrap()
1412            .try_collect()
1413            .await
1414            .unwrap();
1415        assert_eq!(plan_task.len(), 2);
1416
1417        let reader = ArrowReaderBuilder::new(
1418            fixture.table.file_io().clone(),
1419            fixture.table.runtime().clone(),
1420        )
1421        .build();
1422        let batch_stream = reader
1423            .clone()
1424            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1425            .unwrap()
1426            .stream();
1427        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1428
1429        let reader = ArrowReaderBuilder::new(
1430            fixture.table.file_io().clone(),
1431            fixture.table.runtime().clone(),
1432        )
1433        .build();
1434        let batch_stream = reader
1435            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1436            .unwrap()
1437            .stream();
1438        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1439
1440        assert_eq!(batch_1, batch_2);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_open_parquet_with_projection() {
1445        let mut fixture = TableTestFixture::new();
1446        fixture.setup_manifest_files().await;
1447
1448        // Create table scan for current snapshot and plan files
1449        let table_scan = fixture
1450            .table
1451            .scan()
1452            .select(["x", "z"])
1453            .with_row_selection_enabled(true)
1454            .build()
1455            .unwrap();
1456
1457        let batch_stream = table_scan.to_arrow().await.unwrap();
1458
1459        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461        assert_eq!(batches[0].num_columns(), 2);
1462
1463        let col1 = batches[0].column_by_name("x").unwrap();
1464        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1465        assert_eq!(int64_arr.value(0), 1);
1466
1467        let col2 = batches[0].column_by_name("z").unwrap();
1468        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1469        assert_eq!(int64_arr.value(0), 3);
1470
1471        // test empty scan
1472        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1473        let batch_stream = table_scan.to_arrow().await.unwrap();
1474        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1475
1476        assert_eq!(batches[0].num_columns(), 0);
1477        assert_eq!(batches[0].num_rows(), 1024);
1478    }
1479
1480    #[tokio::test]
1481    async fn test_filter_on_arrow_lt() {
1482        let mut fixture = TableTestFixture::new();
1483        fixture.setup_manifest_files().await;
1484
1485        // Filter: y < 3
1486        let mut builder = fixture.table.scan();
1487        let predicate = Reference::new("y").less_than(Datum::long(3));
1488        builder = builder
1489            .with_filter(predicate)
1490            .with_row_selection_enabled(true);
1491        let table_scan = builder.build().unwrap();
1492
1493        let batch_stream = table_scan.to_arrow().await.unwrap();
1494
1495        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1496
1497        assert_eq!(batches[0].num_rows(), 512);
1498
1499        let col = batches[0].column_by_name("x").unwrap();
1500        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1501        assert_eq!(int64_arr.value(0), 1);
1502
1503        let col = batches[0].column_by_name("y").unwrap();
1504        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1505        assert_eq!(int64_arr.value(0), 2);
1506    }
1507
1508    #[tokio::test]
1509    async fn test_filter_on_arrow_gt_eq() {
1510        let mut fixture = TableTestFixture::new();
1511        fixture.setup_manifest_files().await;
1512
1513        // Filter: y >= 5
1514        let mut builder = fixture.table.scan();
1515        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1516        builder = builder
1517            .with_filter(predicate)
1518            .with_row_selection_enabled(true);
1519        let table_scan = builder.build().unwrap();
1520
1521        let batch_stream = table_scan.to_arrow().await.unwrap();
1522
1523        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1524
1525        assert_eq!(batches[0].num_rows(), 12);
1526
1527        let col = batches[0].column_by_name("x").unwrap();
1528        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1529        assert_eq!(int64_arr.value(0), 1);
1530
1531        let col = batches[0].column_by_name("y").unwrap();
1532        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1533        assert_eq!(int64_arr.value(0), 5);
1534    }
1535
1536    #[tokio::test]
1537    async fn test_filter_double_eq() {
1538        let mut fixture = TableTestFixture::new();
1539        fixture.setup_manifest_files().await;
1540
1541        // Filter: dbl == 150.0
1542        let mut builder = fixture.table.scan();
1543        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1544        builder = builder
1545            .with_filter(predicate)
1546            .with_row_selection_enabled(true);
1547        let table_scan = builder.build().unwrap();
1548
1549        let batch_stream = table_scan.to_arrow().await.unwrap();
1550
1551        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1552
1553        assert_eq!(batches.len(), 2);
1554        assert_eq!(batches[0].num_rows(), 12);
1555
1556        let col = batches[0].column_by_name("dbl").unwrap();
1557        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1558        assert_eq!(f64_arr.value(1), 150.0f64);
1559    }
1560
1561    #[tokio::test]
1562    async fn test_filter_int_eq() {
1563        let mut fixture = TableTestFixture::new();
1564        fixture.setup_manifest_files().await;
1565
1566        // Filter: i32 == 150
1567        let mut builder = fixture.table.scan();
1568        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1569        builder = builder
1570            .with_filter(predicate)
1571            .with_row_selection_enabled(true);
1572        let table_scan = builder.build().unwrap();
1573
1574        let batch_stream = table_scan.to_arrow().await.unwrap();
1575
1576        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1577
1578        assert_eq!(batches.len(), 2);
1579        assert_eq!(batches[0].num_rows(), 12);
1580
1581        let col = batches[0].column_by_name("i32").unwrap();
1582        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1583        assert_eq!(i32_arr.value(1), 150i32);
1584    }
1585
1586    #[tokio::test]
1587    async fn test_filter_long_eq() {
1588        let mut fixture = TableTestFixture::new();
1589        fixture.setup_manifest_files().await;
1590
1591        // Filter: i64 == 150
1592        let mut builder = fixture.table.scan();
1593        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1594        builder = builder
1595            .with_filter(predicate)
1596            .with_row_selection_enabled(true);
1597        let table_scan = builder.build().unwrap();
1598
1599        let batch_stream = table_scan.to_arrow().await.unwrap();
1600
1601        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1602
1603        assert_eq!(batches.len(), 2);
1604        assert_eq!(batches[0].num_rows(), 12);
1605
1606        let col = batches[0].column_by_name("i64").unwrap();
1607        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1608        assert_eq!(i64_arr.value(1), 150i64);
1609    }
1610
1611    #[tokio::test]
1612    async fn test_filter_bool_eq() {
1613        let mut fixture = TableTestFixture::new();
1614        fixture.setup_manifest_files().await;
1615
1616        // Filter: bool == true
1617        let mut builder = fixture.table.scan();
1618        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1619        builder = builder
1620            .with_filter(predicate)
1621            .with_row_selection_enabled(true);
1622        let table_scan = builder.build().unwrap();
1623
1624        let batch_stream = table_scan.to_arrow().await.unwrap();
1625
1626        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1627
1628        assert_eq!(batches.len(), 2);
1629        assert_eq!(batches[0].num_rows(), 512);
1630
1631        let col = batches[0].column_by_name("bool").unwrap();
1632        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1633        assert!(bool_arr.value(1));
1634    }
1635
1636    #[tokio::test]
1637    async fn test_filter_on_arrow_is_null() {
1638        let mut fixture = TableTestFixture::new();
1639        fixture.setup_manifest_files().await;
1640
1641        // Filter: y is null
1642        let mut builder = fixture.table.scan();
1643        let predicate = Reference::new("y").is_null();
1644        builder = builder
1645            .with_filter(predicate)
1646            .with_row_selection_enabled(true);
1647        let table_scan = builder.build().unwrap();
1648
1649        let batch_stream = table_scan.to_arrow().await.unwrap();
1650
1651        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1652        assert_eq!(batches.len(), 0);
1653    }
1654
1655    #[tokio::test]
1656    async fn test_filter_on_arrow_is_not_null() {
1657        let mut fixture = TableTestFixture::new();
1658        fixture.setup_manifest_files().await;
1659
1660        // Filter: y is not null
1661        let mut builder = fixture.table.scan();
1662        let predicate = Reference::new("y").is_not_null();
1663        builder = builder
1664            .with_filter(predicate)
1665            .with_row_selection_enabled(true);
1666        let table_scan = builder.build().unwrap();
1667
1668        let batch_stream = table_scan.to_arrow().await.unwrap();
1669
1670        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1671        assert_eq!(batches[0].num_rows(), 1024);
1672    }
1673
1674    #[tokio::test]
1675    async fn test_filter_on_arrow_lt_and_gt() {
1676        let mut fixture = TableTestFixture::new();
1677        fixture.setup_manifest_files().await;
1678
1679        // Filter: y < 5 AND z >= 4
1680        let mut builder = fixture.table.scan();
1681        let predicate = Reference::new("y")
1682            .less_than(Datum::long(5))
1683            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1684        builder = builder
1685            .with_filter(predicate)
1686            .with_row_selection_enabled(true);
1687        let table_scan = builder.build().unwrap();
1688
1689        let batch_stream = table_scan.to_arrow().await.unwrap();
1690
1691        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1692        assert_eq!(batches[0].num_rows(), 500);
1693
1694        let col = batches[0].column_by_name("x").unwrap();
1695        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1696        assert_eq!(col, &expected_x);
1697
1698        let col = batches[0].column_by_name("y").unwrap();
1699        let mut values = vec![];
1700        values.append(vec![3; 200].as_mut());
1701        values.append(vec![4; 300].as_mut());
1702        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1703        assert_eq!(col, &expected_y);
1704
1705        let col = batches[0].column_by_name("z").unwrap();
1706        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1707        assert_eq!(col, &expected_z);
1708    }
1709
1710    #[tokio::test]
1711    async fn test_filter_on_arrow_lt_or_gt() {
1712        let mut fixture = TableTestFixture::new();
1713        fixture.setup_manifest_files().await;
1714
1715        // Filter: y < 5 AND z >= 4
1716        let mut builder = fixture.table.scan();
1717        let predicate = Reference::new("y")
1718            .less_than(Datum::long(5))
1719            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1720        builder = builder
1721            .with_filter(predicate)
1722            .with_row_selection_enabled(true);
1723        let table_scan = builder.build().unwrap();
1724
1725        let batch_stream = table_scan.to_arrow().await.unwrap();
1726
1727        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1728        assert_eq!(batches[0].num_rows(), 1024);
1729
1730        let col = batches[0].column_by_name("x").unwrap();
1731        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1732        assert_eq!(col, &expected_x);
1733
1734        let col = batches[0].column_by_name("y").unwrap();
1735        let mut values = vec![2; 512];
1736        values.append(vec![3; 200].as_mut());
1737        values.append(vec![4; 300].as_mut());
1738        values.append(vec![5; 12].as_mut());
1739        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1740        assert_eq!(col, &expected_y);
1741
1742        let col = batches[0].column_by_name("z").unwrap();
1743        let mut values = vec![3; 512];
1744        values.append(vec![4; 512].as_mut());
1745        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1746        assert_eq!(col, &expected_z);
1747    }
1748
1749    #[tokio::test]
1750    async fn test_filter_on_arrow_startswith() {
1751        let mut fixture = TableTestFixture::new();
1752        fixture.setup_manifest_files().await;
1753
1754        // Filter: a STARTSWITH "Ice"
1755        let mut builder = fixture.table.scan();
1756        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1757        builder = builder
1758            .with_filter(predicate)
1759            .with_row_selection_enabled(true);
1760        let table_scan = builder.build().unwrap();
1761
1762        let batch_stream = table_scan.to_arrow().await.unwrap();
1763
1764        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1765
1766        assert_eq!(batches[0].num_rows(), 512);
1767
1768        let col = batches[0].column_by_name("a").unwrap();
1769        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1770        assert_eq!(string_arr.value(0), "Iceberg");
1771    }
1772
1773    #[tokio::test]
1774    async fn test_filter_on_arrow_not_startswith() {
1775        let mut fixture = TableTestFixture::new();
1776        fixture.setup_manifest_files().await;
1777
1778        // Filter: a NOT STARTSWITH "Ice"
1779        let mut builder = fixture.table.scan();
1780        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1781        builder = builder
1782            .with_filter(predicate)
1783            .with_row_selection_enabled(true);
1784        let table_scan = builder.build().unwrap();
1785
1786        let batch_stream = table_scan.to_arrow().await.unwrap();
1787
1788        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1789
1790        assert_eq!(batches[0].num_rows(), 512);
1791
1792        let col = batches[0].column_by_name("a").unwrap();
1793        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1794        assert_eq!(string_arr.value(0), "Apache");
1795    }
1796
1797    #[tokio::test]
1798    async fn test_filter_on_arrow_in() {
1799        let mut fixture = TableTestFixture::new();
1800        fixture.setup_manifest_files().await;
1801
1802        // Filter: a IN ("Sioux", "Iceberg")
1803        let mut builder = fixture.table.scan();
1804        let predicate =
1805            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1806        builder = builder
1807            .with_filter(predicate)
1808            .with_row_selection_enabled(true);
1809        let table_scan = builder.build().unwrap();
1810
1811        let batch_stream = table_scan.to_arrow().await.unwrap();
1812
1813        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1814
1815        assert_eq!(batches[0].num_rows(), 512);
1816
1817        let col = batches[0].column_by_name("a").unwrap();
1818        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1819        assert_eq!(string_arr.value(0), "Iceberg");
1820    }
1821
1822    #[tokio::test]
1823    async fn test_filter_on_arrow_not_in() {
1824        let mut fixture = TableTestFixture::new();
1825        fixture.setup_manifest_files().await;
1826
1827        // Filter: a NOT IN ("Sioux", "Iceberg")
1828        let mut builder = fixture.table.scan();
1829        let predicate =
1830            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1831        builder = builder
1832            .with_filter(predicate)
1833            .with_row_selection_enabled(true);
1834        let table_scan = builder.build().unwrap();
1835
1836        let batch_stream = table_scan.to_arrow().await.unwrap();
1837
1838        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1839
1840        assert_eq!(batches[0].num_rows(), 512);
1841
1842        let col = batches[0].column_by_name("a").unwrap();
1843        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1844        assert_eq!(string_arr.value(0), "Apache");
1845    }
1846
1847    #[test]
1848    fn test_file_scan_task_serialize_deserialize() {
1849        let test_fn = |task: FileScanTask| {
1850            let serialized = serde_json::to_string(&task).unwrap();
1851            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1852
1853            assert_eq!(task.data_file_path, deserialized.data_file_path);
1854            assert_eq!(task.start, deserialized.start);
1855            assert_eq!(task.length, deserialized.length);
1856            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1857            assert_eq!(task.predicate, deserialized.predicate);
1858            assert_eq!(task.schema, deserialized.schema);
1859        };
1860
1861        // without predicate
1862        let schema = Arc::new(
1863            Schema::builder()
1864                .with_fields(vec![Arc::new(NestedField::required(
1865                    1,
1866                    "x",
1867                    Type::Primitive(PrimitiveType::Binary),
1868                ))])
1869                .build()
1870                .unwrap(),
1871        );
1872        let task = FileScanTask::builder()
1873            .with_data_file_path("data_file_path".to_string())
1874            .with_file_size_in_bytes(0)
1875            .with_start(0)
1876            .with_length(100)
1877            .with_project_field_ids(vec![1, 2, 3])
1878            .with_schema(schema.clone())
1879            .with_record_count(Some(100))
1880            .with_data_file_format(DataFileFormat::Parquet)
1881            .with_case_sensitive(false)
1882            .build();
1883        test_fn(task);
1884
1885        // with predicate
1886        let task = FileScanTask::builder()
1887            .with_data_file_path("data_file_path".to_string())
1888            .with_file_size_in_bytes(0)
1889            .with_start(0)
1890            .with_length(100)
1891            .with_project_field_ids(vec![1, 2, 3])
1892            .with_predicate(Some(BoundPredicate::AlwaysTrue))
1893            .with_schema(schema)
1894            .with_data_file_format(DataFileFormat::Avro)
1895            .with_case_sensitive(false)
1896            .build();
1897        test_fn(task);
1898    }
1899
1900    #[tokio::test]
1901    async fn test_select_with_file_column() {
1902        use arrow_array::cast::AsArray;
1903
1904        let mut fixture = TableTestFixture::new();
1905        fixture.setup_manifest_files().await;
1906
1907        // Select regular columns plus the _file column
1908        let table_scan = fixture
1909            .table
1910            .scan()
1911            .select(["x", RESERVED_COL_NAME_FILE])
1912            .with_row_selection_enabled(true)
1913            .build()
1914            .unwrap();
1915
1916        let batch_stream = table_scan.to_arrow().await.unwrap();
1917        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1918
1919        // Verify we have 2 columns: x and _file
1920        assert_eq!(batches[0].num_columns(), 2);
1921
1922        // Verify the x column exists and has correct data
1923        let x_col = batches[0].column_by_name("x").unwrap();
1924        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1925        assert_eq!(x_arr.value(0), 1);
1926
1927        // Verify the _file column exists
1928        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1929        assert!(
1930            file_col.is_some(),
1931            "_file column should be present in the batch"
1932        );
1933
1934        // Verify the _file column contains a file path
1935        let file_col = file_col.unwrap();
1936        assert!(
1937            matches!(
1938                file_col.data_type(),
1939                arrow_schema::DataType::RunEndEncoded(_, _)
1940            ),
1941            "_file column should use RunEndEncoded type"
1942        );
1943
1944        // Decode the RunArray to verify it contains the file path
1945        let run_array = file_col
1946            .as_any()
1947            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1948            .expect("_file column should be a RunArray");
1949
1950        let values = run_array.values();
1951        let string_values = values.as_string::<i32>();
1952        assert_eq!(string_values.len(), 1, "Should have a single file path");
1953
1954        let file_path = string_values.value(0);
1955        assert!(
1956            file_path.ends_with(".parquet"),
1957            "File path should end with .parquet, got: {file_path}"
1958        );
1959    }
1960
1961    #[tokio::test]
1962    async fn test_select_file_column_position() {
1963        let mut fixture = TableTestFixture::new();
1964        fixture.setup_manifest_files().await;
1965
1966        // Select columns in specific order: x, _file, z
1967        let table_scan = fixture
1968            .table
1969            .scan()
1970            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1971            .with_row_selection_enabled(true)
1972            .build()
1973            .unwrap();
1974
1975        let batch_stream = table_scan.to_arrow().await.unwrap();
1976        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1977
1978        assert_eq!(batches[0].num_columns(), 3);
1979
1980        // Verify column order: x at position 0, _file at position 1, z at position 2
1981        let schema = batches[0].schema();
1982        assert_eq!(schema.field(0).name(), "x");
1983        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1984        assert_eq!(schema.field(2).name(), "z");
1985
1986        // Verify columns by name also works
1987        assert!(batches[0].column_by_name("x").is_some());
1988        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1989        assert!(batches[0].column_by_name("z").is_some());
1990    }
1991
1992    #[tokio::test]
1993    async fn test_select_file_column_only() {
1994        let mut fixture = TableTestFixture::new();
1995        fixture.setup_manifest_files().await;
1996
1997        // Select only the _file column
1998        let table_scan = fixture
1999            .table
2000            .scan()
2001            .select([RESERVED_COL_NAME_FILE])
2002            .with_row_selection_enabled(true)
2003            .build()
2004            .unwrap();
2005
2006        let batch_stream = table_scan.to_arrow().await.unwrap();
2007        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2008
2009        // Should have exactly 1 column
2010        assert_eq!(batches[0].num_columns(), 1);
2011
2012        // Verify it's the _file column
2013        let schema = batches[0].schema();
2014        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2015
2016        // Verify the batch has the correct number of rows
2017        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
2018        // Each file has 1024 rows, so total is 2048 rows
2019        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2020        assert_eq!(total_rows, 2048);
2021    }
2022
2023    #[tokio::test]
2024    async fn test_file_column_with_multiple_files() {
2025        use std::collections::HashSet;
2026
2027        let mut fixture = TableTestFixture::new();
2028        fixture.setup_manifest_files().await;
2029
2030        // Select x and _file columns
2031        let table_scan = fixture
2032            .table
2033            .scan()
2034            .select(["x", RESERVED_COL_NAME_FILE])
2035            .with_row_selection_enabled(true)
2036            .build()
2037            .unwrap();
2038
2039        let batch_stream = table_scan.to_arrow().await.unwrap();
2040        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2041
2042        // Collect all unique file paths from the batches
2043        let mut file_paths = HashSet::new();
2044        for batch in &batches {
2045            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2046            let run_array = file_col
2047                .as_any()
2048                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2049                .expect("_file column should be a RunArray");
2050
2051            let values = run_array.values();
2052            let string_values = values.as_string::<i32>();
2053            for i in 0..string_values.len() {
2054                file_paths.insert(string_values.value(i).to_string());
2055            }
2056        }
2057
2058        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2059        assert!(!file_paths.is_empty(), "Should have at least one file path");
2060
2061        // All paths should end with .parquet
2062        for path in &file_paths {
2063            assert!(
2064                path.ends_with(".parquet"),
2065                "All file paths should end with .parquet, got: {path}"
2066            );
2067        }
2068    }
2069
2070    #[tokio::test]
2071    async fn test_file_column_at_start() {
2072        let mut fixture = TableTestFixture::new();
2073        fixture.setup_manifest_files().await;
2074
2075        // Select _file at the start
2076        let table_scan = fixture
2077            .table
2078            .scan()
2079            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2080            .with_row_selection_enabled(true)
2081            .build()
2082            .unwrap();
2083
2084        let batch_stream = table_scan.to_arrow().await.unwrap();
2085        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2086
2087        assert_eq!(batches[0].num_columns(), 3);
2088
2089        // Verify _file is at position 0
2090        let schema = batches[0].schema();
2091        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2092        assert_eq!(schema.field(1).name(), "x");
2093        assert_eq!(schema.field(2).name(), "y");
2094    }
2095
2096    #[tokio::test]
2097    async fn test_file_column_at_end() {
2098        let mut fixture = TableTestFixture::new();
2099        fixture.setup_manifest_files().await;
2100
2101        // Select _file at the end
2102        let table_scan = fixture
2103            .table
2104            .scan()
2105            .select(["x", "y", RESERVED_COL_NAME_FILE])
2106            .with_row_selection_enabled(true)
2107            .build()
2108            .unwrap();
2109
2110        let batch_stream = table_scan.to_arrow().await.unwrap();
2111        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2112
2113        assert_eq!(batches[0].num_columns(), 3);
2114
2115        // Verify _file is at position 2 (the end)
2116        let schema = batches[0].schema();
2117        assert_eq!(schema.field(0).name(), "x");
2118        assert_eq!(schema.field(1).name(), "y");
2119        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2120    }
2121
2122    #[tokio::test]
2123    async fn test_select_with_repeated_column_names() {
2124        let mut fixture = TableTestFixture::new();
2125        fixture.setup_manifest_files().await;
2126
2127        // Select with repeated column names - both regular columns and virtual columns
2128        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2129        let table_scan = fixture
2130            .table
2131            .scan()
2132            .select([
2133                "x",
2134                RESERVED_COL_NAME_FILE,
2135                "x", // x repeated
2136                "y",
2137                RESERVED_COL_NAME_FILE, // _file repeated
2138                "y",                    // y repeated
2139            ])
2140            .with_row_selection_enabled(true)
2141            .build()
2142            .unwrap();
2143
2144        let batch_stream = table_scan.to_arrow().await.unwrap();
2145        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2146
2147        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2148        assert_eq!(
2149            batches[0].num_columns(),
2150            6,
2151            "Should have exactly 6 columns with duplicates"
2152        );
2153
2154        let schema = batches[0].schema();
2155
2156        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2157        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2158        assert_eq!(
2159            schema.field(1).name(),
2160            RESERVED_COL_NAME_FILE,
2161            "Column 1 should be _file"
2162        );
2163        assert_eq!(
2164            schema.field(2).name(),
2165            "x",
2166            "Column 2 should be x (duplicate)"
2167        );
2168        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2169        assert_eq!(
2170            schema.field(4).name(),
2171            RESERVED_COL_NAME_FILE,
2172            "Column 4 should be _file (duplicate)"
2173        );
2174        assert_eq!(
2175            schema.field(5).name(),
2176            "y",
2177            "Column 5 should be y (duplicate)"
2178        );
2179
2180        // Verify all columns have correct data types
2181        assert!(
2182            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2183            "Column x should be Int64"
2184        );
2185        assert!(
2186            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2187            "Column x (duplicate) should be Int64"
2188        );
2189        assert!(
2190            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2191            "Column y should be Int64"
2192        );
2193        assert!(
2194            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2195            "Column y (duplicate) should be Int64"
2196        );
2197        assert!(
2198            matches!(
2199                schema.field(1).data_type(),
2200                arrow_schema::DataType::RunEndEncoded(_, _)
2201            ),
2202            "_file column should use RunEndEncoded type"
2203        );
2204        assert!(
2205            matches!(
2206                schema.field(4).data_type(),
2207                arrow_schema::DataType::RunEndEncoded(_, _)
2208            ),
2209            "_file column (duplicate) should use RunEndEncoded type"
2210        );
2211    }
2212
2213    #[tokio::test]
2214    async fn test_scan_deadlock() {
2215        let mut fixture = TableTestFixture::new();
2216        fixture.setup_deadlock_manifests().await;
2217
2218        // Create table scan with concurrency limit 1
2219        // This sets channel size to 1.
2220        // Data manifest has 10 entries -> will block producer.
2221        // Delete manifest is 2nd in list -> won't be processed.
2222        // Consumer 2 (Data) not started -> blocked.
2223        // Consumer 1 (Delete) waiting -> blocked.
2224        let table_scan = fixture
2225            .table
2226            .scan()
2227            .with_concurrency_limit(1)
2228            .build()
2229            .unwrap();
2230
2231        // This should timeout/hang if deadlock exists
2232        // We can use tokio::time::timeout
2233        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2234            table_scan
2235                .plan_files()
2236                .await
2237                .unwrap()
2238                .try_collect::<Vec<_>>()
2239                .await
2240        })
2241        .await;
2242
2243        // Assert it finished (didn't timeout)
2244        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2245    }
2246}