iceberg_datafusion/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg table providers for DataFusion.
19//!
20//! This module provides two table provider implementations:
21//!
22//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh.
23//!   Use for write operations and when you need to see the latest table state.
24//!
25//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific
26//!   table snapshot. Use for consistent analytical queries or time-travel scenarios.
27
28pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60/// Catalog-backed table provider with automatic metadata refresh.
61///
62/// This provider loads fresh table metadata from the catalog on every scan and write
63/// operation, ensuring you always see the latest table state. Use this when you need
64/// write operations or want to see the most up-to-date data.
65///
66/// For read-only access to a specific snapshot without catalog overhead, use
67/// [`IcebergStaticTableProvider`] instead.
68#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70    /// The catalog that manages this table
71    catalog: Arc<dyn Catalog>,
72    /// The table identifier (namespace + name)
73    table_ident: TableIdent,
74    /// A reference-counted arrow `Schema` (cached at construction)
75    schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79    /// Creates a new catalog-backed table provider.
80    ///
81    /// Loads the table once to get the initial schema, then stores the catalog
82    /// reference for future metadata refreshes on each operation.
83    pub(crate) async fn try_new(
84        catalog: Arc<dyn Catalog>,
85        namespace: NamespaceIdent,
86        name: impl Into<String>,
87    ) -> Result<Self> {
88        let table_ident = TableIdent::new(namespace, name.into());
89
90        // Load table once to get initial schema
91        let table = catalog.load_table(&table_ident).await?;
92        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94        Ok(IcebergTableProvider {
95            catalog,
96            table_ident,
97            schema,
98        })
99    }
100
101    pub(crate) async fn metadata_table(
102        &self,
103        r#type: MetadataTableType,
104    ) -> Result<IcebergMetadataTableProvider> {
105        // Load fresh table metadata for metadata table access
106        let table = self.catalog.load_table(&self.table_ident).await?;
107        Ok(IcebergMetadataTableProvider { table, r#type })
108    }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn schema(&self) -> ArrowSchemaRef {
118        self.schema.clone()
119    }
120
121    fn table_type(&self) -> TableType {
122        TableType::Base
123    }
124
125    async fn scan(
126        &self,
127        _state: &dyn Session,
128        projection: Option<&Vec<usize>>,
129        filters: &[Expr],
130        limit: Option<usize>,
131    ) -> DFResult<Arc<dyn ExecutionPlan>> {
132        // Load fresh table metadata from catalog
133        let table = self
134            .catalog
135            .load_table(&self.table_ident)
136            .await
137            .map_err(to_datafusion_error)?;
138
139        // Create scan with fresh metadata (always use current snapshot)
140        Ok(Arc::new(IcebergTableScan::new(
141            table,
142            None, // Always use current snapshot for catalog-backed provider
143            self.schema.clone(),
144            projection,
145            filters,
146            limit,
147        )))
148    }
149
150    fn supports_filters_pushdown(
151        &self,
152        filters: &[&Expr],
153    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
155        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156    }
157
158    async fn insert_into(
159        &self,
160        state: &dyn Session,
161        input: Arc<dyn ExecutionPlan>,
162        _insert_op: InsertOp,
163    ) -> DFResult<Arc<dyn ExecutionPlan>> {
164        // Load fresh table metadata from catalog
165        let table = self
166            .catalog
167            .load_table(&self.table_ident)
168            .await
169            .map_err(to_datafusion_error)?;
170
171        let partition_spec = table.metadata().default_partition_spec();
172
173        // Step 1: Project partition values for partitioned tables
174        let plan_with_partition = if !partition_spec.is_unpartitioned() {
175            project_with_partition(input, &table)?
176        } else {
177            input
178        };
179
180        // Step 2: Repartition for parallel processing
181        let target_partitions =
182            NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
183                DataFusionError::Configuration(
184                    "target_partitions must be greater than 0".to_string(),
185                )
186            })?;
187
188        let repartitioned_plan =
189            repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
190
191        // Apply sort node when it's not fanout mode
192        let fanout_enabled = table
193            .metadata()
194            .properties()
195            .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
196            .map(|value| {
197                value
198                    .parse::<bool>()
199                    .map_err(|e| {
200                        Error::new(
201                            ErrorKind::DataInvalid,
202                            format!(
203                                "Invalid value for {}, expected 'true' or 'false'",
204                                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
205                            ),
206                        )
207                        .with_source(e)
208                    })
209                    .map_err(to_datafusion_error)
210            })
211            .transpose()?
212            .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
213
214        let write_input = if fanout_enabled {
215            repartitioned_plan
216        } else {
217            sort_by_partition(repartitioned_plan)?
218        };
219
220        let write_plan = Arc::new(IcebergWriteExec::new(
221            table.clone(),
222            write_input,
223            self.schema.clone(),
224        ));
225
226        // Merge the outputs of write_plan into one so we can commit all files together
227        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
228
229        Ok(Arc::new(IcebergCommitExec::new(
230            table,
231            self.catalog.clone(),
232            coalesce_partitions,
233            self.schema.clone(),
234        )))
235    }
236}
237
238/// Static table provider for read-only snapshot access.
239///
240/// This provider holds a cached table instance and does not refresh metadata or support
241/// write operations. Use this for consistent analytical queries, time-travel scenarios,
242/// or when you want to avoid catalog overhead.
243///
244/// For catalog-backed tables with write support and automatic refresh, use
245/// [`IcebergTableProvider`] instead.
246#[derive(Debug, Clone)]
247pub struct IcebergStaticTableProvider {
248    /// The static table instance (never refreshed)
249    table: Table,
250    /// Optional snapshot ID for this static view
251    snapshot_id: Option<i64>,
252    /// A reference-counted arrow `Schema`
253    schema: ArrowSchemaRef,
254}
255
256impl IcebergStaticTableProvider {
257    /// Creates a static provider from a table instance.
258    ///
259    /// Uses the table's current snapshot for all queries. Does not support write operations.
260    pub async fn try_new_from_table(table: Table) -> Result<Self> {
261        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
262        Ok(IcebergStaticTableProvider {
263            table,
264            snapshot_id: None,
265            schema,
266        })
267    }
268
269    /// Creates a static provider for a specific table snapshot.
270    ///
271    /// Queries the specified snapshot for all operations. Useful for time-travel queries.
272    /// Does not support write operations.
273    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
274        let snapshot = table
275            .metadata()
276            .snapshot_by_id(snapshot_id)
277            .ok_or_else(|| {
278                Error::new(
279                    ErrorKind::Unexpected,
280                    format!(
281                        "snapshot id {snapshot_id} not found in table {}",
282                        table.identifier().name()
283                    ),
284                )
285            })?;
286        let table_schema = snapshot.schema(table.metadata())?;
287        let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
288        Ok(IcebergStaticTableProvider {
289            table,
290            snapshot_id: Some(snapshot_id),
291            schema,
292        })
293    }
294}
295
296#[async_trait]
297impl TableProvider for IcebergStaticTableProvider {
298    fn as_any(&self) -> &dyn Any {
299        self
300    }
301
302    fn schema(&self) -> ArrowSchemaRef {
303        self.schema.clone()
304    }
305
306    fn table_type(&self) -> TableType {
307        TableType::Base
308    }
309
310    async fn scan(
311        &self,
312        _state: &dyn Session,
313        projection: Option<&Vec<usize>>,
314        filters: &[Expr],
315        limit: Option<usize>,
316    ) -> DFResult<Arc<dyn ExecutionPlan>> {
317        // Use cached table (no refresh)
318        Ok(Arc::new(IcebergTableScan::new(
319            self.table.clone(),
320            self.snapshot_id,
321            self.schema.clone(),
322            projection,
323            filters,
324            limit,
325        )))
326    }
327
328    fn supports_filters_pushdown(
329        &self,
330        filters: &[&Expr],
331    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
332        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
333        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
334    }
335
336    async fn insert_into(
337        &self,
338        _state: &dyn Session,
339        _input: Arc<dyn ExecutionPlan>,
340        _insert_op: InsertOp,
341    ) -> DFResult<Arc<dyn ExecutionPlan>> {
342        Err(to_datafusion_error(Error::new(
343            ErrorKind::FeatureUnsupported,
344            "Write operations are not supported on IcebergStaticTableProvider. \
345             Use IcebergTableProvider with a catalog for write support."
346                .to_string(),
347        )))
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use std::collections::HashMap;
354    use std::sync::Arc;
355
356    use datafusion::common::Column;
357    use datafusion::physical_plan::ExecutionPlan;
358    use datafusion::prelude::SessionContext;
359    use iceberg::io::FileIO;
360    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
361    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
362    use iceberg::table::{StaticTable, Table};
363    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
364    use tempfile::TempDir;
365
366    use super::*;
367
368    async fn get_test_table_from_metadata_file() -> Table {
369        let metadata_file_name = "TableMetadataV2Valid.json";
370        let metadata_file_path = format!(
371            "{}/tests/test_data/{}",
372            env!("CARGO_MANIFEST_DIR"),
373            metadata_file_name
374        );
375        let file_io = FileIO::from_path(&metadata_file_path)
376            .unwrap()
377            .build()
378            .unwrap();
379        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
380        let static_table =
381            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
382                .await
383                .unwrap();
384        static_table.into_table()
385    }
386
387    async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
388        let temp_dir = TempDir::new().unwrap();
389        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
390
391        let catalog = MemoryCatalogBuilder::default()
392            .load(
393                "memory",
394                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
395            )
396            .await
397            .unwrap();
398
399        let namespace = NamespaceIdent::new("test_ns".to_string());
400        catalog
401            .create_namespace(&namespace, HashMap::new())
402            .await
403            .unwrap();
404
405        let schema = Schema::builder()
406            .with_schema_id(0)
407            .with_fields(vec![
408                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
409                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
410            ])
411            .build()
412            .unwrap();
413
414        let table_creation = TableCreation::builder()
415            .name("test_table".to_string())
416            .location(format!("{warehouse_path}/test_table"))
417            .schema(schema)
418            .properties(HashMap::new())
419            .build();
420
421        catalog
422            .create_table(&namespace, table_creation)
423            .await
424            .unwrap();
425
426        (
427            Arc::new(catalog),
428            namespace,
429            "test_table".to_string(),
430            temp_dir,
431        )
432    }
433
434    // Tests for IcebergStaticTableProvider
435
436    #[tokio::test]
437    async fn test_static_provider_from_table() {
438        let table = get_test_table_from_metadata_file().await;
439        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
440            .await
441            .unwrap();
442        let ctx = SessionContext::new();
443        ctx.register_table("mytable", Arc::new(table_provider))
444            .unwrap();
445        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
446        let df_schema = df.schema();
447        let df_columns = df_schema.fields();
448        assert_eq!(df_columns.len(), 3);
449        let x_column = df_columns.first().unwrap();
450        let column_data = format!(
451            "{:?}:{:?}",
452            x_column.name(),
453            x_column.data_type().to_string()
454        );
455        assert_eq!(column_data, "\"x\":\"Int64\"");
456        let has_column = df_schema.has_column(&Column::from_name("z"));
457        assert!(has_column);
458    }
459
460    #[tokio::test]
461    async fn test_static_provider_from_snapshot() {
462        let table = get_test_table_from_metadata_file().await;
463        let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
464        let table_provider =
465            IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
466                .await
467                .unwrap();
468        let ctx = SessionContext::new();
469        ctx.register_table("mytable", Arc::new(table_provider))
470            .unwrap();
471        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
472        let df_schema = df.schema();
473        let df_columns = df_schema.fields();
474        assert_eq!(df_columns.len(), 3);
475        let x_column = df_columns.first().unwrap();
476        let column_data = format!(
477            "{:?}:{:?}",
478            x_column.name(),
479            x_column.data_type().to_string()
480        );
481        assert_eq!(column_data, "\"x\":\"Int64\"");
482        let has_column = df_schema.has_column(&Column::from_name("z"));
483        assert!(has_column);
484    }
485
486    #[tokio::test]
487    async fn test_static_provider_rejects_writes() {
488        let table = get_test_table_from_metadata_file().await;
489        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
490            .await
491            .unwrap();
492        let ctx = SessionContext::new();
493        ctx.register_table("mytable", Arc::new(table_provider))
494            .unwrap();
495
496        // Attempt to insert into the static provider should fail
497        let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
498
499        // The error should occur during planning or execution
500        // We expect an error indicating write operations are not supported
501        assert!(
502            result.is_err() || {
503                let df = result.unwrap();
504                df.collect().await.is_err()
505            }
506        );
507    }
508
509    #[tokio::test]
510    async fn test_static_provider_scan() {
511        let table = get_test_table_from_metadata_file().await;
512        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
513            .await
514            .unwrap();
515        let ctx = SessionContext::new();
516        ctx.register_table("mytable", Arc::new(table_provider))
517            .unwrap();
518
519        // Test that scan operations work correctly
520        let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
521        let physical_plan = df.create_physical_plan().await;
522        assert!(physical_plan.is_ok());
523    }
524
525    // Tests for IcebergTableProvider
526
527    #[tokio::test]
528    async fn test_catalog_backed_provider_creation() {
529        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
530
531        // Test creating a catalog-backed provider
532        let provider =
533            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
534                .await
535                .unwrap();
536
537        // Verify the schema is loaded correctly
538        let schema = provider.schema();
539        assert_eq!(schema.fields().len(), 2);
540        assert_eq!(schema.field(0).name(), "id");
541        assert_eq!(schema.field(1).name(), "name");
542    }
543
544    #[tokio::test]
545    async fn test_catalog_backed_provider_scan() {
546        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
547
548        let provider =
549            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
550                .await
551                .unwrap();
552
553        let ctx = SessionContext::new();
554        ctx.register_table("test_table", Arc::new(provider))
555            .unwrap();
556
557        // Test that scan operations work correctly
558        let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
559
560        // Verify the schema in the query result
561        let df_schema = df.schema();
562        assert_eq!(df_schema.fields().len(), 2);
563        assert_eq!(df_schema.field(0).name(), "id");
564        assert_eq!(df_schema.field(1).name(), "name");
565
566        let physical_plan = df.create_physical_plan().await;
567        assert!(physical_plan.is_ok());
568    }
569
570    #[tokio::test]
571    async fn test_catalog_backed_provider_insert() {
572        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
573
574        let provider =
575            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
576                .await
577                .unwrap();
578
579        let ctx = SessionContext::new();
580        ctx.register_table("test_table", Arc::new(provider))
581            .unwrap();
582
583        // Test that insert operations work correctly
584        let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
585
586        // Insert should succeed (or at least not fail during planning)
587        assert!(result.is_ok());
588
589        // Try to execute the insert plan
590        let df = result.unwrap();
591        let execution_result = df.collect().await;
592
593        // The execution should succeed
594        assert!(execution_result.is_ok());
595    }
596
597    #[tokio::test]
598    async fn test_physical_input_schema_consistent_with_logical_input_schema() {
599        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
600
601        let provider =
602            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
603                .await
604                .unwrap();
605
606        let ctx = SessionContext::new();
607        ctx.register_table("test_table", Arc::new(provider))
608            .unwrap();
609
610        // Create a query plan
611        let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
612
613        // Get logical schema before consuming df
614        let logical_schema = df.schema().clone();
615
616        // Get physical plan (this consumes df)
617        let physical_plan = df.create_physical_plan().await.unwrap();
618        let physical_schema = physical_plan.schema();
619
620        // Verify that logical and physical schemas are consistent
621        assert_eq!(
622            logical_schema.fields().len(),
623            physical_schema.fields().len()
624        );
625
626        for (logical_field, physical_field) in logical_schema
627            .fields()
628            .iter()
629            .zip(physical_schema.fields().iter())
630        {
631            assert_eq!(logical_field.name(), physical_field.name());
632            assert_eq!(logical_field.data_type(), physical_field.data_type());
633        }
634    }
635
636    async fn get_partitioned_test_catalog_and_table(
637        fanout_enabled: Option<bool>,
638    ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
639        use iceberg::spec::{Transform, UnboundPartitionSpec};
640
641        let temp_dir = TempDir::new().unwrap();
642        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
643
644        let catalog = MemoryCatalogBuilder::default()
645            .load(
646                "memory",
647                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
648            )
649            .await
650            .unwrap();
651
652        let namespace = NamespaceIdent::new("test_ns".to_string());
653        catalog
654            .create_namespace(&namespace, HashMap::new())
655            .await
656            .unwrap();
657
658        let schema = Schema::builder()
659            .with_schema_id(0)
660            .with_fields(vec![
661                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
662                NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
663            ])
664            .build()
665            .unwrap();
666
667        let partition_spec = UnboundPartitionSpec::builder()
668            .with_spec_id(0)
669            .add_partition_field(2, "category", Transform::Identity)
670            .unwrap()
671            .build();
672
673        let mut properties = HashMap::new();
674        if let Some(enabled) = fanout_enabled {
675            properties.insert(
676                iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
677                    .to_string(),
678                enabled.to_string(),
679            );
680        }
681
682        let table_creation = TableCreation::builder()
683            .name("partitioned_table".to_string())
684            .location(format!("{warehouse_path}/partitioned_table"))
685            .schema(schema)
686            .partition_spec(partition_spec)
687            .properties(properties)
688            .build();
689
690        catalog
691            .create_table(&namespace, table_creation)
692            .await
693            .unwrap();
694
695        (
696            Arc::new(catalog),
697            namespace,
698            "partitioned_table".to_string(),
699            temp_dir,
700        )
701    }
702
703    /// Helper to check if a plan contains a SortExec node
704    fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
705        if plan.name() == "SortExec" {
706            return true;
707        }
708        for child in plan.children() {
709            if plan_contains_sort(child) {
710                return true;
711            }
712        }
713        false
714    }
715
716    #[tokio::test]
717    async fn test_insert_plan_fanout_enabled_no_sort() {
718        use datafusion::datasource::TableProvider;
719        use datafusion::logical_expr::dml::InsertOp;
720        use datafusion::physical_plan::empty::EmptyExec;
721
722        // When fanout is enabled (default), no sort node should be added
723        let (catalog, namespace, table_name, _temp_dir) =
724            get_partitioned_test_catalog_and_table(Some(true)).await;
725
726        let provider =
727            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
728                .await
729                .unwrap();
730
731        let ctx = SessionContext::new();
732        let input_schema = provider.schema();
733        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
734
735        let state = ctx.state();
736        let insert_plan = provider
737            .insert_into(&state, input, InsertOp::Append)
738            .await
739            .unwrap();
740
741        // With fanout enabled, there should be no SortExec in the plan
742        assert!(
743            !plan_contains_sort(&insert_plan),
744            "Plan should NOT contain SortExec when fanout is enabled"
745        );
746    }
747
748    #[tokio::test]
749    async fn test_insert_plan_fanout_disabled_has_sort() {
750        use datafusion::datasource::TableProvider;
751        use datafusion::logical_expr::dml::InsertOp;
752        use datafusion::physical_plan::empty::EmptyExec;
753
754        // When fanout is disabled, a sort node should be added
755        let (catalog, namespace, table_name, _temp_dir) =
756            get_partitioned_test_catalog_and_table(Some(false)).await;
757
758        let provider =
759            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
760                .await
761                .unwrap();
762
763        let ctx = SessionContext::new();
764        let input_schema = provider.schema();
765        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
766
767        let state = ctx.state();
768        let insert_plan = provider
769            .insert_into(&state, input, InsertOp::Append)
770            .await
771            .unwrap();
772
773        // With fanout disabled, there should be a SortExec in the plan
774        assert!(
775            plan_contains_sort(&insert_plan),
776            "Plan should contain SortExec when fanout is disabled"
777        );
778    }
779
780    #[tokio::test]
781    async fn test_limit_pushdown_static_provider() {
782        use datafusion::datasource::TableProvider;
783
784        let table = get_test_table_from_metadata_file().await;
785        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
786            .await
787            .unwrap();
788
789        let ctx = SessionContext::new();
790        let state = ctx.state();
791
792        // Test scan with limit
793        let scan_plan = table_provider
794            .scan(&state, None, &[], Some(10))
795            .await
796            .unwrap();
797
798        // Verify that the scan plan is an IcebergTableScan
799        let iceberg_scan = scan_plan
800            .as_any()
801            .downcast_ref::<IcebergTableScan>()
802            .expect("Expected IcebergTableScan");
803
804        // Verify the limit is set
805        assert_eq!(
806            iceberg_scan.limit(),
807            Some(10),
808            "Limit should be set to 10 in the scan plan"
809        );
810    }
811
812    #[tokio::test]
813    async fn test_limit_pushdown_catalog_backed_provider() {
814        use datafusion::datasource::TableProvider;
815
816        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
817
818        let provider =
819            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
820                .await
821                .unwrap();
822
823        let ctx = SessionContext::new();
824        let state = ctx.state();
825
826        // Test scan with limit
827        let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
828
829        // Verify that the scan plan is an IcebergTableScan
830        let iceberg_scan = scan_plan
831            .as_any()
832            .downcast_ref::<IcebergTableScan>()
833            .expect("Expected IcebergTableScan");
834
835        // Verify the limit is set
836        assert_eq!(
837            iceberg_scan.limit(),
838            Some(5),
839            "Limit should be set to 5 in the scan plan"
840        );
841    }
842
843    #[tokio::test]
844    async fn test_no_limit_pushdown() {
845        use datafusion::datasource::TableProvider;
846
847        let table = get_test_table_from_metadata_file().await;
848        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
849            .await
850            .unwrap();
851
852        let ctx = SessionContext::new();
853        let state = ctx.state();
854
855        // Test scan without limit
856        let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
857
858        // Verify that the scan plan is an IcebergTableScan
859        let iceberg_scan = scan_plan
860            .as_any()
861            .downcast_ref::<IcebergTableScan>()
862            .expect("Expected IcebergTableScan");
863
864        // Verify the limit is None
865        assert_eq!(
866            iceberg_scan.limit(),
867            None,
868            "Limit should be None when not specified"
869        );
870    }
871}