iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::spawn;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47/// A stream of arrow [`RecordBatch`]es.
48pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50/// Builder to create table scan.
51pub struct TableScanBuilder<'a> {
52    table: &'a Table,
53    // Defaults to none which means select all columns
54    column_names: Option<Vec<String>>,
55    snapshot_id: Option<i64>,
56    batch_size: Option<usize>,
57    case_sensitive: bool,
58    filter: Option<Predicate>,
59    concurrency_limit_data_files: usize,
60    concurrency_limit_manifest_entries: usize,
61    concurrency_limit_manifest_files: usize,
62    row_group_filtering_enabled: bool,
63    row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67    pub(crate) fn new(table: &'a Table) -> Self {
68        let num_cpus = available_parallelism().get();
69
70        Self {
71            table,
72            column_names: None,
73            snapshot_id: None,
74            batch_size: None,
75            case_sensitive: true,
76            filter: None,
77            concurrency_limit_data_files: num_cpus,
78            concurrency_limit_manifest_entries: num_cpus,
79            concurrency_limit_manifest_files: num_cpus,
80            row_group_filtering_enabled: true,
81            row_selection_enabled: false,
82        }
83    }
84
85    /// Sets the desired size of batches in the response
86    /// to something other than the default
87    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88        self.batch_size = batch_size;
89        self
90    }
91
92    /// Sets the scan's case sensitivity
93    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94        self.case_sensitive = case_sensitive;
95        self
96    }
97
98    /// Specifies a predicate to use as a filter
99    pub fn with_filter(mut self, predicate: Predicate) -> Self {
100        // calls rewrite_not to remove Not nodes, which must be absent
101        // when applying the manifest evaluator
102        self.filter = Some(predicate.rewrite_not());
103        self
104    }
105
106    /// Select all columns.
107    pub fn select_all(mut self) -> Self {
108        self.column_names = None;
109        self
110    }
111
112    /// Select empty columns.
113    pub fn select_empty(mut self) -> Self {
114        self.column_names = Some(vec![]);
115        self
116    }
117
118    /// Select some columns of the table.
119    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120        self.column_names = Some(
121            column_names
122                .into_iter()
123                .map(|item| item.to_string())
124                .collect(),
125        );
126        self
127    }
128
129    /// Set the snapshot to scan. When not set, it uses current snapshot.
130    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131        self.snapshot_id = Some(snapshot_id);
132        self
133    }
134
135    /// Sets the concurrency limit for both manifest files and manifest
136    /// entries for this scan
137    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138        self.concurrency_limit_manifest_files = limit;
139        self.concurrency_limit_manifest_entries = limit;
140        self.concurrency_limit_data_files = limit;
141        self
142    }
143
144    /// Sets the data file concurrency limit for this scan
145    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146        self.concurrency_limit_data_files = limit;
147        self
148    }
149
150    /// Sets the manifest entry concurrency limit for this scan
151    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152        self.concurrency_limit_manifest_entries = limit;
153        self
154    }
155
156    /// Determines whether to enable row group filtering.
157    /// When enabled, if a read is performed with a filter predicate,
158    /// then the metadata for each row group in the parquet file is
159    /// evaluated against the filter predicate and row groups
160    /// that cant contain matching rows will be skipped entirely.
161    ///
162    /// Defaults to enabled, as it generally improves performance or
163    /// keeps it the same, with performance degradation unlikely.
164    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165        self.row_group_filtering_enabled = row_group_filtering_enabled;
166        self
167    }
168
169    /// Determines whether to enable row selection.
170    /// When enabled, if a read is performed with a filter predicate,
171    /// then (for row groups that have not been skipped) the page index
172    /// for each row group in a parquet file is parsed and evaluated
173    /// against the filter predicate to determine if ranges of rows
174    /// within a row group can be skipped, based upon the page-level
175    /// statistics for each column.
176    ///
177    /// Defaults to being disabled. Enabling requires parsing the parquet page
178    /// index, which can be slow enough that parsing the page index outweighs any
179    /// gains from the reduced number of rows that need scanning.
180    /// It is recommended to experiment with partitioning, sorting, row group size,
181    /// page size, and page row limit Iceberg settings on the table being scanned in
182    /// order to get the best performance from using row selection.
183    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184        self.row_selection_enabled = row_selection_enabled;
185        self
186    }
187
188    /// Build the table scan.
189    pub fn build(self) -> Result<TableScan> {
190        let snapshot = match self.snapshot_id {
191            Some(snapshot_id) => self
192                .table
193                .metadata()
194                .snapshot_by_id(snapshot_id)
195                .ok_or_else(|| {
196                    Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("Snapshot with id {snapshot_id} not found"),
199                    )
200                })?
201                .clone(),
202            None => {
203                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204                    return Ok(TableScan {
205                        batch_size: self.batch_size,
206                        column_names: self.column_names,
207                        file_io: self.table.file_io().clone(),
208                        plan_context: None,
209                        concurrency_limit_data_files: self.concurrency_limit_data_files,
210                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212                        row_group_filtering_enabled: self.row_group_filtering_enabled,
213                        row_selection_enabled: self.row_selection_enabled,
214                    });
215                };
216                current_snapshot_id.clone()
217            }
218        };
219
220        let schema = snapshot.schema(self.table.metadata())?;
221
222        // Check that all column names exist in the schema (skip reserved columns).
223        if let Some(column_names) = self.column_names.as_ref() {
224            for column_name in column_names {
225                // Skip reserved columns that don't exist in the schema
226                if is_metadata_column_name(column_name) {
227                    continue;
228                }
229                if schema.field_by_name(column_name).is_none() {
230                    return Err(Error::new(
231                        ErrorKind::DataInvalid,
232                        format!("Column {column_name} not found in table. Schema: {schema}"),
233                    ));
234                }
235            }
236        }
237
238        let mut field_ids = vec![];
239        let column_names = self.column_names.clone().unwrap_or_else(|| {
240            schema
241                .as_struct()
242                .fields()
243                .iter()
244                .map(|f| f.name.clone())
245                .collect()
246        });
247
248        for column_name in column_names.iter() {
249            // Handle metadata columns (like "_file")
250            if is_metadata_column_name(column_name) {
251                field_ids.push(get_metadata_field_id(column_name)?);
252                continue;
253            }
254
255            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
256                Error::new(
257                    ErrorKind::DataInvalid,
258                    format!("Column {column_name} not found in table. Schema: {schema}"),
259                )
260            })?;
261
262            schema
263                .as_struct()
264                .field_by_id(field_id)
265                .ok_or_else(|| {
266                    Error::new(
267                        ErrorKind::FeatureUnsupported,
268                        format!(
269                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
270                    ),
271                )
272            })?;
273
274            field_ids.push(field_id);
275        }
276
277        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
278            Some(predicates.bind(schema.clone(), true)?)
279        } else {
280            None
281        };
282
283        let plan_context = PlanContext {
284            snapshot,
285            table_metadata: self.table.metadata_ref(),
286            snapshot_schema: schema,
287            case_sensitive: self.case_sensitive,
288            predicate: self.filter.map(Arc::new),
289            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
290            object_cache: self.table.object_cache(),
291            field_ids: Arc::new(field_ids),
292            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
293            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
294            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
295        };
296
297        Ok(TableScan {
298            batch_size: self.batch_size,
299            column_names: self.column_names,
300            file_io: self.table.file_io().clone(),
301            plan_context: Some(plan_context),
302            concurrency_limit_data_files: self.concurrency_limit_data_files,
303            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
304            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
305            row_group_filtering_enabled: self.row_group_filtering_enabled,
306            row_selection_enabled: self.row_selection_enabled,
307        })
308    }
309}
310
311/// Table scan.
312#[derive(Debug)]
313pub struct TableScan {
314    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
315    ///
316    /// If this is None, then the scan contains no rows.
317    plan_context: Option<PlanContext>,
318    batch_size: Option<usize>,
319    file_io: FileIO,
320    column_names: Option<Vec<String>>,
321    /// The maximum number of manifest files that will be
322    /// retrieved from [`FileIO`] concurrently
323    concurrency_limit_manifest_files: usize,
324
325    /// The maximum number of [`ManifestEntry`]s that will
326    /// be processed in parallel
327    concurrency_limit_manifest_entries: usize,
328
329    /// The maximum number of [`ManifestEntry`]s that will
330    /// be processed in parallel
331    concurrency_limit_data_files: usize,
332
333    row_group_filtering_enabled: bool,
334    row_selection_enabled: bool,
335}
336
337impl TableScan {
338    /// Returns a stream of [`FileScanTask`]s.
339    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
340        let Some(plan_context) = self.plan_context.as_ref() else {
341            return Ok(Box::pin(futures::stream::empty()));
342        };
343
344        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
345        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
346
347        // used to stream ManifestEntryContexts between stages of the file plan operation
348        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
349            channel(concurrency_limit_manifest_files);
350        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
351            channel(concurrency_limit_manifest_files);
352
353        // used to stream the results back to the caller
354        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
355
356        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
357
358        let manifest_list = plan_context.get_manifest_list().await?;
359
360        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
361        // whose partitions cannot match this
362        // scan's filter
363        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
364            manifest_list,
365            manifest_entry_data_ctx_tx,
366            delete_file_idx.clone(),
367            manifest_entry_delete_ctx_tx,
368        )?;
369
370        let mut channel_for_manifest_error = file_scan_task_tx.clone();
371
372        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
373        spawn(async move {
374            let result = futures::stream::iter(manifest_file_contexts)
375                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
376                    ctx.fetch_manifest_and_stream_manifest_entries().await
377                })
378                .await;
379
380            if let Err(error) = result {
381                let _ = channel_for_manifest_error.send(Err(error)).await;
382            }
383        });
384
385        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
386        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
387
388        // Process the delete file [`ManifestEntry`] stream in parallel
389        spawn(async move {
390            let result = manifest_entry_delete_ctx_rx
391                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
392                .try_for_each_concurrent(
393                    concurrency_limit_manifest_entries,
394                    |(manifest_entry_context, tx)| async move {
395                        spawn(async move {
396                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
397                        })
398                        .await
399                    },
400                )
401                .await;
402
403            if let Err(error) = result {
404                let _ = channel_for_delete_manifest_entry_error
405                    .send(Err(error))
406                    .await;
407            }
408        })
409        .await;
410
411        // Process the data file [`ManifestEntry`] stream in parallel
412        spawn(async move {
413            let result = manifest_entry_data_ctx_rx
414                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
415                .try_for_each_concurrent(
416                    concurrency_limit_manifest_entries,
417                    |(manifest_entry_context, tx)| async move {
418                        spawn(async move {
419                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
420                        })
421                        .await
422                    },
423                )
424                .await;
425
426            if let Err(error) = result {
427                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
428            }
429        });
430
431        Ok(file_scan_task_rx.boxed())
432    }
433
434    /// Returns an [`ArrowRecordBatchStream`].
435    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
436        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
437            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
438            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
439            .with_row_selection_enabled(self.row_selection_enabled);
440
441        if let Some(batch_size) = self.batch_size {
442            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
443        }
444
445        arrow_reader_builder
446            .build()
447            .read(self.plan_files().await?)
448            .map(|result| result.stream())
449    }
450
451    /// Returns a reference to the column names of the table scan.
452    pub fn column_names(&self) -> Option<&[String]> {
453        self.column_names.as_deref()
454    }
455
456    /// Returns a reference to the snapshot of the table scan.
457    pub fn snapshot(&self) -> Option<&SnapshotRef> {
458        self.plan_context.as_ref().map(|x| &x.snapshot)
459    }
460
461    async fn process_data_manifest_entry(
462        manifest_entry_context: ManifestEntryContext,
463        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
464    ) -> Result<()> {
465        // skip processing this manifest entry if it has been marked as deleted
466        if !manifest_entry_context.manifest_entry.is_alive() {
467            return Ok(());
468        }
469
470        // abort the plan if we encounter a manifest entry for a delete file
471        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
472            return Err(Error::new(
473                ErrorKind::FeatureUnsupported,
474                "Encountered an entry for a delete file in a data file manifest",
475            ));
476        }
477
478        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
479            let BoundPredicates {
480                snapshot_bound_predicate,
481                partition_bound_predicate,
482            } = bound_predicates.as_ref();
483
484            let expression_evaluator_cache =
485                manifest_entry_context.expression_evaluator_cache.as_ref();
486
487            let expression_evaluator = expression_evaluator_cache.get(
488                manifest_entry_context.partition_spec_id,
489                partition_bound_predicate,
490            )?;
491
492            // skip any data file whose partition data indicates that it can't contain
493            // any data that matches this scan's filter
494            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
495                return Ok(());
496            }
497
498            // skip any data file whose metrics don't match this scan's filter
499            if !InclusiveMetricsEvaluator::eval(
500                snapshot_bound_predicate,
501                manifest_entry_context.manifest_entry.data_file(),
502                false,
503            )? {
504                return Ok(());
505            }
506        }
507
508        // congratulations! the manifest entry has made its way through the
509        // entire plan without getting filtered out. Create a corresponding
510        // FileScanTask and push it to the result stream
511        file_scan_task_tx
512            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
513            .await?;
514
515        Ok(())
516    }
517
518    async fn process_delete_manifest_entry(
519        manifest_entry_context: ManifestEntryContext,
520        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
521    ) -> Result<()> {
522        // skip processing this manifest entry if it has been marked as deleted
523        if !manifest_entry_context.manifest_entry.is_alive() {
524            return Ok(());
525        }
526
527        // abort the plan if we encounter a manifest entry that is not for a delete file
528        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
529            return Err(Error::new(
530                ErrorKind::FeatureUnsupported,
531                "Encountered an entry for a data file in a delete manifest",
532            ));
533        }
534
535        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
536            let expression_evaluator_cache =
537                manifest_entry_context.expression_evaluator_cache.as_ref();
538
539            let expression_evaluator = expression_evaluator_cache.get(
540                manifest_entry_context.partition_spec_id,
541                &bound_predicates.partition_bound_predicate,
542            )?;
543
544            // skip any data file whose partition data indicates that it can't contain
545            // any data that matches this scan's filter
546            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
547                return Ok(());
548            }
549        }
550
551        delete_file_ctx_tx
552            .send(DeleteFileContext {
553                manifest_entry: manifest_entry_context.manifest_entry.clone(),
554                partition_spec_id: manifest_entry_context.partition_spec_id,
555            })
556            .await?;
557
558        Ok(())
559    }
560}
561
562pub(crate) struct BoundPredicates {
563    partition_bound_predicate: BoundPredicate,
564    snapshot_bound_predicate: BoundPredicate,
565}
566
567#[cfg(test)]
568pub mod tests {
569    //! shared tests for the table scan API
570    #![allow(missing_docs)]
571
572    use std::collections::HashMap;
573    use std::fs;
574    use std::fs::File;
575    use std::sync::Arc;
576
577    use arrow_array::cast::AsArray;
578    use arrow_array::{
579        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
580        StringArray,
581    };
582    use futures::{TryStreamExt, stream};
583    use minijinja::value::Value;
584    use minijinja::{AutoEscape, Environment, context};
585    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
586    use parquet::basic::Compression;
587    use parquet::file::properties::WriterProperties;
588    use tempfile::TempDir;
589    use uuid::Uuid;
590
591    use crate::TableIdent;
592    use crate::arrow::ArrowReaderBuilder;
593    use crate::expr::{BoundPredicate, Reference};
594    use crate::io::{FileIO, OutputFile};
595    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
596    use crate::scan::FileScanTask;
597    use crate::spec::{
598        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
599        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
600        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
601    };
602    use crate::table::Table;
603
604    fn render_template(template: &str, ctx: Value) -> String {
605        let mut env = Environment::new();
606        env.set_auto_escape_callback(|_| AutoEscape::None);
607        env.render_str(template, ctx).unwrap()
608    }
609
610    pub struct TableTestFixture {
611        pub table_location: String,
612        pub table: Table,
613    }
614
615    impl TableTestFixture {
616        #[allow(clippy::new_without_default)]
617        pub fn new() -> Self {
618            let tmp_dir = TempDir::new().unwrap();
619            let table_location = tmp_dir.path().join("table1");
620            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
621            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
622            let table_metadata1_location = table_location.join("metadata/v1.json");
623
624            let file_io = FileIO::new_with_fs();
625
626            let table_metadata = {
627                let template_json_str = fs::read_to_string(format!(
628                    "{}/testdata/example_table_metadata_v2.json",
629                    env!("CARGO_MANIFEST_DIR")
630                ))
631                .unwrap();
632                let metadata_json = render_template(&template_json_str, context! {
633                    table_location => &table_location,
634                    manifest_list_1_location => &manifest_list1_location,
635                    manifest_list_2_location => &manifest_list2_location,
636                    table_metadata_1_location => &table_metadata1_location,
637                });
638                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
639            };
640
641            let table = Table::builder()
642                .metadata(table_metadata)
643                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
644                .file_io(file_io.clone())
645                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
646                .build()
647                .unwrap();
648
649            Self {
650                table_location: table_location.to_str().unwrap().to_string(),
651                table,
652            }
653        }
654
655        #[allow(clippy::new_without_default)]
656        pub fn new_empty() -> Self {
657            let tmp_dir = TempDir::new().unwrap();
658            let table_location = tmp_dir.path().join("table1");
659            let table_metadata1_location = table_location.join("metadata/v1.json");
660
661            let file_io = FileIO::new_with_fs();
662
663            let table_metadata = {
664                let template_json_str = fs::read_to_string(format!(
665                    "{}/testdata/example_empty_table_metadata_v2.json",
666                    env!("CARGO_MANIFEST_DIR")
667                ))
668                .unwrap();
669                let metadata_json = render_template(&template_json_str, context! {
670                    table_location => &table_location,
671                    table_metadata_1_location => &table_metadata1_location,
672                });
673                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
674            };
675
676            let table = Table::builder()
677                .metadata(table_metadata)
678                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
679                .file_io(file_io.clone())
680                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
681                .build()
682                .unwrap();
683
684            Self {
685                table_location: table_location.to_str().unwrap().to_string(),
686                table,
687            }
688        }
689
690        /// Creates a fixture with 5 snapshots chained as:
691        ///   S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
692        /// Useful for testing snapshot history traversal.
693        pub fn new_with_deep_history() -> Self {
694            let tmp_dir = TempDir::new().unwrap();
695            let table_location = tmp_dir.path().join("table1");
696            let table_metadata1_location = table_location.join("metadata/v1.json");
697
698            let file_io = FileIO::new_with_fs();
699
700            let table_metadata = {
701                let json_str = fs::read_to_string(format!(
702                    "{}/testdata/example_table_metadata_v2_deep_history.json",
703                    env!("CARGO_MANIFEST_DIR")
704                ))
705                .unwrap();
706                serde_json::from_str::<TableMetadata>(&json_str).unwrap()
707            };
708
709            let table = Table::builder()
710                .metadata(table_metadata)
711                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
712                .file_io(file_io.clone())
713                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
714                .build()
715                .unwrap();
716
717            Self {
718                table_location: table_location.to_str().unwrap().to_string(),
719                table,
720            }
721        }
722
723        pub fn new_unpartitioned() -> Self {
724            let tmp_dir = TempDir::new().unwrap();
725            let table_location = tmp_dir.path().join("table1");
726            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
727            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
728            let table_metadata1_location = table_location.join("metadata/v1.json");
729
730            let file_io = FileIO::new_with_fs();
731
732            let mut table_metadata = {
733                let template_json_str = fs::read_to_string(format!(
734                    "{}/testdata/example_table_metadata_v2.json",
735                    env!("CARGO_MANIFEST_DIR")
736                ))
737                .unwrap();
738                let metadata_json = render_template(&template_json_str, context! {
739                    table_location => &table_location,
740                    manifest_list_1_location => &manifest_list1_location,
741                    manifest_list_2_location => &manifest_list2_location,
742                    table_metadata_1_location => &table_metadata1_location,
743                });
744                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
745            };
746
747            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
748            table_metadata.partition_specs.clear();
749            table_metadata.default_partition_type = StructType::new(vec![]);
750            table_metadata
751                .partition_specs
752                .insert(0, table_metadata.default_spec.clone());
753
754            let table = Table::builder()
755                .metadata(table_metadata)
756                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
757                .file_io(file_io.clone())
758                .metadata_location(table_metadata1_location.to_str().unwrap())
759                .build()
760                .unwrap();
761
762            Self {
763                table_location: table_location.to_str().unwrap().to_string(),
764                table,
765            }
766        }
767
768        fn next_manifest_file(&self) -> OutputFile {
769            self.table
770                .file_io()
771                .new_output(format!(
772                    "{}/metadata/manifest_{}.avro",
773                    self.table_location,
774                    Uuid::new_v4()
775                ))
776                .unwrap()
777        }
778
779        pub async fn setup_manifest_files(&mut self) {
780            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
781            let parent_snapshot = current_snapshot
782                .parent_snapshot(self.table.metadata())
783                .unwrap();
784            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
785            let current_partition_spec = self.table.metadata().default_partition_spec();
786
787            // Write the data files first, then use the file size in the manifest entries
788            let parquet_file_size = self.write_parquet_data_files();
789
790            let mut writer = ManifestWriterBuilder::new(
791                self.next_manifest_file(),
792                Some(current_snapshot.snapshot_id()),
793                None,
794                current_schema.clone(),
795                current_partition_spec.as_ref().clone(),
796            )
797            .build_v2_data();
798            writer
799                .add_entry(
800                    ManifestEntry::builder()
801                        .status(ManifestStatus::Added)
802                        .data_file(
803                            DataFileBuilder::default()
804                                .partition_spec_id(0)
805                                .content(DataContentType::Data)
806                                .file_path(format!("{}/1.parquet", &self.table_location))
807                                .file_format(DataFileFormat::Parquet)
808                                .file_size_in_bytes(parquet_file_size)
809                                .record_count(1)
810                                .partition(Struct::from_iter([Some(Literal::long(100))]))
811                                .key_metadata(None)
812                                .build()
813                                .unwrap(),
814                        )
815                        .build(),
816                )
817                .unwrap();
818            writer
819                .add_delete_entry(
820                    ManifestEntry::builder()
821                        .status(ManifestStatus::Deleted)
822                        .snapshot_id(parent_snapshot.snapshot_id())
823                        .sequence_number(parent_snapshot.sequence_number())
824                        .file_sequence_number(parent_snapshot.sequence_number())
825                        .data_file(
826                            DataFileBuilder::default()
827                                .partition_spec_id(0)
828                                .content(DataContentType::Data)
829                                .file_path(format!("{}/2.parquet", &self.table_location))
830                                .file_format(DataFileFormat::Parquet)
831                                .file_size_in_bytes(parquet_file_size)
832                                .record_count(1)
833                                .partition(Struct::from_iter([Some(Literal::long(200))]))
834                                .build()
835                                .unwrap(),
836                        )
837                        .build(),
838                )
839                .unwrap();
840            writer
841                .add_existing_entry(
842                    ManifestEntry::builder()
843                        .status(ManifestStatus::Existing)
844                        .snapshot_id(parent_snapshot.snapshot_id())
845                        .sequence_number(parent_snapshot.sequence_number())
846                        .file_sequence_number(parent_snapshot.sequence_number())
847                        .data_file(
848                            DataFileBuilder::default()
849                                .partition_spec_id(0)
850                                .content(DataContentType::Data)
851                                .file_path(format!("{}/3.parquet", &self.table_location))
852                                .file_format(DataFileFormat::Parquet)
853                                .file_size_in_bytes(parquet_file_size)
854                                .record_count(1)
855                                .partition(Struct::from_iter([Some(Literal::long(300))]))
856                                .build()
857                                .unwrap(),
858                        )
859                        .build(),
860                )
861                .unwrap();
862            let data_file_manifest = writer.write_manifest_file().await.unwrap();
863
864            // Write to manifest list
865            let mut manifest_list_write = ManifestListWriter::v2(
866                self.table
867                    .file_io()
868                    .new_output(current_snapshot.manifest_list())
869                    .unwrap(),
870                current_snapshot.snapshot_id(),
871                current_snapshot.parent_snapshot_id(),
872                current_snapshot.sequence_number(),
873            );
874            manifest_list_write
875                .add_manifests(vec![data_file_manifest].into_iter())
876                .unwrap();
877            manifest_list_write.close().await.unwrap();
878        }
879
880        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
881        /// and returns the file size in bytes.
882        fn write_parquet_data_files(&self) -> u64 {
883            std::fs::create_dir_all(&self.table_location).unwrap();
884
885            let schema = {
886                let fields = vec![
887                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
888                        .with_metadata(HashMap::from([(
889                            PARQUET_FIELD_ID_META_KEY.to_string(),
890                            "1".to_string(),
891                        )])),
892                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
893                        .with_metadata(HashMap::from([(
894                            PARQUET_FIELD_ID_META_KEY.to_string(),
895                            "2".to_string(),
896                        )])),
897                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
898                        .with_metadata(HashMap::from([(
899                            PARQUET_FIELD_ID_META_KEY.to_string(),
900                            "3".to_string(),
901                        )])),
902                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
903                        .with_metadata(HashMap::from([(
904                            PARQUET_FIELD_ID_META_KEY.to_string(),
905                            "4".to_string(),
906                        )])),
907                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
908                        .with_metadata(HashMap::from([(
909                            PARQUET_FIELD_ID_META_KEY.to_string(),
910                            "5".to_string(),
911                        )])),
912                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
913                        .with_metadata(HashMap::from([(
914                            PARQUET_FIELD_ID_META_KEY.to_string(),
915                            "6".to_string(),
916                        )])),
917                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
918                        .with_metadata(HashMap::from([(
919                            PARQUET_FIELD_ID_META_KEY.to_string(),
920                            "7".to_string(),
921                        )])),
922                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
923                        .with_metadata(HashMap::from([(
924                            PARQUET_FIELD_ID_META_KEY.to_string(),
925                            "8".to_string(),
926                        )])),
927                ];
928                Arc::new(arrow_schema::Schema::new(fields))
929            };
930            // x: [1, 1, 1, 1, ...]
931            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
932
933            let mut values = vec![2; 512];
934            values.append(vec![3; 200].as_mut());
935            values.append(vec![4; 300].as_mut());
936            values.append(vec![5; 12].as_mut());
937
938            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
939            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
940
941            let mut values = vec![3; 512];
942            values.append(vec![4; 512].as_mut());
943
944            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
945            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
946
947            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
948            let mut values = vec!["Apache"; 512];
949            values.append(vec!["Iceberg"; 512].as_mut());
950            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
951
952            // dbl:
953            let mut values = vec![100.0f64; 512];
954            values.append(vec![150.0f64; 12].as_mut());
955            values.append(vec![200.0f64; 500].as_mut());
956            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
957
958            // i32:
959            let mut values = vec![100i32; 512];
960            values.append(vec![150i32; 12].as_mut());
961            values.append(vec![200i32; 500].as_mut());
962            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
963
964            // i64:
965            let mut values = vec![100i64; 512];
966            values.append(vec![150i64; 12].as_mut());
967            values.append(vec![200i64; 500].as_mut());
968            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
969
970            // bool:
971            let mut values = vec![false; 512];
972            values.append(vec![true; 512].as_mut());
973            let values: BooleanArray = values.into();
974            let col8 = Arc::new(values) as ArrayRef;
975
976            let to_write = RecordBatch::try_new(schema.clone(), vec![
977                col1, col2, col3, col4, col5, col6, col7, col8,
978            ])
979            .unwrap();
980
981            // Write the Parquet files
982            let props = WriterProperties::builder()
983                .set_compression(Compression::SNAPPY)
984                .build();
985
986            for n in 1..=3 {
987                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
988                let mut writer =
989                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
990
991                writer.write(&to_write).expect("Writing batch");
992
993                // writer must be closed to write footer
994                writer.close().unwrap();
995            }
996
997            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
998                .unwrap()
999                .len()
1000        }
1001
1002        pub async fn setup_unpartitioned_manifest_files(&mut self) {
1003            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1004            let parent_snapshot = current_snapshot
1005                .parent_snapshot(self.table.metadata())
1006                .unwrap();
1007            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1008            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1009
1010            // Write the data files first, then use the file size in the manifest entries
1011            let parquet_file_size = self.write_parquet_data_files();
1012
1013            // Write data files using an empty partition for unpartitioned tables.
1014            let mut writer = ManifestWriterBuilder::new(
1015                self.next_manifest_file(),
1016                Some(current_snapshot.snapshot_id()),
1017                None,
1018                current_schema.clone(),
1019                current_partition_spec.as_ref().clone(),
1020            )
1021            .build_v2_data();
1022
1023            // Create an empty partition value.
1024            let empty_partition = Struct::empty();
1025
1026            writer
1027                .add_entry(
1028                    ManifestEntry::builder()
1029                        .status(ManifestStatus::Added)
1030                        .data_file(
1031                            DataFileBuilder::default()
1032                                .partition_spec_id(0)
1033                                .content(DataContentType::Data)
1034                                .file_path(format!("{}/1.parquet", &self.table_location))
1035                                .file_format(DataFileFormat::Parquet)
1036                                .file_size_in_bytes(parquet_file_size)
1037                                .record_count(1)
1038                                .partition(empty_partition.clone())
1039                                .key_metadata(None)
1040                                .build()
1041                                .unwrap(),
1042                        )
1043                        .build(),
1044                )
1045                .unwrap();
1046
1047            writer
1048                .add_delete_entry(
1049                    ManifestEntry::builder()
1050                        .status(ManifestStatus::Deleted)
1051                        .snapshot_id(parent_snapshot.snapshot_id())
1052                        .sequence_number(parent_snapshot.sequence_number())
1053                        .file_sequence_number(parent_snapshot.sequence_number())
1054                        .data_file(
1055                            DataFileBuilder::default()
1056                                .partition_spec_id(0)
1057                                .content(DataContentType::Data)
1058                                .file_path(format!("{}/2.parquet", &self.table_location))
1059                                .file_format(DataFileFormat::Parquet)
1060                                .file_size_in_bytes(parquet_file_size)
1061                                .record_count(1)
1062                                .partition(empty_partition.clone())
1063                                .build()
1064                                .unwrap(),
1065                        )
1066                        .build(),
1067                )
1068                .unwrap();
1069
1070            writer
1071                .add_existing_entry(
1072                    ManifestEntry::builder()
1073                        .status(ManifestStatus::Existing)
1074                        .snapshot_id(parent_snapshot.snapshot_id())
1075                        .sequence_number(parent_snapshot.sequence_number())
1076                        .file_sequence_number(parent_snapshot.sequence_number())
1077                        .data_file(
1078                            DataFileBuilder::default()
1079                                .partition_spec_id(0)
1080                                .content(DataContentType::Data)
1081                                .file_path(format!("{}/3.parquet", &self.table_location))
1082                                .file_format(DataFileFormat::Parquet)
1083                                .file_size_in_bytes(parquet_file_size)
1084                                .record_count(1)
1085                                .partition(empty_partition.clone())
1086                                .build()
1087                                .unwrap(),
1088                        )
1089                        .build(),
1090                )
1091                .unwrap();
1092
1093            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1094
1095            // Write to manifest list
1096            let mut manifest_list_write = ManifestListWriter::v2(
1097                self.table
1098                    .file_io()
1099                    .new_output(current_snapshot.manifest_list())
1100                    .unwrap(),
1101                current_snapshot.snapshot_id(),
1102                current_snapshot.parent_snapshot_id(),
1103                current_snapshot.sequence_number(),
1104            );
1105            manifest_list_write
1106                .add_manifests(vec![data_file_manifest].into_iter())
1107                .unwrap();
1108            manifest_list_write.close().await.unwrap();
1109        }
1110
1111        pub async fn setup_deadlock_manifests(&mut self) {
1112            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1113            let _parent_snapshot = current_snapshot
1114                .parent_snapshot(self.table.metadata())
1115                .unwrap();
1116            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1117            let current_partition_spec = self.table.metadata().default_partition_spec();
1118
1119            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1120            let mut writer = ManifestWriterBuilder::new(
1121                self.next_manifest_file(),
1122                Some(current_snapshot.snapshot_id()),
1123                None,
1124                current_schema.clone(),
1125                current_partition_spec.as_ref().clone(),
1126            )
1127            .build_v2_data();
1128
1129            // Add 10 data entries
1130            for i in 0..10 {
1131                writer
1132                    .add_entry(
1133                        ManifestEntry::builder()
1134                            .status(ManifestStatus::Added)
1135                            .data_file(
1136                                DataFileBuilder::default()
1137                                    .partition_spec_id(0)
1138                                    .content(DataContentType::Data)
1139                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1140                                    .file_format(DataFileFormat::Parquet)
1141                                    .file_size_in_bytes(100)
1142                                    .record_count(1)
1143                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1144                                    .key_metadata(None)
1145                                    .build()
1146                                    .unwrap(),
1147                            )
1148                            .build(),
1149                    )
1150                    .unwrap();
1151            }
1152            let data_manifest = writer.write_manifest_file().await.unwrap();
1153
1154            // 2. Write DELETE manifest
1155            let mut writer = ManifestWriterBuilder::new(
1156                self.next_manifest_file(),
1157                Some(current_snapshot.snapshot_id()),
1158                None,
1159                current_schema.clone(),
1160                current_partition_spec.as_ref().clone(),
1161            )
1162            .build_v2_deletes();
1163
1164            writer
1165                .add_entry(
1166                    ManifestEntry::builder()
1167                        .status(ManifestStatus::Added)
1168                        .data_file(
1169                            DataFileBuilder::default()
1170                                .partition_spec_id(0)
1171                                .content(DataContentType::PositionDeletes)
1172                                .file_path(format!("{}/del.parquet", &self.table_location))
1173                                .file_format(DataFileFormat::Parquet)
1174                                .file_size_in_bytes(100)
1175                                .record_count(1)
1176                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1177                                .build()
1178                                .unwrap(),
1179                        )
1180                        .build(),
1181                )
1182                .unwrap();
1183            let delete_manifest = writer.write_manifest_file().await.unwrap();
1184
1185            // Write to manifest list - DATA FIRST then DELETE
1186            // This order is crucial for reproduction
1187            let mut manifest_list_write = ManifestListWriter::v2(
1188                self.table
1189                    .file_io()
1190                    .new_output(current_snapshot.manifest_list())
1191                    .unwrap(),
1192                current_snapshot.snapshot_id(),
1193                current_snapshot.parent_snapshot_id(),
1194                current_snapshot.sequence_number(),
1195            );
1196            manifest_list_write
1197                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1198                .unwrap();
1199            manifest_list_write.close().await.unwrap();
1200        }
1201    }
1202
1203    #[test]
1204    fn test_table_scan_columns() {
1205        let table = TableTestFixture::new().table;
1206
1207        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1208        assert_eq!(
1209            Some(vec!["x".to_string(), "y".to_string()]),
1210            table_scan.column_names
1211        );
1212
1213        let table_scan = table
1214            .scan()
1215            .select(["x", "y"])
1216            .select(["z"])
1217            .build()
1218            .unwrap();
1219        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1220    }
1221
1222    #[test]
1223    fn test_select_all() {
1224        let table = TableTestFixture::new().table;
1225
1226        let table_scan = table.scan().select_all().build().unwrap();
1227        assert!(table_scan.column_names.is_none());
1228    }
1229
1230    #[test]
1231    fn test_select_no_exist_column() {
1232        let table = TableTestFixture::new().table;
1233
1234        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1235        assert!(table_scan.is_err());
1236    }
1237
1238    #[test]
1239    fn test_table_scan_default_snapshot_id() {
1240        let table = TableTestFixture::new().table;
1241
1242        let table_scan = table.scan().build().unwrap();
1243        assert_eq!(
1244            table.metadata().current_snapshot().unwrap().snapshot_id(),
1245            table_scan.snapshot().unwrap().snapshot_id()
1246        );
1247    }
1248
1249    #[test]
1250    fn test_table_scan_non_exist_snapshot_id() {
1251        let table = TableTestFixture::new().table;
1252
1253        let table_scan = table.scan().snapshot_id(1024).build();
1254        assert!(table_scan.is_err());
1255    }
1256
1257    #[test]
1258    fn test_table_scan_with_snapshot_id() {
1259        let table = TableTestFixture::new().table;
1260
1261        let table_scan = table
1262            .scan()
1263            .snapshot_id(3051729675574597004)
1264            .with_row_selection_enabled(true)
1265            .build()
1266            .unwrap();
1267        assert_eq!(
1268            table_scan.snapshot().unwrap().snapshot_id(),
1269            3051729675574597004
1270        );
1271    }
1272
1273    #[tokio::test]
1274    async fn test_plan_files_on_table_without_any_snapshots() {
1275        let table = TableTestFixture::new_empty().table;
1276        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1277        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1278        assert!(batches.is_empty());
1279    }
1280
1281    #[tokio::test]
1282    async fn test_plan_files_no_deletions() {
1283        let mut fixture = TableTestFixture::new();
1284        fixture.setup_manifest_files().await;
1285
1286        // Create table scan for current snapshot and plan files
1287        let table_scan = fixture
1288            .table
1289            .scan()
1290            .with_row_selection_enabled(true)
1291            .build()
1292            .unwrap();
1293
1294        let mut tasks = table_scan
1295            .plan_files()
1296            .await
1297            .unwrap()
1298            .try_fold(vec![], |mut acc, task| async move {
1299                acc.push(task);
1300                Ok(acc)
1301            })
1302            .await
1303            .unwrap();
1304
1305        assert_eq!(tasks.len(), 2);
1306
1307        tasks.sort_by_key(|t| t.data_file_path.to_string());
1308
1309        // Check first task is added data file
1310        assert_eq!(
1311            tasks[0].data_file_path,
1312            format!("{}/1.parquet", &fixture.table_location)
1313        );
1314
1315        // Check second task is existing data file
1316        assert_eq!(
1317            tasks[1].data_file_path,
1318            format!("{}/3.parquet", &fixture.table_location)
1319        );
1320    }
1321
1322    #[tokio::test]
1323    async fn test_open_parquet_no_deletions() {
1324        let mut fixture = TableTestFixture::new();
1325        fixture.setup_manifest_files().await;
1326
1327        // Create table scan for current snapshot and plan files
1328        let table_scan = fixture
1329            .table
1330            .scan()
1331            .with_row_selection_enabled(true)
1332            .build()
1333            .unwrap();
1334
1335        let batch_stream = table_scan.to_arrow().await.unwrap();
1336
1337        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1338
1339        let col = batches[0].column_by_name("x").unwrap();
1340
1341        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1342        assert_eq!(int64_arr.value(0), 1);
1343    }
1344
1345    #[tokio::test]
1346    async fn test_open_parquet_no_deletions_by_separate_reader() {
1347        let mut fixture = TableTestFixture::new();
1348        fixture.setup_manifest_files().await;
1349
1350        // Create table scan for current snapshot and plan files
1351        let table_scan = fixture
1352            .table
1353            .scan()
1354            .with_row_selection_enabled(true)
1355            .build()
1356            .unwrap();
1357
1358        let mut plan_task: Vec<_> = table_scan
1359            .plan_files()
1360            .await
1361            .unwrap()
1362            .try_collect()
1363            .await
1364            .unwrap();
1365        assert_eq!(plan_task.len(), 2);
1366
1367        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1368        let batch_stream = reader
1369            .clone()
1370            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1371            .unwrap()
1372            .stream();
1373        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1374
1375        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1376        let batch_stream = reader
1377            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1378            .unwrap()
1379            .stream();
1380        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1381
1382        assert_eq!(batch_1, batch_2);
1383    }
1384
1385    #[tokio::test]
1386    async fn test_open_parquet_with_projection() {
1387        let mut fixture = TableTestFixture::new();
1388        fixture.setup_manifest_files().await;
1389
1390        // Create table scan for current snapshot and plan files
1391        let table_scan = fixture
1392            .table
1393            .scan()
1394            .select(["x", "z"])
1395            .with_row_selection_enabled(true)
1396            .build()
1397            .unwrap();
1398
1399        let batch_stream = table_scan.to_arrow().await.unwrap();
1400
1401        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1402
1403        assert_eq!(batches[0].num_columns(), 2);
1404
1405        let col1 = batches[0].column_by_name("x").unwrap();
1406        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1407        assert_eq!(int64_arr.value(0), 1);
1408
1409        let col2 = batches[0].column_by_name("z").unwrap();
1410        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1411        assert_eq!(int64_arr.value(0), 3);
1412
1413        // test empty scan
1414        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1415        let batch_stream = table_scan.to_arrow().await.unwrap();
1416        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1417
1418        assert_eq!(batches[0].num_columns(), 0);
1419        assert_eq!(batches[0].num_rows(), 1024);
1420    }
1421
1422    #[tokio::test]
1423    async fn test_filter_on_arrow_lt() {
1424        let mut fixture = TableTestFixture::new();
1425        fixture.setup_manifest_files().await;
1426
1427        // Filter: y < 3
1428        let mut builder = fixture.table.scan();
1429        let predicate = Reference::new("y").less_than(Datum::long(3));
1430        builder = builder
1431            .with_filter(predicate)
1432            .with_row_selection_enabled(true);
1433        let table_scan = builder.build().unwrap();
1434
1435        let batch_stream = table_scan.to_arrow().await.unwrap();
1436
1437        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1438
1439        assert_eq!(batches[0].num_rows(), 512);
1440
1441        let col = batches[0].column_by_name("x").unwrap();
1442        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1443        assert_eq!(int64_arr.value(0), 1);
1444
1445        let col = batches[0].column_by_name("y").unwrap();
1446        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1447        assert_eq!(int64_arr.value(0), 2);
1448    }
1449
1450    #[tokio::test]
1451    async fn test_filter_on_arrow_gt_eq() {
1452        let mut fixture = TableTestFixture::new();
1453        fixture.setup_manifest_files().await;
1454
1455        // Filter: y >= 5
1456        let mut builder = fixture.table.scan();
1457        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1458        builder = builder
1459            .with_filter(predicate)
1460            .with_row_selection_enabled(true);
1461        let table_scan = builder.build().unwrap();
1462
1463        let batch_stream = table_scan.to_arrow().await.unwrap();
1464
1465        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1466
1467        assert_eq!(batches[0].num_rows(), 12);
1468
1469        let col = batches[0].column_by_name("x").unwrap();
1470        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1471        assert_eq!(int64_arr.value(0), 1);
1472
1473        let col = batches[0].column_by_name("y").unwrap();
1474        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1475        assert_eq!(int64_arr.value(0), 5);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_filter_double_eq() {
1480        let mut fixture = TableTestFixture::new();
1481        fixture.setup_manifest_files().await;
1482
1483        // Filter: dbl == 150.0
1484        let mut builder = fixture.table.scan();
1485        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1486        builder = builder
1487            .with_filter(predicate)
1488            .with_row_selection_enabled(true);
1489        let table_scan = builder.build().unwrap();
1490
1491        let batch_stream = table_scan.to_arrow().await.unwrap();
1492
1493        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1494
1495        assert_eq!(batches.len(), 2);
1496        assert_eq!(batches[0].num_rows(), 12);
1497
1498        let col = batches[0].column_by_name("dbl").unwrap();
1499        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1500        assert_eq!(f64_arr.value(1), 150.0f64);
1501    }
1502
1503    #[tokio::test]
1504    async fn test_filter_int_eq() {
1505        let mut fixture = TableTestFixture::new();
1506        fixture.setup_manifest_files().await;
1507
1508        // Filter: i32 == 150
1509        let mut builder = fixture.table.scan();
1510        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1511        builder = builder
1512            .with_filter(predicate)
1513            .with_row_selection_enabled(true);
1514        let table_scan = builder.build().unwrap();
1515
1516        let batch_stream = table_scan.to_arrow().await.unwrap();
1517
1518        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1519
1520        assert_eq!(batches.len(), 2);
1521        assert_eq!(batches[0].num_rows(), 12);
1522
1523        let col = batches[0].column_by_name("i32").unwrap();
1524        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1525        assert_eq!(i32_arr.value(1), 150i32);
1526    }
1527
1528    #[tokio::test]
1529    async fn test_filter_long_eq() {
1530        let mut fixture = TableTestFixture::new();
1531        fixture.setup_manifest_files().await;
1532
1533        // Filter: i64 == 150
1534        let mut builder = fixture.table.scan();
1535        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1536        builder = builder
1537            .with_filter(predicate)
1538            .with_row_selection_enabled(true);
1539        let table_scan = builder.build().unwrap();
1540
1541        let batch_stream = table_scan.to_arrow().await.unwrap();
1542
1543        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1544
1545        assert_eq!(batches.len(), 2);
1546        assert_eq!(batches[0].num_rows(), 12);
1547
1548        let col = batches[0].column_by_name("i64").unwrap();
1549        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1550        assert_eq!(i64_arr.value(1), 150i64);
1551    }
1552
1553    #[tokio::test]
1554    async fn test_filter_bool_eq() {
1555        let mut fixture = TableTestFixture::new();
1556        fixture.setup_manifest_files().await;
1557
1558        // Filter: bool == true
1559        let mut builder = fixture.table.scan();
1560        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1561        builder = builder
1562            .with_filter(predicate)
1563            .with_row_selection_enabled(true);
1564        let table_scan = builder.build().unwrap();
1565
1566        let batch_stream = table_scan.to_arrow().await.unwrap();
1567
1568        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1569
1570        assert_eq!(batches.len(), 2);
1571        assert_eq!(batches[0].num_rows(), 512);
1572
1573        let col = batches[0].column_by_name("bool").unwrap();
1574        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1575        assert!(bool_arr.value(1));
1576    }
1577
1578    #[tokio::test]
1579    async fn test_filter_on_arrow_is_null() {
1580        let mut fixture = TableTestFixture::new();
1581        fixture.setup_manifest_files().await;
1582
1583        // Filter: y is null
1584        let mut builder = fixture.table.scan();
1585        let predicate = Reference::new("y").is_null();
1586        builder = builder
1587            .with_filter(predicate)
1588            .with_row_selection_enabled(true);
1589        let table_scan = builder.build().unwrap();
1590
1591        let batch_stream = table_scan.to_arrow().await.unwrap();
1592
1593        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1594        assert_eq!(batches.len(), 0);
1595    }
1596
1597    #[tokio::test]
1598    async fn test_filter_on_arrow_is_not_null() {
1599        let mut fixture = TableTestFixture::new();
1600        fixture.setup_manifest_files().await;
1601
1602        // Filter: y is not null
1603        let mut builder = fixture.table.scan();
1604        let predicate = Reference::new("y").is_not_null();
1605        builder = builder
1606            .with_filter(predicate)
1607            .with_row_selection_enabled(true);
1608        let table_scan = builder.build().unwrap();
1609
1610        let batch_stream = table_scan.to_arrow().await.unwrap();
1611
1612        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1613        assert_eq!(batches[0].num_rows(), 1024);
1614    }
1615
1616    #[tokio::test]
1617    async fn test_filter_on_arrow_lt_and_gt() {
1618        let mut fixture = TableTestFixture::new();
1619        fixture.setup_manifest_files().await;
1620
1621        // Filter: y < 5 AND z >= 4
1622        let mut builder = fixture.table.scan();
1623        let predicate = Reference::new("y")
1624            .less_than(Datum::long(5))
1625            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1626        builder = builder
1627            .with_filter(predicate)
1628            .with_row_selection_enabled(true);
1629        let table_scan = builder.build().unwrap();
1630
1631        let batch_stream = table_scan.to_arrow().await.unwrap();
1632
1633        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1634        assert_eq!(batches[0].num_rows(), 500);
1635
1636        let col = batches[0].column_by_name("x").unwrap();
1637        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1638        assert_eq!(col, &expected_x);
1639
1640        let col = batches[0].column_by_name("y").unwrap();
1641        let mut values = vec![];
1642        values.append(vec![3; 200].as_mut());
1643        values.append(vec![4; 300].as_mut());
1644        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1645        assert_eq!(col, &expected_y);
1646
1647        let col = batches[0].column_by_name("z").unwrap();
1648        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1649        assert_eq!(col, &expected_z);
1650    }
1651
1652    #[tokio::test]
1653    async fn test_filter_on_arrow_lt_or_gt() {
1654        let mut fixture = TableTestFixture::new();
1655        fixture.setup_manifest_files().await;
1656
1657        // Filter: y < 5 AND z >= 4
1658        let mut builder = fixture.table.scan();
1659        let predicate = Reference::new("y")
1660            .less_than(Datum::long(5))
1661            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1662        builder = builder
1663            .with_filter(predicate)
1664            .with_row_selection_enabled(true);
1665        let table_scan = builder.build().unwrap();
1666
1667        let batch_stream = table_scan.to_arrow().await.unwrap();
1668
1669        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1670        assert_eq!(batches[0].num_rows(), 1024);
1671
1672        let col = batches[0].column_by_name("x").unwrap();
1673        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1674        assert_eq!(col, &expected_x);
1675
1676        let col = batches[0].column_by_name("y").unwrap();
1677        let mut values = vec![2; 512];
1678        values.append(vec![3; 200].as_mut());
1679        values.append(vec![4; 300].as_mut());
1680        values.append(vec![5; 12].as_mut());
1681        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1682        assert_eq!(col, &expected_y);
1683
1684        let col = batches[0].column_by_name("z").unwrap();
1685        let mut values = vec![3; 512];
1686        values.append(vec![4; 512].as_mut());
1687        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1688        assert_eq!(col, &expected_z);
1689    }
1690
1691    #[tokio::test]
1692    async fn test_filter_on_arrow_startswith() {
1693        let mut fixture = TableTestFixture::new();
1694        fixture.setup_manifest_files().await;
1695
1696        // Filter: a STARTSWITH "Ice"
1697        let mut builder = fixture.table.scan();
1698        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1699        builder = builder
1700            .with_filter(predicate)
1701            .with_row_selection_enabled(true);
1702        let table_scan = builder.build().unwrap();
1703
1704        let batch_stream = table_scan.to_arrow().await.unwrap();
1705
1706        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1707
1708        assert_eq!(batches[0].num_rows(), 512);
1709
1710        let col = batches[0].column_by_name("a").unwrap();
1711        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1712        assert_eq!(string_arr.value(0), "Iceberg");
1713    }
1714
1715    #[tokio::test]
1716    async fn test_filter_on_arrow_not_startswith() {
1717        let mut fixture = TableTestFixture::new();
1718        fixture.setup_manifest_files().await;
1719
1720        // Filter: a NOT STARTSWITH "Ice"
1721        let mut builder = fixture.table.scan();
1722        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1723        builder = builder
1724            .with_filter(predicate)
1725            .with_row_selection_enabled(true);
1726        let table_scan = builder.build().unwrap();
1727
1728        let batch_stream = table_scan.to_arrow().await.unwrap();
1729
1730        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1731
1732        assert_eq!(batches[0].num_rows(), 512);
1733
1734        let col = batches[0].column_by_name("a").unwrap();
1735        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1736        assert_eq!(string_arr.value(0), "Apache");
1737    }
1738
1739    #[tokio::test]
1740    async fn test_filter_on_arrow_in() {
1741        let mut fixture = TableTestFixture::new();
1742        fixture.setup_manifest_files().await;
1743
1744        // Filter: a IN ("Sioux", "Iceberg")
1745        let mut builder = fixture.table.scan();
1746        let predicate =
1747            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1748        builder = builder
1749            .with_filter(predicate)
1750            .with_row_selection_enabled(true);
1751        let table_scan = builder.build().unwrap();
1752
1753        let batch_stream = table_scan.to_arrow().await.unwrap();
1754
1755        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1756
1757        assert_eq!(batches[0].num_rows(), 512);
1758
1759        let col = batches[0].column_by_name("a").unwrap();
1760        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1761        assert_eq!(string_arr.value(0), "Iceberg");
1762    }
1763
1764    #[tokio::test]
1765    async fn test_filter_on_arrow_not_in() {
1766        let mut fixture = TableTestFixture::new();
1767        fixture.setup_manifest_files().await;
1768
1769        // Filter: a NOT IN ("Sioux", "Iceberg")
1770        let mut builder = fixture.table.scan();
1771        let predicate =
1772            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1773        builder = builder
1774            .with_filter(predicate)
1775            .with_row_selection_enabled(true);
1776        let table_scan = builder.build().unwrap();
1777
1778        let batch_stream = table_scan.to_arrow().await.unwrap();
1779
1780        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1781
1782        assert_eq!(batches[0].num_rows(), 512);
1783
1784        let col = batches[0].column_by_name("a").unwrap();
1785        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1786        assert_eq!(string_arr.value(0), "Apache");
1787    }
1788
1789    #[test]
1790    fn test_file_scan_task_serialize_deserialize() {
1791        let test_fn = |task: FileScanTask| {
1792            let serialized = serde_json::to_string(&task).unwrap();
1793            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1794
1795            assert_eq!(task.data_file_path, deserialized.data_file_path);
1796            assert_eq!(task.start, deserialized.start);
1797            assert_eq!(task.length, deserialized.length);
1798            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1799            assert_eq!(task.predicate, deserialized.predicate);
1800            assert_eq!(task.schema, deserialized.schema);
1801        };
1802
1803        // without predicate
1804        let schema = Arc::new(
1805            Schema::builder()
1806                .with_fields(vec![Arc::new(NestedField::required(
1807                    1,
1808                    "x",
1809                    Type::Primitive(PrimitiveType::Binary),
1810                ))])
1811                .build()
1812                .unwrap(),
1813        );
1814        let task = FileScanTask {
1815            data_file_path: "data_file_path".to_string(),
1816            file_size_in_bytes: 0,
1817            start: 0,
1818            length: 100,
1819            project_field_ids: vec![1, 2, 3],
1820            predicate: None,
1821            schema: schema.clone(),
1822            record_count: Some(100),
1823            data_file_format: DataFileFormat::Parquet,
1824            deletes: vec![],
1825            partition: None,
1826            partition_spec: None,
1827            name_mapping: None,
1828            case_sensitive: false,
1829        };
1830        test_fn(task);
1831
1832        // with predicate
1833        let task = FileScanTask {
1834            data_file_path: "data_file_path".to_string(),
1835            file_size_in_bytes: 0,
1836            start: 0,
1837            length: 100,
1838            project_field_ids: vec![1, 2, 3],
1839            predicate: Some(BoundPredicate::AlwaysTrue),
1840            schema,
1841            record_count: None,
1842            data_file_format: DataFileFormat::Avro,
1843            deletes: vec![],
1844            partition: None,
1845            partition_spec: None,
1846            name_mapping: None,
1847            case_sensitive: false,
1848        };
1849        test_fn(task);
1850    }
1851
1852    #[tokio::test]
1853    async fn test_select_with_file_column() {
1854        use arrow_array::cast::AsArray;
1855
1856        let mut fixture = TableTestFixture::new();
1857        fixture.setup_manifest_files().await;
1858
1859        // Select regular columns plus the _file column
1860        let table_scan = fixture
1861            .table
1862            .scan()
1863            .select(["x", RESERVED_COL_NAME_FILE])
1864            .with_row_selection_enabled(true)
1865            .build()
1866            .unwrap();
1867
1868        let batch_stream = table_scan.to_arrow().await.unwrap();
1869        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1870
1871        // Verify we have 2 columns: x and _file
1872        assert_eq!(batches[0].num_columns(), 2);
1873
1874        // Verify the x column exists and has correct data
1875        let x_col = batches[0].column_by_name("x").unwrap();
1876        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1877        assert_eq!(x_arr.value(0), 1);
1878
1879        // Verify the _file column exists
1880        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1881        assert!(
1882            file_col.is_some(),
1883            "_file column should be present in the batch"
1884        );
1885
1886        // Verify the _file column contains a file path
1887        let file_col = file_col.unwrap();
1888        assert!(
1889            matches!(
1890                file_col.data_type(),
1891                arrow_schema::DataType::RunEndEncoded(_, _)
1892            ),
1893            "_file column should use RunEndEncoded type"
1894        );
1895
1896        // Decode the RunArray to verify it contains the file path
1897        let run_array = file_col
1898            .as_any()
1899            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1900            .expect("_file column should be a RunArray");
1901
1902        let values = run_array.values();
1903        let string_values = values.as_string::<i32>();
1904        assert_eq!(string_values.len(), 1, "Should have a single file path");
1905
1906        let file_path = string_values.value(0);
1907        assert!(
1908            file_path.ends_with(".parquet"),
1909            "File path should end with .parquet, got: {file_path}"
1910        );
1911    }
1912
1913    #[tokio::test]
1914    async fn test_select_file_column_position() {
1915        let mut fixture = TableTestFixture::new();
1916        fixture.setup_manifest_files().await;
1917
1918        // Select columns in specific order: x, _file, z
1919        let table_scan = fixture
1920            .table
1921            .scan()
1922            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1923            .with_row_selection_enabled(true)
1924            .build()
1925            .unwrap();
1926
1927        let batch_stream = table_scan.to_arrow().await.unwrap();
1928        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1929
1930        assert_eq!(batches[0].num_columns(), 3);
1931
1932        // Verify column order: x at position 0, _file at position 1, z at position 2
1933        let schema = batches[0].schema();
1934        assert_eq!(schema.field(0).name(), "x");
1935        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1936        assert_eq!(schema.field(2).name(), "z");
1937
1938        // Verify columns by name also works
1939        assert!(batches[0].column_by_name("x").is_some());
1940        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1941        assert!(batches[0].column_by_name("z").is_some());
1942    }
1943
1944    #[tokio::test]
1945    async fn test_select_file_column_only() {
1946        let mut fixture = TableTestFixture::new();
1947        fixture.setup_manifest_files().await;
1948
1949        // Select only the _file column
1950        let table_scan = fixture
1951            .table
1952            .scan()
1953            .select([RESERVED_COL_NAME_FILE])
1954            .with_row_selection_enabled(true)
1955            .build()
1956            .unwrap();
1957
1958        let batch_stream = table_scan.to_arrow().await.unwrap();
1959        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1960
1961        // Should have exactly 1 column
1962        assert_eq!(batches[0].num_columns(), 1);
1963
1964        // Verify it's the _file column
1965        let schema = batches[0].schema();
1966        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
1967
1968        // Verify the batch has the correct number of rows
1969        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
1970        // Each file has 1024 rows, so total is 2048 rows
1971        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1972        assert_eq!(total_rows, 2048);
1973    }
1974
1975    #[tokio::test]
1976    async fn test_file_column_with_multiple_files() {
1977        use std::collections::HashSet;
1978
1979        let mut fixture = TableTestFixture::new();
1980        fixture.setup_manifest_files().await;
1981
1982        // Select x and _file columns
1983        let table_scan = fixture
1984            .table
1985            .scan()
1986            .select(["x", RESERVED_COL_NAME_FILE])
1987            .with_row_selection_enabled(true)
1988            .build()
1989            .unwrap();
1990
1991        let batch_stream = table_scan.to_arrow().await.unwrap();
1992        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1993
1994        // Collect all unique file paths from the batches
1995        let mut file_paths = HashSet::new();
1996        for batch in &batches {
1997            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
1998            let run_array = file_col
1999                .as_any()
2000                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2001                .expect("_file column should be a RunArray");
2002
2003            let values = run_array.values();
2004            let string_values = values.as_string::<i32>();
2005            for i in 0..string_values.len() {
2006                file_paths.insert(string_values.value(i).to_string());
2007            }
2008        }
2009
2010        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2011        assert!(!file_paths.is_empty(), "Should have at least one file path");
2012
2013        // All paths should end with .parquet
2014        for path in &file_paths {
2015            assert!(
2016                path.ends_with(".parquet"),
2017                "All file paths should end with .parquet, got: {path}"
2018            );
2019        }
2020    }
2021
2022    #[tokio::test]
2023    async fn test_file_column_at_start() {
2024        let mut fixture = TableTestFixture::new();
2025        fixture.setup_manifest_files().await;
2026
2027        // Select _file at the start
2028        let table_scan = fixture
2029            .table
2030            .scan()
2031            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2032            .with_row_selection_enabled(true)
2033            .build()
2034            .unwrap();
2035
2036        let batch_stream = table_scan.to_arrow().await.unwrap();
2037        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2038
2039        assert_eq!(batches[0].num_columns(), 3);
2040
2041        // Verify _file is at position 0
2042        let schema = batches[0].schema();
2043        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2044        assert_eq!(schema.field(1).name(), "x");
2045        assert_eq!(schema.field(2).name(), "y");
2046    }
2047
2048    #[tokio::test]
2049    async fn test_file_column_at_end() {
2050        let mut fixture = TableTestFixture::new();
2051        fixture.setup_manifest_files().await;
2052
2053        // Select _file at the end
2054        let table_scan = fixture
2055            .table
2056            .scan()
2057            .select(["x", "y", RESERVED_COL_NAME_FILE])
2058            .with_row_selection_enabled(true)
2059            .build()
2060            .unwrap();
2061
2062        let batch_stream = table_scan.to_arrow().await.unwrap();
2063        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2064
2065        assert_eq!(batches[0].num_columns(), 3);
2066
2067        // Verify _file is at position 2 (the end)
2068        let schema = batches[0].schema();
2069        assert_eq!(schema.field(0).name(), "x");
2070        assert_eq!(schema.field(1).name(), "y");
2071        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2072    }
2073
2074    #[tokio::test]
2075    async fn test_select_with_repeated_column_names() {
2076        let mut fixture = TableTestFixture::new();
2077        fixture.setup_manifest_files().await;
2078
2079        // Select with repeated column names - both regular columns and virtual columns
2080        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2081        let table_scan = fixture
2082            .table
2083            .scan()
2084            .select([
2085                "x",
2086                RESERVED_COL_NAME_FILE,
2087                "x", // x repeated
2088                "y",
2089                RESERVED_COL_NAME_FILE, // _file repeated
2090                "y",                    // y repeated
2091            ])
2092            .with_row_selection_enabled(true)
2093            .build()
2094            .unwrap();
2095
2096        let batch_stream = table_scan.to_arrow().await.unwrap();
2097        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2098
2099        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2100        assert_eq!(
2101            batches[0].num_columns(),
2102            6,
2103            "Should have exactly 6 columns with duplicates"
2104        );
2105
2106        let schema = batches[0].schema();
2107
2108        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2109        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2110        assert_eq!(
2111            schema.field(1).name(),
2112            RESERVED_COL_NAME_FILE,
2113            "Column 1 should be _file"
2114        );
2115        assert_eq!(
2116            schema.field(2).name(),
2117            "x",
2118            "Column 2 should be x (duplicate)"
2119        );
2120        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2121        assert_eq!(
2122            schema.field(4).name(),
2123            RESERVED_COL_NAME_FILE,
2124            "Column 4 should be _file (duplicate)"
2125        );
2126        assert_eq!(
2127            schema.field(5).name(),
2128            "y",
2129            "Column 5 should be y (duplicate)"
2130        );
2131
2132        // Verify all columns have correct data types
2133        assert!(
2134            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2135            "Column x should be Int64"
2136        );
2137        assert!(
2138            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2139            "Column x (duplicate) should be Int64"
2140        );
2141        assert!(
2142            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2143            "Column y should be Int64"
2144        );
2145        assert!(
2146            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2147            "Column y (duplicate) should be Int64"
2148        );
2149        assert!(
2150            matches!(
2151                schema.field(1).data_type(),
2152                arrow_schema::DataType::RunEndEncoded(_, _)
2153            ),
2154            "_file column should use RunEndEncoded type"
2155        );
2156        assert!(
2157            matches!(
2158                schema.field(4).data_type(),
2159                arrow_schema::DataType::RunEndEncoded(_, _)
2160            ),
2161            "_file column (duplicate) should use RunEndEncoded type"
2162        );
2163    }
2164
2165    #[tokio::test]
2166    async fn test_scan_deadlock() {
2167        let mut fixture = TableTestFixture::new();
2168        fixture.setup_deadlock_manifests().await;
2169
2170        // Create table scan with concurrency limit 1
2171        // This sets channel size to 1.
2172        // Data manifest has 10 entries -> will block producer.
2173        // Delete manifest is 2nd in list -> won't be processed.
2174        // Consumer 2 (Data) not started -> blocked.
2175        // Consumer 1 (Delete) waiting -> blocked.
2176        let table_scan = fixture
2177            .table
2178            .scan()
2179            .with_concurrency_limit(1)
2180            .build()
2181            .unwrap();
2182
2183        // This should timeout/hang if deadlock exists
2184        // We can use tokio::time::timeout
2185        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2186            table_scan
2187                .plan_files()
2188                .await
2189                .unwrap()
2190                .try_collect::<Vec<_>>()
2191                .await
2192        })
2193        .await;
2194
2195        // Assert it finished (didn't timeout)
2196        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2197    }
2198}