Skip to main content

iceberg_datafusion/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg table providers for DataFusion.
19//!
20//! This module provides two table provider implementations:
21//!
22//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh.
23//!   Use for write operations and when you need to see the latest table state.
24//!
25//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific
26//!   table snapshot. Use for consistent analytical queries or time-travel scenarios.
27
28pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60/// Catalog-backed table provider with automatic metadata refresh.
61///
62/// This provider loads fresh table metadata from the catalog on every scan and write
63/// operation, ensuring you always see the latest table state. Use this when you need
64/// write operations or want to see the most up-to-date data.
65///
66/// For read-only access to a specific snapshot without catalog overhead, use
67/// [`IcebergStaticTableProvider`] instead.
68#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70    /// The catalog that manages this table
71    catalog: Arc<dyn Catalog>,
72    /// The table identifier (namespace + name)
73    table_ident: TableIdent,
74    /// A reference-counted arrow `Schema` (cached at construction)
75    schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79    /// Creates a new catalog-backed table provider.
80    ///
81    /// Loads the table once to get the initial schema, then stores the catalog
82    /// reference for future metadata refreshes on each operation.
83    pub(crate) async fn try_new(
84        catalog: Arc<dyn Catalog>,
85        namespace: NamespaceIdent,
86        name: impl Into<String>,
87    ) -> Result<Self> {
88        let table_ident = TableIdent::new(namespace, name.into());
89
90        // Load table once to get initial schema
91        let table = catalog.load_table(&table_ident).await?;
92        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94        Ok(IcebergTableProvider {
95            catalog,
96            table_ident,
97            schema,
98        })
99    }
100
101    pub(crate) async fn metadata_table(
102        &self,
103        r#type: MetadataTableType,
104    ) -> Result<IcebergMetadataTableProvider> {
105        // Load fresh table metadata for metadata table access
106        let table = self.catalog.load_table(&self.table_ident).await?;
107        Ok(IcebergMetadataTableProvider { table, r#type })
108    }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn schema(&self) -> ArrowSchemaRef {
118        self.schema.clone()
119    }
120
121    fn table_type(&self) -> TableType {
122        TableType::Base
123    }
124
125    async fn scan(
126        &self,
127        _state: &dyn Session,
128        projection: Option<&Vec<usize>>,
129        filters: &[Expr],
130        limit: Option<usize>,
131    ) -> DFResult<Arc<dyn ExecutionPlan>> {
132        // Load fresh table metadata from catalog
133        let table = self
134            .catalog
135            .load_table(&self.table_ident)
136            .await
137            .map_err(to_datafusion_error)?;
138
139        // Create scan with fresh metadata (always use current snapshot)
140        Ok(Arc::new(IcebergTableScan::new(
141            table,
142            None, // Always use current snapshot for catalog-backed provider
143            self.schema.clone(),
144            projection,
145            filters,
146            limit,
147        )))
148    }
149
150    fn supports_filters_pushdown(
151        &self,
152        filters: &[&Expr],
153    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
155        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156    }
157
158    async fn insert_into(
159        &self,
160        state: &dyn Session,
161        input: Arc<dyn ExecutionPlan>,
162        _insert_op: InsertOp,
163    ) -> DFResult<Arc<dyn ExecutionPlan>> {
164        if _insert_op != InsertOp::Append {
165            return Err(DataFusionError::NotImplemented(format!(
166                "IcebergTableProvider supports only append inserts, got {_insert_op}"
167            )));
168        }
169
170        // Load fresh table metadata from catalog
171        let table = self
172            .catalog
173            .load_table(&self.table_ident)
174            .await
175            .map_err(to_datafusion_error)?;
176
177        let partition_spec = table.metadata().default_partition_spec();
178
179        // Step 1: Project partition values for partitioned tables
180        let plan_with_partition = if !partition_spec.is_unpartitioned() {
181            project_with_partition(input, &table)?
182        } else {
183            input
184        };
185
186        // Step 2: Repartition for parallel processing
187        let target_partitions =
188            NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
189                DataFusionError::Configuration(
190                    "target_partitions must be greater than 0".to_string(),
191                )
192            })?;
193
194        let repartitioned_plan =
195            repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
196
197        // Apply sort node when it's not fanout mode
198        let fanout_enabled = table
199            .metadata()
200            .properties()
201            .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
202            .map(|value| {
203                value
204                    .parse::<bool>()
205                    .map_err(|e| {
206                        Error::new(
207                            ErrorKind::DataInvalid,
208                            format!(
209                                "Invalid value for {}, expected 'true' or 'false'",
210                                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
211                            ),
212                        )
213                        .with_source(e)
214                    })
215                    .map_err(to_datafusion_error)
216            })
217            .transpose()?
218            .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
219
220        let write_input = if fanout_enabled {
221            repartitioned_plan
222        } else {
223            sort_by_partition(repartitioned_plan)?
224        };
225
226        let write_plan = Arc::new(IcebergWriteExec::new(
227            table.clone(),
228            write_input,
229            self.schema.clone(),
230        ));
231
232        // Merge the outputs of write_plan into one so we can commit all files together
233        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
234
235        Ok(Arc::new(IcebergCommitExec::new(
236            table,
237            self.catalog.clone(),
238            coalesce_partitions,
239            self.schema.clone(),
240        )))
241    }
242}
243
244/// Static table provider for read-only snapshot access.
245///
246/// This provider holds a cached table instance and does not refresh metadata or support
247/// write operations. Use this for consistent analytical queries, time-travel scenarios,
248/// or when you want to avoid catalog overhead.
249///
250/// For catalog-backed tables with write support and automatic refresh, use
251/// [`IcebergTableProvider`] instead.
252#[derive(Debug, Clone)]
253pub struct IcebergStaticTableProvider {
254    /// The static table instance (never refreshed)
255    table: Table,
256    /// Optional snapshot ID for this static view
257    snapshot_id: Option<i64>,
258    /// A reference-counted arrow `Schema`
259    schema: ArrowSchemaRef,
260}
261
262impl IcebergStaticTableProvider {
263    /// Creates a static provider from a table instance.
264    ///
265    /// Uses the table's current snapshot for all queries. Does not support write operations.
266    pub async fn try_new_from_table(table: Table) -> Result<Self> {
267        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
268        Ok(IcebergStaticTableProvider {
269            table,
270            snapshot_id: None,
271            schema,
272        })
273    }
274
275    /// Creates a static provider for a specific table snapshot.
276    ///
277    /// Queries the specified snapshot for all operations. Useful for time-travel queries.
278    /// Does not support write operations.
279    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
280        let snapshot = table
281            .metadata()
282            .snapshot_by_id(snapshot_id)
283            .ok_or_else(|| {
284                Error::new(
285                    ErrorKind::Unexpected,
286                    format!(
287                        "snapshot id {snapshot_id} not found in table {}",
288                        table.identifier().name()
289                    ),
290                )
291            })?;
292        let table_schema = snapshot.schema(table.metadata())?;
293        let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
294        Ok(IcebergStaticTableProvider {
295            table,
296            snapshot_id: Some(snapshot_id),
297            schema,
298        })
299    }
300}
301
302#[async_trait]
303impl TableProvider for IcebergStaticTableProvider {
304    fn as_any(&self) -> &dyn Any {
305        self
306    }
307
308    fn schema(&self) -> ArrowSchemaRef {
309        self.schema.clone()
310    }
311
312    fn table_type(&self) -> TableType {
313        TableType::Base
314    }
315
316    async fn scan(
317        &self,
318        _state: &dyn Session,
319        projection: Option<&Vec<usize>>,
320        filters: &[Expr],
321        limit: Option<usize>,
322    ) -> DFResult<Arc<dyn ExecutionPlan>> {
323        // Use cached table (no refresh)
324        Ok(Arc::new(IcebergTableScan::new(
325            self.table.clone(),
326            self.snapshot_id,
327            self.schema.clone(),
328            projection,
329            filters,
330            limit,
331        )))
332    }
333
334    fn supports_filters_pushdown(
335        &self,
336        filters: &[&Expr],
337    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
338        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
339        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
340    }
341
342    async fn insert_into(
343        &self,
344        _state: &dyn Session,
345        _input: Arc<dyn ExecutionPlan>,
346        _insert_op: InsertOp,
347    ) -> DFResult<Arc<dyn ExecutionPlan>> {
348        Err(to_datafusion_error(Error::new(
349            ErrorKind::FeatureUnsupported,
350            "Write operations are not supported on IcebergStaticTableProvider. \
351             Use IcebergTableProvider with a catalog for write support."
352                .to_string(),
353        )))
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::collections::HashMap;
360    use std::sync::Arc;
361
362    use datafusion::common::Column;
363    use datafusion::physical_plan::ExecutionPlan;
364    use datafusion::prelude::SessionContext;
365    use iceberg::io::FileIO;
366    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
367    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
368    use iceberg::table::{StaticTable, Table};
369    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
370    use tempfile::TempDir;
371
372    use super::*;
373
374    async fn get_test_table_from_metadata_file() -> Table {
375        let metadata_file_name = "TableMetadataV2Valid.json";
376        let metadata_file_path = format!(
377            "{}/tests/test_data/{}",
378            env!("CARGO_MANIFEST_DIR"),
379            metadata_file_name
380        );
381        let file_io = FileIO::new_with_fs();
382        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
383        let static_table =
384            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
385                .await
386                .unwrap();
387        static_table.into_table()
388    }
389
390    async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
391        let temp_dir = TempDir::new().unwrap();
392        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
393
394        let catalog = MemoryCatalogBuilder::default()
395            .load(
396                "memory",
397                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
398            )
399            .await
400            .unwrap();
401
402        let namespace = NamespaceIdent::new("test_ns".to_string());
403        catalog
404            .create_namespace(&namespace, HashMap::new())
405            .await
406            .unwrap();
407
408        let schema = Schema::builder()
409            .with_schema_id(0)
410            .with_fields(vec![
411                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
412                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
413            ])
414            .build()
415            .unwrap();
416
417        let table_creation = TableCreation::builder()
418            .name("test_table".to_string())
419            .location(format!("{warehouse_path}/test_table"))
420            .schema(schema)
421            .properties(HashMap::new())
422            .build();
423
424        catalog
425            .create_table(&namespace, table_creation)
426            .await
427            .unwrap();
428
429        (
430            Arc::new(catalog),
431            namespace,
432            "test_table".to_string(),
433            temp_dir,
434        )
435    }
436
437    // Tests for IcebergStaticTableProvider
438
439    #[tokio::test]
440    async fn test_static_provider_from_table() {
441        let table = get_test_table_from_metadata_file().await;
442        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
443            .await
444            .unwrap();
445        let ctx = SessionContext::new();
446        ctx.register_table("mytable", Arc::new(table_provider))
447            .unwrap();
448        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
449        let df_schema = df.schema();
450        let df_columns = df_schema.fields();
451        assert_eq!(df_columns.len(), 3);
452        let x_column = df_columns.first().unwrap();
453        let column_data = format!(
454            "{:?}:{:?}",
455            x_column.name(),
456            x_column.data_type().to_string()
457        );
458        assert_eq!(column_data, "\"x\":\"Int64\"");
459        let has_column = df_schema.has_column(&Column::from_name("z"));
460        assert!(has_column);
461    }
462
463    #[tokio::test]
464    async fn test_static_provider_from_snapshot() {
465        let table = get_test_table_from_metadata_file().await;
466        let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
467        let table_provider =
468            IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
469                .await
470                .unwrap();
471        let ctx = SessionContext::new();
472        ctx.register_table("mytable", Arc::new(table_provider))
473            .unwrap();
474        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
475        let df_schema = df.schema();
476        let df_columns = df_schema.fields();
477        assert_eq!(df_columns.len(), 3);
478        let x_column = df_columns.first().unwrap();
479        let column_data = format!(
480            "{:?}:{:?}",
481            x_column.name(),
482            x_column.data_type().to_string()
483        );
484        assert_eq!(column_data, "\"x\":\"Int64\"");
485        let has_column = df_schema.has_column(&Column::from_name("z"));
486        assert!(has_column);
487    }
488
489    #[tokio::test]
490    async fn test_static_provider_rejects_writes() {
491        let table = get_test_table_from_metadata_file().await;
492        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
493            .await
494            .unwrap();
495        let ctx = SessionContext::new();
496        ctx.register_table("mytable", Arc::new(table_provider))
497            .unwrap();
498
499        // Attempt to insert into the static provider should fail
500        let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
501
502        // The error should occur during planning or execution
503        // We expect an error indicating write operations are not supported
504        assert!(
505            result.is_err() || {
506                let df = result.unwrap();
507                df.collect().await.is_err()
508            }
509        );
510    }
511
512    #[tokio::test]
513    async fn test_static_provider_scan() {
514        let table = get_test_table_from_metadata_file().await;
515        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
516            .await
517            .unwrap();
518        let ctx = SessionContext::new();
519        ctx.register_table("mytable", Arc::new(table_provider))
520            .unwrap();
521
522        // Test that scan operations work correctly
523        let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
524        let physical_plan = df.create_physical_plan().await;
525        assert!(physical_plan.is_ok());
526    }
527
528    // Tests for IcebergTableProvider
529
530    #[tokio::test]
531    async fn test_catalog_backed_provider_creation() {
532        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
533
534        // Test creating a catalog-backed provider
535        let provider =
536            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
537                .await
538                .unwrap();
539
540        // Verify the schema is loaded correctly
541        let schema = provider.schema();
542        assert_eq!(schema.fields().len(), 2);
543        assert_eq!(schema.field(0).name(), "id");
544        assert_eq!(schema.field(1).name(), "name");
545    }
546
547    #[tokio::test]
548    async fn test_catalog_backed_provider_scan() {
549        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
550
551        let provider =
552            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
553                .await
554                .unwrap();
555
556        let ctx = SessionContext::new();
557        ctx.register_table("test_table", Arc::new(provider))
558            .unwrap();
559
560        // Test that scan operations work correctly
561        let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
562
563        // Verify the schema in the query result
564        let df_schema = df.schema();
565        assert_eq!(df_schema.fields().len(), 2);
566        assert_eq!(df_schema.field(0).name(), "id");
567        assert_eq!(df_schema.field(1).name(), "name");
568
569        let physical_plan = df.create_physical_plan().await;
570        assert!(physical_plan.is_ok());
571    }
572
573    #[tokio::test]
574    async fn test_catalog_backed_provider_insert() {
575        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
576
577        let provider =
578            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
579                .await
580                .unwrap();
581
582        let ctx = SessionContext::new();
583        ctx.register_table("test_table", Arc::new(provider))
584            .unwrap();
585
586        // Test that insert operations work correctly
587        let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
588
589        // Insert should succeed (or at least not fail during planning)
590        assert!(result.is_ok());
591
592        // Try to execute the insert plan
593        let df = result.unwrap();
594        let execution_result = df.collect().await;
595
596        // The execution should succeed
597        assert!(execution_result.is_ok());
598    }
599
600    #[tokio::test]
601    async fn test_physical_input_schema_consistent_with_logical_input_schema() {
602        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
603
604        let provider =
605            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
606                .await
607                .unwrap();
608
609        let ctx = SessionContext::new();
610        ctx.register_table("test_table", Arc::new(provider))
611            .unwrap();
612
613        // Create a query plan
614        let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
615
616        // Get logical schema before consuming df
617        let logical_schema = df.schema().clone();
618
619        // Get physical plan (this consumes df)
620        let physical_plan = df.create_physical_plan().await.unwrap();
621        let physical_schema = physical_plan.schema();
622
623        // Verify that logical and physical schemas are consistent
624        assert_eq!(
625            logical_schema.fields().len(),
626            physical_schema.fields().len()
627        );
628
629        for (logical_field, physical_field) in logical_schema
630            .fields()
631            .iter()
632            .zip(physical_schema.fields().iter())
633        {
634            assert_eq!(logical_field.name(), physical_field.name());
635            assert_eq!(logical_field.data_type(), physical_field.data_type());
636        }
637    }
638
639    async fn get_partitioned_test_catalog_and_table(
640        fanout_enabled: Option<bool>,
641    ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
642        use iceberg::spec::{Transform, UnboundPartitionSpec};
643
644        let temp_dir = TempDir::new().unwrap();
645        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
646
647        let catalog = MemoryCatalogBuilder::default()
648            .load(
649                "memory",
650                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
651            )
652            .await
653            .unwrap();
654
655        let namespace = NamespaceIdent::new("test_ns".to_string());
656        catalog
657            .create_namespace(&namespace, HashMap::new())
658            .await
659            .unwrap();
660
661        let schema = Schema::builder()
662            .with_schema_id(0)
663            .with_fields(vec![
664                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
665                NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
666            ])
667            .build()
668            .unwrap();
669
670        let partition_spec = UnboundPartitionSpec::builder()
671            .with_spec_id(0)
672            .add_partition_field(2, "category", Transform::Identity)
673            .unwrap()
674            .build();
675
676        let mut properties = HashMap::new();
677        if let Some(enabled) = fanout_enabled {
678            properties.insert(
679                iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
680                    .to_string(),
681                enabled.to_string(),
682            );
683        }
684
685        let table_creation = TableCreation::builder()
686            .name("partitioned_table".to_string())
687            .location(format!("{warehouse_path}/partitioned_table"))
688            .schema(schema)
689            .partition_spec(partition_spec)
690            .properties(properties)
691            .build();
692
693        catalog
694            .create_table(&namespace, table_creation)
695            .await
696            .unwrap();
697
698        (
699            Arc::new(catalog),
700            namespace,
701            "partitioned_table".to_string(),
702            temp_dir,
703        )
704    }
705
706    /// Helper to check if a plan contains a SortExec node
707    fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
708        if plan.name() == "SortExec" {
709            return true;
710        }
711        for child in plan.children() {
712            if plan_contains_sort(child) {
713                return true;
714            }
715        }
716        false
717    }
718
719    #[tokio::test]
720    async fn test_catalog_backed_provider_rejects_non_append_op() {
721        use datafusion::physical_plan::empty::EmptyExec;
722
723        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
724        let provider = IcebergTableProvider::try_new(catalog, namespace, table_name)
725            .await
726            .unwrap();
727        let ctx = SessionContext::new();
728
729        for (insert_op, expected_message) in [
730            (
731                InsertOp::Overwrite,
732                "IcebergTableProvider supports only append inserts, got Insert Overwrite",
733            ),
734            (
735                InsertOp::Replace,
736                "IcebergTableProvider supports only append inserts, got Replace Into",
737            ),
738        ] {
739            let input = Arc::new(EmptyExec::new(provider.schema())) as Arc<dyn ExecutionPlan>;
740            let error = provider
741                .insert_into(&ctx.state(), input, insert_op)
742                .await
743                .expect_err("non-append inserts should be rejected");
744
745            assert!(
746                matches!(
747                    error,
748                    DataFusionError::NotImplemented(ref message) if message == expected_message
749                ),
750                "unexpected error: {error}"
751            );
752        }
753    }
754
755    #[tokio::test]
756    async fn test_insert_plan_fanout_enabled_no_sort() {
757        use datafusion::datasource::TableProvider;
758        use datafusion::logical_expr::dml::InsertOp;
759        use datafusion::physical_plan::empty::EmptyExec;
760
761        // When fanout is enabled (default), no sort node should be added
762        let (catalog, namespace, table_name, _temp_dir) =
763            get_partitioned_test_catalog_and_table(Some(true)).await;
764
765        let provider =
766            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
767                .await
768                .unwrap();
769
770        let ctx = SessionContext::new();
771        let input_schema = provider.schema();
772        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
773
774        let state = ctx.state();
775        let insert_plan = provider
776            .insert_into(&state, input, InsertOp::Append)
777            .await
778            .unwrap();
779
780        // With fanout enabled, there should be no SortExec in the plan
781        assert!(
782            !plan_contains_sort(&insert_plan),
783            "Plan should NOT contain SortExec when fanout is enabled"
784        );
785    }
786
787    #[tokio::test]
788    async fn test_insert_plan_fanout_disabled_has_sort() {
789        use datafusion::datasource::TableProvider;
790        use datafusion::logical_expr::dml::InsertOp;
791        use datafusion::physical_plan::empty::EmptyExec;
792
793        // When fanout is disabled, a sort node should be added
794        let (catalog, namespace, table_name, _temp_dir) =
795            get_partitioned_test_catalog_and_table(Some(false)).await;
796
797        let provider =
798            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
799                .await
800                .unwrap();
801
802        let ctx = SessionContext::new();
803        let input_schema = provider.schema();
804        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
805
806        let state = ctx.state();
807        let insert_plan = provider
808            .insert_into(&state, input, InsertOp::Append)
809            .await
810            .unwrap();
811
812        // With fanout disabled, there should be a SortExec in the plan
813        assert!(
814            plan_contains_sort(&insert_plan),
815            "Plan should contain SortExec when fanout is disabled"
816        );
817    }
818
819    #[tokio::test]
820    async fn test_limit_pushdown_static_provider() {
821        use datafusion::datasource::TableProvider;
822
823        let table = get_test_table_from_metadata_file().await;
824        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
825            .await
826            .unwrap();
827
828        let ctx = SessionContext::new();
829        let state = ctx.state();
830
831        // Test scan with limit
832        let scan_plan = table_provider
833            .scan(&state, None, &[], Some(10))
834            .await
835            .unwrap();
836
837        // Verify that the scan plan is an IcebergTableScan
838        let iceberg_scan = scan_plan
839            .as_any()
840            .downcast_ref::<IcebergTableScan>()
841            .expect("Expected IcebergTableScan");
842
843        // Verify the limit is set
844        assert_eq!(
845            iceberg_scan.limit(),
846            Some(10),
847            "Limit should be set to 10 in the scan plan"
848        );
849    }
850
851    #[tokio::test]
852    async fn test_limit_pushdown_catalog_backed_provider() {
853        use datafusion::datasource::TableProvider;
854
855        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
856
857        let provider =
858            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
859                .await
860                .unwrap();
861
862        let ctx = SessionContext::new();
863        let state = ctx.state();
864
865        // Test scan with limit
866        let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
867
868        // Verify that the scan plan is an IcebergTableScan
869        let iceberg_scan = scan_plan
870            .as_any()
871            .downcast_ref::<IcebergTableScan>()
872            .expect("Expected IcebergTableScan");
873
874        // Verify the limit is set
875        assert_eq!(
876            iceberg_scan.limit(),
877            Some(5),
878            "Limit should be set to 5 in the scan plan"
879        );
880    }
881
882    #[tokio::test]
883    async fn test_no_limit_pushdown() {
884        use datafusion::datasource::TableProvider;
885
886        let table = get_test_table_from_metadata_file().await;
887        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
888            .await
889            .unwrap();
890
891        let ctx = SessionContext::new();
892        let state = ctx.state();
893
894        // Test scan without limit
895        let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
896
897        // Verify that the scan plan is an IcebergTableScan
898        let iceberg_scan = scan_plan
899            .as_any()
900            .downcast_ref::<IcebergTableScan>()
901            .expect("Expected IcebergTableScan");
902
903        // Verify the limit is None
904        assert_eq!(
905            iceberg_scan.limit(),
906            None,
907            "Limit should be None when not specified"
908        );
909    }
910}