Skip to main content

iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, NameMapping, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47/// A stream of arrow [`RecordBatch`]es.
48pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50/// Builder to create table scan.
51pub struct TableScanBuilder<'a> {
52    table: &'a Table,
53    // Defaults to none which means select all columns
54    column_names: Option<Vec<String>>,
55    snapshot_id: Option<i64>,
56    batch_size: Option<usize>,
57    case_sensitive: bool,
58    filter: Option<Predicate>,
59    concurrency_limit_data_files: usize,
60    concurrency_limit_manifest_entries: usize,
61    concurrency_limit_manifest_files: usize,
62    row_group_filtering_enabled: bool,
63    row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67    pub(crate) fn new(table: &'a Table) -> Self {
68        let num_cpus = available_parallelism().get();
69
70        Self {
71            table,
72            column_names: None,
73            snapshot_id: None,
74            batch_size: None,
75            case_sensitive: true,
76            filter: None,
77            concurrency_limit_data_files: num_cpus,
78            concurrency_limit_manifest_entries: num_cpus,
79            concurrency_limit_manifest_files: num_cpus,
80            row_group_filtering_enabled: true,
81            row_selection_enabled: false,
82        }
83    }
84
85    /// Sets the desired size of batches in the response
86    /// to something other than the default
87    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88        self.batch_size = batch_size;
89        self
90    }
91
92    /// Sets the scan's case sensitivity
93    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94        self.case_sensitive = case_sensitive;
95        self
96    }
97
98    /// Specifies a predicate to use as a filter
99    pub fn with_filter(mut self, predicate: Predicate) -> Self {
100        // calls rewrite_not to remove Not nodes, which must be absent
101        // when applying the manifest evaluator
102        self.filter = Some(predicate.rewrite_not());
103        self
104    }
105
106    /// Select all columns.
107    pub fn select_all(mut self) -> Self {
108        self.column_names = None;
109        self
110    }
111
112    /// Select empty columns.
113    pub fn select_empty(mut self) -> Self {
114        self.column_names = Some(vec![]);
115        self
116    }
117
118    /// Select some columns of the table.
119    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120        self.column_names = Some(
121            column_names
122                .into_iter()
123                .map(|item| item.to_string())
124                .collect(),
125        );
126        self
127    }
128
129    /// Set the snapshot to scan. When not set, it uses current snapshot.
130    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131        self.snapshot_id = Some(snapshot_id);
132        self
133    }
134
135    /// Sets the concurrency limit for both manifest files and manifest
136    /// entries for this scan
137    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138        self.concurrency_limit_manifest_files = limit;
139        self.concurrency_limit_manifest_entries = limit;
140        self.concurrency_limit_data_files = limit;
141        self
142    }
143
144    /// Sets the data file concurrency limit for this scan
145    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146        self.concurrency_limit_data_files = limit;
147        self
148    }
149
150    /// Sets the manifest entry concurrency limit for this scan
151    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152        self.concurrency_limit_manifest_entries = limit;
153        self
154    }
155
156    /// Determines whether to enable row group filtering.
157    /// When enabled, if a read is performed with a filter predicate,
158    /// then the metadata for each row group in the parquet file is
159    /// evaluated against the filter predicate and row groups
160    /// that cant contain matching rows will be skipped entirely.
161    ///
162    /// Defaults to enabled, as it generally improves performance or
163    /// keeps it the same, with performance degradation unlikely.
164    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165        self.row_group_filtering_enabled = row_group_filtering_enabled;
166        self
167    }
168
169    /// Determines whether to enable row selection.
170    /// When enabled, if a read is performed with a filter predicate,
171    /// then (for row groups that have not been skipped) the page index
172    /// for each row group in a parquet file is parsed and evaluated
173    /// against the filter predicate to determine if ranges of rows
174    /// within a row group can be skipped, based upon the page-level
175    /// statistics for each column.
176    ///
177    /// Defaults to being disabled. Enabling requires parsing the parquet page
178    /// index, which can be slow enough that parsing the page index outweighs any
179    /// gains from the reduced number of rows that need scanning.
180    /// It is recommended to experiment with partitioning, sorting, row group size,
181    /// page size, and page row limit Iceberg settings on the table being scanned in
182    /// order to get the best performance from using row selection.
183    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184        self.row_selection_enabled = row_selection_enabled;
185        self
186    }
187
188    /// Build the table scan.
189    pub fn build(self) -> Result<TableScan> {
190        let snapshot = match self.snapshot_id {
191            Some(snapshot_id) => self
192                .table
193                .metadata()
194                .snapshot_by_id(snapshot_id)
195                .ok_or_else(|| {
196                    Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("Snapshot with id {snapshot_id} not found"),
199                    )
200                })?
201                .clone(),
202            None => {
203                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204                    return Ok(TableScan {
205                        batch_size: self.batch_size,
206                        column_names: self.column_names,
207                        file_io: self.table.file_io().clone(),
208                        plan_context: None,
209                        concurrency_limit_data_files: self.concurrency_limit_data_files,
210                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212                        row_group_filtering_enabled: self.row_group_filtering_enabled,
213                        row_selection_enabled: self.row_selection_enabled,
214                        runtime: self.table.runtime().clone(),
215                    });
216                };
217                current_snapshot_id.clone()
218            }
219        };
220
221        let schema = snapshot.schema(self.table.metadata())?;
222
223        // Check that all column names exist in the schema (skip reserved columns).
224        if let Some(column_names) = self.column_names.as_ref() {
225            for column_name in column_names {
226                // Skip reserved columns that don't exist in the schema
227                if is_metadata_column_name(column_name) {
228                    continue;
229                }
230                if schema.field_by_name(column_name).is_none() {
231                    return Err(Error::new(
232                        ErrorKind::DataInvalid,
233                        format!("Column {column_name} not found in table. Schema: {schema}"),
234                    ));
235                }
236            }
237        }
238
239        let mut field_ids = vec![];
240        let column_names = self.column_names.clone().unwrap_or_else(|| {
241            schema
242                .as_struct()
243                .fields()
244                .iter()
245                .map(|f| f.name.clone())
246                .collect()
247        });
248
249        for column_name in column_names.iter() {
250            // Handle metadata columns (like "_file")
251            if is_metadata_column_name(column_name) {
252                field_ids.push(get_metadata_field_id(column_name)?);
253                continue;
254            }
255
256            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257                Error::new(
258                    ErrorKind::DataInvalid,
259                    format!("Column {column_name} not found in table. Schema: {schema}"),
260                )
261            })?;
262
263            schema
264                .as_struct()
265                .field_by_id(field_id)
266                .ok_or_else(|| {
267                    Error::new(
268                        ErrorKind::FeatureUnsupported,
269                        format!(
270                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271                    ),
272                )
273            })?;
274
275            field_ids.push(field_id);
276        }
277
278        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279            Some(predicates.bind(schema.clone(), true)?)
280        } else {
281            None
282        };
283
284        let name_mapping = self
285            .table
286            .metadata()
287            .properties()
288            .get(DEFAULT_SCHEMA_NAME_MAPPING)
289            .map(|raw| {
290                serde_json::from_str::<NameMapping>(raw).map_err(|e| {
291                    Error::new(
292                        ErrorKind::DataInvalid,
293                        format!(
294                            "Failed to parse table property {DEFAULT_SCHEMA_NAME_MAPPING} as a NameMapping"
295                        ),
296                    )
297                    .with_source(e)
298                })
299            })
300            .transpose()?
301            .map(Arc::new);
302
303        let plan_context = PlanContext {
304            snapshot,
305            table_metadata: self.table.metadata_ref(),
306            snapshot_schema: schema,
307            case_sensitive: self.case_sensitive,
308            predicate: self.filter.map(Arc::new),
309            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
310            object_cache: self.table.object_cache(),
311            field_ids: Arc::new(field_ids),
312            name_mapping,
313            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
314            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
315            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
316        };
317
318        Ok(TableScan {
319            batch_size: self.batch_size,
320            column_names: self.column_names,
321            file_io: self.table.file_io().clone(),
322            plan_context: Some(plan_context),
323            concurrency_limit_data_files: self.concurrency_limit_data_files,
324            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
325            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
326            row_group_filtering_enabled: self.row_group_filtering_enabled,
327            row_selection_enabled: self.row_selection_enabled,
328            runtime: self.table.runtime().clone(),
329        })
330    }
331}
332
333/// Table scan.
334#[derive(Debug)]
335pub struct TableScan {
336    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
337    ///
338    /// If this is None, then the scan contains no rows.
339    plan_context: Option<PlanContext>,
340    batch_size: Option<usize>,
341    file_io: FileIO,
342    column_names: Option<Vec<String>>,
343    /// The maximum number of manifest files that will be
344    /// retrieved from [`FileIO`] concurrently
345    concurrency_limit_manifest_files: usize,
346
347    /// The maximum number of [`ManifestEntry`]s that will
348    /// be processed in parallel
349    concurrency_limit_manifest_entries: usize,
350
351    /// The maximum number of [`ManifestEntry`]s that will
352    /// be processed in parallel
353    concurrency_limit_data_files: usize,
354
355    row_group_filtering_enabled: bool,
356    row_selection_enabled: bool,
357
358    runtime: Runtime,
359}
360
361impl TableScan {
362    /// Returns a stream of [`FileScanTask`]s.
363    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
364        let Some(plan_context) = self.plan_context.as_ref() else {
365            return Ok(Box::pin(futures::stream::empty()));
366        };
367
368        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
369        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
370
371        // used to stream ManifestEntryContexts between stages of the file plan operation
372        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
373            channel(concurrency_limit_manifest_files);
374        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
375            channel(concurrency_limit_manifest_files);
376
377        // used to stream the results back to the caller
378        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
379
380        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
381
382        let manifest_list = plan_context.get_manifest_list().await?;
383
384        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
385        // whose partitions cannot match this
386        // scan's filter
387        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
388            manifest_list,
389            manifest_entry_data_ctx_tx,
390            delete_file_idx.clone(),
391            manifest_entry_delete_ctx_tx,
392        )?;
393
394        let mut channel_for_manifest_error = file_scan_task_tx.clone();
395        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
396        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
397
398        let rt = self.runtime.clone();
399
400        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
401        rt.io().spawn(async move {
402            let result = futures::stream::iter(manifest_file_contexts)
403                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
404                    ctx.fetch_manifest_and_stream_manifest_entries().await
405                })
406                .await;
407
408            if let Err(error) = result {
409                let _ = channel_for_manifest_error.send(Err(error)).await;
410            }
411        });
412
413        // Process the delete file [`ManifestEntry`] stream in parallel
414        {
415            let rt = rt.clone();
416            let rt_inner = rt.clone();
417            rt.cpu().spawn(async move {
418                let result = manifest_entry_delete_ctx_rx
419                    .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
420                    .try_for_each_concurrent(
421                        concurrency_limit_manifest_entries,
422                        |(manifest_entry_context, tx)| {
423                            let rt_inner = rt_inner.clone();
424                            async move {
425                                rt_inner
426                                    .cpu()
427                                    .spawn(async move {
428                                        Self::process_delete_manifest_entry(
429                                            manifest_entry_context,
430                                            tx,
431                                        )
432                                        .await
433                                    })
434                                    .await?
435                            }
436                        },
437                    )
438                    .await;
439
440                if let Err(error) = result {
441                    let _ = channel_for_delete_manifest_entry_error
442                        .send(Err(error))
443                        .await;
444                }
445            });
446        }
447
448        // Process the data file [`ManifestEntry`] stream in parallel
449        {
450            let rt_inner = rt.clone();
451            rt.cpu().spawn(async move {
452                let result = manifest_entry_data_ctx_rx
453                    .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
454                    .try_for_each_concurrent(
455                        concurrency_limit_manifest_entries,
456                        |(manifest_entry_context, tx)| {
457                            let rt_inner = rt_inner.clone();
458                            async move {
459                                rt_inner
460                                    .cpu()
461                                    .spawn(async move {
462                                        Self::process_data_manifest_entry(
463                                            manifest_entry_context,
464                                            tx,
465                                        )
466                                        .await
467                                    })
468                                    .await?
469                            }
470                        },
471                    )
472                    .await;
473
474                if let Err(error) = result {
475                    let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
476                }
477            });
478        }
479
480        Ok(file_scan_task_rx.boxed())
481    }
482
483    /// Returns an [`ArrowRecordBatchStream`].
484    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
485        let mut arrow_reader_builder =
486            ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
487                .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
488                .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
489                .with_row_selection_enabled(self.row_selection_enabled);
490
491        if let Some(batch_size) = self.batch_size {
492            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
493        }
494
495        arrow_reader_builder
496            .build()
497            .read(self.plan_files().await?)
498            .map(|result| result.stream())
499    }
500
501    /// Returns a reference to the column names of the table scan.
502    pub fn column_names(&self) -> Option<&[String]> {
503        self.column_names.as_deref()
504    }
505
506    /// Returns a reference to the snapshot of the table scan.
507    pub fn snapshot(&self) -> Option<&SnapshotRef> {
508        self.plan_context.as_ref().map(|x| &x.snapshot)
509    }
510
511    async fn process_data_manifest_entry(
512        manifest_entry_context: ManifestEntryContext,
513        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
514    ) -> Result<()> {
515        // skip processing this manifest entry if it has been marked as deleted
516        if !manifest_entry_context.manifest_entry.is_alive() {
517            return Ok(());
518        }
519
520        // abort the plan if we encounter a manifest entry for a delete file
521        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
522            return Err(Error::new(
523                ErrorKind::FeatureUnsupported,
524                "Encountered an entry for a delete file in a data file manifest",
525            ));
526        }
527
528        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
529            let BoundPredicates {
530                snapshot_bound_predicate,
531                partition_bound_predicate,
532            } = bound_predicates.as_ref();
533
534            let expression_evaluator_cache =
535                manifest_entry_context.expression_evaluator_cache.as_ref();
536
537            let expression_evaluator = expression_evaluator_cache.get(
538                manifest_entry_context.partition_spec_id,
539                partition_bound_predicate,
540            )?;
541
542            // skip any data file whose partition data indicates that it can't contain
543            // any data that matches this scan's filter
544            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
545                return Ok(());
546            }
547
548            // skip any data file whose metrics don't match this scan's filter
549            if !InclusiveMetricsEvaluator::eval(
550                snapshot_bound_predicate,
551                manifest_entry_context.manifest_entry.data_file(),
552                false,
553            )? {
554                return Ok(());
555            }
556        }
557
558        // congratulations! the manifest entry has made its way through the
559        // entire plan without getting filtered out. Create a corresponding
560        // FileScanTask and push it to the result stream
561        file_scan_task_tx
562            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
563            .await?;
564
565        Ok(())
566    }
567
568    async fn process_delete_manifest_entry(
569        manifest_entry_context: ManifestEntryContext,
570        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
571    ) -> Result<()> {
572        // skip processing this manifest entry if it has been marked as deleted
573        if !manifest_entry_context.manifest_entry.is_alive() {
574            return Ok(());
575        }
576
577        // abort the plan if we encounter a manifest entry that is not for a delete file
578        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
579            return Err(Error::new(
580                ErrorKind::FeatureUnsupported,
581                "Encountered an entry for a data file in a delete manifest",
582            ));
583        }
584
585        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
586            let expression_evaluator_cache =
587                manifest_entry_context.expression_evaluator_cache.as_ref();
588
589            let expression_evaluator = expression_evaluator_cache.get(
590                manifest_entry_context.partition_spec_id,
591                &bound_predicates.partition_bound_predicate,
592            )?;
593
594            // skip any data file whose partition data indicates that it can't contain
595            // any data that matches this scan's filter
596            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
597                return Ok(());
598            }
599        }
600
601        delete_file_ctx_tx
602            .send(DeleteFileContext {
603                manifest_entry: manifest_entry_context.manifest_entry.clone(),
604                partition_spec_id: manifest_entry_context.partition_spec_id,
605            })
606            .await?;
607
608        Ok(())
609    }
610}
611
612pub(crate) struct BoundPredicates {
613    partition_bound_predicate: BoundPredicate,
614    snapshot_bound_predicate: BoundPredicate,
615}
616
617#[cfg(test)]
618pub mod tests {
619    //! shared tests for the table scan API
620    #![allow(missing_docs)]
621
622    use std::collections::HashMap;
623    use std::fs;
624    use std::fs::File;
625    use std::sync::Arc;
626
627    use arrow_array::cast::AsArray;
628    use arrow_array::{
629        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
630        StringArray,
631    };
632    use futures::{TryStreamExt, stream};
633    use minijinja::value::Value;
634    use minijinja::{AutoEscape, Environment, context};
635    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
636    use parquet::basic::Compression;
637    use parquet::file::properties::WriterProperties;
638    use tempfile::TempDir;
639    use uuid::Uuid;
640
641    use crate::arrow::ArrowReaderBuilder;
642    use crate::expr::{BoundPredicate, Reference};
643    use crate::io::{FileIO, OutputFile};
644    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
645    use crate::scan::FileScanTask;
646    use crate::spec::{
647        DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, DataFileBuilder, DataFileFormat, Datum,
648        Literal, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder,
649        NestedField, PartitionSpec, PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
650    };
651    use crate::table::Table;
652    use crate::test_utils::test_runtime;
653    use crate::{ErrorKind, TableIdent};
654
655    fn render_template(template: &str, ctx: Value) -> String {
656        let mut env = Environment::new();
657        env.set_auto_escape_callback(|_| AutoEscape::None);
658        env.render_str(template, ctx).unwrap()
659    }
660
661    pub struct TableTestFixture {
662        pub table_location: String,
663        pub table: Table,
664    }
665
666    impl TableTestFixture {
667        #[allow(clippy::new_without_default)]
668        pub fn new() -> Self {
669            let tmp_dir = TempDir::new().unwrap();
670            let table_location = tmp_dir.path().join("table1");
671            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
672            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
673            let table_metadata1_location = table_location.join("metadata/v1.json");
674
675            let file_io = FileIO::new_with_fs();
676
677            let table_metadata = {
678                let template_json_str = fs::read_to_string(format!(
679                    "{}/testdata/example_table_metadata_v2.json",
680                    env!("CARGO_MANIFEST_DIR")
681                ))
682                .unwrap();
683                let metadata_json = render_template(&template_json_str, context! {
684                    table_location => &table_location,
685                    manifest_list_1_location => &manifest_list1_location,
686                    manifest_list_2_location => &manifest_list2_location,
687                    table_metadata_1_location => &table_metadata1_location,
688                });
689                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
690            };
691
692            let table = Table::builder()
693                .metadata(table_metadata)
694                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
695                .file_io(file_io.clone())
696                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
697                .runtime(test_runtime())
698                .build()
699                .unwrap();
700
701            Self {
702                table_location: table_location.to_str().unwrap().to_string(),
703                table,
704            }
705        }
706
707        #[allow(clippy::new_without_default)]
708        pub fn new_empty() -> Self {
709            let tmp_dir = TempDir::new().unwrap();
710            let table_location = tmp_dir.path().join("table1");
711            let table_metadata1_location = table_location.join("metadata/v1.json");
712
713            let file_io = FileIO::new_with_fs();
714
715            let table_metadata = {
716                let template_json_str = fs::read_to_string(format!(
717                    "{}/testdata/example_empty_table_metadata_v2.json",
718                    env!("CARGO_MANIFEST_DIR")
719                ))
720                .unwrap();
721                let metadata_json = render_template(&template_json_str, context! {
722                    table_location => &table_location,
723                    table_metadata_1_location => &table_metadata1_location,
724                });
725                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
726            };
727
728            let table = Table::builder()
729                .metadata(table_metadata)
730                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
731                .file_io(file_io.clone())
732                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
733                .runtime(test_runtime())
734                .build()
735                .unwrap();
736
737            Self {
738                table_location: table_location.to_str().unwrap().to_string(),
739                table,
740            }
741        }
742
743        /// Creates a fixture with 5 snapshots chained as:
744        ///   S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
745        /// Useful for testing snapshot history traversal.
746        pub fn new_with_deep_history() -> Self {
747            let tmp_dir = TempDir::new().unwrap();
748            let table_location = tmp_dir.path().join("table1");
749            let table_metadata1_location = table_location.join("metadata/v1.json");
750
751            let file_io = FileIO::new_with_fs();
752
753            let table_metadata = {
754                let json_str = fs::read_to_string(format!(
755                    "{}/testdata/example_table_metadata_v2_deep_history.json",
756                    env!("CARGO_MANIFEST_DIR")
757                ))
758                .unwrap();
759                serde_json::from_str::<TableMetadata>(&json_str).unwrap()
760            };
761
762            let table = Table::builder()
763                .metadata(table_metadata)
764                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
765                .file_io(file_io.clone())
766                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
767                .runtime(test_runtime())
768                .build()
769                .unwrap();
770
771            Self {
772                table_location: table_location.to_str().unwrap().to_string(),
773                table,
774            }
775        }
776
777        pub fn new_unpartitioned() -> Self {
778            let tmp_dir = TempDir::new().unwrap();
779            let table_location = tmp_dir.path().join("table1");
780            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
781            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
782            let table_metadata1_location = table_location.join("metadata/v1.json");
783
784            let file_io = FileIO::new_with_fs();
785
786            let mut table_metadata = {
787                let template_json_str = fs::read_to_string(format!(
788                    "{}/testdata/example_table_metadata_v2.json",
789                    env!("CARGO_MANIFEST_DIR")
790                ))
791                .unwrap();
792                let metadata_json = render_template(&template_json_str, context! {
793                    table_location => &table_location,
794                    manifest_list_1_location => &manifest_list1_location,
795                    manifest_list_2_location => &manifest_list2_location,
796                    table_metadata_1_location => &table_metadata1_location,
797                });
798                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
799            };
800
801            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
802            table_metadata.partition_specs.clear();
803            table_metadata.default_partition_type = StructType::new(vec![]);
804            table_metadata
805                .partition_specs
806                .insert(0, table_metadata.default_spec.clone());
807
808            let table = Table::builder()
809                .metadata(table_metadata)
810                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
811                .file_io(file_io.clone())
812                .metadata_location(table_metadata1_location.to_str().unwrap())
813                .runtime(test_runtime())
814                .build()
815                .unwrap();
816
817            Self {
818                table_location: table_location.to_str().unwrap().to_string(),
819                table,
820            }
821        }
822
823        fn next_manifest_file(&self) -> OutputFile {
824            self.table
825                .file_io()
826                .new_output(format!(
827                    "{}/metadata/manifest_{}.avro",
828                    self.table_location,
829                    Uuid::new_v4()
830                ))
831                .unwrap()
832        }
833
834        pub async fn setup_manifest_files(&mut self) {
835            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
836            let parent_snapshot = current_snapshot
837                .parent_snapshot(self.table.metadata())
838                .unwrap();
839            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
840            let current_partition_spec = self.table.metadata().default_partition_spec();
841
842            // Write the data files first, then use the file size in the manifest entries
843            let parquet_file_size = self.write_parquet_data_files();
844
845            let mut writer = ManifestWriterBuilder::new(
846                self.next_manifest_file(),
847                Some(current_snapshot.snapshot_id()),
848                None,
849                current_schema.clone(),
850                current_partition_spec.as_ref().clone(),
851            )
852            .build_v2_data();
853            writer
854                .add_entry(
855                    ManifestEntry::builder()
856                        .status(ManifestStatus::Added)
857                        .data_file(
858                            DataFileBuilder::default()
859                                .partition_spec_id(0)
860                                .content(DataContentType::Data)
861                                .file_path(format!("{}/1.parquet", &self.table_location))
862                                .file_format(DataFileFormat::Parquet)
863                                .file_size_in_bytes(parquet_file_size)
864                                .record_count(1)
865                                .partition(Struct::from_iter([Some(Literal::long(100))]))
866                                .key_metadata(None)
867                                .build()
868                                .unwrap(),
869                        )
870                        .build(),
871                )
872                .unwrap();
873            writer
874                .add_delete_entry(
875                    ManifestEntry::builder()
876                        .status(ManifestStatus::Deleted)
877                        .snapshot_id(parent_snapshot.snapshot_id())
878                        .sequence_number(parent_snapshot.sequence_number())
879                        .file_sequence_number(parent_snapshot.sequence_number())
880                        .data_file(
881                            DataFileBuilder::default()
882                                .partition_spec_id(0)
883                                .content(DataContentType::Data)
884                                .file_path(format!("{}/2.parquet", &self.table_location))
885                                .file_format(DataFileFormat::Parquet)
886                                .file_size_in_bytes(parquet_file_size)
887                                .record_count(1)
888                                .partition(Struct::from_iter([Some(Literal::long(200))]))
889                                .build()
890                                .unwrap(),
891                        )
892                        .build(),
893                )
894                .unwrap();
895            writer
896                .add_existing_entry(
897                    ManifestEntry::builder()
898                        .status(ManifestStatus::Existing)
899                        .snapshot_id(parent_snapshot.snapshot_id())
900                        .sequence_number(parent_snapshot.sequence_number())
901                        .file_sequence_number(parent_snapshot.sequence_number())
902                        .data_file(
903                            DataFileBuilder::default()
904                                .partition_spec_id(0)
905                                .content(DataContentType::Data)
906                                .file_path(format!("{}/3.parquet", &self.table_location))
907                                .file_format(DataFileFormat::Parquet)
908                                .file_size_in_bytes(parquet_file_size)
909                                .record_count(1)
910                                .partition(Struct::from_iter([Some(Literal::long(300))]))
911                                .build()
912                                .unwrap(),
913                        )
914                        .build(),
915                )
916                .unwrap();
917            let data_file_manifest = writer.write_manifest_file().await.unwrap();
918
919            // Write to manifest list
920            let manifest_list_writer = self
921                .table
922                .file_io()
923                .new_output(current_snapshot.manifest_list())
924                .unwrap()
925                .writer()
926                .await
927                .unwrap();
928            let mut manifest_list_write = ManifestListWriter::v2(
929                manifest_list_writer,
930                current_snapshot.snapshot_id(),
931                current_snapshot.parent_snapshot_id(),
932                current_snapshot.sequence_number(),
933            );
934            manifest_list_write
935                .add_manifests(vec![data_file_manifest].into_iter())
936                .unwrap();
937            manifest_list_write.close().await.unwrap();
938        }
939
940        /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet)
941        /// and returns the file size in bytes.
942        fn write_parquet_data_files(&self) -> u64 {
943            std::fs::create_dir_all(&self.table_location).unwrap();
944
945            let schema = {
946                let fields = vec![
947                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
948                        .with_metadata(HashMap::from([(
949                            PARQUET_FIELD_ID_META_KEY.to_string(),
950                            "1".to_string(),
951                        )])),
952                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
953                        .with_metadata(HashMap::from([(
954                            PARQUET_FIELD_ID_META_KEY.to_string(),
955                            "2".to_string(),
956                        )])),
957                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
958                        .with_metadata(HashMap::from([(
959                            PARQUET_FIELD_ID_META_KEY.to_string(),
960                            "3".to_string(),
961                        )])),
962                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
963                        .with_metadata(HashMap::from([(
964                            PARQUET_FIELD_ID_META_KEY.to_string(),
965                            "4".to_string(),
966                        )])),
967                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
968                        .with_metadata(HashMap::from([(
969                            PARQUET_FIELD_ID_META_KEY.to_string(),
970                            "5".to_string(),
971                        )])),
972                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
973                        .with_metadata(HashMap::from([(
974                            PARQUET_FIELD_ID_META_KEY.to_string(),
975                            "6".to_string(),
976                        )])),
977                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
978                        .with_metadata(HashMap::from([(
979                            PARQUET_FIELD_ID_META_KEY.to_string(),
980                            "7".to_string(),
981                        )])),
982                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
983                        .with_metadata(HashMap::from([(
984                            PARQUET_FIELD_ID_META_KEY.to_string(),
985                            "8".to_string(),
986                        )])),
987                ];
988                Arc::new(arrow_schema::Schema::new(fields))
989            };
990            // x: [1, 1, 1, 1, ...]
991            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
992
993            let mut values = vec![2; 512];
994            values.append(vec![3; 200].as_mut());
995            values.append(vec![4; 300].as_mut());
996            values.append(vec![5; 12].as_mut());
997
998            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
999            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1000
1001            let mut values = vec![3; 512];
1002            values.append(vec![4; 512].as_mut());
1003
1004            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
1005            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1006
1007            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
1008            let mut values = vec!["Apache"; 512];
1009            values.append(vec!["Iceberg"; 512].as_mut());
1010            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1011
1012            // dbl:
1013            let mut values = vec![100.0f64; 512];
1014            values.append(vec![150.0f64; 12].as_mut());
1015            values.append(vec![200.0f64; 500].as_mut());
1016            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1017
1018            // i32:
1019            let mut values = vec![100i32; 512];
1020            values.append(vec![150i32; 12].as_mut());
1021            values.append(vec![200i32; 500].as_mut());
1022            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1023
1024            // i64:
1025            let mut values = vec![100i64; 512];
1026            values.append(vec![150i64; 12].as_mut());
1027            values.append(vec![200i64; 500].as_mut());
1028            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1029
1030            // bool:
1031            let mut values = vec![false; 512];
1032            values.append(vec![true; 512].as_mut());
1033            let values: BooleanArray = values.into();
1034            let col8 = Arc::new(values) as ArrayRef;
1035
1036            let to_write = RecordBatch::try_new(schema.clone(), vec![
1037                col1, col2, col3, col4, col5, col6, col7, col8,
1038            ])
1039            .unwrap();
1040
1041            // Write the Parquet files
1042            let props = WriterProperties::builder()
1043                .set_compression(Compression::SNAPPY)
1044                .build();
1045
1046            for n in 1..=3 {
1047                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1048                let mut writer =
1049                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1050
1051                writer.write(&to_write).expect("Writing batch");
1052
1053                // writer must be closed to write footer
1054                writer.close().unwrap();
1055            }
1056
1057            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1058                .unwrap()
1059                .len()
1060        }
1061
1062        pub async fn setup_unpartitioned_manifest_files(&mut self) {
1063            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1064            let parent_snapshot = current_snapshot
1065                .parent_snapshot(self.table.metadata())
1066                .unwrap();
1067            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1068            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1069
1070            // Write the data files first, then use the file size in the manifest entries
1071            let parquet_file_size = self.write_parquet_data_files();
1072
1073            // Write data files using an empty partition for unpartitioned tables.
1074            let mut writer = ManifestWriterBuilder::new(
1075                self.next_manifest_file(),
1076                Some(current_snapshot.snapshot_id()),
1077                None,
1078                current_schema.clone(),
1079                current_partition_spec.as_ref().clone(),
1080            )
1081            .build_v2_data();
1082
1083            // Create an empty partition value.
1084            let empty_partition = Struct::empty();
1085
1086            writer
1087                .add_entry(
1088                    ManifestEntry::builder()
1089                        .status(ManifestStatus::Added)
1090                        .data_file(
1091                            DataFileBuilder::default()
1092                                .partition_spec_id(0)
1093                                .content(DataContentType::Data)
1094                                .file_path(format!("{}/1.parquet", &self.table_location))
1095                                .file_format(DataFileFormat::Parquet)
1096                                .file_size_in_bytes(parquet_file_size)
1097                                .record_count(1)
1098                                .partition(empty_partition.clone())
1099                                .key_metadata(None)
1100                                .build()
1101                                .unwrap(),
1102                        )
1103                        .build(),
1104                )
1105                .unwrap();
1106
1107            writer
1108                .add_delete_entry(
1109                    ManifestEntry::builder()
1110                        .status(ManifestStatus::Deleted)
1111                        .snapshot_id(parent_snapshot.snapshot_id())
1112                        .sequence_number(parent_snapshot.sequence_number())
1113                        .file_sequence_number(parent_snapshot.sequence_number())
1114                        .data_file(
1115                            DataFileBuilder::default()
1116                                .partition_spec_id(0)
1117                                .content(DataContentType::Data)
1118                                .file_path(format!("{}/2.parquet", &self.table_location))
1119                                .file_format(DataFileFormat::Parquet)
1120                                .file_size_in_bytes(parquet_file_size)
1121                                .record_count(1)
1122                                .partition(empty_partition.clone())
1123                                .build()
1124                                .unwrap(),
1125                        )
1126                        .build(),
1127                )
1128                .unwrap();
1129
1130            writer
1131                .add_existing_entry(
1132                    ManifestEntry::builder()
1133                        .status(ManifestStatus::Existing)
1134                        .snapshot_id(parent_snapshot.snapshot_id())
1135                        .sequence_number(parent_snapshot.sequence_number())
1136                        .file_sequence_number(parent_snapshot.sequence_number())
1137                        .data_file(
1138                            DataFileBuilder::default()
1139                                .partition_spec_id(0)
1140                                .content(DataContentType::Data)
1141                                .file_path(format!("{}/3.parquet", &self.table_location))
1142                                .file_format(DataFileFormat::Parquet)
1143                                .file_size_in_bytes(parquet_file_size)
1144                                .record_count(1)
1145                                .partition(empty_partition.clone())
1146                                .build()
1147                                .unwrap(),
1148                        )
1149                        .build(),
1150                )
1151                .unwrap();
1152
1153            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1154
1155            // Write to manifest list
1156            let manifest_list_writer = self
1157                .table
1158                .file_io()
1159                .new_output(current_snapshot.manifest_list())
1160                .unwrap()
1161                .writer()
1162                .await
1163                .unwrap();
1164            let mut manifest_list_write = ManifestListWriter::v2(
1165                manifest_list_writer,
1166                current_snapshot.snapshot_id(),
1167                current_snapshot.parent_snapshot_id(),
1168                current_snapshot.sequence_number(),
1169            );
1170            manifest_list_write
1171                .add_manifests(vec![data_file_manifest].into_iter())
1172                .unwrap();
1173            manifest_list_write.close().await.unwrap();
1174        }
1175
1176        pub async fn setup_deadlock_manifests(&mut self) {
1177            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1178            let _parent_snapshot = current_snapshot
1179                .parent_snapshot(self.table.metadata())
1180                .unwrap();
1181            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1182            let current_partition_spec = self.table.metadata().default_partition_spec();
1183
1184            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1185            let mut writer = ManifestWriterBuilder::new(
1186                self.next_manifest_file(),
1187                Some(current_snapshot.snapshot_id()),
1188                None,
1189                current_schema.clone(),
1190                current_partition_spec.as_ref().clone(),
1191            )
1192            .build_v2_data();
1193
1194            // Add 10 data entries
1195            for i in 0..10 {
1196                writer
1197                    .add_entry(
1198                        ManifestEntry::builder()
1199                            .status(ManifestStatus::Added)
1200                            .data_file(
1201                                DataFileBuilder::default()
1202                                    .partition_spec_id(0)
1203                                    .content(DataContentType::Data)
1204                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1205                                    .file_format(DataFileFormat::Parquet)
1206                                    .file_size_in_bytes(100)
1207                                    .record_count(1)
1208                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1209                                    .key_metadata(None)
1210                                    .build()
1211                                    .unwrap(),
1212                            )
1213                            .build(),
1214                    )
1215                    .unwrap();
1216            }
1217            let data_manifest = writer.write_manifest_file().await.unwrap();
1218
1219            // 2. Write DELETE manifest
1220            let mut writer = ManifestWriterBuilder::new(
1221                self.next_manifest_file(),
1222                Some(current_snapshot.snapshot_id()),
1223                None,
1224                current_schema.clone(),
1225                current_partition_spec.as_ref().clone(),
1226            )
1227            .build_v2_deletes();
1228
1229            writer
1230                .add_entry(
1231                    ManifestEntry::builder()
1232                        .status(ManifestStatus::Added)
1233                        .data_file(
1234                            DataFileBuilder::default()
1235                                .partition_spec_id(0)
1236                                .content(DataContentType::PositionDeletes)
1237                                .file_path(format!("{}/del.parquet", &self.table_location))
1238                                .file_format(DataFileFormat::Parquet)
1239                                .file_size_in_bytes(100)
1240                                .record_count(1)
1241                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1242                                .build()
1243                                .unwrap(),
1244                        )
1245                        .build(),
1246                )
1247                .unwrap();
1248            let delete_manifest = writer.write_manifest_file().await.unwrap();
1249
1250            // Write to manifest list - DATA FIRST then DELETE
1251            // This order is crucial for reproduction
1252            let manifest_list_writer = self
1253                .table
1254                .file_io()
1255                .new_output(current_snapshot.manifest_list())
1256                .unwrap()
1257                .writer()
1258                .await
1259                .unwrap();
1260            let mut manifest_list_write = ManifestListWriter::v2(
1261                manifest_list_writer,
1262                current_snapshot.snapshot_id(),
1263                current_snapshot.parent_snapshot_id(),
1264                current_snapshot.sequence_number(),
1265            );
1266            manifest_list_write
1267                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1268                .unwrap();
1269            manifest_list_write.close().await.unwrap();
1270        }
1271    }
1272
1273    #[tokio::test]
1274    async fn test_table_scan_columns() {
1275        let table = TableTestFixture::new().table;
1276
1277        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1278        assert_eq!(
1279            Some(vec!["x".to_string(), "y".to_string()]),
1280            table_scan.column_names
1281        );
1282
1283        let table_scan = table
1284            .scan()
1285            .select(["x", "y"])
1286            .select(["z"])
1287            .build()
1288            .unwrap();
1289        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1290    }
1291
1292    #[tokio::test]
1293    async fn test_select_all() {
1294        let table = TableTestFixture::new().table;
1295
1296        let table_scan = table.scan().select_all().build().unwrap();
1297        assert!(table_scan.column_names.is_none());
1298    }
1299
1300    #[test]
1301    fn test_select_no_exist_column() {
1302        let table = TableTestFixture::new().table;
1303
1304        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1305        assert!(table_scan.is_err());
1306    }
1307
1308    #[tokio::test]
1309    async fn test_table_scan_default_snapshot_id() {
1310        let table = TableTestFixture::new().table;
1311
1312        let table_scan = table.scan().build().unwrap();
1313        assert_eq!(
1314            table.metadata().current_snapshot().unwrap().snapshot_id(),
1315            table_scan.snapshot().unwrap().snapshot_id()
1316        );
1317    }
1318
1319    #[test]
1320    fn test_table_scan_non_exist_snapshot_id() {
1321        let table = TableTestFixture::new().table;
1322
1323        let table_scan = table.scan().snapshot_id(1024).build();
1324        assert!(table_scan.is_err());
1325    }
1326
1327    #[tokio::test]
1328    async fn test_table_scan_with_snapshot_id() {
1329        let table = TableTestFixture::new().table;
1330
1331        let table_scan = table
1332            .scan()
1333            .snapshot_id(3051729675574597004)
1334            .with_row_selection_enabled(true)
1335            .build()
1336            .unwrap();
1337        assert_eq!(
1338            table_scan.snapshot().unwrap().snapshot_id(),
1339            3051729675574597004
1340        );
1341    }
1342
1343    fn table_with_property(key: &str, value: &str) -> Table {
1344        let fixture = TableTestFixture::new();
1345        let mut metadata = fixture.table.metadata().clone();
1346        metadata
1347            .properties
1348            .insert(key.to_string(), value.to_string());
1349        Table::builder()
1350            .metadata(metadata)
1351            .identifier(fixture.table.identifier().clone())
1352            .file_io(fixture.table.file_io().clone())
1353            .metadata_location(fixture.table.metadata_location().unwrap().to_string())
1354            .runtime(test_runtime())
1355            .build()
1356            .unwrap()
1357    }
1358
1359    #[test]
1360    fn test_table_scan_without_name_mapping_property() {
1361        let table = TableTestFixture::new().table;
1362
1363        let table_scan = table.scan().build().unwrap();
1364        assert!(
1365            table_scan
1366                .plan_context
1367                .as_ref()
1368                .unwrap()
1369                .name_mapping
1370                .is_none()
1371        );
1372    }
1373
1374    #[test]
1375    fn test_table_scan_with_name_mapping_property() {
1376        let mapping_json = r#"[{"field-id":1,"names":["id","record_id"]}]"#;
1377        let table = table_with_property(DEFAULT_SCHEMA_NAME_MAPPING, mapping_json);
1378
1379        let table_scan = table.scan().build().unwrap();
1380        let mapping = table_scan
1381            .plan_context
1382            .as_ref()
1383            .unwrap()
1384            .name_mapping
1385            .as_ref()
1386            .expect("name_mapping should be parsed from the table property");
1387        let fields = mapping.fields();
1388        assert_eq!(fields.len(), 1);
1389        assert_eq!(fields[0].field_id(), Some(1));
1390        assert_eq!(fields[0].names(), &[
1391            "id".to_string(),
1392            "record_id".to_string()
1393        ]);
1394    }
1395
1396    #[test]
1397    fn test_table_scan_with_malformed_name_mapping_property() {
1398        let table = table_with_property(DEFAULT_SCHEMA_NAME_MAPPING, "{ not valid json");
1399
1400        let err = table
1401            .scan()
1402            .build()
1403            .expect_err("malformed name mapping should fail to parse");
1404        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1405    }
1406
1407    #[tokio::test]
1408    async fn test_plan_files_carries_name_mapping_into_file_scan_task() {
1409        let mut fixture = TableTestFixture::new();
1410        fixture.setup_manifest_files().await;
1411
1412        let mapping_json = r#"[{"field-id":1,"names":["id","record_id"]}]"#;
1413        let mut metadata = fixture.table.metadata().clone();
1414        metadata.properties.insert(
1415            DEFAULT_SCHEMA_NAME_MAPPING.to_string(),
1416            mapping_json.to_string(),
1417        );
1418        let table = Table::builder()
1419            .metadata(metadata)
1420            .identifier(fixture.table.identifier().clone())
1421            .file_io(fixture.table.file_io().clone())
1422            .metadata_location(fixture.table.metadata_location().unwrap().to_string())
1423            .runtime(test_runtime())
1424            .build()
1425            .unwrap();
1426
1427        let tasks: Vec<_> = table
1428            .scan()
1429            .build()
1430            .unwrap()
1431            .plan_files()
1432            .await
1433            .unwrap()
1434            .try_collect()
1435            .await
1436            .unwrap();
1437
1438        assert!(!tasks.is_empty(), "expected at least one FileScanTask");
1439        for task in &tasks {
1440            let mapping = task
1441                .name_mapping
1442                .as_ref()
1443                .expect("name_mapping should reach the FileScanTask");
1444            assert_eq!(mapping.fields().len(), 1);
1445            assert_eq!(mapping.fields()[0].field_id(), Some(1));
1446        }
1447    }
1448
1449    #[tokio::test]
1450    async fn test_plan_files_on_table_without_any_snapshots() {
1451        let table = TableTestFixture::new_empty().table;
1452        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1453        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1454        assert!(batches.is_empty());
1455    }
1456
1457    #[tokio::test]
1458    async fn test_plan_files_no_deletions() {
1459        let mut fixture = TableTestFixture::new();
1460        fixture.setup_manifest_files().await;
1461
1462        // Create table scan for current snapshot and plan files
1463        let table_scan = fixture
1464            .table
1465            .scan()
1466            .with_row_selection_enabled(true)
1467            .build()
1468            .unwrap();
1469
1470        let mut tasks = table_scan
1471            .plan_files()
1472            .await
1473            .unwrap()
1474            .try_fold(vec![], |mut acc, task| async move {
1475                acc.push(task);
1476                Ok(acc)
1477            })
1478            .await
1479            .unwrap();
1480
1481        assert_eq!(tasks.len(), 2);
1482
1483        tasks.sort_by_key(|t| t.data_file_path.to_string());
1484
1485        // Check first task is added data file
1486        assert_eq!(
1487            tasks[0].data_file_path,
1488            format!("{}/1.parquet", &fixture.table_location)
1489        );
1490
1491        // Check second task is existing data file
1492        assert_eq!(
1493            tasks[1].data_file_path,
1494            format!("{}/3.parquet", &fixture.table_location)
1495        );
1496    }
1497
1498    #[tokio::test]
1499    async fn test_open_parquet_no_deletions() {
1500        let mut fixture = TableTestFixture::new();
1501        fixture.setup_manifest_files().await;
1502
1503        // Create table scan for current snapshot and plan files
1504        let table_scan = fixture
1505            .table
1506            .scan()
1507            .with_row_selection_enabled(true)
1508            .build()
1509            .unwrap();
1510
1511        let batch_stream = table_scan.to_arrow().await.unwrap();
1512
1513        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1514
1515        let col = batches[0].column_by_name("x").unwrap();
1516
1517        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1518        assert_eq!(int64_arr.value(0), 1);
1519    }
1520
1521    #[tokio::test]
1522    async fn test_open_parquet_no_deletions_by_separate_reader() {
1523        let mut fixture = TableTestFixture::new();
1524        fixture.setup_manifest_files().await;
1525
1526        // Create table scan for current snapshot and plan files
1527        let table_scan = fixture
1528            .table
1529            .scan()
1530            .with_row_selection_enabled(true)
1531            .build()
1532            .unwrap();
1533
1534        let mut plan_task: Vec<_> = table_scan
1535            .plan_files()
1536            .await
1537            .unwrap()
1538            .try_collect()
1539            .await
1540            .unwrap();
1541        assert_eq!(plan_task.len(), 2);
1542
1543        let reader = ArrowReaderBuilder::new(
1544            fixture.table.file_io().clone(),
1545            fixture.table.runtime().clone(),
1546        )
1547        .build();
1548        let batch_stream = reader
1549            .clone()
1550            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1551            .unwrap()
1552            .stream();
1553        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1554
1555        let reader = ArrowReaderBuilder::new(
1556            fixture.table.file_io().clone(),
1557            fixture.table.runtime().clone(),
1558        )
1559        .build();
1560        let batch_stream = reader
1561            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1562            .unwrap()
1563            .stream();
1564        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1565
1566        assert_eq!(batch_1, batch_2);
1567    }
1568
1569    #[tokio::test]
1570    async fn test_open_parquet_with_projection() {
1571        let mut fixture = TableTestFixture::new();
1572        fixture.setup_manifest_files().await;
1573
1574        // Create table scan for current snapshot and plan files
1575        let table_scan = fixture
1576            .table
1577            .scan()
1578            .select(["x", "z"])
1579            .with_row_selection_enabled(true)
1580            .build()
1581            .unwrap();
1582
1583        let batch_stream = table_scan.to_arrow().await.unwrap();
1584
1585        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1586
1587        assert_eq!(batches[0].num_columns(), 2);
1588
1589        let col1 = batches[0].column_by_name("x").unwrap();
1590        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1591        assert_eq!(int64_arr.value(0), 1);
1592
1593        let col2 = batches[0].column_by_name("z").unwrap();
1594        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1595        assert_eq!(int64_arr.value(0), 3);
1596
1597        // test empty scan
1598        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1599        let batch_stream = table_scan.to_arrow().await.unwrap();
1600        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1601
1602        assert_eq!(batches[0].num_columns(), 0);
1603        assert_eq!(batches[0].num_rows(), 1024);
1604    }
1605
1606    #[tokio::test]
1607    async fn test_filter_on_arrow_lt() {
1608        let mut fixture = TableTestFixture::new();
1609        fixture.setup_manifest_files().await;
1610
1611        // Filter: y < 3
1612        let mut builder = fixture.table.scan();
1613        let predicate = Reference::new("y").less_than(Datum::long(3));
1614        builder = builder
1615            .with_filter(predicate)
1616            .with_row_selection_enabled(true);
1617        let table_scan = builder.build().unwrap();
1618
1619        let batch_stream = table_scan.to_arrow().await.unwrap();
1620
1621        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1622
1623        assert_eq!(batches[0].num_rows(), 512);
1624
1625        let col = batches[0].column_by_name("x").unwrap();
1626        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1627        assert_eq!(int64_arr.value(0), 1);
1628
1629        let col = batches[0].column_by_name("y").unwrap();
1630        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1631        assert_eq!(int64_arr.value(0), 2);
1632    }
1633
1634    #[tokio::test]
1635    async fn test_filter_on_arrow_gt_eq() {
1636        let mut fixture = TableTestFixture::new();
1637        fixture.setup_manifest_files().await;
1638
1639        // Filter: y >= 5
1640        let mut builder = fixture.table.scan();
1641        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1642        builder = builder
1643            .with_filter(predicate)
1644            .with_row_selection_enabled(true);
1645        let table_scan = builder.build().unwrap();
1646
1647        let batch_stream = table_scan.to_arrow().await.unwrap();
1648
1649        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1650
1651        assert_eq!(batches[0].num_rows(), 12);
1652
1653        let col = batches[0].column_by_name("x").unwrap();
1654        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1655        assert_eq!(int64_arr.value(0), 1);
1656
1657        let col = batches[0].column_by_name("y").unwrap();
1658        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1659        assert_eq!(int64_arr.value(0), 5);
1660    }
1661
1662    #[tokio::test]
1663    async fn test_filter_double_eq() {
1664        let mut fixture = TableTestFixture::new();
1665        fixture.setup_manifest_files().await;
1666
1667        // Filter: dbl == 150.0
1668        let mut builder = fixture.table.scan();
1669        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1670        builder = builder
1671            .with_filter(predicate)
1672            .with_row_selection_enabled(true);
1673        let table_scan = builder.build().unwrap();
1674
1675        let batch_stream = table_scan.to_arrow().await.unwrap();
1676
1677        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1678
1679        assert_eq!(batches.len(), 2);
1680        assert_eq!(batches[0].num_rows(), 12);
1681
1682        let col = batches[0].column_by_name("dbl").unwrap();
1683        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1684        assert_eq!(f64_arr.value(1), 150.0f64);
1685    }
1686
1687    #[tokio::test]
1688    async fn test_filter_int_eq() {
1689        let mut fixture = TableTestFixture::new();
1690        fixture.setup_manifest_files().await;
1691
1692        // Filter: i32 == 150
1693        let mut builder = fixture.table.scan();
1694        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1695        builder = builder
1696            .with_filter(predicate)
1697            .with_row_selection_enabled(true);
1698        let table_scan = builder.build().unwrap();
1699
1700        let batch_stream = table_scan.to_arrow().await.unwrap();
1701
1702        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1703
1704        assert_eq!(batches.len(), 2);
1705        assert_eq!(batches[0].num_rows(), 12);
1706
1707        let col = batches[0].column_by_name("i32").unwrap();
1708        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1709        assert_eq!(i32_arr.value(1), 150i32);
1710    }
1711
1712    #[tokio::test]
1713    async fn test_filter_long_eq() {
1714        let mut fixture = TableTestFixture::new();
1715        fixture.setup_manifest_files().await;
1716
1717        // Filter: i64 == 150
1718        let mut builder = fixture.table.scan();
1719        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1720        builder = builder
1721            .with_filter(predicate)
1722            .with_row_selection_enabled(true);
1723        let table_scan = builder.build().unwrap();
1724
1725        let batch_stream = table_scan.to_arrow().await.unwrap();
1726
1727        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1728
1729        assert_eq!(batches.len(), 2);
1730        assert_eq!(batches[0].num_rows(), 12);
1731
1732        let col = batches[0].column_by_name("i64").unwrap();
1733        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1734        assert_eq!(i64_arr.value(1), 150i64);
1735    }
1736
1737    #[tokio::test]
1738    async fn test_filter_bool_eq() {
1739        let mut fixture = TableTestFixture::new();
1740        fixture.setup_manifest_files().await;
1741
1742        // Filter: bool == true
1743        let mut builder = fixture.table.scan();
1744        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1745        builder = builder
1746            .with_filter(predicate)
1747            .with_row_selection_enabled(true);
1748        let table_scan = builder.build().unwrap();
1749
1750        let batch_stream = table_scan.to_arrow().await.unwrap();
1751
1752        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1753
1754        assert_eq!(batches.len(), 2);
1755        assert_eq!(batches[0].num_rows(), 512);
1756
1757        let col = batches[0].column_by_name("bool").unwrap();
1758        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1759        assert!(bool_arr.value(1));
1760    }
1761
1762    #[tokio::test]
1763    async fn test_filter_on_arrow_is_null() {
1764        let mut fixture = TableTestFixture::new();
1765        fixture.setup_manifest_files().await;
1766
1767        // Filter: y is null
1768        let mut builder = fixture.table.scan();
1769        let predicate = Reference::new("y").is_null();
1770        builder = builder
1771            .with_filter(predicate)
1772            .with_row_selection_enabled(true);
1773        let table_scan = builder.build().unwrap();
1774
1775        let batch_stream = table_scan.to_arrow().await.unwrap();
1776
1777        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1778        assert_eq!(batches.len(), 0);
1779    }
1780
1781    #[tokio::test]
1782    async fn test_filter_on_arrow_is_not_null() {
1783        let mut fixture = TableTestFixture::new();
1784        fixture.setup_manifest_files().await;
1785
1786        // Filter: y is not null
1787        let mut builder = fixture.table.scan();
1788        let predicate = Reference::new("y").is_not_null();
1789        builder = builder
1790            .with_filter(predicate)
1791            .with_row_selection_enabled(true);
1792        let table_scan = builder.build().unwrap();
1793
1794        let batch_stream = table_scan.to_arrow().await.unwrap();
1795
1796        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1797        assert_eq!(batches[0].num_rows(), 1024);
1798    }
1799
1800    #[tokio::test]
1801    async fn test_filter_on_arrow_lt_and_gt() {
1802        let mut fixture = TableTestFixture::new();
1803        fixture.setup_manifest_files().await;
1804
1805        // Filter: y < 5 AND z >= 4
1806        let mut builder = fixture.table.scan();
1807        let predicate = Reference::new("y")
1808            .less_than(Datum::long(5))
1809            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1810        builder = builder
1811            .with_filter(predicate)
1812            .with_row_selection_enabled(true);
1813        let table_scan = builder.build().unwrap();
1814
1815        let batch_stream = table_scan.to_arrow().await.unwrap();
1816
1817        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1818        assert_eq!(batches[0].num_rows(), 500);
1819
1820        let col = batches[0].column_by_name("x").unwrap();
1821        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1822        assert_eq!(col, &expected_x);
1823
1824        let col = batches[0].column_by_name("y").unwrap();
1825        let mut values = vec![];
1826        values.append(vec![3; 200].as_mut());
1827        values.append(vec![4; 300].as_mut());
1828        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1829        assert_eq!(col, &expected_y);
1830
1831        let col = batches[0].column_by_name("z").unwrap();
1832        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1833        assert_eq!(col, &expected_z);
1834    }
1835
1836    #[tokio::test]
1837    async fn test_filter_on_arrow_lt_or_gt() {
1838        let mut fixture = TableTestFixture::new();
1839        fixture.setup_manifest_files().await;
1840
1841        // Filter: y < 5 AND z >= 4
1842        let mut builder = fixture.table.scan();
1843        let predicate = Reference::new("y")
1844            .less_than(Datum::long(5))
1845            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1846        builder = builder
1847            .with_filter(predicate)
1848            .with_row_selection_enabled(true);
1849        let table_scan = builder.build().unwrap();
1850
1851        let batch_stream = table_scan.to_arrow().await.unwrap();
1852
1853        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1854        assert_eq!(batches[0].num_rows(), 1024);
1855
1856        let col = batches[0].column_by_name("x").unwrap();
1857        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1858        assert_eq!(col, &expected_x);
1859
1860        let col = batches[0].column_by_name("y").unwrap();
1861        let mut values = vec![2; 512];
1862        values.append(vec![3; 200].as_mut());
1863        values.append(vec![4; 300].as_mut());
1864        values.append(vec![5; 12].as_mut());
1865        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1866        assert_eq!(col, &expected_y);
1867
1868        let col = batches[0].column_by_name("z").unwrap();
1869        let mut values = vec![3; 512];
1870        values.append(vec![4; 512].as_mut());
1871        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1872        assert_eq!(col, &expected_z);
1873    }
1874
1875    #[tokio::test]
1876    async fn test_filter_on_arrow_startswith() {
1877        let mut fixture = TableTestFixture::new();
1878        fixture.setup_manifest_files().await;
1879
1880        // Filter: a STARTSWITH "Ice"
1881        let mut builder = fixture.table.scan();
1882        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1883        builder = builder
1884            .with_filter(predicate)
1885            .with_row_selection_enabled(true);
1886        let table_scan = builder.build().unwrap();
1887
1888        let batch_stream = table_scan.to_arrow().await.unwrap();
1889
1890        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1891
1892        assert_eq!(batches[0].num_rows(), 512);
1893
1894        let col = batches[0].column_by_name("a").unwrap();
1895        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1896        assert_eq!(string_arr.value(0), "Iceberg");
1897    }
1898
1899    #[tokio::test]
1900    async fn test_filter_on_arrow_not_startswith() {
1901        let mut fixture = TableTestFixture::new();
1902        fixture.setup_manifest_files().await;
1903
1904        // Filter: a NOT STARTSWITH "Ice"
1905        let mut builder = fixture.table.scan();
1906        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1907        builder = builder
1908            .with_filter(predicate)
1909            .with_row_selection_enabled(true);
1910        let table_scan = builder.build().unwrap();
1911
1912        let batch_stream = table_scan.to_arrow().await.unwrap();
1913
1914        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1915
1916        assert_eq!(batches[0].num_rows(), 512);
1917
1918        let col = batches[0].column_by_name("a").unwrap();
1919        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1920        assert_eq!(string_arr.value(0), "Apache");
1921    }
1922
1923    #[tokio::test]
1924    async fn test_filter_on_arrow_in() {
1925        let mut fixture = TableTestFixture::new();
1926        fixture.setup_manifest_files().await;
1927
1928        // Filter: a IN ("Sioux", "Iceberg")
1929        let mut builder = fixture.table.scan();
1930        let predicate =
1931            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1932        builder = builder
1933            .with_filter(predicate)
1934            .with_row_selection_enabled(true);
1935        let table_scan = builder.build().unwrap();
1936
1937        let batch_stream = table_scan.to_arrow().await.unwrap();
1938
1939        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1940
1941        assert_eq!(batches[0].num_rows(), 512);
1942
1943        let col = batches[0].column_by_name("a").unwrap();
1944        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1945        assert_eq!(string_arr.value(0), "Iceberg");
1946    }
1947
1948    #[tokio::test]
1949    async fn test_filter_on_arrow_not_in() {
1950        let mut fixture = TableTestFixture::new();
1951        fixture.setup_manifest_files().await;
1952
1953        // Filter: a NOT IN ("Sioux", "Iceberg")
1954        let mut builder = fixture.table.scan();
1955        let predicate =
1956            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1957        builder = builder
1958            .with_filter(predicate)
1959            .with_row_selection_enabled(true);
1960        let table_scan = builder.build().unwrap();
1961
1962        let batch_stream = table_scan.to_arrow().await.unwrap();
1963
1964        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1965
1966        assert_eq!(batches[0].num_rows(), 512);
1967
1968        let col = batches[0].column_by_name("a").unwrap();
1969        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1970        assert_eq!(string_arr.value(0), "Apache");
1971    }
1972
1973    #[test]
1974    fn test_file_scan_task_serialize_deserialize() {
1975        let test_fn = |task: FileScanTask| {
1976            let serialized = serde_json::to_string(&task).unwrap();
1977            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1978
1979            assert_eq!(task.data_file_path, deserialized.data_file_path);
1980            assert_eq!(task.start, deserialized.start);
1981            assert_eq!(task.length, deserialized.length);
1982            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1983            assert_eq!(task.predicate, deserialized.predicate);
1984            assert_eq!(task.schema, deserialized.schema);
1985        };
1986
1987        // without predicate
1988        let schema = Arc::new(
1989            Schema::builder()
1990                .with_fields(vec![Arc::new(NestedField::required(
1991                    1,
1992                    "x",
1993                    Type::Primitive(PrimitiveType::Binary),
1994                ))])
1995                .build()
1996                .unwrap(),
1997        );
1998        let task = FileScanTask::builder()
1999            .with_data_file_path("data_file_path".to_string())
2000            .with_file_size_in_bytes(0)
2001            .with_start(0)
2002            .with_length(100)
2003            .with_project_field_ids(vec![1, 2, 3])
2004            .with_schema(schema.clone())
2005            .with_record_count(Some(100))
2006            .with_data_file_format(DataFileFormat::Parquet)
2007            .with_case_sensitive(false)
2008            .build();
2009        test_fn(task);
2010
2011        // with predicate
2012        let task = FileScanTask::builder()
2013            .with_data_file_path("data_file_path".to_string())
2014            .with_file_size_in_bytes(0)
2015            .with_start(0)
2016            .with_length(100)
2017            .with_project_field_ids(vec![1, 2, 3])
2018            .with_predicate(Some(BoundPredicate::AlwaysTrue))
2019            .with_schema(schema)
2020            .with_data_file_format(DataFileFormat::Avro)
2021            .with_case_sensitive(false)
2022            .build();
2023        test_fn(task);
2024    }
2025
2026    #[tokio::test]
2027    async fn test_select_with_file_column() {
2028        use arrow_array::cast::AsArray;
2029
2030        let mut fixture = TableTestFixture::new();
2031        fixture.setup_manifest_files().await;
2032
2033        // Select regular columns plus the _file column
2034        let table_scan = fixture
2035            .table
2036            .scan()
2037            .select(["x", RESERVED_COL_NAME_FILE])
2038            .with_row_selection_enabled(true)
2039            .build()
2040            .unwrap();
2041
2042        let batch_stream = table_scan.to_arrow().await.unwrap();
2043        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2044
2045        // Verify we have 2 columns: x and _file
2046        assert_eq!(batches[0].num_columns(), 2);
2047
2048        // Verify the x column exists and has correct data
2049        let x_col = batches[0].column_by_name("x").unwrap();
2050        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
2051        assert_eq!(x_arr.value(0), 1);
2052
2053        // Verify the _file column exists
2054        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
2055        assert!(
2056            file_col.is_some(),
2057            "_file column should be present in the batch"
2058        );
2059
2060        // Verify the _file column contains a file path
2061        let file_col = file_col.unwrap();
2062        assert!(
2063            matches!(
2064                file_col.data_type(),
2065                arrow_schema::DataType::RunEndEncoded(_, _)
2066            ),
2067            "_file column should use RunEndEncoded type"
2068        );
2069
2070        // Decode the RunArray to verify it contains the file path
2071        let run_array = file_col
2072            .as_any()
2073            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2074            .expect("_file column should be a RunArray");
2075
2076        let values = run_array.values();
2077        let string_values = values.as_string::<i32>();
2078        assert_eq!(string_values.len(), 1, "Should have a single file path");
2079
2080        let file_path = string_values.value(0);
2081        assert!(
2082            file_path.ends_with(".parquet"),
2083            "File path should end with .parquet, got: {file_path}"
2084        );
2085    }
2086
2087    #[tokio::test]
2088    async fn test_select_file_column_position() {
2089        let mut fixture = TableTestFixture::new();
2090        fixture.setup_manifest_files().await;
2091
2092        // Select columns in specific order: x, _file, z
2093        let table_scan = fixture
2094            .table
2095            .scan()
2096            .select(["x", RESERVED_COL_NAME_FILE, "z"])
2097            .with_row_selection_enabled(true)
2098            .build()
2099            .unwrap();
2100
2101        let batch_stream = table_scan.to_arrow().await.unwrap();
2102        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2103
2104        assert_eq!(batches[0].num_columns(), 3);
2105
2106        // Verify column order: x at position 0, _file at position 1, z at position 2
2107        let schema = batches[0].schema();
2108        assert_eq!(schema.field(0).name(), "x");
2109        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
2110        assert_eq!(schema.field(2).name(), "z");
2111
2112        // Verify columns by name also works
2113        assert!(batches[0].column_by_name("x").is_some());
2114        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
2115        assert!(batches[0].column_by_name("z").is_some());
2116    }
2117
2118    #[tokio::test]
2119    async fn test_select_file_column_only() {
2120        let mut fixture = TableTestFixture::new();
2121        fixture.setup_manifest_files().await;
2122
2123        // Select only the _file column
2124        let table_scan = fixture
2125            .table
2126            .scan()
2127            .select([RESERVED_COL_NAME_FILE])
2128            .with_row_selection_enabled(true)
2129            .build()
2130            .unwrap();
2131
2132        let batch_stream = table_scan.to_arrow().await.unwrap();
2133        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2134
2135        // Should have exactly 1 column
2136        assert_eq!(batches[0].num_columns(), 1);
2137
2138        // Verify it's the _file column
2139        let schema = batches[0].schema();
2140        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2141
2142        // Verify the batch has the correct number of rows
2143        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
2144        // Each file has 1024 rows, so total is 2048 rows
2145        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2146        assert_eq!(total_rows, 2048);
2147    }
2148
2149    #[tokio::test]
2150    async fn test_file_column_with_multiple_files() {
2151        use std::collections::HashSet;
2152
2153        let mut fixture = TableTestFixture::new();
2154        fixture.setup_manifest_files().await;
2155
2156        // Select x and _file columns
2157        let table_scan = fixture
2158            .table
2159            .scan()
2160            .select(["x", RESERVED_COL_NAME_FILE])
2161            .with_row_selection_enabled(true)
2162            .build()
2163            .unwrap();
2164
2165        let batch_stream = table_scan.to_arrow().await.unwrap();
2166        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2167
2168        // Collect all unique file paths from the batches
2169        let mut file_paths = HashSet::new();
2170        for batch in &batches {
2171            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2172            let run_array = file_col
2173                .as_any()
2174                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2175                .expect("_file column should be a RunArray");
2176
2177            let values = run_array.values();
2178            let string_values = values.as_string::<i32>();
2179            for i in 0..string_values.len() {
2180                file_paths.insert(string_values.value(i).to_string());
2181            }
2182        }
2183
2184        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2185        assert!(!file_paths.is_empty(), "Should have at least one file path");
2186
2187        // All paths should end with .parquet
2188        for path in &file_paths {
2189            assert!(
2190                path.ends_with(".parquet"),
2191                "All file paths should end with .parquet, got: {path}"
2192            );
2193        }
2194    }
2195
2196    #[tokio::test]
2197    async fn test_file_column_at_start() {
2198        let mut fixture = TableTestFixture::new();
2199        fixture.setup_manifest_files().await;
2200
2201        // Select _file at the start
2202        let table_scan = fixture
2203            .table
2204            .scan()
2205            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2206            .with_row_selection_enabled(true)
2207            .build()
2208            .unwrap();
2209
2210        let batch_stream = table_scan.to_arrow().await.unwrap();
2211        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2212
2213        assert_eq!(batches[0].num_columns(), 3);
2214
2215        // Verify _file is at position 0
2216        let schema = batches[0].schema();
2217        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2218        assert_eq!(schema.field(1).name(), "x");
2219        assert_eq!(schema.field(2).name(), "y");
2220    }
2221
2222    #[tokio::test]
2223    async fn test_file_column_at_end() {
2224        let mut fixture = TableTestFixture::new();
2225        fixture.setup_manifest_files().await;
2226
2227        // Select _file at the end
2228        let table_scan = fixture
2229            .table
2230            .scan()
2231            .select(["x", "y", RESERVED_COL_NAME_FILE])
2232            .with_row_selection_enabled(true)
2233            .build()
2234            .unwrap();
2235
2236        let batch_stream = table_scan.to_arrow().await.unwrap();
2237        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2238
2239        assert_eq!(batches[0].num_columns(), 3);
2240
2241        // Verify _file is at position 2 (the end)
2242        let schema = batches[0].schema();
2243        assert_eq!(schema.field(0).name(), "x");
2244        assert_eq!(schema.field(1).name(), "y");
2245        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2246    }
2247
2248    #[tokio::test]
2249    async fn test_select_with_repeated_column_names() {
2250        let mut fixture = TableTestFixture::new();
2251        fixture.setup_manifest_files().await;
2252
2253        // Select with repeated column names - both regular columns and virtual columns
2254        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2255        let table_scan = fixture
2256            .table
2257            .scan()
2258            .select([
2259                "x",
2260                RESERVED_COL_NAME_FILE,
2261                "x", // x repeated
2262                "y",
2263                RESERVED_COL_NAME_FILE, // _file repeated
2264                "y",                    // y repeated
2265            ])
2266            .with_row_selection_enabled(true)
2267            .build()
2268            .unwrap();
2269
2270        let batch_stream = table_scan.to_arrow().await.unwrap();
2271        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2272
2273        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2274        assert_eq!(
2275            batches[0].num_columns(),
2276            6,
2277            "Should have exactly 6 columns with duplicates"
2278        );
2279
2280        let schema = batches[0].schema();
2281
2282        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2283        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2284        assert_eq!(
2285            schema.field(1).name(),
2286            RESERVED_COL_NAME_FILE,
2287            "Column 1 should be _file"
2288        );
2289        assert_eq!(
2290            schema.field(2).name(),
2291            "x",
2292            "Column 2 should be x (duplicate)"
2293        );
2294        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2295        assert_eq!(
2296            schema.field(4).name(),
2297            RESERVED_COL_NAME_FILE,
2298            "Column 4 should be _file (duplicate)"
2299        );
2300        assert_eq!(
2301            schema.field(5).name(),
2302            "y",
2303            "Column 5 should be y (duplicate)"
2304        );
2305
2306        // Verify all columns have correct data types
2307        assert!(
2308            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2309            "Column x should be Int64"
2310        );
2311        assert!(
2312            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2313            "Column x (duplicate) should be Int64"
2314        );
2315        assert!(
2316            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2317            "Column y should be Int64"
2318        );
2319        assert!(
2320            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2321            "Column y (duplicate) should be Int64"
2322        );
2323        assert!(
2324            matches!(
2325                schema.field(1).data_type(),
2326                arrow_schema::DataType::RunEndEncoded(_, _)
2327            ),
2328            "_file column should use RunEndEncoded type"
2329        );
2330        assert!(
2331            matches!(
2332                schema.field(4).data_type(),
2333                arrow_schema::DataType::RunEndEncoded(_, _)
2334            ),
2335            "_file column (duplicate) should use RunEndEncoded type"
2336        );
2337    }
2338
2339    #[tokio::test]
2340    async fn test_scan_deadlock() {
2341        let mut fixture = TableTestFixture::new();
2342        fixture.setup_deadlock_manifests().await;
2343
2344        // Create table scan with concurrency limit 1
2345        // This sets channel size to 1.
2346        // Data manifest has 10 entries -> will block producer.
2347        // Delete manifest is 2nd in list -> won't be processed.
2348        // Consumer 2 (Data) not started -> blocked.
2349        // Consumer 1 (Delete) waiting -> blocked.
2350        let table_scan = fixture
2351            .table
2352            .scan()
2353            .with_concurrency_limit(1)
2354            .build()
2355            .unwrap();
2356
2357        // This should timeout/hang if deadlock exists
2358        // We can use tokio::time::timeout
2359        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2360            table_scan
2361                .plan_files()
2362                .await
2363                .unwrap()
2364                .try_collect::<Vec<_>>()
2365                .await
2366        })
2367        .await;
2368
2369        // Assert it finished (didn't timeout)
2370        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2371    }
2372}