iceberg_datafusion/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg table providers for DataFusion.
19//!
20//! This module provides two table provider implementations:
21//!
22//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh.
23//!   Use for write operations and when you need to see the latest table state.
24//!
25//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific
26//!   table snapshot. Use for consistent analytical queries or time-travel scenarios.
27
28pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60/// Catalog-backed table provider with automatic metadata refresh.
61///
62/// This provider loads fresh table metadata from the catalog on every scan and write
63/// operation, ensuring you always see the latest table state. Use this when you need
64/// write operations or want to see the most up-to-date data.
65///
66/// For read-only access to a specific snapshot without catalog overhead, use
67/// [`IcebergStaticTableProvider`] instead.
68#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70    /// The catalog that manages this table
71    catalog: Arc<dyn Catalog>,
72    /// The table identifier (namespace + name)
73    table_ident: TableIdent,
74    /// A reference-counted arrow `Schema` (cached at construction)
75    schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79    /// Creates a new catalog-backed table provider.
80    ///
81    /// Loads the table once to get the initial schema, then stores the catalog
82    /// reference for future metadata refreshes on each operation.
83    pub(crate) async fn try_new(
84        catalog: Arc<dyn Catalog>,
85        namespace: NamespaceIdent,
86        name: impl Into<String>,
87    ) -> Result<Self> {
88        let table_ident = TableIdent::new(namespace, name.into());
89
90        // Load table once to get initial schema
91        let table = catalog.load_table(&table_ident).await?;
92        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94        Ok(IcebergTableProvider {
95            catalog,
96            table_ident,
97            schema,
98        })
99    }
100
101    pub(crate) async fn metadata_table(
102        &self,
103        r#type: MetadataTableType,
104    ) -> Result<IcebergMetadataTableProvider> {
105        // Load fresh table metadata for metadata table access
106        let table = self.catalog.load_table(&self.table_ident).await?;
107        Ok(IcebergMetadataTableProvider { table, r#type })
108    }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn schema(&self) -> ArrowSchemaRef {
118        self.schema.clone()
119    }
120
121    fn table_type(&self) -> TableType {
122        TableType::Base
123    }
124
125    async fn scan(
126        &self,
127        _state: &dyn Session,
128        projection: Option<&Vec<usize>>,
129        filters: &[Expr],
130        limit: Option<usize>,
131    ) -> DFResult<Arc<dyn ExecutionPlan>> {
132        // Load fresh table metadata from catalog
133        let table = self
134            .catalog
135            .load_table(&self.table_ident)
136            .await
137            .map_err(to_datafusion_error)?;
138
139        // Create scan with fresh metadata (always use current snapshot)
140        Ok(Arc::new(IcebergTableScan::new(
141            table,
142            None, // Always use current snapshot for catalog-backed provider
143            self.schema.clone(),
144            projection,
145            filters,
146            limit,
147        )))
148    }
149
150    fn supports_filters_pushdown(
151        &self,
152        filters: &[&Expr],
153    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
155        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156    }
157
158    async fn insert_into(
159        &self,
160        state: &dyn Session,
161        input: Arc<dyn ExecutionPlan>,
162        _insert_op: InsertOp,
163    ) -> DFResult<Arc<dyn ExecutionPlan>> {
164        // Load fresh table metadata from catalog
165        let table = self
166            .catalog
167            .load_table(&self.table_ident)
168            .await
169            .map_err(to_datafusion_error)?;
170
171        let partition_spec = table.metadata().default_partition_spec();
172
173        // Step 1: Project partition values for partitioned tables
174        let plan_with_partition = if !partition_spec.is_unpartitioned() {
175            project_with_partition(input, &table)?
176        } else {
177            input
178        };
179
180        // Step 2: Repartition for parallel processing
181        let target_partitions =
182            NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
183                DataFusionError::Configuration(
184                    "target_partitions must be greater than 0".to_string(),
185                )
186            })?;
187
188        let repartitioned_plan =
189            repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
190
191        // Apply sort node when it's not fanout mode
192        let fanout_enabled = table
193            .metadata()
194            .properties()
195            .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
196            .map(|value| {
197                value
198                    .parse::<bool>()
199                    .map_err(|e| {
200                        Error::new(
201                            ErrorKind::DataInvalid,
202                            format!(
203                                "Invalid value for {}, expected 'true' or 'false'",
204                                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
205                            ),
206                        )
207                        .with_source(e)
208                    })
209                    .map_err(to_datafusion_error)
210            })
211            .transpose()?
212            .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
213
214        let write_input = if fanout_enabled {
215            repartitioned_plan
216        } else {
217            sort_by_partition(repartitioned_plan)?
218        };
219
220        let write_plan = Arc::new(IcebergWriteExec::new(
221            table.clone(),
222            write_input,
223            self.schema.clone(),
224        ));
225
226        // Merge the outputs of write_plan into one so we can commit all files together
227        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
228
229        Ok(Arc::new(IcebergCommitExec::new(
230            table,
231            self.catalog.clone(),
232            coalesce_partitions,
233            self.schema.clone(),
234        )))
235    }
236}
237
238/// Static table provider for read-only snapshot access.
239///
240/// This provider holds a cached table instance and does not refresh metadata or support
241/// write operations. Use this for consistent analytical queries, time-travel scenarios,
242/// or when you want to avoid catalog overhead.
243///
244/// For catalog-backed tables with write support and automatic refresh, use
245/// [`IcebergTableProvider`] instead.
246#[derive(Debug, Clone)]
247pub struct IcebergStaticTableProvider {
248    /// The static table instance (never refreshed)
249    table: Table,
250    /// Optional snapshot ID for this static view
251    snapshot_id: Option<i64>,
252    /// A reference-counted arrow `Schema`
253    schema: ArrowSchemaRef,
254}
255
256impl IcebergStaticTableProvider {
257    /// Creates a static provider from a table instance.
258    ///
259    /// Uses the table's current snapshot for all queries. Does not support write operations.
260    pub async fn try_new_from_table(table: Table) -> Result<Self> {
261        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
262        Ok(IcebergStaticTableProvider {
263            table,
264            snapshot_id: None,
265            schema,
266        })
267    }
268
269    /// Creates a static provider for a specific table snapshot.
270    ///
271    /// Queries the specified snapshot for all operations. Useful for time-travel queries.
272    /// Does not support write operations.
273    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
274        let snapshot = table
275            .metadata()
276            .snapshot_by_id(snapshot_id)
277            .ok_or_else(|| {
278                Error::new(
279                    ErrorKind::Unexpected,
280                    format!(
281                        "snapshot id {snapshot_id} not found in table {}",
282                        table.identifier().name()
283                    ),
284                )
285            })?;
286        let table_schema = snapshot.schema(table.metadata())?;
287        let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
288        Ok(IcebergStaticTableProvider {
289            table,
290            snapshot_id: Some(snapshot_id),
291            schema,
292        })
293    }
294}
295
296#[async_trait]
297impl TableProvider for IcebergStaticTableProvider {
298    fn as_any(&self) -> &dyn Any {
299        self
300    }
301
302    fn schema(&self) -> ArrowSchemaRef {
303        self.schema.clone()
304    }
305
306    fn table_type(&self) -> TableType {
307        TableType::Base
308    }
309
310    async fn scan(
311        &self,
312        _state: &dyn Session,
313        projection: Option<&Vec<usize>>,
314        filters: &[Expr],
315        limit: Option<usize>,
316    ) -> DFResult<Arc<dyn ExecutionPlan>> {
317        // Use cached table (no refresh)
318        Ok(Arc::new(IcebergTableScan::new(
319            self.table.clone(),
320            self.snapshot_id,
321            self.schema.clone(),
322            projection,
323            filters,
324            limit,
325        )))
326    }
327
328    fn supports_filters_pushdown(
329        &self,
330        filters: &[&Expr],
331    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
332        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
333        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
334    }
335
336    async fn insert_into(
337        &self,
338        _state: &dyn Session,
339        _input: Arc<dyn ExecutionPlan>,
340        _insert_op: InsertOp,
341    ) -> DFResult<Arc<dyn ExecutionPlan>> {
342        Err(to_datafusion_error(Error::new(
343            ErrorKind::FeatureUnsupported,
344            "Write operations are not supported on IcebergStaticTableProvider. \
345             Use IcebergTableProvider with a catalog for write support."
346                .to_string(),
347        )))
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use std::collections::HashMap;
354    use std::sync::Arc;
355
356    use datafusion::common::Column;
357    use datafusion::physical_plan::ExecutionPlan;
358    use datafusion::prelude::SessionContext;
359    use iceberg::io::FileIO;
360    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
361    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
362    use iceberg::table::{StaticTable, Table};
363    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
364    use tempfile::TempDir;
365
366    use super::*;
367
368    async fn get_test_table_from_metadata_file() -> Table {
369        let metadata_file_name = "TableMetadataV2Valid.json";
370        let metadata_file_path = format!(
371            "{}/tests/test_data/{}",
372            env!("CARGO_MANIFEST_DIR"),
373            metadata_file_name
374        );
375        let file_io = FileIO::new_with_fs();
376        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
377        let static_table =
378            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
379                .await
380                .unwrap();
381        static_table.into_table()
382    }
383
384    async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
385        let temp_dir = TempDir::new().unwrap();
386        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
387
388        let catalog = MemoryCatalogBuilder::default()
389            .load(
390                "memory",
391                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
392            )
393            .await
394            .unwrap();
395
396        let namespace = NamespaceIdent::new("test_ns".to_string());
397        catalog
398            .create_namespace(&namespace, HashMap::new())
399            .await
400            .unwrap();
401
402        let schema = Schema::builder()
403            .with_schema_id(0)
404            .with_fields(vec![
405                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
406                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
407            ])
408            .build()
409            .unwrap();
410
411        let table_creation = TableCreation::builder()
412            .name("test_table".to_string())
413            .location(format!("{warehouse_path}/test_table"))
414            .schema(schema)
415            .properties(HashMap::new())
416            .build();
417
418        catalog
419            .create_table(&namespace, table_creation)
420            .await
421            .unwrap();
422
423        (
424            Arc::new(catalog),
425            namespace,
426            "test_table".to_string(),
427            temp_dir,
428        )
429    }
430
431    // Tests for IcebergStaticTableProvider
432
433    #[tokio::test]
434    async fn test_static_provider_from_table() {
435        let table = get_test_table_from_metadata_file().await;
436        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
437            .await
438            .unwrap();
439        let ctx = SessionContext::new();
440        ctx.register_table("mytable", Arc::new(table_provider))
441            .unwrap();
442        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
443        let df_schema = df.schema();
444        let df_columns = df_schema.fields();
445        assert_eq!(df_columns.len(), 3);
446        let x_column = df_columns.first().unwrap();
447        let column_data = format!(
448            "{:?}:{:?}",
449            x_column.name(),
450            x_column.data_type().to_string()
451        );
452        assert_eq!(column_data, "\"x\":\"Int64\"");
453        let has_column = df_schema.has_column(&Column::from_name("z"));
454        assert!(has_column);
455    }
456
457    #[tokio::test]
458    async fn test_static_provider_from_snapshot() {
459        let table = get_test_table_from_metadata_file().await;
460        let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
461        let table_provider =
462            IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
463                .await
464                .unwrap();
465        let ctx = SessionContext::new();
466        ctx.register_table("mytable", Arc::new(table_provider))
467            .unwrap();
468        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
469        let df_schema = df.schema();
470        let df_columns = df_schema.fields();
471        assert_eq!(df_columns.len(), 3);
472        let x_column = df_columns.first().unwrap();
473        let column_data = format!(
474            "{:?}:{:?}",
475            x_column.name(),
476            x_column.data_type().to_string()
477        );
478        assert_eq!(column_data, "\"x\":\"Int64\"");
479        let has_column = df_schema.has_column(&Column::from_name("z"));
480        assert!(has_column);
481    }
482
483    #[tokio::test]
484    async fn test_static_provider_rejects_writes() {
485        let table = get_test_table_from_metadata_file().await;
486        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
487            .await
488            .unwrap();
489        let ctx = SessionContext::new();
490        ctx.register_table("mytable", Arc::new(table_provider))
491            .unwrap();
492
493        // Attempt to insert into the static provider should fail
494        let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
495
496        // The error should occur during planning or execution
497        // We expect an error indicating write operations are not supported
498        assert!(
499            result.is_err() || {
500                let df = result.unwrap();
501                df.collect().await.is_err()
502            }
503        );
504    }
505
506    #[tokio::test]
507    async fn test_static_provider_scan() {
508        let table = get_test_table_from_metadata_file().await;
509        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
510            .await
511            .unwrap();
512        let ctx = SessionContext::new();
513        ctx.register_table("mytable", Arc::new(table_provider))
514            .unwrap();
515
516        // Test that scan operations work correctly
517        let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
518        let physical_plan = df.create_physical_plan().await;
519        assert!(physical_plan.is_ok());
520    }
521
522    // Tests for IcebergTableProvider
523
524    #[tokio::test]
525    async fn test_catalog_backed_provider_creation() {
526        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
527
528        // Test creating a catalog-backed provider
529        let provider =
530            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
531                .await
532                .unwrap();
533
534        // Verify the schema is loaded correctly
535        let schema = provider.schema();
536        assert_eq!(schema.fields().len(), 2);
537        assert_eq!(schema.field(0).name(), "id");
538        assert_eq!(schema.field(1).name(), "name");
539    }
540
541    #[tokio::test]
542    async fn test_catalog_backed_provider_scan() {
543        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
544
545        let provider =
546            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
547                .await
548                .unwrap();
549
550        let ctx = SessionContext::new();
551        ctx.register_table("test_table", Arc::new(provider))
552            .unwrap();
553
554        // Test that scan operations work correctly
555        let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
556
557        // Verify the schema in the query result
558        let df_schema = df.schema();
559        assert_eq!(df_schema.fields().len(), 2);
560        assert_eq!(df_schema.field(0).name(), "id");
561        assert_eq!(df_schema.field(1).name(), "name");
562
563        let physical_plan = df.create_physical_plan().await;
564        assert!(physical_plan.is_ok());
565    }
566
567    #[tokio::test]
568    async fn test_catalog_backed_provider_insert() {
569        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
570
571        let provider =
572            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
573                .await
574                .unwrap();
575
576        let ctx = SessionContext::new();
577        ctx.register_table("test_table", Arc::new(provider))
578            .unwrap();
579
580        // Test that insert operations work correctly
581        let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
582
583        // Insert should succeed (or at least not fail during planning)
584        assert!(result.is_ok());
585
586        // Try to execute the insert plan
587        let df = result.unwrap();
588        let execution_result = df.collect().await;
589
590        // The execution should succeed
591        assert!(execution_result.is_ok());
592    }
593
594    #[tokio::test]
595    async fn test_physical_input_schema_consistent_with_logical_input_schema() {
596        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
597
598        let provider =
599            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
600                .await
601                .unwrap();
602
603        let ctx = SessionContext::new();
604        ctx.register_table("test_table", Arc::new(provider))
605            .unwrap();
606
607        // Create a query plan
608        let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
609
610        // Get logical schema before consuming df
611        let logical_schema = df.schema().clone();
612
613        // Get physical plan (this consumes df)
614        let physical_plan = df.create_physical_plan().await.unwrap();
615        let physical_schema = physical_plan.schema();
616
617        // Verify that logical and physical schemas are consistent
618        assert_eq!(
619            logical_schema.fields().len(),
620            physical_schema.fields().len()
621        );
622
623        for (logical_field, physical_field) in logical_schema
624            .fields()
625            .iter()
626            .zip(physical_schema.fields().iter())
627        {
628            assert_eq!(logical_field.name(), physical_field.name());
629            assert_eq!(logical_field.data_type(), physical_field.data_type());
630        }
631    }
632
633    async fn get_partitioned_test_catalog_and_table(
634        fanout_enabled: Option<bool>,
635    ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
636        use iceberg::spec::{Transform, UnboundPartitionSpec};
637
638        let temp_dir = TempDir::new().unwrap();
639        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
640
641        let catalog = MemoryCatalogBuilder::default()
642            .load(
643                "memory",
644                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
645            )
646            .await
647            .unwrap();
648
649        let namespace = NamespaceIdent::new("test_ns".to_string());
650        catalog
651            .create_namespace(&namespace, HashMap::new())
652            .await
653            .unwrap();
654
655        let schema = Schema::builder()
656            .with_schema_id(0)
657            .with_fields(vec![
658                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
659                NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
660            ])
661            .build()
662            .unwrap();
663
664        let partition_spec = UnboundPartitionSpec::builder()
665            .with_spec_id(0)
666            .add_partition_field(2, "category", Transform::Identity)
667            .unwrap()
668            .build();
669
670        let mut properties = HashMap::new();
671        if let Some(enabled) = fanout_enabled {
672            properties.insert(
673                iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
674                    .to_string(),
675                enabled.to_string(),
676            );
677        }
678
679        let table_creation = TableCreation::builder()
680            .name("partitioned_table".to_string())
681            .location(format!("{warehouse_path}/partitioned_table"))
682            .schema(schema)
683            .partition_spec(partition_spec)
684            .properties(properties)
685            .build();
686
687        catalog
688            .create_table(&namespace, table_creation)
689            .await
690            .unwrap();
691
692        (
693            Arc::new(catalog),
694            namespace,
695            "partitioned_table".to_string(),
696            temp_dir,
697        )
698    }
699
700    /// Helper to check if a plan contains a SortExec node
701    fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
702        if plan.name() == "SortExec" {
703            return true;
704        }
705        for child in plan.children() {
706            if plan_contains_sort(child) {
707                return true;
708            }
709        }
710        false
711    }
712
713    #[tokio::test]
714    async fn test_insert_plan_fanout_enabled_no_sort() {
715        use datafusion::datasource::TableProvider;
716        use datafusion::logical_expr::dml::InsertOp;
717        use datafusion::physical_plan::empty::EmptyExec;
718
719        // When fanout is enabled (default), no sort node should be added
720        let (catalog, namespace, table_name, _temp_dir) =
721            get_partitioned_test_catalog_and_table(Some(true)).await;
722
723        let provider =
724            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
725                .await
726                .unwrap();
727
728        let ctx = SessionContext::new();
729        let input_schema = provider.schema();
730        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
731
732        let state = ctx.state();
733        let insert_plan = provider
734            .insert_into(&state, input, InsertOp::Append)
735            .await
736            .unwrap();
737
738        // With fanout enabled, there should be no SortExec in the plan
739        assert!(
740            !plan_contains_sort(&insert_plan),
741            "Plan should NOT contain SortExec when fanout is enabled"
742        );
743    }
744
745    #[tokio::test]
746    async fn test_insert_plan_fanout_disabled_has_sort() {
747        use datafusion::datasource::TableProvider;
748        use datafusion::logical_expr::dml::InsertOp;
749        use datafusion::physical_plan::empty::EmptyExec;
750
751        // When fanout is disabled, a sort node should be added
752        let (catalog, namespace, table_name, _temp_dir) =
753            get_partitioned_test_catalog_and_table(Some(false)).await;
754
755        let provider =
756            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
757                .await
758                .unwrap();
759
760        let ctx = SessionContext::new();
761        let input_schema = provider.schema();
762        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
763
764        let state = ctx.state();
765        let insert_plan = provider
766            .insert_into(&state, input, InsertOp::Append)
767            .await
768            .unwrap();
769
770        // With fanout disabled, there should be a SortExec in the plan
771        assert!(
772            plan_contains_sort(&insert_plan),
773            "Plan should contain SortExec when fanout is disabled"
774        );
775    }
776
777    #[tokio::test]
778    async fn test_limit_pushdown_static_provider() {
779        use datafusion::datasource::TableProvider;
780
781        let table = get_test_table_from_metadata_file().await;
782        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
783            .await
784            .unwrap();
785
786        let ctx = SessionContext::new();
787        let state = ctx.state();
788
789        // Test scan with limit
790        let scan_plan = table_provider
791            .scan(&state, None, &[], Some(10))
792            .await
793            .unwrap();
794
795        // Verify that the scan plan is an IcebergTableScan
796        let iceberg_scan = scan_plan
797            .as_any()
798            .downcast_ref::<IcebergTableScan>()
799            .expect("Expected IcebergTableScan");
800
801        // Verify the limit is set
802        assert_eq!(
803            iceberg_scan.limit(),
804            Some(10),
805            "Limit should be set to 10 in the scan plan"
806        );
807    }
808
809    #[tokio::test]
810    async fn test_limit_pushdown_catalog_backed_provider() {
811        use datafusion::datasource::TableProvider;
812
813        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
814
815        let provider =
816            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
817                .await
818                .unwrap();
819
820        let ctx = SessionContext::new();
821        let state = ctx.state();
822
823        // Test scan with limit
824        let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
825
826        // Verify that the scan plan is an IcebergTableScan
827        let iceberg_scan = scan_plan
828            .as_any()
829            .downcast_ref::<IcebergTableScan>()
830            .expect("Expected IcebergTableScan");
831
832        // Verify the limit is set
833        assert_eq!(
834            iceberg_scan.limit(),
835            Some(5),
836            "Limit should be set to 5 in the scan plan"
837        );
838    }
839
840    #[tokio::test]
841    async fn test_no_limit_pushdown() {
842        use datafusion::datasource::TableProvider;
843
844        let table = get_test_table_from_metadata_file().await;
845        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
846            .await
847            .unwrap();
848
849        let ctx = SessionContext::new();
850        let state = ctx.state();
851
852        // Test scan without limit
853        let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
854
855        // Verify that the scan plan is an IcebergTableScan
856        let iceberg_scan = scan_plan
857            .as_any()
858            .downcast_ref::<IcebergTableScan>()
859            .expect("Expected IcebergTableScan");
860
861        // Verify the limit is None
862        assert_eq!(
863            iceberg_scan.limit(),
864            None,
865            "Limit should be None when not specified"
866        );
867    }
868}