iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::util::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46/// A stream of arrow [`RecordBatch`]es.
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49/// Builder to create table scan.
50pub struct TableScanBuilder<'a> {
51    table: &'a Table,
52    // Defaults to none which means select all columns
53    column_names: Option<Vec<String>>,
54    snapshot_id: Option<i64>,
55    batch_size: Option<usize>,
56    case_sensitive: bool,
57    filter: Option<Predicate>,
58    concurrency_limit_data_files: usize,
59    concurrency_limit_manifest_entries: usize,
60    concurrency_limit_manifest_files: usize,
61    row_group_filtering_enabled: bool,
62    row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66    pub(crate) fn new(table: &'a Table) -> Self {
67        let num_cpus = available_parallelism().get();
68
69        Self {
70            table,
71            column_names: None,
72            snapshot_id: None,
73            batch_size: None,
74            case_sensitive: true,
75            filter: None,
76            concurrency_limit_data_files: num_cpus,
77            concurrency_limit_manifest_entries: num_cpus,
78            concurrency_limit_manifest_files: num_cpus,
79            row_group_filtering_enabled: true,
80            row_selection_enabled: false,
81        }
82    }
83
84    /// Sets the desired size of batches in the response
85    /// to something other than the default
86    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90
91    /// Sets the scan's case sensitivity
92    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93        self.case_sensitive = case_sensitive;
94        self
95    }
96
97    /// Specifies a predicate to use as a filter
98    pub fn with_filter(mut self, predicate: Predicate) -> Self {
99        // calls rewrite_not to remove Not nodes, which must be absent
100        // when applying the manifest evaluator
101        self.filter = Some(predicate.rewrite_not());
102        self
103    }
104
105    /// Select all columns.
106    pub fn select_all(mut self) -> Self {
107        self.column_names = None;
108        self
109    }
110
111    /// Select empty columns.
112    pub fn select_empty(mut self) -> Self {
113        self.column_names = Some(vec![]);
114        self
115    }
116
117    /// Select some columns of the table.
118    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119        self.column_names = Some(
120            column_names
121                .into_iter()
122                .map(|item| item.to_string())
123                .collect(),
124        );
125        self
126    }
127
128    /// Set the snapshot to scan. When not set, it uses current snapshot.
129    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130        self.snapshot_id = Some(snapshot_id);
131        self
132    }
133
134    /// Sets the concurrency limit for both manifest files and manifest
135    /// entries for this scan
136    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137        self.concurrency_limit_manifest_files = limit;
138        self.concurrency_limit_manifest_entries = limit;
139        self.concurrency_limit_data_files = limit;
140        self
141    }
142
143    /// Sets the data file concurrency limit for this scan
144    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145        self.concurrency_limit_data_files = limit;
146        self
147    }
148
149    /// Sets the manifest entry concurrency limit for this scan
150    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151        self.concurrency_limit_manifest_entries = limit;
152        self
153    }
154
155    /// Determines whether to enable row group filtering.
156    /// When enabled, if a read is performed with a filter predicate,
157    /// then the metadata for each row group in the parquet file is
158    /// evaluated against the filter predicate and row groups
159    /// that cant contain matching rows will be skipped entirely.
160    ///
161    /// Defaults to enabled, as it generally improves performance or
162    /// keeps it the same, with performance degradation unlikely.
163    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164        self.row_group_filtering_enabled = row_group_filtering_enabled;
165        self
166    }
167
168    /// Determines whether to enable row selection.
169    /// When enabled, if a read is performed with a filter predicate,
170    /// then (for row groups that have not been skipped) the page index
171    /// for each row group in a parquet file is parsed and evaluated
172    /// against the filter predicate to determine if ranges of rows
173    /// within a row group can be skipped, based upon the page-level
174    /// statistics for each column.
175    ///
176    /// Defaults to being disabled. Enabling requires parsing the parquet page
177    /// index, which can be slow enough that parsing the page index outweighs any
178    /// gains from the reduced number of rows that need scanning.
179    /// It is recommended to experiment with partitioning, sorting, row group size,
180    /// page size, and page row limit Iceberg settings on the table being scanned in
181    /// order to get the best performance from using row selection.
182    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183        self.row_selection_enabled = row_selection_enabled;
184        self
185    }
186
187    /// Build the table scan.
188    pub fn build(self) -> Result<TableScan> {
189        let snapshot = match self.snapshot_id {
190            Some(snapshot_id) => self
191                .table
192                .metadata()
193                .snapshot_by_id(snapshot_id)
194                .ok_or_else(|| {
195                    Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("Snapshot with id {snapshot_id} not found"),
198                    )
199                })?
200                .clone(),
201            None => {
202                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203                    return Ok(TableScan {
204                        batch_size: self.batch_size,
205                        column_names: self.column_names,
206                        file_io: self.table.file_io().clone(),
207                        plan_context: None,
208                        concurrency_limit_data_files: self.concurrency_limit_data_files,
209                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211                        row_group_filtering_enabled: self.row_group_filtering_enabled,
212                        row_selection_enabled: self.row_selection_enabled,
213                    });
214                };
215                current_snapshot_id.clone()
216            }
217        };
218
219        let schema = snapshot.schema(self.table.metadata())?;
220
221        // Check that all column names exist in the schema (skip reserved columns).
222        if let Some(column_names) = self.column_names.as_ref() {
223            for column_name in column_names {
224                // Skip reserved columns that don't exist in the schema
225                if is_metadata_column_name(column_name) {
226                    continue;
227                }
228                if schema.field_by_name(column_name).is_none() {
229                    return Err(Error::new(
230                        ErrorKind::DataInvalid,
231                        format!("Column {column_name} not found in table. Schema: {schema}"),
232                    ));
233                }
234            }
235        }
236
237        let mut field_ids = vec![];
238        let column_names = self.column_names.clone().unwrap_or_else(|| {
239            schema
240                .as_struct()
241                .fields()
242                .iter()
243                .map(|f| f.name.clone())
244                .collect()
245        });
246
247        for column_name in column_names.iter() {
248            // Handle metadata columns (like "_file")
249            if is_metadata_column_name(column_name) {
250                field_ids.push(get_metadata_field_id(column_name)?);
251                continue;
252            }
253
254            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255                Error::new(
256                    ErrorKind::DataInvalid,
257                    format!("Column {column_name} not found in table. Schema: {schema}"),
258                )
259            })?;
260
261            schema
262                .as_struct()
263                .field_by_id(field_id)
264                .ok_or_else(|| {
265                    Error::new(
266                        ErrorKind::FeatureUnsupported,
267                        format!(
268                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269                    ),
270                )
271            })?;
272
273            field_ids.push(field_id);
274        }
275
276        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277            Some(predicates.bind(schema.clone(), true)?)
278        } else {
279            None
280        };
281
282        let plan_context = PlanContext {
283            snapshot,
284            table_metadata: self.table.metadata_ref(),
285            snapshot_schema: schema,
286            case_sensitive: self.case_sensitive,
287            predicate: self.filter.map(Arc::new),
288            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289            object_cache: self.table.object_cache(),
290            field_ids: Arc::new(field_ids),
291            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294        };
295
296        Ok(TableScan {
297            batch_size: self.batch_size,
298            column_names: self.column_names,
299            file_io: self.table.file_io().clone(),
300            plan_context: Some(plan_context),
301            concurrency_limit_data_files: self.concurrency_limit_data_files,
302            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304            row_group_filtering_enabled: self.row_group_filtering_enabled,
305            row_selection_enabled: self.row_selection_enabled,
306        })
307    }
308}
309
310/// Table scan.
311#[derive(Debug)]
312pub struct TableScan {
313    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
314    ///
315    /// If this is None, then the scan contains no rows.
316    plan_context: Option<PlanContext>,
317    batch_size: Option<usize>,
318    file_io: FileIO,
319    column_names: Option<Vec<String>>,
320    /// The maximum number of manifest files that will be
321    /// retrieved from [`FileIO`] concurrently
322    concurrency_limit_manifest_files: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_manifest_entries: usize,
327
328    /// The maximum number of [`ManifestEntry`]s that will
329    /// be processed in parallel
330    concurrency_limit_data_files: usize,
331
332    row_group_filtering_enabled: bool,
333    row_selection_enabled: bool,
334}
335
336impl TableScan {
337    /// Returns a stream of [`FileScanTask`]s.
338    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339        let Some(plan_context) = self.plan_context.as_ref() else {
340            return Ok(Box::pin(futures::stream::empty()));
341        };
342
343        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346        // used to stream ManifestEntryContexts between stages of the file plan operation
347        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348            channel(concurrency_limit_manifest_files);
349        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350            channel(concurrency_limit_manifest_files);
351
352        // used to stream the results back to the caller
353        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357        let manifest_list = plan_context.get_manifest_list().await?;
358
359        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
360        // whose partitions cannot match this
361        // scan's filter
362        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363            manifest_list,
364            manifest_entry_data_ctx_tx,
365            delete_file_idx.clone(),
366            manifest_entry_delete_ctx_tx,
367        )?;
368
369        let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
372        spawn(async move {
373            let result = futures::stream::iter(manifest_file_contexts)
374                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375                    ctx.fetch_manifest_and_stream_manifest_entries().await
376                })
377                .await;
378
379            if let Err(error) = result {
380                let _ = channel_for_manifest_error.send(Err(error)).await;
381            }
382        });
383
384        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387        // Process the delete file [`ManifestEntry`] stream in parallel
388        spawn(async move {
389            let result = manifest_entry_delete_ctx_rx
390                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391                .try_for_each_concurrent(
392                    concurrency_limit_manifest_entries,
393                    |(manifest_entry_context, tx)| async move {
394                        spawn(async move {
395                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396                        })
397                        .await
398                    },
399                )
400                .await;
401
402            if let Err(error) = result {
403                let _ = channel_for_delete_manifest_entry_error
404                    .send(Err(error))
405                    .await;
406            }
407        })
408        .await;
409
410        // Process the data file [`ManifestEntry`] stream in parallel
411        spawn(async move {
412            let result = manifest_entry_data_ctx_rx
413                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414                .try_for_each_concurrent(
415                    concurrency_limit_manifest_entries,
416                    |(manifest_entry_context, tx)| async move {
417                        spawn(async move {
418                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
419                        })
420                        .await
421                    },
422                )
423                .await;
424
425            if let Err(error) = result {
426                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427            }
428        });
429
430        Ok(file_scan_task_rx.boxed())
431    }
432
433    /// Returns an [`ArrowRecordBatchStream`].
434    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438            .with_row_selection_enabled(self.row_selection_enabled);
439
440        if let Some(batch_size) = self.batch_size {
441            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442        }
443
444        arrow_reader_builder.build().read(self.plan_files().await?)
445    }
446
447    /// Returns a reference to the column names of the table scan.
448    pub fn column_names(&self) -> Option<&[String]> {
449        self.column_names.as_deref()
450    }
451
452    /// Returns a reference to the snapshot of the table scan.
453    pub fn snapshot(&self) -> Option<&SnapshotRef> {
454        self.plan_context.as_ref().map(|x| &x.snapshot)
455    }
456
457    async fn process_data_manifest_entry(
458        manifest_entry_context: ManifestEntryContext,
459        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460    ) -> Result<()> {
461        // skip processing this manifest entry if it has been marked as deleted
462        if !manifest_entry_context.manifest_entry.is_alive() {
463            return Ok(());
464        }
465
466        // abort the plan if we encounter a manifest entry for a delete file
467        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468            return Err(Error::new(
469                ErrorKind::FeatureUnsupported,
470                "Encountered an entry for a delete file in a data file manifest",
471            ));
472        }
473
474        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475            let BoundPredicates {
476                snapshot_bound_predicate,
477                partition_bound_predicate,
478            } = bound_predicates.as_ref();
479
480            let expression_evaluator_cache =
481                manifest_entry_context.expression_evaluator_cache.as_ref();
482
483            let expression_evaluator = expression_evaluator_cache.get(
484                manifest_entry_context.partition_spec_id,
485                partition_bound_predicate,
486            )?;
487
488            // skip any data file whose partition data indicates that it can't contain
489            // any data that matches this scan's filter
490            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491                return Ok(());
492            }
493
494            // skip any data file whose metrics don't match this scan's filter
495            if !InclusiveMetricsEvaluator::eval(
496                snapshot_bound_predicate,
497                manifest_entry_context.manifest_entry.data_file(),
498                false,
499            )? {
500                return Ok(());
501            }
502        }
503
504        // congratulations! the manifest entry has made its way through the
505        // entire plan without getting filtered out. Create a corresponding
506        // FileScanTask and push it to the result stream
507        file_scan_task_tx
508            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509            .await?;
510
511        Ok(())
512    }
513
514    async fn process_delete_manifest_entry(
515        manifest_entry_context: ManifestEntryContext,
516        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517    ) -> Result<()> {
518        // skip processing this manifest entry if it has been marked as deleted
519        if !manifest_entry_context.manifest_entry.is_alive() {
520            return Ok(());
521        }
522
523        // abort the plan if we encounter a manifest entry that is not for a delete file
524        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525            return Err(Error::new(
526                ErrorKind::FeatureUnsupported,
527                "Encountered an entry for a data file in a delete manifest",
528            ));
529        }
530
531        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532            let expression_evaluator_cache =
533                manifest_entry_context.expression_evaluator_cache.as_ref();
534
535            let expression_evaluator = expression_evaluator_cache.get(
536                manifest_entry_context.partition_spec_id,
537                &bound_predicates.partition_bound_predicate,
538            )?;
539
540            // skip any data file whose partition data indicates that it can't contain
541            // any data that matches this scan's filter
542            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543                return Ok(());
544            }
545        }
546
547        delete_file_ctx_tx
548            .send(DeleteFileContext {
549                manifest_entry: manifest_entry_context.manifest_entry.clone(),
550                partition_spec_id: manifest_entry_context.partition_spec_id,
551            })
552            .await?;
553
554        Ok(())
555    }
556}
557
558pub(crate) struct BoundPredicates {
559    partition_bound_predicate: BoundPredicate,
560    snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565    //! shared tests for the table scan API
566    #![allow(missing_docs)]
567
568    use std::collections::HashMap;
569    use std::fs;
570    use std::fs::File;
571    use std::sync::Arc;
572
573    use arrow_array::cast::AsArray;
574    use arrow_array::{
575        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576        StringArray,
577    };
578    use futures::{TryStreamExt, stream};
579    use minijinja::value::Value;
580    use minijinja::{AutoEscape, Environment, context};
581    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582    use parquet::basic::Compression;
583    use parquet::file::properties::WriterProperties;
584    use tempfile::TempDir;
585    use uuid::Uuid;
586
587    use crate::TableIdent;
588    use crate::arrow::ArrowReaderBuilder;
589    use crate::expr::{BoundPredicate, Reference};
590    use crate::io::{FileIO, OutputFile};
591    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592    use crate::scan::FileScanTask;
593    use crate::spec::{
594        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597    };
598    use crate::table::Table;
599
600    fn render_template(template: &str, ctx: Value) -> String {
601        let mut env = Environment::new();
602        env.set_auto_escape_callback(|_| AutoEscape::None);
603        env.render_str(template, ctx).unwrap()
604    }
605
606    pub struct TableTestFixture {
607        pub table_location: String,
608        pub table: Table,
609    }
610
611    impl TableTestFixture {
612        #[allow(clippy::new_without_default)]
613        pub fn new() -> Self {
614            let tmp_dir = TempDir::new().unwrap();
615            let table_location = tmp_dir.path().join("table1");
616            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618            let table_metadata1_location = table_location.join("metadata/v1.json");
619
620            let file_io = FileIO::new_with_fs();
621
622            let table_metadata = {
623                let template_json_str = fs::read_to_string(format!(
624                    "{}/testdata/example_table_metadata_v2.json",
625                    env!("CARGO_MANIFEST_DIR")
626                ))
627                .unwrap();
628                let metadata_json = render_template(&template_json_str, context! {
629                    table_location => &table_location,
630                    manifest_list_1_location => &manifest_list1_location,
631                    manifest_list_2_location => &manifest_list2_location,
632                    table_metadata_1_location => &table_metadata1_location,
633                });
634                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
635            };
636
637            let table = Table::builder()
638                .metadata(table_metadata)
639                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
640                .file_io(file_io.clone())
641                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
642                .build()
643                .unwrap();
644
645            Self {
646                table_location: table_location.to_str().unwrap().to_string(),
647                table,
648            }
649        }
650
651        #[allow(clippy::new_without_default)]
652        pub fn new_empty() -> Self {
653            let tmp_dir = TempDir::new().unwrap();
654            let table_location = tmp_dir.path().join("table1");
655            let table_metadata1_location = table_location.join("metadata/v1.json");
656
657            let file_io = FileIO::new_with_fs();
658
659            let table_metadata = {
660                let template_json_str = fs::read_to_string(format!(
661                    "{}/testdata/example_empty_table_metadata_v2.json",
662                    env!("CARGO_MANIFEST_DIR")
663                ))
664                .unwrap();
665                let metadata_json = render_template(&template_json_str, context! {
666                    table_location => &table_location,
667                    table_metadata_1_location => &table_metadata1_location,
668                });
669                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670            };
671
672            let table = Table::builder()
673                .metadata(table_metadata)
674                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675                .file_io(file_io.clone())
676                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677                .build()
678                .unwrap();
679
680            Self {
681                table_location: table_location.to_str().unwrap().to_string(),
682                table,
683            }
684        }
685
686        /// Creates a fixture with 5 snapshots chained as:
687        ///   S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
688        /// Useful for testing snapshot history traversal.
689        pub fn new_with_deep_history() -> Self {
690            let tmp_dir = TempDir::new().unwrap();
691            let table_location = tmp_dir.path().join("table1");
692            let table_metadata1_location = table_location.join("metadata/v1.json");
693
694            let file_io = FileIO::new_with_fs();
695
696            let table_metadata = {
697                let json_str = fs::read_to_string(format!(
698                    "{}/testdata/example_table_metadata_v2_deep_history.json",
699                    env!("CARGO_MANIFEST_DIR")
700                ))
701                .unwrap();
702                serde_json::from_str::<TableMetadata>(&json_str).unwrap()
703            };
704
705            let table = Table::builder()
706                .metadata(table_metadata)
707                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
708                .file_io(file_io.clone())
709                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
710                .build()
711                .unwrap();
712
713            Self {
714                table_location: table_location.to_str().unwrap().to_string(),
715                table,
716            }
717        }
718
719        pub fn new_unpartitioned() -> Self {
720            let tmp_dir = TempDir::new().unwrap();
721            let table_location = tmp_dir.path().join("table1");
722            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
723            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
724            let table_metadata1_location = table_location.join("metadata/v1.json");
725
726            let file_io = FileIO::new_with_fs();
727
728            let mut table_metadata = {
729                let template_json_str = fs::read_to_string(format!(
730                    "{}/testdata/example_table_metadata_v2.json",
731                    env!("CARGO_MANIFEST_DIR")
732                ))
733                .unwrap();
734                let metadata_json = render_template(&template_json_str, context! {
735                    table_location => &table_location,
736                    manifest_list_1_location => &manifest_list1_location,
737                    manifest_list_2_location => &manifest_list2_location,
738                    table_metadata_1_location => &table_metadata1_location,
739                });
740                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
741            };
742
743            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
744            table_metadata.partition_specs.clear();
745            table_metadata.default_partition_type = StructType::new(vec![]);
746            table_metadata
747                .partition_specs
748                .insert(0, table_metadata.default_spec.clone());
749
750            let table = Table::builder()
751                .metadata(table_metadata)
752                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
753                .file_io(file_io.clone())
754                .metadata_location(table_metadata1_location.to_str().unwrap())
755                .build()
756                .unwrap();
757
758            Self {
759                table_location: table_location.to_str().unwrap().to_string(),
760                table,
761            }
762        }
763
764        fn next_manifest_file(&self) -> OutputFile {
765            self.table
766                .file_io()
767                .new_output(format!(
768                    "{}/metadata/manifest_{}.avro",
769                    self.table_location,
770                    Uuid::new_v4()
771                ))
772                .unwrap()
773        }
774
775        pub async fn setup_manifest_files(&mut self) {
776            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
777            let parent_snapshot = current_snapshot
778                .parent_snapshot(self.table.metadata())
779                .unwrap();
780            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
781            let current_partition_spec = self.table.metadata().default_partition_spec();
782
783            // Write the data files first, then use the file size in the manifest entries
784            let parquet_file_size = self.write_parquet_data_files();
785
786            let mut writer = ManifestWriterBuilder::new(
787                self.next_manifest_file(),
788                Some(current_snapshot.snapshot_id()),
789                None,
790                current_schema.clone(),
791                current_partition_spec.as_ref().clone(),
792            )
793            .build_v2_data();
794            writer
795                .add_entry(
796                    ManifestEntry::builder()
797                        .status(ManifestStatus::Added)
798                        .data_file(
799                            DataFileBuilder::default()
800                                .partition_spec_id(0)
801                                .content(DataContentType::Data)
802                                .file_path(format!("{}/1.parquet", &self.table_location))
803                                .file_format(DataFileFormat::Parquet)
804                                .file_size_in_bytes(parquet_file_size)
805                                .record_count(1)
806                                .partition(Struct::from_iter([Some(Literal::long(100))]))
807                                .key_metadata(None)
808                                .build()
809                                .unwrap(),
810                        )
811                        .build(),
812                )
813                .unwrap();
814            writer
815                .add_delete_entry(
816                    ManifestEntry::builder()
817                        .status(ManifestStatus::Deleted)
818                        .snapshot_id(parent_snapshot.snapshot_id())
819                        .sequence_number(parent_snapshot.sequence_number())
820                        .file_sequence_number(parent_snapshot.sequence_number())
821                        .data_file(
822                            DataFileBuilder::default()
823                                .partition_spec_id(0)
824                                .content(DataContentType::Data)
825                                .file_path(format!("{}/2.parquet", &self.table_location))
826                                .file_format(DataFileFormat::Parquet)
827                                .file_size_in_bytes(parquet_file_size)
828                                .record_count(1)
829                                .partition(Struct::from_iter([Some(Literal::long(200))]))
830                                .build()
831                                .unwrap(),
832                        )
833                        .build(),
834                )
835                .unwrap();
836            writer
837                .add_existing_entry(
838                    ManifestEntry::builder()
839                        .status(ManifestStatus::Existing)
840                        .snapshot_id(parent_snapshot.snapshot_id())
841                        .sequence_number(parent_snapshot.sequence_number())
842                        .file_sequence_number(parent_snapshot.sequence_number())
843                        .data_file(
844                            DataFileBuilder::default()
845                                .partition_spec_id(0)
846                                .content(DataContentType::Data)
847                                .file_path(format!("{}/3.parquet", &self.table_location))
848                                .file_format(DataFileFormat::Parquet)
849                                .file_size_in_bytes(parquet_file_size)
850                                .record_count(1)
851                                .partition(Struct::from_iter([Some(Literal::long(300))]))
852                                .build()
853                                .unwrap(),
854                        )
855                        .build(),
856                )
857                .unwrap();
858            let data_file_manifest = writer.write_manifest_file().await.unwrap();
859
860            // Write to manifest list
861            let mut manifest_list_write = ManifestListWriter::v2(
862                self.table
863                    .file_io()
864                    .new_output(current_snapshot.manifest_list())
865                    .unwrap(),
866                current_snapshot.snapshot_id(),
867                current_snapshot.parent_snapshot_id(),
868                current_snapshot.sequence_number(),
869            );
870            manifest_list_write
871                .add_manifests(vec![data_file_manifest].into_iter())
872                .unwrap();
873            manifest_list_write.close().await.unwrap();
874        }
875
876        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
877        /// and returns the file size in bytes.
878        fn write_parquet_data_files(&self) -> u64 {
879            std::fs::create_dir_all(&self.table_location).unwrap();
880
881            let schema = {
882                let fields = vec![
883                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
884                        .with_metadata(HashMap::from([(
885                            PARQUET_FIELD_ID_META_KEY.to_string(),
886                            "1".to_string(),
887                        )])),
888                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
889                        .with_metadata(HashMap::from([(
890                            PARQUET_FIELD_ID_META_KEY.to_string(),
891                            "2".to_string(),
892                        )])),
893                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
894                        .with_metadata(HashMap::from([(
895                            PARQUET_FIELD_ID_META_KEY.to_string(),
896                            "3".to_string(),
897                        )])),
898                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
899                        .with_metadata(HashMap::from([(
900                            PARQUET_FIELD_ID_META_KEY.to_string(),
901                            "4".to_string(),
902                        )])),
903                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
904                        .with_metadata(HashMap::from([(
905                            PARQUET_FIELD_ID_META_KEY.to_string(),
906                            "5".to_string(),
907                        )])),
908                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
909                        .with_metadata(HashMap::from([(
910                            PARQUET_FIELD_ID_META_KEY.to_string(),
911                            "6".to_string(),
912                        )])),
913                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
914                        .with_metadata(HashMap::from([(
915                            PARQUET_FIELD_ID_META_KEY.to_string(),
916                            "7".to_string(),
917                        )])),
918                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
919                        .with_metadata(HashMap::from([(
920                            PARQUET_FIELD_ID_META_KEY.to_string(),
921                            "8".to_string(),
922                        )])),
923                ];
924                Arc::new(arrow_schema::Schema::new(fields))
925            };
926            // x: [1, 1, 1, 1, ...]
927            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
928
929            let mut values = vec![2; 512];
930            values.append(vec![3; 200].as_mut());
931            values.append(vec![4; 300].as_mut());
932            values.append(vec![5; 12].as_mut());
933
934            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
935            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
936
937            let mut values = vec![3; 512];
938            values.append(vec![4; 512].as_mut());
939
940            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
941            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
942
943            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
944            let mut values = vec!["Apache"; 512];
945            values.append(vec!["Iceberg"; 512].as_mut());
946            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
947
948            // dbl:
949            let mut values = vec![100.0f64; 512];
950            values.append(vec![150.0f64; 12].as_mut());
951            values.append(vec![200.0f64; 500].as_mut());
952            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
953
954            // i32:
955            let mut values = vec![100i32; 512];
956            values.append(vec![150i32; 12].as_mut());
957            values.append(vec![200i32; 500].as_mut());
958            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
959
960            // i64:
961            let mut values = vec![100i64; 512];
962            values.append(vec![150i64; 12].as_mut());
963            values.append(vec![200i64; 500].as_mut());
964            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
965
966            // bool:
967            let mut values = vec![false; 512];
968            values.append(vec![true; 512].as_mut());
969            let values: BooleanArray = values.into();
970            let col8 = Arc::new(values) as ArrayRef;
971
972            let to_write = RecordBatch::try_new(schema.clone(), vec![
973                col1, col2, col3, col4, col5, col6, col7, col8,
974            ])
975            .unwrap();
976
977            // Write the Parquet files
978            let props = WriterProperties::builder()
979                .set_compression(Compression::SNAPPY)
980                .build();
981
982            for n in 1..=3 {
983                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
984                let mut writer =
985                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
986
987                writer.write(&to_write).expect("Writing batch");
988
989                // writer must be closed to write footer
990                writer.close().unwrap();
991            }
992
993            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
994                .unwrap()
995                .len()
996        }
997
998        pub async fn setup_unpartitioned_manifest_files(&mut self) {
999            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1000            let parent_snapshot = current_snapshot
1001                .parent_snapshot(self.table.metadata())
1002                .unwrap();
1003            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1004            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1005
1006            // Write the data files first, then use the file size in the manifest entries
1007            let parquet_file_size = self.write_parquet_data_files();
1008
1009            // Write data files using an empty partition for unpartitioned tables.
1010            let mut writer = ManifestWriterBuilder::new(
1011                self.next_manifest_file(),
1012                Some(current_snapshot.snapshot_id()),
1013                None,
1014                current_schema.clone(),
1015                current_partition_spec.as_ref().clone(),
1016            )
1017            .build_v2_data();
1018
1019            // Create an empty partition value.
1020            let empty_partition = Struct::empty();
1021
1022            writer
1023                .add_entry(
1024                    ManifestEntry::builder()
1025                        .status(ManifestStatus::Added)
1026                        .data_file(
1027                            DataFileBuilder::default()
1028                                .partition_spec_id(0)
1029                                .content(DataContentType::Data)
1030                                .file_path(format!("{}/1.parquet", &self.table_location))
1031                                .file_format(DataFileFormat::Parquet)
1032                                .file_size_in_bytes(parquet_file_size)
1033                                .record_count(1)
1034                                .partition(empty_partition.clone())
1035                                .key_metadata(None)
1036                                .build()
1037                                .unwrap(),
1038                        )
1039                        .build(),
1040                )
1041                .unwrap();
1042
1043            writer
1044                .add_delete_entry(
1045                    ManifestEntry::builder()
1046                        .status(ManifestStatus::Deleted)
1047                        .snapshot_id(parent_snapshot.snapshot_id())
1048                        .sequence_number(parent_snapshot.sequence_number())
1049                        .file_sequence_number(parent_snapshot.sequence_number())
1050                        .data_file(
1051                            DataFileBuilder::default()
1052                                .partition_spec_id(0)
1053                                .content(DataContentType::Data)
1054                                .file_path(format!("{}/2.parquet", &self.table_location))
1055                                .file_format(DataFileFormat::Parquet)
1056                                .file_size_in_bytes(parquet_file_size)
1057                                .record_count(1)
1058                                .partition(empty_partition.clone())
1059                                .build()
1060                                .unwrap(),
1061                        )
1062                        .build(),
1063                )
1064                .unwrap();
1065
1066            writer
1067                .add_existing_entry(
1068                    ManifestEntry::builder()
1069                        .status(ManifestStatus::Existing)
1070                        .snapshot_id(parent_snapshot.snapshot_id())
1071                        .sequence_number(parent_snapshot.sequence_number())
1072                        .file_sequence_number(parent_snapshot.sequence_number())
1073                        .data_file(
1074                            DataFileBuilder::default()
1075                                .partition_spec_id(0)
1076                                .content(DataContentType::Data)
1077                                .file_path(format!("{}/3.parquet", &self.table_location))
1078                                .file_format(DataFileFormat::Parquet)
1079                                .file_size_in_bytes(parquet_file_size)
1080                                .record_count(1)
1081                                .partition(empty_partition.clone())
1082                                .build()
1083                                .unwrap(),
1084                        )
1085                        .build(),
1086                )
1087                .unwrap();
1088
1089            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1090
1091            // Write to manifest list
1092            let mut manifest_list_write = ManifestListWriter::v2(
1093                self.table
1094                    .file_io()
1095                    .new_output(current_snapshot.manifest_list())
1096                    .unwrap(),
1097                current_snapshot.snapshot_id(),
1098                current_snapshot.parent_snapshot_id(),
1099                current_snapshot.sequence_number(),
1100            );
1101            manifest_list_write
1102                .add_manifests(vec![data_file_manifest].into_iter())
1103                .unwrap();
1104            manifest_list_write.close().await.unwrap();
1105        }
1106
1107        pub async fn setup_deadlock_manifests(&mut self) {
1108            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1109            let _parent_snapshot = current_snapshot
1110                .parent_snapshot(self.table.metadata())
1111                .unwrap();
1112            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1113            let current_partition_spec = self.table.metadata().default_partition_spec();
1114
1115            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1116            let mut writer = ManifestWriterBuilder::new(
1117                self.next_manifest_file(),
1118                Some(current_snapshot.snapshot_id()),
1119                None,
1120                current_schema.clone(),
1121                current_partition_spec.as_ref().clone(),
1122            )
1123            .build_v2_data();
1124
1125            // Add 10 data entries
1126            for i in 0..10 {
1127                writer
1128                    .add_entry(
1129                        ManifestEntry::builder()
1130                            .status(ManifestStatus::Added)
1131                            .data_file(
1132                                DataFileBuilder::default()
1133                                    .partition_spec_id(0)
1134                                    .content(DataContentType::Data)
1135                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1136                                    .file_format(DataFileFormat::Parquet)
1137                                    .file_size_in_bytes(100)
1138                                    .record_count(1)
1139                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1140                                    .key_metadata(None)
1141                                    .build()
1142                                    .unwrap(),
1143                            )
1144                            .build(),
1145                    )
1146                    .unwrap();
1147            }
1148            let data_manifest = writer.write_manifest_file().await.unwrap();
1149
1150            // 2. Write DELETE manifest
1151            let mut writer = ManifestWriterBuilder::new(
1152                self.next_manifest_file(),
1153                Some(current_snapshot.snapshot_id()),
1154                None,
1155                current_schema.clone(),
1156                current_partition_spec.as_ref().clone(),
1157            )
1158            .build_v2_deletes();
1159
1160            writer
1161                .add_entry(
1162                    ManifestEntry::builder()
1163                        .status(ManifestStatus::Added)
1164                        .data_file(
1165                            DataFileBuilder::default()
1166                                .partition_spec_id(0)
1167                                .content(DataContentType::PositionDeletes)
1168                                .file_path(format!("{}/del.parquet", &self.table_location))
1169                                .file_format(DataFileFormat::Parquet)
1170                                .file_size_in_bytes(100)
1171                                .record_count(1)
1172                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1173                                .build()
1174                                .unwrap(),
1175                        )
1176                        .build(),
1177                )
1178                .unwrap();
1179            let delete_manifest = writer.write_manifest_file().await.unwrap();
1180
1181            // Write to manifest list - DATA FIRST then DELETE
1182            // This order is crucial for reproduction
1183            let mut manifest_list_write = ManifestListWriter::v2(
1184                self.table
1185                    .file_io()
1186                    .new_output(current_snapshot.manifest_list())
1187                    .unwrap(),
1188                current_snapshot.snapshot_id(),
1189                current_snapshot.parent_snapshot_id(),
1190                current_snapshot.sequence_number(),
1191            );
1192            manifest_list_write
1193                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1194                .unwrap();
1195            manifest_list_write.close().await.unwrap();
1196        }
1197    }
1198
1199    #[test]
1200    fn test_table_scan_columns() {
1201        let table = TableTestFixture::new().table;
1202
1203        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1204        assert_eq!(
1205            Some(vec!["x".to_string(), "y".to_string()]),
1206            table_scan.column_names
1207        );
1208
1209        let table_scan = table
1210            .scan()
1211            .select(["x", "y"])
1212            .select(["z"])
1213            .build()
1214            .unwrap();
1215        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1216    }
1217
1218    #[test]
1219    fn test_select_all() {
1220        let table = TableTestFixture::new().table;
1221
1222        let table_scan = table.scan().select_all().build().unwrap();
1223        assert!(table_scan.column_names.is_none());
1224    }
1225
1226    #[test]
1227    fn test_select_no_exist_column() {
1228        let table = TableTestFixture::new().table;
1229
1230        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1231        assert!(table_scan.is_err());
1232    }
1233
1234    #[test]
1235    fn test_table_scan_default_snapshot_id() {
1236        let table = TableTestFixture::new().table;
1237
1238        let table_scan = table.scan().build().unwrap();
1239        assert_eq!(
1240            table.metadata().current_snapshot().unwrap().snapshot_id(),
1241            table_scan.snapshot().unwrap().snapshot_id()
1242        );
1243    }
1244
1245    #[test]
1246    fn test_table_scan_non_exist_snapshot_id() {
1247        let table = TableTestFixture::new().table;
1248
1249        let table_scan = table.scan().snapshot_id(1024).build();
1250        assert!(table_scan.is_err());
1251    }
1252
1253    #[test]
1254    fn test_table_scan_with_snapshot_id() {
1255        let table = TableTestFixture::new().table;
1256
1257        let table_scan = table
1258            .scan()
1259            .snapshot_id(3051729675574597004)
1260            .with_row_selection_enabled(true)
1261            .build()
1262            .unwrap();
1263        assert_eq!(
1264            table_scan.snapshot().unwrap().snapshot_id(),
1265            3051729675574597004
1266        );
1267    }
1268
1269    #[tokio::test]
1270    async fn test_plan_files_on_table_without_any_snapshots() {
1271        let table = TableTestFixture::new_empty().table;
1272        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1273        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1274        assert!(batches.is_empty());
1275    }
1276
1277    #[tokio::test]
1278    async fn test_plan_files_no_deletions() {
1279        let mut fixture = TableTestFixture::new();
1280        fixture.setup_manifest_files().await;
1281
1282        // Create table scan for current snapshot and plan files
1283        let table_scan = fixture
1284            .table
1285            .scan()
1286            .with_row_selection_enabled(true)
1287            .build()
1288            .unwrap();
1289
1290        let mut tasks = table_scan
1291            .plan_files()
1292            .await
1293            .unwrap()
1294            .try_fold(vec![], |mut acc, task| async move {
1295                acc.push(task);
1296                Ok(acc)
1297            })
1298            .await
1299            .unwrap();
1300
1301        assert_eq!(tasks.len(), 2);
1302
1303        tasks.sort_by_key(|t| t.data_file_path.to_string());
1304
1305        // Check first task is added data file
1306        assert_eq!(
1307            tasks[0].data_file_path,
1308            format!("{}/1.parquet", &fixture.table_location)
1309        );
1310
1311        // Check second task is existing data file
1312        assert_eq!(
1313            tasks[1].data_file_path,
1314            format!("{}/3.parquet", &fixture.table_location)
1315        );
1316    }
1317
1318    #[tokio::test]
1319    async fn test_open_parquet_no_deletions() {
1320        let mut fixture = TableTestFixture::new();
1321        fixture.setup_manifest_files().await;
1322
1323        // Create table scan for current snapshot and plan files
1324        let table_scan = fixture
1325            .table
1326            .scan()
1327            .with_row_selection_enabled(true)
1328            .build()
1329            .unwrap();
1330
1331        let batch_stream = table_scan.to_arrow().await.unwrap();
1332
1333        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1334
1335        let col = batches[0].column_by_name("x").unwrap();
1336
1337        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1338        assert_eq!(int64_arr.value(0), 1);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_open_parquet_no_deletions_by_separate_reader() {
1343        let mut fixture = TableTestFixture::new();
1344        fixture.setup_manifest_files().await;
1345
1346        // Create table scan for current snapshot and plan files
1347        let table_scan = fixture
1348            .table
1349            .scan()
1350            .with_row_selection_enabled(true)
1351            .build()
1352            .unwrap();
1353
1354        let mut plan_task: Vec<_> = table_scan
1355            .plan_files()
1356            .await
1357            .unwrap()
1358            .try_collect()
1359            .await
1360            .unwrap();
1361        assert_eq!(plan_task.len(), 2);
1362
1363        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1364        let batch_stream = reader
1365            .clone()
1366            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1367            .unwrap();
1368        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1369
1370        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1371        let batch_stream = reader
1372            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1373            .unwrap();
1374        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1375
1376        assert_eq!(batch_1, batch_2);
1377    }
1378
1379    #[tokio::test]
1380    async fn test_open_parquet_with_projection() {
1381        let mut fixture = TableTestFixture::new();
1382        fixture.setup_manifest_files().await;
1383
1384        // Create table scan for current snapshot and plan files
1385        let table_scan = fixture
1386            .table
1387            .scan()
1388            .select(["x", "z"])
1389            .with_row_selection_enabled(true)
1390            .build()
1391            .unwrap();
1392
1393        let batch_stream = table_scan.to_arrow().await.unwrap();
1394
1395        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1396
1397        assert_eq!(batches[0].num_columns(), 2);
1398
1399        let col1 = batches[0].column_by_name("x").unwrap();
1400        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1401        assert_eq!(int64_arr.value(0), 1);
1402
1403        let col2 = batches[0].column_by_name("z").unwrap();
1404        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1405        assert_eq!(int64_arr.value(0), 3);
1406
1407        // test empty scan
1408        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1409        let batch_stream = table_scan.to_arrow().await.unwrap();
1410        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1411
1412        assert_eq!(batches[0].num_columns(), 0);
1413        assert_eq!(batches[0].num_rows(), 1024);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_filter_on_arrow_lt() {
1418        let mut fixture = TableTestFixture::new();
1419        fixture.setup_manifest_files().await;
1420
1421        // Filter: y < 3
1422        let mut builder = fixture.table.scan();
1423        let predicate = Reference::new("y").less_than(Datum::long(3));
1424        builder = builder
1425            .with_filter(predicate)
1426            .with_row_selection_enabled(true);
1427        let table_scan = builder.build().unwrap();
1428
1429        let batch_stream = table_scan.to_arrow().await.unwrap();
1430
1431        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1432
1433        assert_eq!(batches[0].num_rows(), 512);
1434
1435        let col = batches[0].column_by_name("x").unwrap();
1436        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1437        assert_eq!(int64_arr.value(0), 1);
1438
1439        let col = batches[0].column_by_name("y").unwrap();
1440        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1441        assert_eq!(int64_arr.value(0), 2);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_filter_on_arrow_gt_eq() {
1446        let mut fixture = TableTestFixture::new();
1447        fixture.setup_manifest_files().await;
1448
1449        // Filter: y >= 5
1450        let mut builder = fixture.table.scan();
1451        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1452        builder = builder
1453            .with_filter(predicate)
1454            .with_row_selection_enabled(true);
1455        let table_scan = builder.build().unwrap();
1456
1457        let batch_stream = table_scan.to_arrow().await.unwrap();
1458
1459        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461        assert_eq!(batches[0].num_rows(), 12);
1462
1463        let col = batches[0].column_by_name("x").unwrap();
1464        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1465        assert_eq!(int64_arr.value(0), 1);
1466
1467        let col = batches[0].column_by_name("y").unwrap();
1468        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1469        assert_eq!(int64_arr.value(0), 5);
1470    }
1471
1472    #[tokio::test]
1473    async fn test_filter_double_eq() {
1474        let mut fixture = TableTestFixture::new();
1475        fixture.setup_manifest_files().await;
1476
1477        // Filter: dbl == 150.0
1478        let mut builder = fixture.table.scan();
1479        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1480        builder = builder
1481            .with_filter(predicate)
1482            .with_row_selection_enabled(true);
1483        let table_scan = builder.build().unwrap();
1484
1485        let batch_stream = table_scan.to_arrow().await.unwrap();
1486
1487        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1488
1489        assert_eq!(batches.len(), 2);
1490        assert_eq!(batches[0].num_rows(), 12);
1491
1492        let col = batches[0].column_by_name("dbl").unwrap();
1493        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1494        assert_eq!(f64_arr.value(1), 150.0f64);
1495    }
1496
1497    #[tokio::test]
1498    async fn test_filter_int_eq() {
1499        let mut fixture = TableTestFixture::new();
1500        fixture.setup_manifest_files().await;
1501
1502        // Filter: i32 == 150
1503        let mut builder = fixture.table.scan();
1504        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1505        builder = builder
1506            .with_filter(predicate)
1507            .with_row_selection_enabled(true);
1508        let table_scan = builder.build().unwrap();
1509
1510        let batch_stream = table_scan.to_arrow().await.unwrap();
1511
1512        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1513
1514        assert_eq!(batches.len(), 2);
1515        assert_eq!(batches[0].num_rows(), 12);
1516
1517        let col = batches[0].column_by_name("i32").unwrap();
1518        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1519        assert_eq!(i32_arr.value(1), 150i32);
1520    }
1521
1522    #[tokio::test]
1523    async fn test_filter_long_eq() {
1524        let mut fixture = TableTestFixture::new();
1525        fixture.setup_manifest_files().await;
1526
1527        // Filter: i64 == 150
1528        let mut builder = fixture.table.scan();
1529        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1530        builder = builder
1531            .with_filter(predicate)
1532            .with_row_selection_enabled(true);
1533        let table_scan = builder.build().unwrap();
1534
1535        let batch_stream = table_scan.to_arrow().await.unwrap();
1536
1537        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1538
1539        assert_eq!(batches.len(), 2);
1540        assert_eq!(batches[0].num_rows(), 12);
1541
1542        let col = batches[0].column_by_name("i64").unwrap();
1543        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1544        assert_eq!(i64_arr.value(1), 150i64);
1545    }
1546
1547    #[tokio::test]
1548    async fn test_filter_bool_eq() {
1549        let mut fixture = TableTestFixture::new();
1550        fixture.setup_manifest_files().await;
1551
1552        // Filter: bool == true
1553        let mut builder = fixture.table.scan();
1554        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1555        builder = builder
1556            .with_filter(predicate)
1557            .with_row_selection_enabled(true);
1558        let table_scan = builder.build().unwrap();
1559
1560        let batch_stream = table_scan.to_arrow().await.unwrap();
1561
1562        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1563
1564        assert_eq!(batches.len(), 2);
1565        assert_eq!(batches[0].num_rows(), 512);
1566
1567        let col = batches[0].column_by_name("bool").unwrap();
1568        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1569        assert!(bool_arr.value(1));
1570    }
1571
1572    #[tokio::test]
1573    async fn test_filter_on_arrow_is_null() {
1574        let mut fixture = TableTestFixture::new();
1575        fixture.setup_manifest_files().await;
1576
1577        // Filter: y is null
1578        let mut builder = fixture.table.scan();
1579        let predicate = Reference::new("y").is_null();
1580        builder = builder
1581            .with_filter(predicate)
1582            .with_row_selection_enabled(true);
1583        let table_scan = builder.build().unwrap();
1584
1585        let batch_stream = table_scan.to_arrow().await.unwrap();
1586
1587        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1588        assert_eq!(batches.len(), 0);
1589    }
1590
1591    #[tokio::test]
1592    async fn test_filter_on_arrow_is_not_null() {
1593        let mut fixture = TableTestFixture::new();
1594        fixture.setup_manifest_files().await;
1595
1596        // Filter: y is not null
1597        let mut builder = fixture.table.scan();
1598        let predicate = Reference::new("y").is_not_null();
1599        builder = builder
1600            .with_filter(predicate)
1601            .with_row_selection_enabled(true);
1602        let table_scan = builder.build().unwrap();
1603
1604        let batch_stream = table_scan.to_arrow().await.unwrap();
1605
1606        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1607        assert_eq!(batches[0].num_rows(), 1024);
1608    }
1609
1610    #[tokio::test]
1611    async fn test_filter_on_arrow_lt_and_gt() {
1612        let mut fixture = TableTestFixture::new();
1613        fixture.setup_manifest_files().await;
1614
1615        // Filter: y < 5 AND z >= 4
1616        let mut builder = fixture.table.scan();
1617        let predicate = Reference::new("y")
1618            .less_than(Datum::long(5))
1619            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1620        builder = builder
1621            .with_filter(predicate)
1622            .with_row_selection_enabled(true);
1623        let table_scan = builder.build().unwrap();
1624
1625        let batch_stream = table_scan.to_arrow().await.unwrap();
1626
1627        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1628        assert_eq!(batches[0].num_rows(), 500);
1629
1630        let col = batches[0].column_by_name("x").unwrap();
1631        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1632        assert_eq!(col, &expected_x);
1633
1634        let col = batches[0].column_by_name("y").unwrap();
1635        let mut values = vec![];
1636        values.append(vec![3; 200].as_mut());
1637        values.append(vec![4; 300].as_mut());
1638        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1639        assert_eq!(col, &expected_y);
1640
1641        let col = batches[0].column_by_name("z").unwrap();
1642        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1643        assert_eq!(col, &expected_z);
1644    }
1645
1646    #[tokio::test]
1647    async fn test_filter_on_arrow_lt_or_gt() {
1648        let mut fixture = TableTestFixture::new();
1649        fixture.setup_manifest_files().await;
1650
1651        // Filter: y < 5 AND z >= 4
1652        let mut builder = fixture.table.scan();
1653        let predicate = Reference::new("y")
1654            .less_than(Datum::long(5))
1655            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1656        builder = builder
1657            .with_filter(predicate)
1658            .with_row_selection_enabled(true);
1659        let table_scan = builder.build().unwrap();
1660
1661        let batch_stream = table_scan.to_arrow().await.unwrap();
1662
1663        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1664        assert_eq!(batches[0].num_rows(), 1024);
1665
1666        let col = batches[0].column_by_name("x").unwrap();
1667        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1668        assert_eq!(col, &expected_x);
1669
1670        let col = batches[0].column_by_name("y").unwrap();
1671        let mut values = vec![2; 512];
1672        values.append(vec![3; 200].as_mut());
1673        values.append(vec![4; 300].as_mut());
1674        values.append(vec![5; 12].as_mut());
1675        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1676        assert_eq!(col, &expected_y);
1677
1678        let col = batches[0].column_by_name("z").unwrap();
1679        let mut values = vec![3; 512];
1680        values.append(vec![4; 512].as_mut());
1681        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1682        assert_eq!(col, &expected_z);
1683    }
1684
1685    #[tokio::test]
1686    async fn test_filter_on_arrow_startswith() {
1687        let mut fixture = TableTestFixture::new();
1688        fixture.setup_manifest_files().await;
1689
1690        // Filter: a STARTSWITH "Ice"
1691        let mut builder = fixture.table.scan();
1692        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1693        builder = builder
1694            .with_filter(predicate)
1695            .with_row_selection_enabled(true);
1696        let table_scan = builder.build().unwrap();
1697
1698        let batch_stream = table_scan.to_arrow().await.unwrap();
1699
1700        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1701
1702        assert_eq!(batches[0].num_rows(), 512);
1703
1704        let col = batches[0].column_by_name("a").unwrap();
1705        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1706        assert_eq!(string_arr.value(0), "Iceberg");
1707    }
1708
1709    #[tokio::test]
1710    async fn test_filter_on_arrow_not_startswith() {
1711        let mut fixture = TableTestFixture::new();
1712        fixture.setup_manifest_files().await;
1713
1714        // Filter: a NOT STARTSWITH "Ice"
1715        let mut builder = fixture.table.scan();
1716        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1717        builder = builder
1718            .with_filter(predicate)
1719            .with_row_selection_enabled(true);
1720        let table_scan = builder.build().unwrap();
1721
1722        let batch_stream = table_scan.to_arrow().await.unwrap();
1723
1724        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1725
1726        assert_eq!(batches[0].num_rows(), 512);
1727
1728        let col = batches[0].column_by_name("a").unwrap();
1729        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1730        assert_eq!(string_arr.value(0), "Apache");
1731    }
1732
1733    #[tokio::test]
1734    async fn test_filter_on_arrow_in() {
1735        let mut fixture = TableTestFixture::new();
1736        fixture.setup_manifest_files().await;
1737
1738        // Filter: a IN ("Sioux", "Iceberg")
1739        let mut builder = fixture.table.scan();
1740        let predicate =
1741            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1742        builder = builder
1743            .with_filter(predicate)
1744            .with_row_selection_enabled(true);
1745        let table_scan = builder.build().unwrap();
1746
1747        let batch_stream = table_scan.to_arrow().await.unwrap();
1748
1749        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1750
1751        assert_eq!(batches[0].num_rows(), 512);
1752
1753        let col = batches[0].column_by_name("a").unwrap();
1754        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1755        assert_eq!(string_arr.value(0), "Iceberg");
1756    }
1757
1758    #[tokio::test]
1759    async fn test_filter_on_arrow_not_in() {
1760        let mut fixture = TableTestFixture::new();
1761        fixture.setup_manifest_files().await;
1762
1763        // Filter: a NOT IN ("Sioux", "Iceberg")
1764        let mut builder = fixture.table.scan();
1765        let predicate =
1766            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1767        builder = builder
1768            .with_filter(predicate)
1769            .with_row_selection_enabled(true);
1770        let table_scan = builder.build().unwrap();
1771
1772        let batch_stream = table_scan.to_arrow().await.unwrap();
1773
1774        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1775
1776        assert_eq!(batches[0].num_rows(), 512);
1777
1778        let col = batches[0].column_by_name("a").unwrap();
1779        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1780        assert_eq!(string_arr.value(0), "Apache");
1781    }
1782
1783    #[test]
1784    fn test_file_scan_task_serialize_deserialize() {
1785        let test_fn = |task: FileScanTask| {
1786            let serialized = serde_json::to_string(&task).unwrap();
1787            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1788
1789            assert_eq!(task.data_file_path, deserialized.data_file_path);
1790            assert_eq!(task.start, deserialized.start);
1791            assert_eq!(task.length, deserialized.length);
1792            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1793            assert_eq!(task.predicate, deserialized.predicate);
1794            assert_eq!(task.schema, deserialized.schema);
1795        };
1796
1797        // without predicate
1798        let schema = Arc::new(
1799            Schema::builder()
1800                .with_fields(vec![Arc::new(NestedField::required(
1801                    1,
1802                    "x",
1803                    Type::Primitive(PrimitiveType::Binary),
1804                ))])
1805                .build()
1806                .unwrap(),
1807        );
1808        let task = FileScanTask {
1809            data_file_path: "data_file_path".to_string(),
1810            file_size_in_bytes: 0,
1811            start: 0,
1812            length: 100,
1813            project_field_ids: vec![1, 2, 3],
1814            predicate: None,
1815            schema: schema.clone(),
1816            record_count: Some(100),
1817            data_file_format: DataFileFormat::Parquet,
1818            deletes: vec![],
1819            partition: None,
1820            partition_spec: None,
1821            name_mapping: None,
1822            case_sensitive: false,
1823        };
1824        test_fn(task);
1825
1826        // with predicate
1827        let task = FileScanTask {
1828            data_file_path: "data_file_path".to_string(),
1829            file_size_in_bytes: 0,
1830            start: 0,
1831            length: 100,
1832            project_field_ids: vec![1, 2, 3],
1833            predicate: Some(BoundPredicate::AlwaysTrue),
1834            schema,
1835            record_count: None,
1836            data_file_format: DataFileFormat::Avro,
1837            deletes: vec![],
1838            partition: None,
1839            partition_spec: None,
1840            name_mapping: None,
1841            case_sensitive: false,
1842        };
1843        test_fn(task);
1844    }
1845
1846    #[tokio::test]
1847    async fn test_select_with_file_column() {
1848        use arrow_array::cast::AsArray;
1849
1850        let mut fixture = TableTestFixture::new();
1851        fixture.setup_manifest_files().await;
1852
1853        // Select regular columns plus the _file column
1854        let table_scan = fixture
1855            .table
1856            .scan()
1857            .select(["x", RESERVED_COL_NAME_FILE])
1858            .with_row_selection_enabled(true)
1859            .build()
1860            .unwrap();
1861
1862        let batch_stream = table_scan.to_arrow().await.unwrap();
1863        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1864
1865        // Verify we have 2 columns: x and _file
1866        assert_eq!(batches[0].num_columns(), 2);
1867
1868        // Verify the x column exists and has correct data
1869        let x_col = batches[0].column_by_name("x").unwrap();
1870        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1871        assert_eq!(x_arr.value(0), 1);
1872
1873        // Verify the _file column exists
1874        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1875        assert!(
1876            file_col.is_some(),
1877            "_file column should be present in the batch"
1878        );
1879
1880        // Verify the _file column contains a file path
1881        let file_col = file_col.unwrap();
1882        assert!(
1883            matches!(
1884                file_col.data_type(),
1885                arrow_schema::DataType::RunEndEncoded(_, _)
1886            ),
1887            "_file column should use RunEndEncoded type"
1888        );
1889
1890        // Decode the RunArray to verify it contains the file path
1891        let run_array = file_col
1892            .as_any()
1893            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1894            .expect("_file column should be a RunArray");
1895
1896        let values = run_array.values();
1897        let string_values = values.as_string::<i32>();
1898        assert_eq!(string_values.len(), 1, "Should have a single file path");
1899
1900        let file_path = string_values.value(0);
1901        assert!(
1902            file_path.ends_with(".parquet"),
1903            "File path should end with .parquet, got: {file_path}"
1904        );
1905    }
1906
1907    #[tokio::test]
1908    async fn test_select_file_column_position() {
1909        let mut fixture = TableTestFixture::new();
1910        fixture.setup_manifest_files().await;
1911
1912        // Select columns in specific order: x, _file, z
1913        let table_scan = fixture
1914            .table
1915            .scan()
1916            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1917            .with_row_selection_enabled(true)
1918            .build()
1919            .unwrap();
1920
1921        let batch_stream = table_scan.to_arrow().await.unwrap();
1922        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1923
1924        assert_eq!(batches[0].num_columns(), 3);
1925
1926        // Verify column order: x at position 0, _file at position 1, z at position 2
1927        let schema = batches[0].schema();
1928        assert_eq!(schema.field(0).name(), "x");
1929        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1930        assert_eq!(schema.field(2).name(), "z");
1931
1932        // Verify columns by name also works
1933        assert!(batches[0].column_by_name("x").is_some());
1934        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1935        assert!(batches[0].column_by_name("z").is_some());
1936    }
1937
1938    #[tokio::test]
1939    async fn test_select_file_column_only() {
1940        let mut fixture = TableTestFixture::new();
1941        fixture.setup_manifest_files().await;
1942
1943        // Select only the _file column
1944        let table_scan = fixture
1945            .table
1946            .scan()
1947            .select([RESERVED_COL_NAME_FILE])
1948            .with_row_selection_enabled(true)
1949            .build()
1950            .unwrap();
1951
1952        let batch_stream = table_scan.to_arrow().await.unwrap();
1953        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1954
1955        // Should have exactly 1 column
1956        assert_eq!(batches[0].num_columns(), 1);
1957
1958        // Verify it's the _file column
1959        let schema = batches[0].schema();
1960        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1961
1962        // Verify the batch has the correct number of rows
1963        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
1964        // Each file has 1024 rows, so total is 2048 rows
1965        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1966        assert_eq!(total_rows, 2048);
1967    }
1968
1969    #[tokio::test]
1970    async fn test_file_column_with_multiple_files() {
1971        use std::collections::HashSet;
1972
1973        let mut fixture = TableTestFixture::new();
1974        fixture.setup_manifest_files().await;
1975
1976        // Select x and _file columns
1977        let table_scan = fixture
1978            .table
1979            .scan()
1980            .select(["x", RESERVED_COL_NAME_FILE])
1981            .with_row_selection_enabled(true)
1982            .build()
1983            .unwrap();
1984
1985        let batch_stream = table_scan.to_arrow().await.unwrap();
1986        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1987
1988        // Collect all unique file paths from the batches
1989        let mut file_paths = HashSet::new();
1990        for batch in &batches {
1991            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1992            let run_array = file_col
1993                .as_any()
1994                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1995                .expect("_file column should be a RunArray");
1996
1997            let values = run_array.values();
1998            let string_values = values.as_string::<i32>();
1999            for i in 0..string_values.len() {
2000                file_paths.insert(string_values.value(i).to_string());
2001            }
2002        }
2003
2004        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2005        assert!(!file_paths.is_empty(), "Should have at least one file path");
2006
2007        // All paths should end with .parquet
2008        for path in &file_paths {
2009            assert!(
2010                path.ends_with(".parquet"),
2011                "All file paths should end with .parquet, got: {path}"
2012            );
2013        }
2014    }
2015
2016    #[tokio::test]
2017    async fn test_file_column_at_start() {
2018        let mut fixture = TableTestFixture::new();
2019        fixture.setup_manifest_files().await;
2020
2021        // Select _file at the start
2022        let table_scan = fixture
2023            .table
2024            .scan()
2025            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2026            .with_row_selection_enabled(true)
2027            .build()
2028            .unwrap();
2029
2030        let batch_stream = table_scan.to_arrow().await.unwrap();
2031        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2032
2033        assert_eq!(batches[0].num_columns(), 3);
2034
2035        // Verify _file is at position 0
2036        let schema = batches[0].schema();
2037        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2038        assert_eq!(schema.field(1).name(), "x");
2039        assert_eq!(schema.field(2).name(), "y");
2040    }
2041
2042    #[tokio::test]
2043    async fn test_file_column_at_end() {
2044        let mut fixture = TableTestFixture::new();
2045        fixture.setup_manifest_files().await;
2046
2047        // Select _file at the end
2048        let table_scan = fixture
2049            .table
2050            .scan()
2051            .select(["x", "y", RESERVED_COL_NAME_FILE])
2052            .with_row_selection_enabled(true)
2053            .build()
2054            .unwrap();
2055
2056        let batch_stream = table_scan.to_arrow().await.unwrap();
2057        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2058
2059        assert_eq!(batches[0].num_columns(), 3);
2060
2061        // Verify _file is at position 2 (the end)
2062        let schema = batches[0].schema();
2063        assert_eq!(schema.field(0).name(), "x");
2064        assert_eq!(schema.field(1).name(), "y");
2065        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2066    }
2067
2068    #[tokio::test]
2069    async fn test_select_with_repeated_column_names() {
2070        let mut fixture = TableTestFixture::new();
2071        fixture.setup_manifest_files().await;
2072
2073        // Select with repeated column names - both regular columns and virtual columns
2074        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2075        let table_scan = fixture
2076            .table
2077            .scan()
2078            .select([
2079                "x",
2080                RESERVED_COL_NAME_FILE,
2081                "x", // x repeated
2082                "y",
2083                RESERVED_COL_NAME_FILE, // _file repeated
2084                "y",                    // y repeated
2085            ])
2086            .with_row_selection_enabled(true)
2087            .build()
2088            .unwrap();
2089
2090        let batch_stream = table_scan.to_arrow().await.unwrap();
2091        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2092
2093        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2094        assert_eq!(
2095            batches[0].num_columns(),
2096            6,
2097            "Should have exactly 6 columns with duplicates"
2098        );
2099
2100        let schema = batches[0].schema();
2101
2102        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2103        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2104        assert_eq!(
2105            schema.field(1).name(),
2106            RESERVED_COL_NAME_FILE,
2107            "Column 1 should be _file"
2108        );
2109        assert_eq!(
2110            schema.field(2).name(),
2111            "x",
2112            "Column 2 should be x (duplicate)"
2113        );
2114        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2115        assert_eq!(
2116            schema.field(4).name(),
2117            RESERVED_COL_NAME_FILE,
2118            "Column 4 should be _file (duplicate)"
2119        );
2120        assert_eq!(
2121            schema.field(5).name(),
2122            "y",
2123            "Column 5 should be y (duplicate)"
2124        );
2125
2126        // Verify all columns have correct data types
2127        assert!(
2128            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2129            "Column x should be Int64"
2130        );
2131        assert!(
2132            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2133            "Column x (duplicate) should be Int64"
2134        );
2135        assert!(
2136            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2137            "Column y should be Int64"
2138        );
2139        assert!(
2140            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2141            "Column y (duplicate) should be Int64"
2142        );
2143        assert!(
2144            matches!(
2145                schema.field(1).data_type(),
2146                arrow_schema::DataType::RunEndEncoded(_, _)
2147            ),
2148            "_file column should use RunEndEncoded type"
2149        );
2150        assert!(
2151            matches!(
2152                schema.field(4).data_type(),
2153                arrow_schema::DataType::RunEndEncoded(_, _)
2154            ),
2155            "_file column (duplicate) should use RunEndEncoded type"
2156        );
2157    }
2158
2159    #[tokio::test]
2160    async fn test_scan_deadlock() {
2161        let mut fixture = TableTestFixture::new();
2162        fixture.setup_deadlock_manifests().await;
2163
2164        // Create table scan with concurrency limit 1
2165        // This sets channel size to 1.
2166        // Data manifest has 10 entries -> will block producer.
2167        // Delete manifest is 2nd in list -> won't be processed.
2168        // Consumer 2 (Data) not started -> blocked.
2169        // Consumer 1 (Delete) waiting -> blocked.
2170        let table_scan = fixture
2171            .table
2172            .scan()
2173            .with_concurrency_limit(1)
2174            .build()
2175            .unwrap();
2176
2177        // This should timeout/hang if deadlock exists
2178        // We can use tokio::time::timeout
2179        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2180            table_scan
2181                .plan_files()
2182                .await
2183                .unwrap()
2184                .try_collect::<Vec<_>>()
2185                .await
2186        })
2187        .await;
2188
2189        // Assert it finished (didn't timeout)
2190        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2191    }
2192}