iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::runtime::spawn;
40use crate::spec::{DataContentType, SnapshotRef};
41use crate::table::Table;
42use crate::utils::available_parallelism;
43use crate::{Error, ErrorKind, Result};
44
45/// A stream of arrow [`RecordBatch`]es.
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
47
48/// Builder to create table scan.
49pub struct TableScanBuilder<'a> {
50    table: &'a Table,
51    // Defaults to none which means select all columns
52    column_names: Option<Vec<String>>,
53    snapshot_id: Option<i64>,
54    batch_size: Option<usize>,
55    case_sensitive: bool,
56    filter: Option<Predicate>,
57    concurrency_limit_data_files: usize,
58    concurrency_limit_manifest_entries: usize,
59    concurrency_limit_manifest_files: usize,
60    row_group_filtering_enabled: bool,
61    row_selection_enabled: bool,
62}
63
64impl<'a> TableScanBuilder<'a> {
65    pub(crate) fn new(table: &'a Table) -> Self {
66        let num_cpus = available_parallelism().get();
67
68        Self {
69            table,
70            column_names: None,
71            snapshot_id: None,
72            batch_size: None,
73            case_sensitive: true,
74            filter: None,
75            concurrency_limit_data_files: num_cpus,
76            concurrency_limit_manifest_entries: num_cpus,
77            concurrency_limit_manifest_files: num_cpus,
78            row_group_filtering_enabled: true,
79            row_selection_enabled: false,
80        }
81    }
82
83    /// Sets the desired size of batches in the response
84    /// to something other than the default
85    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
86        self.batch_size = batch_size;
87        self
88    }
89
90    /// Sets the scan's case sensitivity
91    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
92        self.case_sensitive = case_sensitive;
93        self
94    }
95
96    /// Specifies a predicate to use as a filter
97    pub fn with_filter(mut self, predicate: Predicate) -> Self {
98        // calls rewrite_not to remove Not nodes, which must be absent
99        // when applying the manifest evaluator
100        self.filter = Some(predicate.rewrite_not());
101        self
102    }
103
104    /// Select all columns.
105    pub fn select_all(mut self) -> Self {
106        self.column_names = None;
107        self
108    }
109
110    /// Select empty columns.
111    pub fn select_empty(mut self) -> Self {
112        self.column_names = Some(vec![]);
113        self
114    }
115
116    /// Select some columns of the table.
117    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
118        self.column_names = Some(
119            column_names
120                .into_iter()
121                .map(|item| item.to_string())
122                .collect(),
123        );
124        self
125    }
126
127    /// Set the snapshot to scan. When not set, it uses current snapshot.
128    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
129        self.snapshot_id = Some(snapshot_id);
130        self
131    }
132
133    /// Sets the concurrency limit for both manifest files and manifest
134    /// entries for this scan
135    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
136        self.concurrency_limit_manifest_files = limit;
137        self.concurrency_limit_manifest_entries = limit;
138        self.concurrency_limit_data_files = limit;
139        self
140    }
141
142    /// Sets the data file concurrency limit for this scan
143    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
144        self.concurrency_limit_data_files = limit;
145        self
146    }
147
148    /// Sets the manifest entry concurrency limit for this scan
149    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
150        self.concurrency_limit_manifest_entries = limit;
151        self
152    }
153
154    /// Determines whether to enable row group filtering.
155    /// When enabled, if a read is performed with a filter predicate,
156    /// then the metadata for each row group in the parquet file is
157    /// evaluated against the filter predicate and row groups
158    /// that cant contain matching rows will be skipped entirely.
159    ///
160    /// Defaults to enabled, as it generally improves performance or
161    /// keeps it the same, with performance degradation unlikely.
162    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
163        self.row_group_filtering_enabled = row_group_filtering_enabled;
164        self
165    }
166
167    /// Determines whether to enable row selection.
168    /// When enabled, if a read is performed with a filter predicate,
169    /// then (for row groups that have not been skipped) the page index
170    /// for each row group in a parquet file is parsed and evaluated
171    /// against the filter predicate to determine if ranges of rows
172    /// within a row group can be skipped, based upon the page-level
173    /// statistics for each column.
174    ///
175    /// Defaults to being disabled. Enabling requires parsing the parquet page
176    /// index, which can be slow enough that parsing the page index outweighs any
177    /// gains from the reduced number of rows that need scanning.
178    /// It is recommended to experiment with partitioning, sorting, row group size,
179    /// page size, and page row limit Iceberg settings on the table being scanned in
180    /// order to get the best performance from using row selection.
181    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182        self.row_selection_enabled = row_selection_enabled;
183        self
184    }
185
186    /// Build the table scan.
187    pub fn build(self) -> Result<TableScan> {
188        let snapshot = match self.snapshot_id {
189            Some(snapshot_id) => self
190                .table
191                .metadata()
192                .snapshot_by_id(snapshot_id)
193                .ok_or_else(|| {
194                    Error::new(
195                        ErrorKind::DataInvalid,
196                        format!("Snapshot with id {snapshot_id} not found"),
197                    )
198                })?
199                .clone(),
200            None => {
201                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
202                    return Ok(TableScan {
203                        batch_size: self.batch_size,
204                        column_names: self.column_names,
205                        file_io: self.table.file_io().clone(),
206                        plan_context: None,
207                        concurrency_limit_data_files: self.concurrency_limit_data_files,
208                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
209                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
210                        row_group_filtering_enabled: self.row_group_filtering_enabled,
211                        row_selection_enabled: self.row_selection_enabled,
212                    });
213                };
214                current_snapshot_id.clone()
215            }
216        };
217
218        let schema = snapshot.schema(self.table.metadata())?;
219
220        // Check that all column names exist in the schema.
221        if let Some(column_names) = self.column_names.as_ref() {
222            for column_name in column_names {
223                if schema.field_by_name(column_name).is_none() {
224                    return Err(Error::new(
225                        ErrorKind::DataInvalid,
226                        format!("Column {column_name} not found in table. Schema: {schema}"),
227                    ));
228                }
229            }
230        }
231
232        let mut field_ids = vec![];
233        let column_names = self.column_names.clone().unwrap_or_else(|| {
234            schema
235                .as_struct()
236                .fields()
237                .iter()
238                .map(|f| f.name.clone())
239                .collect()
240        });
241
242        for column_name in column_names.iter() {
243            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
244                Error::new(
245                    ErrorKind::DataInvalid,
246                    format!("Column {column_name} not found in table. Schema: {schema}"),
247                )
248            })?;
249
250            schema
251                .as_struct()
252                .field_by_id(field_id)
253                .ok_or_else(|| {
254                    Error::new(
255                        ErrorKind::FeatureUnsupported,
256                        format!(
257                            "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
258                        ),
259                    )
260                })?;
261
262            field_ids.push(field_id);
263        }
264
265        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
266            Some(predicates.bind(schema.clone(), true)?)
267        } else {
268            None
269        };
270
271        let plan_context = PlanContext {
272            snapshot,
273            table_metadata: self.table.metadata_ref(),
274            snapshot_schema: schema,
275            case_sensitive: self.case_sensitive,
276            predicate: self.filter.map(Arc::new),
277            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
278            object_cache: self.table.object_cache(),
279            field_ids: Arc::new(field_ids),
280            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
281            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
282            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
283        };
284
285        Ok(TableScan {
286            batch_size: self.batch_size,
287            column_names: self.column_names,
288            file_io: self.table.file_io().clone(),
289            plan_context: Some(plan_context),
290            concurrency_limit_data_files: self.concurrency_limit_data_files,
291            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
292            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
293            row_group_filtering_enabled: self.row_group_filtering_enabled,
294            row_selection_enabled: self.row_selection_enabled,
295        })
296    }
297}
298
299/// Table scan.
300#[derive(Debug)]
301pub struct TableScan {
302    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
303    ///
304    /// If this is None, then the scan contains no rows.
305    plan_context: Option<PlanContext>,
306    batch_size: Option<usize>,
307    file_io: FileIO,
308    column_names: Option<Vec<String>>,
309    /// The maximum number of manifest files that will be
310    /// retrieved from [`FileIO`] concurrently
311    concurrency_limit_manifest_files: usize,
312
313    /// The maximum number of [`ManifestEntry`]s that will
314    /// be processed in parallel
315    concurrency_limit_manifest_entries: usize,
316
317    /// The maximum number of [`ManifestEntry`]s that will
318    /// be processed in parallel
319    concurrency_limit_data_files: usize,
320
321    row_group_filtering_enabled: bool,
322    row_selection_enabled: bool,
323}
324
325impl TableScan {
326    /// Returns a stream of [`FileScanTask`]s.
327    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
328        let Some(plan_context) = self.plan_context.as_ref() else {
329            return Ok(Box::pin(futures::stream::empty()));
330        };
331
332        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
333        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
334
335        // used to stream ManifestEntryContexts between stages of the file plan operation
336        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
337            channel(concurrency_limit_manifest_files);
338        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
339            channel(concurrency_limit_manifest_files);
340
341        // used to stream the results back to the caller
342        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
343
344        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
345
346        let manifest_list = plan_context.get_manifest_list().await?;
347
348        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
349        // whose partitions cannot match this
350        // scan's filter
351        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
352            manifest_list,
353            manifest_entry_data_ctx_tx,
354            delete_file_idx.clone(),
355            manifest_entry_delete_ctx_tx,
356        )?;
357
358        let mut channel_for_manifest_error = file_scan_task_tx.clone();
359
360        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
361        spawn(async move {
362            let result = futures::stream::iter(manifest_file_contexts)
363                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
364                    ctx.fetch_manifest_and_stream_manifest_entries().await
365                })
366                .await;
367
368            if let Err(error) = result {
369                let _ = channel_for_manifest_error.send(Err(error)).await;
370            }
371        });
372
373        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
374        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
375
376        // Process the delete file [`ManifestEntry`] stream in parallel
377        spawn(async move {
378            let result = manifest_entry_delete_ctx_rx
379                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
380                .try_for_each_concurrent(
381                    concurrency_limit_manifest_entries,
382                    |(manifest_entry_context, tx)| async move {
383                        spawn(async move {
384                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
385                        })
386                        .await
387                    },
388                )
389                .await;
390
391            if let Err(error) = result {
392                let _ = channel_for_delete_manifest_entry_error
393                    .send(Err(error))
394                    .await;
395            }
396        })
397        .await;
398
399        // Process the data file [`ManifestEntry`] stream in parallel
400        spawn(async move {
401            let result = manifest_entry_data_ctx_rx
402                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
403                .try_for_each_concurrent(
404                    concurrency_limit_manifest_entries,
405                    |(manifest_entry_context, tx)| async move {
406                        spawn(async move {
407                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
408                        })
409                        .await
410                    },
411                )
412                .await;
413
414            if let Err(error) = result {
415                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
416            }
417        });
418
419        Ok(file_scan_task_rx.boxed())
420    }
421
422    /// Returns an [`ArrowRecordBatchStream`].
423    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
424        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
425            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
426            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
427            .with_row_selection_enabled(self.row_selection_enabled);
428
429        if let Some(batch_size) = self.batch_size {
430            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
431        }
432
433        arrow_reader_builder.build().read(self.plan_files().await?)
434    }
435
436    /// Returns a reference to the column names of the table scan.
437    pub fn column_names(&self) -> Option<&[String]> {
438        self.column_names.as_deref()
439    }
440
441    /// Returns a reference to the snapshot of the table scan.
442    pub fn snapshot(&self) -> Option<&SnapshotRef> {
443        self.plan_context.as_ref().map(|x| &x.snapshot)
444    }
445
446    async fn process_data_manifest_entry(
447        manifest_entry_context: ManifestEntryContext,
448        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
449    ) -> Result<()> {
450        // skip processing this manifest entry if it has been marked as deleted
451        if !manifest_entry_context.manifest_entry.is_alive() {
452            return Ok(());
453        }
454
455        // abort the plan if we encounter a manifest entry for a delete file
456        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
457            return Err(Error::new(
458                ErrorKind::FeatureUnsupported,
459                "Encountered an entry for a delete file in a data file manifest",
460            ));
461        }
462
463        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
464            let BoundPredicates {
465                snapshot_bound_predicate,
466                partition_bound_predicate,
467            } = bound_predicates.as_ref();
468
469            let expression_evaluator_cache =
470                manifest_entry_context.expression_evaluator_cache.as_ref();
471
472            let expression_evaluator = expression_evaluator_cache.get(
473                manifest_entry_context.partition_spec_id,
474                partition_bound_predicate,
475            )?;
476
477            // skip any data file whose partition data indicates that it can't contain
478            // any data that matches this scan's filter
479            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
480                return Ok(());
481            }
482
483            // skip any data file whose metrics don't match this scan's filter
484            if !InclusiveMetricsEvaluator::eval(
485                snapshot_bound_predicate,
486                manifest_entry_context.manifest_entry.data_file(),
487                false,
488            )? {
489                return Ok(());
490            }
491        }
492
493        // congratulations! the manifest entry has made its way through the
494        // entire plan without getting filtered out. Create a corresponding
495        // FileScanTask and push it to the result stream
496        file_scan_task_tx
497            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
498            .await?;
499
500        Ok(())
501    }
502
503    async fn process_delete_manifest_entry(
504        manifest_entry_context: ManifestEntryContext,
505        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
506    ) -> Result<()> {
507        // skip processing this manifest entry if it has been marked as deleted
508        if !manifest_entry_context.manifest_entry.is_alive() {
509            return Ok(());
510        }
511
512        // abort the plan if we encounter a manifest entry that is not for a delete file
513        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
514            return Err(Error::new(
515                ErrorKind::FeatureUnsupported,
516                "Encountered an entry for a data file in a delete manifest",
517            ));
518        }
519
520        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
521            let expression_evaluator_cache =
522                manifest_entry_context.expression_evaluator_cache.as_ref();
523
524            let expression_evaluator = expression_evaluator_cache.get(
525                manifest_entry_context.partition_spec_id,
526                &bound_predicates.partition_bound_predicate,
527            )?;
528
529            // skip any data file whose partition data indicates that it can't contain
530            // any data that matches this scan's filter
531            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
532                return Ok(());
533            }
534        }
535
536        delete_file_ctx_tx
537            .send(DeleteFileContext {
538                manifest_entry: manifest_entry_context.manifest_entry.clone(),
539                partition_spec_id: manifest_entry_context.partition_spec_id,
540            })
541            .await?;
542
543        Ok(())
544    }
545}
546
547pub(crate) struct BoundPredicates {
548    partition_bound_predicate: BoundPredicate,
549    snapshot_bound_predicate: BoundPredicate,
550}
551
552#[cfg(test)]
553pub mod tests {
554    //! shared tests for the table scan API
555    #![allow(missing_docs)]
556
557    use std::collections::HashMap;
558    use std::fs;
559    use std::fs::File;
560    use std::sync::Arc;
561
562    use arrow_array::{
563        ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
564    };
565    use futures::{TryStreamExt, stream};
566    use minijinja::value::Value;
567    use minijinja::{AutoEscape, Environment, context};
568    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
569    use parquet::basic::Compression;
570    use parquet::file::properties::WriterProperties;
571    use tempfile::TempDir;
572    use uuid::Uuid;
573
574    use crate::TableIdent;
575    use crate::arrow::ArrowReaderBuilder;
576    use crate::expr::{BoundPredicate, Reference};
577    use crate::io::{FileIO, OutputFile};
578    use crate::scan::FileScanTask;
579    use crate::spec::{
580        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
581        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
582        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
583    };
584    use crate::table::Table;
585
586    fn render_template(template: &str, ctx: Value) -> String {
587        let mut env = Environment::new();
588        env.set_auto_escape_callback(|_| AutoEscape::None);
589        env.render_str(template, ctx).unwrap()
590    }
591
592    pub struct TableTestFixture {
593        pub table_location: String,
594        pub table: Table,
595    }
596
597    impl TableTestFixture {
598        #[allow(clippy::new_without_default)]
599        pub fn new() -> Self {
600            let tmp_dir = TempDir::new().unwrap();
601            let table_location = tmp_dir.path().join("table1");
602            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
603            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
604            let table_metadata1_location = table_location.join("metadata/v1.json");
605
606            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
607                .unwrap()
608                .build()
609                .unwrap();
610
611            let table_metadata = {
612                let template_json_str = fs::read_to_string(format!(
613                    "{}/testdata/example_table_metadata_v2.json",
614                    env!("CARGO_MANIFEST_DIR")
615                ))
616                .unwrap();
617                let metadata_json = render_template(&template_json_str, context! {
618                    table_location => &table_location,
619                    manifest_list_1_location => &manifest_list1_location,
620                    manifest_list_2_location => &manifest_list2_location,
621                    table_metadata_1_location => &table_metadata1_location,
622                });
623                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
624            };
625
626            let table = Table::builder()
627                .metadata(table_metadata)
628                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
629                .file_io(file_io.clone())
630                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
631                .build()
632                .unwrap();
633
634            Self {
635                table_location: table_location.to_str().unwrap().to_string(),
636                table,
637            }
638        }
639
640        #[allow(clippy::new_without_default)]
641        pub fn new_empty() -> Self {
642            let tmp_dir = TempDir::new().unwrap();
643            let table_location = tmp_dir.path().join("table1");
644            let table_metadata1_location = table_location.join("metadata/v1.json");
645
646            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
647                .unwrap()
648                .build()
649                .unwrap();
650
651            let table_metadata = {
652                let template_json_str = fs::read_to_string(format!(
653                    "{}/testdata/example_empty_table_metadata_v2.json",
654                    env!("CARGO_MANIFEST_DIR")
655                ))
656                .unwrap();
657                let metadata_json = render_template(&template_json_str, context! {
658                    table_location => &table_location,
659                    table_metadata_1_location => &table_metadata1_location,
660                });
661                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
662            };
663
664            let table = Table::builder()
665                .metadata(table_metadata)
666                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
667                .file_io(file_io.clone())
668                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
669                .build()
670                .unwrap();
671
672            Self {
673                table_location: table_location.to_str().unwrap().to_string(),
674                table,
675            }
676        }
677
678        pub fn new_unpartitioned() -> Self {
679            let tmp_dir = TempDir::new().unwrap();
680            let table_location = tmp_dir.path().join("table1");
681            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
682            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
683            let table_metadata1_location = table_location.join("metadata/v1.json");
684
685            let file_io = FileIO::from_path(table_location.to_str().unwrap())
686                .unwrap()
687                .build()
688                .unwrap();
689
690            let mut table_metadata = {
691                let template_json_str = fs::read_to_string(format!(
692                    "{}/testdata/example_table_metadata_v2.json",
693                    env!("CARGO_MANIFEST_DIR")
694                ))
695                .unwrap();
696                let metadata_json = render_template(&template_json_str, context! {
697                    table_location => &table_location,
698                    manifest_list_1_location => &manifest_list1_location,
699                    manifest_list_2_location => &manifest_list2_location,
700                    table_metadata_1_location => &table_metadata1_location,
701                });
702                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
703            };
704
705            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
706            table_metadata.partition_specs.clear();
707            table_metadata.default_partition_type = StructType::new(vec![]);
708            table_metadata
709                .partition_specs
710                .insert(0, table_metadata.default_spec.clone());
711
712            let table = Table::builder()
713                .metadata(table_metadata)
714                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
715                .file_io(file_io.clone())
716                .metadata_location(table_metadata1_location.to_str().unwrap())
717                .build()
718                .unwrap();
719
720            Self {
721                table_location: table_location.to_str().unwrap().to_string(),
722                table,
723            }
724        }
725
726        fn next_manifest_file(&self) -> OutputFile {
727            self.table
728                .file_io()
729                .new_output(format!(
730                    "{}/metadata/manifest_{}.avro",
731                    self.table_location,
732                    Uuid::new_v4()
733                ))
734                .unwrap()
735        }
736
737        pub async fn setup_manifest_files(&mut self) {
738            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
739            let parent_snapshot = current_snapshot
740                .parent_snapshot(self.table.metadata())
741                .unwrap();
742            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
743            let current_partition_spec = self.table.metadata().default_partition_spec();
744
745            // Write data files
746            let mut writer = ManifestWriterBuilder::new(
747                self.next_manifest_file(),
748                Some(current_snapshot.snapshot_id()),
749                None,
750                current_schema.clone(),
751                current_partition_spec.as_ref().clone(),
752            )
753            .build_v2_data();
754            writer
755                .add_entry(
756                    ManifestEntry::builder()
757                        .status(ManifestStatus::Added)
758                        .data_file(
759                            DataFileBuilder::default()
760                                .partition_spec_id(0)
761                                .content(DataContentType::Data)
762                                .file_path(format!("{}/1.parquet", &self.table_location))
763                                .file_format(DataFileFormat::Parquet)
764                                .file_size_in_bytes(100)
765                                .record_count(1)
766                                .partition(Struct::from_iter([Some(Literal::long(100))]))
767                                .key_metadata(None)
768                                .build()
769                                .unwrap(),
770                        )
771                        .build(),
772                )
773                .unwrap();
774            writer
775                .add_delete_entry(
776                    ManifestEntry::builder()
777                        .status(ManifestStatus::Deleted)
778                        .snapshot_id(parent_snapshot.snapshot_id())
779                        .sequence_number(parent_snapshot.sequence_number())
780                        .file_sequence_number(parent_snapshot.sequence_number())
781                        .data_file(
782                            DataFileBuilder::default()
783                                .partition_spec_id(0)
784                                .content(DataContentType::Data)
785                                .file_path(format!("{}/2.parquet", &self.table_location))
786                                .file_format(DataFileFormat::Parquet)
787                                .file_size_in_bytes(100)
788                                .record_count(1)
789                                .partition(Struct::from_iter([Some(Literal::long(200))]))
790                                .build()
791                                .unwrap(),
792                        )
793                        .build(),
794                )
795                .unwrap();
796            writer
797                .add_existing_entry(
798                    ManifestEntry::builder()
799                        .status(ManifestStatus::Existing)
800                        .snapshot_id(parent_snapshot.snapshot_id())
801                        .sequence_number(parent_snapshot.sequence_number())
802                        .file_sequence_number(parent_snapshot.sequence_number())
803                        .data_file(
804                            DataFileBuilder::default()
805                                .partition_spec_id(0)
806                                .content(DataContentType::Data)
807                                .file_path(format!("{}/3.parquet", &self.table_location))
808                                .file_format(DataFileFormat::Parquet)
809                                .file_size_in_bytes(100)
810                                .record_count(1)
811                                .partition(Struct::from_iter([Some(Literal::long(300))]))
812                                .build()
813                                .unwrap(),
814                        )
815                        .build(),
816                )
817                .unwrap();
818            let data_file_manifest = writer.write_manifest_file().await.unwrap();
819
820            // Write to manifest list
821            let mut manifest_list_write = ManifestListWriter::v2(
822                self.table
823                    .file_io()
824                    .new_output(current_snapshot.manifest_list())
825                    .unwrap(),
826                current_snapshot.snapshot_id(),
827                current_snapshot.parent_snapshot_id(),
828                current_snapshot.sequence_number(),
829            );
830            manifest_list_write
831                .add_manifests(vec![data_file_manifest].into_iter())
832                .unwrap();
833            manifest_list_write.close().await.unwrap();
834
835            // prepare data
836            let schema = {
837                let fields = vec![
838                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
839                        .with_metadata(HashMap::from([(
840                            PARQUET_FIELD_ID_META_KEY.to_string(),
841                            "1".to_string(),
842                        )])),
843                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
844                        .with_metadata(HashMap::from([(
845                            PARQUET_FIELD_ID_META_KEY.to_string(),
846                            "2".to_string(),
847                        )])),
848                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
849                        .with_metadata(HashMap::from([(
850                            PARQUET_FIELD_ID_META_KEY.to_string(),
851                            "3".to_string(),
852                        )])),
853                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
854                        .with_metadata(HashMap::from([(
855                            PARQUET_FIELD_ID_META_KEY.to_string(),
856                            "4".to_string(),
857                        )])),
858                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
859                        .with_metadata(HashMap::from([(
860                            PARQUET_FIELD_ID_META_KEY.to_string(),
861                            "5".to_string(),
862                        )])),
863                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
864                        .with_metadata(HashMap::from([(
865                            PARQUET_FIELD_ID_META_KEY.to_string(),
866                            "6".to_string(),
867                        )])),
868                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
869                        .with_metadata(HashMap::from([(
870                            PARQUET_FIELD_ID_META_KEY.to_string(),
871                            "7".to_string(),
872                        )])),
873                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
874                        .with_metadata(HashMap::from([(
875                            PARQUET_FIELD_ID_META_KEY.to_string(),
876                            "8".to_string(),
877                        )])),
878                ];
879                Arc::new(arrow_schema::Schema::new(fields))
880            };
881            // x: [1, 1, 1, 1, ...]
882            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
883
884            let mut values = vec![2; 512];
885            values.append(vec![3; 200].as_mut());
886            values.append(vec![4; 300].as_mut());
887            values.append(vec![5; 12].as_mut());
888
889            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
890            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
891
892            let mut values = vec![3; 512];
893            values.append(vec![4; 512].as_mut());
894
895            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
896            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
897
898            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
899            let mut values = vec!["Apache"; 512];
900            values.append(vec!["Iceberg"; 512].as_mut());
901            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
902
903            // dbl:
904            let mut values = vec![100.0f64; 512];
905            values.append(vec![150.0f64; 12].as_mut());
906            values.append(vec![200.0f64; 500].as_mut());
907            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
908
909            // i32:
910            let mut values = vec![100i32; 512];
911            values.append(vec![150i32; 12].as_mut());
912            values.append(vec![200i32; 500].as_mut());
913            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
914
915            // i64:
916            let mut values = vec![100i64; 512];
917            values.append(vec![150i64; 12].as_mut());
918            values.append(vec![200i64; 500].as_mut());
919            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
920
921            // bool:
922            let mut values = vec![false; 512];
923            values.append(vec![true; 512].as_mut());
924            let values: BooleanArray = values.into();
925            let col8 = Arc::new(values) as ArrayRef;
926
927            let to_write = RecordBatch::try_new(schema.clone(), vec![
928                col1, col2, col3, col4, col5, col6, col7, col8,
929            ])
930            .unwrap();
931
932            // Write the Parquet files
933            let props = WriterProperties::builder()
934                .set_compression(Compression::SNAPPY)
935                .build();
936
937            for n in 1..=3 {
938                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
939                let mut writer =
940                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
941
942                writer.write(&to_write).expect("Writing batch");
943
944                // writer must be closed to write footer
945                writer.close().unwrap();
946            }
947        }
948
949        pub async fn setup_unpartitioned_manifest_files(&mut self) {
950            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
951            let parent_snapshot = current_snapshot
952                .parent_snapshot(self.table.metadata())
953                .unwrap();
954            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
955            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
956
957            // Write data files using an empty partition for unpartitioned tables.
958            let mut writer = ManifestWriterBuilder::new(
959                self.next_manifest_file(),
960                Some(current_snapshot.snapshot_id()),
961                None,
962                current_schema.clone(),
963                current_partition_spec.as_ref().clone(),
964            )
965            .build_v2_data();
966
967            // Create an empty partition value.
968            let empty_partition = Struct::empty();
969
970            writer
971                .add_entry(
972                    ManifestEntry::builder()
973                        .status(ManifestStatus::Added)
974                        .data_file(
975                            DataFileBuilder::default()
976                                .partition_spec_id(0)
977                                .content(DataContentType::Data)
978                                .file_path(format!("{}/1.parquet", &self.table_location))
979                                .file_format(DataFileFormat::Parquet)
980                                .file_size_in_bytes(100)
981                                .record_count(1)
982                                .partition(empty_partition.clone())
983                                .key_metadata(None)
984                                .build()
985                                .unwrap(),
986                        )
987                        .build(),
988                )
989                .unwrap();
990
991            writer
992                .add_delete_entry(
993                    ManifestEntry::builder()
994                        .status(ManifestStatus::Deleted)
995                        .snapshot_id(parent_snapshot.snapshot_id())
996                        .sequence_number(parent_snapshot.sequence_number())
997                        .file_sequence_number(parent_snapshot.sequence_number())
998                        .data_file(
999                            DataFileBuilder::default()
1000                                .partition_spec_id(0)
1001                                .content(DataContentType::Data)
1002                                .file_path(format!("{}/2.parquet", &self.table_location))
1003                                .file_format(DataFileFormat::Parquet)
1004                                .file_size_in_bytes(100)
1005                                .record_count(1)
1006                                .partition(empty_partition.clone())
1007                                .build()
1008                                .unwrap(),
1009                        )
1010                        .build(),
1011                )
1012                .unwrap();
1013
1014            writer
1015                .add_existing_entry(
1016                    ManifestEntry::builder()
1017                        .status(ManifestStatus::Existing)
1018                        .snapshot_id(parent_snapshot.snapshot_id())
1019                        .sequence_number(parent_snapshot.sequence_number())
1020                        .file_sequence_number(parent_snapshot.sequence_number())
1021                        .data_file(
1022                            DataFileBuilder::default()
1023                                .partition_spec_id(0)
1024                                .content(DataContentType::Data)
1025                                .file_path(format!("{}/3.parquet", &self.table_location))
1026                                .file_format(DataFileFormat::Parquet)
1027                                .file_size_in_bytes(100)
1028                                .record_count(1)
1029                                .partition(empty_partition.clone())
1030                                .build()
1031                                .unwrap(),
1032                        )
1033                        .build(),
1034                )
1035                .unwrap();
1036
1037            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1038
1039            // Write to manifest list
1040            let mut manifest_list_write = ManifestListWriter::v2(
1041                self.table
1042                    .file_io()
1043                    .new_output(current_snapshot.manifest_list())
1044                    .unwrap(),
1045                current_snapshot.snapshot_id(),
1046                current_snapshot.parent_snapshot_id(),
1047                current_snapshot.sequence_number(),
1048            );
1049            manifest_list_write
1050                .add_manifests(vec![data_file_manifest].into_iter())
1051                .unwrap();
1052            manifest_list_write.close().await.unwrap();
1053
1054            // prepare data for parquet files
1055            let schema = {
1056                let fields = vec![
1057                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1058                        .with_metadata(HashMap::from([(
1059                            PARQUET_FIELD_ID_META_KEY.to_string(),
1060                            "1".to_string(),
1061                        )])),
1062                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1063                        .with_metadata(HashMap::from([(
1064                            PARQUET_FIELD_ID_META_KEY.to_string(),
1065                            "2".to_string(),
1066                        )])),
1067                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1068                        .with_metadata(HashMap::from([(
1069                            PARQUET_FIELD_ID_META_KEY.to_string(),
1070                            "3".to_string(),
1071                        )])),
1072                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1073                        .with_metadata(HashMap::from([(
1074                            PARQUET_FIELD_ID_META_KEY.to_string(),
1075                            "4".to_string(),
1076                        )])),
1077                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1078                        .with_metadata(HashMap::from([(
1079                            PARQUET_FIELD_ID_META_KEY.to_string(),
1080                            "5".to_string(),
1081                        )])),
1082                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1083                        .with_metadata(HashMap::from([(
1084                            PARQUET_FIELD_ID_META_KEY.to_string(),
1085                            "6".to_string(),
1086                        )])),
1087                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1088                        .with_metadata(HashMap::from([(
1089                            PARQUET_FIELD_ID_META_KEY.to_string(),
1090                            "7".to_string(),
1091                        )])),
1092                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1093                        .with_metadata(HashMap::from([(
1094                            PARQUET_FIELD_ID_META_KEY.to_string(),
1095                            "8".to_string(),
1096                        )])),
1097                ];
1098                Arc::new(arrow_schema::Schema::new(fields))
1099            };
1100
1101            // Build the arrays for the RecordBatch
1102            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1103
1104            let mut values = vec![2; 512];
1105            values.append(vec![3; 200].as_mut());
1106            values.append(vec![4; 300].as_mut());
1107            values.append(vec![5; 12].as_mut());
1108            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1109
1110            let mut values = vec![3; 512];
1111            values.append(vec![4; 512].as_mut());
1112            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1113
1114            let mut values = vec!["Apache"; 512];
1115            values.append(vec!["Iceberg"; 512].as_mut());
1116            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1117
1118            let mut values = vec![100.0f64; 512];
1119            values.append(vec![150.0f64; 12].as_mut());
1120            values.append(vec![200.0f64; 500].as_mut());
1121            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1122
1123            let mut values = vec![100i32; 512];
1124            values.append(vec![150i32; 12].as_mut());
1125            values.append(vec![200i32; 500].as_mut());
1126            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1127
1128            let mut values = vec![100i64; 512];
1129            values.append(vec![150i64; 12].as_mut());
1130            values.append(vec![200i64; 500].as_mut());
1131            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1132
1133            let mut values = vec![false; 512];
1134            values.append(vec![true; 512].as_mut());
1135            let values: BooleanArray = values.into();
1136            let col8 = Arc::new(values) as ArrayRef;
1137
1138            let to_write = RecordBatch::try_new(schema.clone(), vec![
1139                col1, col2, col3, col4, col5, col6, col7, col8,
1140            ])
1141            .unwrap();
1142
1143            // Write the Parquet files
1144            let props = WriterProperties::builder()
1145                .set_compression(Compression::SNAPPY)
1146                .build();
1147
1148            for n in 1..=3 {
1149                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1150                let mut writer =
1151                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1152
1153                writer.write(&to_write).expect("Writing batch");
1154
1155                // writer must be closed to write footer
1156                writer.close().unwrap();
1157            }
1158        }
1159    }
1160
1161    #[test]
1162    fn test_table_scan_columns() {
1163        let table = TableTestFixture::new().table;
1164
1165        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1166        assert_eq!(
1167            Some(vec!["x".to_string(), "y".to_string()]),
1168            table_scan.column_names
1169        );
1170
1171        let table_scan = table
1172            .scan()
1173            .select(["x", "y"])
1174            .select(["z"])
1175            .build()
1176            .unwrap();
1177        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1178    }
1179
1180    #[test]
1181    fn test_select_all() {
1182        let table = TableTestFixture::new().table;
1183
1184        let table_scan = table.scan().select_all().build().unwrap();
1185        assert!(table_scan.column_names.is_none());
1186    }
1187
1188    #[test]
1189    fn test_select_no_exist_column() {
1190        let table = TableTestFixture::new().table;
1191
1192        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1193        assert!(table_scan.is_err());
1194    }
1195
1196    #[test]
1197    fn test_table_scan_default_snapshot_id() {
1198        let table = TableTestFixture::new().table;
1199
1200        let table_scan = table.scan().build().unwrap();
1201        assert_eq!(
1202            table.metadata().current_snapshot().unwrap().snapshot_id(),
1203            table_scan.snapshot().unwrap().snapshot_id()
1204        );
1205    }
1206
1207    #[test]
1208    fn test_table_scan_non_exist_snapshot_id() {
1209        let table = TableTestFixture::new().table;
1210
1211        let table_scan = table.scan().snapshot_id(1024).build();
1212        assert!(table_scan.is_err());
1213    }
1214
1215    #[test]
1216    fn test_table_scan_with_snapshot_id() {
1217        let table = TableTestFixture::new().table;
1218
1219        let table_scan = table
1220            .scan()
1221            .snapshot_id(3051729675574597004)
1222            .with_row_selection_enabled(true)
1223            .build()
1224            .unwrap();
1225        assert_eq!(
1226            table_scan.snapshot().unwrap().snapshot_id(),
1227            3051729675574597004
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_plan_files_on_table_without_any_snapshots() {
1233        let table = TableTestFixture::new_empty().table;
1234        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1235        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1236        assert!(batches.is_empty());
1237    }
1238
1239    #[tokio::test]
1240    async fn test_plan_files_no_deletions() {
1241        let mut fixture = TableTestFixture::new();
1242        fixture.setup_manifest_files().await;
1243
1244        // Create table scan for current snapshot and plan files
1245        let table_scan = fixture
1246            .table
1247            .scan()
1248            .with_row_selection_enabled(true)
1249            .build()
1250            .unwrap();
1251
1252        let mut tasks = table_scan
1253            .plan_files()
1254            .await
1255            .unwrap()
1256            .try_fold(vec![], |mut acc, task| async move {
1257                acc.push(task);
1258                Ok(acc)
1259            })
1260            .await
1261            .unwrap();
1262
1263        assert_eq!(tasks.len(), 2);
1264
1265        tasks.sort_by_key(|t| t.data_file_path.to_string());
1266
1267        // Check first task is added data file
1268        assert_eq!(
1269            tasks[0].data_file_path,
1270            format!("{}/1.parquet", &fixture.table_location)
1271        );
1272
1273        // Check second task is existing data file
1274        assert_eq!(
1275            tasks[1].data_file_path,
1276            format!("{}/3.parquet", &fixture.table_location)
1277        );
1278    }
1279
1280    #[tokio::test]
1281    async fn test_open_parquet_no_deletions() {
1282        let mut fixture = TableTestFixture::new();
1283        fixture.setup_manifest_files().await;
1284
1285        // Create table scan for current snapshot and plan files
1286        let table_scan = fixture
1287            .table
1288            .scan()
1289            .with_row_selection_enabled(true)
1290            .build()
1291            .unwrap();
1292
1293        let batch_stream = table_scan.to_arrow().await.unwrap();
1294
1295        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1296
1297        let col = batches[0].column_by_name("x").unwrap();
1298
1299        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1300        assert_eq!(int64_arr.value(0), 1);
1301    }
1302
1303    #[tokio::test]
1304    async fn test_open_parquet_no_deletions_by_separate_reader() {
1305        let mut fixture = TableTestFixture::new();
1306        fixture.setup_manifest_files().await;
1307
1308        // Create table scan for current snapshot and plan files
1309        let table_scan = fixture
1310            .table
1311            .scan()
1312            .with_row_selection_enabled(true)
1313            .build()
1314            .unwrap();
1315
1316        let mut plan_task: Vec<_> = table_scan
1317            .plan_files()
1318            .await
1319            .unwrap()
1320            .try_collect()
1321            .await
1322            .unwrap();
1323        assert_eq!(plan_task.len(), 2);
1324
1325        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1326        let batch_stream = reader
1327            .clone()
1328            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1329            .unwrap();
1330        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1331
1332        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1333        let batch_stream = reader
1334            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1335            .unwrap();
1336        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1337
1338        assert_eq!(batch_1, batch_2);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_open_parquet_with_projection() {
1343        let mut fixture = TableTestFixture::new();
1344        fixture.setup_manifest_files().await;
1345
1346        // Create table scan for current snapshot and plan files
1347        let table_scan = fixture
1348            .table
1349            .scan()
1350            .select(["x", "z"])
1351            .with_row_selection_enabled(true)
1352            .build()
1353            .unwrap();
1354
1355        let batch_stream = table_scan.to_arrow().await.unwrap();
1356
1357        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1358
1359        assert_eq!(batches[0].num_columns(), 2);
1360
1361        let col1 = batches[0].column_by_name("x").unwrap();
1362        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1363        assert_eq!(int64_arr.value(0), 1);
1364
1365        let col2 = batches[0].column_by_name("z").unwrap();
1366        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1367        assert_eq!(int64_arr.value(0), 3);
1368
1369        // test empty scan
1370        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1371        let batch_stream = table_scan.to_arrow().await.unwrap();
1372        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1373
1374        assert_eq!(batches[0].num_columns(), 0);
1375        assert_eq!(batches[0].num_rows(), 1024);
1376    }
1377
1378    #[tokio::test]
1379    async fn test_filter_on_arrow_lt() {
1380        let mut fixture = TableTestFixture::new();
1381        fixture.setup_manifest_files().await;
1382
1383        // Filter: y < 3
1384        let mut builder = fixture.table.scan();
1385        let predicate = Reference::new("y").less_than(Datum::long(3));
1386        builder = builder
1387            .with_filter(predicate)
1388            .with_row_selection_enabled(true);
1389        let table_scan = builder.build().unwrap();
1390
1391        let batch_stream = table_scan.to_arrow().await.unwrap();
1392
1393        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1394
1395        assert_eq!(batches[0].num_rows(), 512);
1396
1397        let col = batches[0].column_by_name("x").unwrap();
1398        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1399        assert_eq!(int64_arr.value(0), 1);
1400
1401        let col = batches[0].column_by_name("y").unwrap();
1402        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1403        assert_eq!(int64_arr.value(0), 2);
1404    }
1405
1406    #[tokio::test]
1407    async fn test_filter_on_arrow_gt_eq() {
1408        let mut fixture = TableTestFixture::new();
1409        fixture.setup_manifest_files().await;
1410
1411        // Filter: y >= 5
1412        let mut builder = fixture.table.scan();
1413        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1414        builder = builder
1415            .with_filter(predicate)
1416            .with_row_selection_enabled(true);
1417        let table_scan = builder.build().unwrap();
1418
1419        let batch_stream = table_scan.to_arrow().await.unwrap();
1420
1421        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1422
1423        assert_eq!(batches[0].num_rows(), 12);
1424
1425        let col = batches[0].column_by_name("x").unwrap();
1426        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1427        assert_eq!(int64_arr.value(0), 1);
1428
1429        let col = batches[0].column_by_name("y").unwrap();
1430        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1431        assert_eq!(int64_arr.value(0), 5);
1432    }
1433
1434    #[tokio::test]
1435    async fn test_filter_double_eq() {
1436        let mut fixture = TableTestFixture::new();
1437        fixture.setup_manifest_files().await;
1438
1439        // Filter: dbl == 150.0
1440        let mut builder = fixture.table.scan();
1441        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1442        builder = builder
1443            .with_filter(predicate)
1444            .with_row_selection_enabled(true);
1445        let table_scan = builder.build().unwrap();
1446
1447        let batch_stream = table_scan.to_arrow().await.unwrap();
1448
1449        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1450
1451        assert_eq!(batches.len(), 2);
1452        assert_eq!(batches[0].num_rows(), 12);
1453
1454        let col = batches[0].column_by_name("dbl").unwrap();
1455        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1456        assert_eq!(f64_arr.value(1), 150.0f64);
1457    }
1458
1459    #[tokio::test]
1460    async fn test_filter_int_eq() {
1461        let mut fixture = TableTestFixture::new();
1462        fixture.setup_manifest_files().await;
1463
1464        // Filter: i32 == 150
1465        let mut builder = fixture.table.scan();
1466        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1467        builder = builder
1468            .with_filter(predicate)
1469            .with_row_selection_enabled(true);
1470        let table_scan = builder.build().unwrap();
1471
1472        let batch_stream = table_scan.to_arrow().await.unwrap();
1473
1474        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1475
1476        assert_eq!(batches.len(), 2);
1477        assert_eq!(batches[0].num_rows(), 12);
1478
1479        let col = batches[0].column_by_name("i32").unwrap();
1480        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1481        assert_eq!(i32_arr.value(1), 150i32);
1482    }
1483
1484    #[tokio::test]
1485    async fn test_filter_long_eq() {
1486        let mut fixture = TableTestFixture::new();
1487        fixture.setup_manifest_files().await;
1488
1489        // Filter: i64 == 150
1490        let mut builder = fixture.table.scan();
1491        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1492        builder = builder
1493            .with_filter(predicate)
1494            .with_row_selection_enabled(true);
1495        let table_scan = builder.build().unwrap();
1496
1497        let batch_stream = table_scan.to_arrow().await.unwrap();
1498
1499        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1500
1501        assert_eq!(batches.len(), 2);
1502        assert_eq!(batches[0].num_rows(), 12);
1503
1504        let col = batches[0].column_by_name("i64").unwrap();
1505        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1506        assert_eq!(i64_arr.value(1), 150i64);
1507    }
1508
1509    #[tokio::test]
1510    async fn test_filter_bool_eq() {
1511        let mut fixture = TableTestFixture::new();
1512        fixture.setup_manifest_files().await;
1513
1514        // Filter: bool == true
1515        let mut builder = fixture.table.scan();
1516        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1517        builder = builder
1518            .with_filter(predicate)
1519            .with_row_selection_enabled(true);
1520        let table_scan = builder.build().unwrap();
1521
1522        let batch_stream = table_scan.to_arrow().await.unwrap();
1523
1524        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1525
1526        assert_eq!(batches.len(), 2);
1527        assert_eq!(batches[0].num_rows(), 512);
1528
1529        let col = batches[0].column_by_name("bool").unwrap();
1530        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1531        assert!(bool_arr.value(1));
1532    }
1533
1534    #[tokio::test]
1535    async fn test_filter_on_arrow_is_null() {
1536        let mut fixture = TableTestFixture::new();
1537        fixture.setup_manifest_files().await;
1538
1539        // Filter: y is null
1540        let mut builder = fixture.table.scan();
1541        let predicate = Reference::new("y").is_null();
1542        builder = builder
1543            .with_filter(predicate)
1544            .with_row_selection_enabled(true);
1545        let table_scan = builder.build().unwrap();
1546
1547        let batch_stream = table_scan.to_arrow().await.unwrap();
1548
1549        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1550        assert_eq!(batches.len(), 0);
1551    }
1552
1553    #[tokio::test]
1554    async fn test_filter_on_arrow_is_not_null() {
1555        let mut fixture = TableTestFixture::new();
1556        fixture.setup_manifest_files().await;
1557
1558        // Filter: y is not null
1559        let mut builder = fixture.table.scan();
1560        let predicate = Reference::new("y").is_not_null();
1561        builder = builder
1562            .with_filter(predicate)
1563            .with_row_selection_enabled(true);
1564        let table_scan = builder.build().unwrap();
1565
1566        let batch_stream = table_scan.to_arrow().await.unwrap();
1567
1568        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1569        assert_eq!(batches[0].num_rows(), 1024);
1570    }
1571
1572    #[tokio::test]
1573    async fn test_filter_on_arrow_lt_and_gt() {
1574        let mut fixture = TableTestFixture::new();
1575        fixture.setup_manifest_files().await;
1576
1577        // Filter: y < 5 AND z >= 4
1578        let mut builder = fixture.table.scan();
1579        let predicate = Reference::new("y")
1580            .less_than(Datum::long(5))
1581            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1582        builder = builder
1583            .with_filter(predicate)
1584            .with_row_selection_enabled(true);
1585        let table_scan = builder.build().unwrap();
1586
1587        let batch_stream = table_scan.to_arrow().await.unwrap();
1588
1589        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1590        assert_eq!(batches[0].num_rows(), 500);
1591
1592        let col = batches[0].column_by_name("x").unwrap();
1593        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1594        assert_eq!(col, &expected_x);
1595
1596        let col = batches[0].column_by_name("y").unwrap();
1597        let mut values = vec![];
1598        values.append(vec![3; 200].as_mut());
1599        values.append(vec![4; 300].as_mut());
1600        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1601        assert_eq!(col, &expected_y);
1602
1603        let col = batches[0].column_by_name("z").unwrap();
1604        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1605        assert_eq!(col, &expected_z);
1606    }
1607
1608    #[tokio::test]
1609    async fn test_filter_on_arrow_lt_or_gt() {
1610        let mut fixture = TableTestFixture::new();
1611        fixture.setup_manifest_files().await;
1612
1613        // Filter: y < 5 AND z >= 4
1614        let mut builder = fixture.table.scan();
1615        let predicate = Reference::new("y")
1616            .less_than(Datum::long(5))
1617            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1618        builder = builder
1619            .with_filter(predicate)
1620            .with_row_selection_enabled(true);
1621        let table_scan = builder.build().unwrap();
1622
1623        let batch_stream = table_scan.to_arrow().await.unwrap();
1624
1625        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1626        assert_eq!(batches[0].num_rows(), 1024);
1627
1628        let col = batches[0].column_by_name("x").unwrap();
1629        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1630        assert_eq!(col, &expected_x);
1631
1632        let col = batches[0].column_by_name("y").unwrap();
1633        let mut values = vec![2; 512];
1634        values.append(vec![3; 200].as_mut());
1635        values.append(vec![4; 300].as_mut());
1636        values.append(vec![5; 12].as_mut());
1637        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1638        assert_eq!(col, &expected_y);
1639
1640        let col = batches[0].column_by_name("z").unwrap();
1641        let mut values = vec![3; 512];
1642        values.append(vec![4; 512].as_mut());
1643        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1644        assert_eq!(col, &expected_z);
1645    }
1646
1647    #[tokio::test]
1648    async fn test_filter_on_arrow_startswith() {
1649        let mut fixture = TableTestFixture::new();
1650        fixture.setup_manifest_files().await;
1651
1652        // Filter: a STARTSWITH "Ice"
1653        let mut builder = fixture.table.scan();
1654        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1655        builder = builder
1656            .with_filter(predicate)
1657            .with_row_selection_enabled(true);
1658        let table_scan = builder.build().unwrap();
1659
1660        let batch_stream = table_scan.to_arrow().await.unwrap();
1661
1662        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1663
1664        assert_eq!(batches[0].num_rows(), 512);
1665
1666        let col = batches[0].column_by_name("a").unwrap();
1667        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1668        assert_eq!(string_arr.value(0), "Iceberg");
1669    }
1670
1671    #[tokio::test]
1672    async fn test_filter_on_arrow_not_startswith() {
1673        let mut fixture = TableTestFixture::new();
1674        fixture.setup_manifest_files().await;
1675
1676        // Filter: a NOT STARTSWITH "Ice"
1677        let mut builder = fixture.table.scan();
1678        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1679        builder = builder
1680            .with_filter(predicate)
1681            .with_row_selection_enabled(true);
1682        let table_scan = builder.build().unwrap();
1683
1684        let batch_stream = table_scan.to_arrow().await.unwrap();
1685
1686        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1687
1688        assert_eq!(batches[0].num_rows(), 512);
1689
1690        let col = batches[0].column_by_name("a").unwrap();
1691        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1692        assert_eq!(string_arr.value(0), "Apache");
1693    }
1694
1695    #[tokio::test]
1696    async fn test_filter_on_arrow_in() {
1697        let mut fixture = TableTestFixture::new();
1698        fixture.setup_manifest_files().await;
1699
1700        // Filter: a IN ("Sioux", "Iceberg")
1701        let mut builder = fixture.table.scan();
1702        let predicate =
1703            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1704        builder = builder
1705            .with_filter(predicate)
1706            .with_row_selection_enabled(true);
1707        let table_scan = builder.build().unwrap();
1708
1709        let batch_stream = table_scan.to_arrow().await.unwrap();
1710
1711        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1712
1713        assert_eq!(batches[0].num_rows(), 512);
1714
1715        let col = batches[0].column_by_name("a").unwrap();
1716        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1717        assert_eq!(string_arr.value(0), "Iceberg");
1718    }
1719
1720    #[tokio::test]
1721    async fn test_filter_on_arrow_not_in() {
1722        let mut fixture = TableTestFixture::new();
1723        fixture.setup_manifest_files().await;
1724
1725        // Filter: a NOT IN ("Sioux", "Iceberg")
1726        let mut builder = fixture.table.scan();
1727        let predicate =
1728            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1729        builder = builder
1730            .with_filter(predicate)
1731            .with_row_selection_enabled(true);
1732        let table_scan = builder.build().unwrap();
1733
1734        let batch_stream = table_scan.to_arrow().await.unwrap();
1735
1736        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1737
1738        assert_eq!(batches[0].num_rows(), 512);
1739
1740        let col = batches[0].column_by_name("a").unwrap();
1741        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1742        assert_eq!(string_arr.value(0), "Apache");
1743    }
1744
1745    #[test]
1746    fn test_file_scan_task_serialize_deserialize() {
1747        let test_fn = |task: FileScanTask| {
1748            let serialized = serde_json::to_string(&task).unwrap();
1749            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1750
1751            assert_eq!(task.data_file_path, deserialized.data_file_path);
1752            assert_eq!(task.start, deserialized.start);
1753            assert_eq!(task.length, deserialized.length);
1754            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1755            assert_eq!(task.predicate, deserialized.predicate);
1756            assert_eq!(task.schema, deserialized.schema);
1757        };
1758
1759        // without predicate
1760        let schema = Arc::new(
1761            Schema::builder()
1762                .with_fields(vec![Arc::new(NestedField::required(
1763                    1,
1764                    "x",
1765                    Type::Primitive(PrimitiveType::Binary),
1766                ))])
1767                .build()
1768                .unwrap(),
1769        );
1770        let task = FileScanTask {
1771            data_file_path: "data_file_path".to_string(),
1772            start: 0,
1773            length: 100,
1774            project_field_ids: vec![1, 2, 3],
1775            predicate: None,
1776            schema: schema.clone(),
1777            record_count: Some(100),
1778            data_file_format: DataFileFormat::Parquet,
1779            deletes: vec![],
1780            partition: None,
1781            partition_spec: None,
1782            name_mapping: None,
1783        };
1784        test_fn(task);
1785
1786        // with predicate
1787        let task = FileScanTask {
1788            data_file_path: "data_file_path".to_string(),
1789            start: 0,
1790            length: 100,
1791            project_field_ids: vec![1, 2, 3],
1792            predicate: Some(BoundPredicate::AlwaysTrue),
1793            schema,
1794            record_count: None,
1795            data_file_format: DataFileFormat::Avro,
1796            deletes: vec![],
1797            partition: None,
1798            partition_spec: None,
1799            name_mapping: None,
1800        };
1801        test_fn(task);
1802    }
1803}