iceberg_datafusion/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg table providers for DataFusion.
19//!
20//! This module provides two table provider implementations:
21//!
22//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh.
23//!   Use for write operations and when you need to see the latest table state.
24//!
25//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific
26//!   table snapshot. Use for consistent analytical queries or time-travel scenarios.
27
28pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::table::Table;
48use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
49use metadata_table::IcebergMetadataTableProvider;
50
51use crate::error::to_datafusion_error;
52use crate::physical_plan::commit::IcebergCommitExec;
53use crate::physical_plan::project::project_with_partition;
54use crate::physical_plan::repartition::repartition;
55use crate::physical_plan::scan::IcebergTableScan;
56use crate::physical_plan::write::IcebergWriteExec;
57
58/// Catalog-backed table provider with automatic metadata refresh.
59///
60/// This provider loads fresh table metadata from the catalog on every scan and write
61/// operation, ensuring you always see the latest table state. Use this when you need
62/// write operations or want to see the most up-to-date data.
63///
64/// For read-only access to a specific snapshot without catalog overhead, use
65/// [`IcebergStaticTableProvider`] instead.
66#[derive(Debug, Clone)]
67pub struct IcebergTableProvider {
68    /// The catalog that manages this table
69    catalog: Arc<dyn Catalog>,
70    /// The table identifier (namespace + name)
71    table_ident: TableIdent,
72    /// A reference-counted arrow `Schema` (cached at construction)
73    schema: ArrowSchemaRef,
74}
75
76impl IcebergTableProvider {
77    /// Creates a new catalog-backed table provider.
78    ///
79    /// Loads the table once to get the initial schema, then stores the catalog
80    /// reference for future metadata refreshes on each operation.
81    pub(crate) async fn try_new(
82        catalog: Arc<dyn Catalog>,
83        namespace: NamespaceIdent,
84        name: impl Into<String>,
85    ) -> Result<Self> {
86        let table_ident = TableIdent::new(namespace, name.into());
87
88        // Load table once to get initial schema
89        let table = catalog.load_table(&table_ident).await?;
90        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
91
92        Ok(IcebergTableProvider {
93            catalog,
94            table_ident,
95            schema,
96        })
97    }
98
99    pub(crate) async fn metadata_table(
100        &self,
101        r#type: MetadataTableType,
102    ) -> Result<IcebergMetadataTableProvider> {
103        // Load fresh table metadata for metadata table access
104        let table = self.catalog.load_table(&self.table_ident).await?;
105        Ok(IcebergMetadataTableProvider { table, r#type })
106    }
107}
108
109#[async_trait]
110impl TableProvider for IcebergTableProvider {
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114
115    fn schema(&self) -> ArrowSchemaRef {
116        self.schema.clone()
117    }
118
119    fn table_type(&self) -> TableType {
120        TableType::Base
121    }
122
123    async fn scan(
124        &self,
125        _state: &dyn Session,
126        projection: Option<&Vec<usize>>,
127        filters: &[Expr],
128        _limit: Option<usize>,
129    ) -> DFResult<Arc<dyn ExecutionPlan>> {
130        // Load fresh table metadata from catalog
131        let table = self
132            .catalog
133            .load_table(&self.table_ident)
134            .await
135            .map_err(to_datafusion_error)?;
136
137        // Create scan with fresh metadata (always use current snapshot)
138        Ok(Arc::new(IcebergTableScan::new(
139            table,
140            None, // Always use current snapshot for catalog-backed provider
141            self.schema.clone(),
142            projection,
143            filters,
144        )))
145    }
146
147    fn supports_filters_pushdown(
148        &self,
149        filters: &[&Expr],
150    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
151        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
152        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
153    }
154
155    async fn insert_into(
156        &self,
157        state: &dyn Session,
158        input: Arc<dyn ExecutionPlan>,
159        _insert_op: InsertOp,
160    ) -> DFResult<Arc<dyn ExecutionPlan>> {
161        // Load fresh table metadata from catalog
162        let table = self
163            .catalog
164            .load_table(&self.table_ident)
165            .await
166            .map_err(to_datafusion_error)?;
167
168        let partition_spec = table.metadata().default_partition_spec();
169
170        // Step 1: Project partition values for partitioned tables
171        let plan_with_partition = if !partition_spec.is_unpartitioned() {
172            project_with_partition(input, &table)?
173        } else {
174            input
175        };
176
177        // Step 2: Repartition for parallel processing
178        let target_partitions =
179            NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
180                DataFusionError::Configuration(
181                    "target_partitions must be greater than 0".to_string(),
182                )
183            })?;
184
185        let repartitioned_plan =
186            repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
187
188        let write_plan = Arc::new(IcebergWriteExec::new(
189            table.clone(),
190            repartitioned_plan,
191            self.schema.clone(),
192        ));
193
194        // Merge the outputs of write_plan into one so we can commit all files together
195        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
196
197        Ok(Arc::new(IcebergCommitExec::new(
198            table,
199            self.catalog.clone(),
200            coalesce_partitions,
201            self.schema.clone(),
202        )))
203    }
204}
205
206/// Static table provider for read-only snapshot access.
207///
208/// This provider holds a cached table instance and does not refresh metadata or support
209/// write operations. Use this for consistent analytical queries, time-travel scenarios,
210/// or when you want to avoid catalog overhead.
211///
212/// For catalog-backed tables with write support and automatic refresh, use
213/// [`IcebergTableProvider`] instead.
214#[derive(Debug, Clone)]
215pub struct IcebergStaticTableProvider {
216    /// The static table instance (never refreshed)
217    table: Table,
218    /// Optional snapshot ID for this static view
219    snapshot_id: Option<i64>,
220    /// A reference-counted arrow `Schema`
221    schema: ArrowSchemaRef,
222}
223
224impl IcebergStaticTableProvider {
225    /// Creates a static provider from a table instance.
226    ///
227    /// Uses the table's current snapshot for all queries. Does not support write operations.
228    pub async fn try_new_from_table(table: Table) -> Result<Self> {
229        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
230        Ok(IcebergStaticTableProvider {
231            table,
232            snapshot_id: None,
233            schema,
234        })
235    }
236
237    /// Creates a static provider for a specific table snapshot.
238    ///
239    /// Queries the specified snapshot for all operations. Useful for time-travel queries.
240    /// Does not support write operations.
241    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
242        let snapshot = table
243            .metadata()
244            .snapshot_by_id(snapshot_id)
245            .ok_or_else(|| {
246                Error::new(
247                    ErrorKind::Unexpected,
248                    format!(
249                        "snapshot id {snapshot_id} not found in table {}",
250                        table.identifier().name()
251                    ),
252                )
253            })?;
254        let table_schema = snapshot.schema(table.metadata())?;
255        let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
256        Ok(IcebergStaticTableProvider {
257            table,
258            snapshot_id: Some(snapshot_id),
259            schema,
260        })
261    }
262}
263
264#[async_trait]
265impl TableProvider for IcebergStaticTableProvider {
266    fn as_any(&self) -> &dyn Any {
267        self
268    }
269
270    fn schema(&self) -> ArrowSchemaRef {
271        self.schema.clone()
272    }
273
274    fn table_type(&self) -> TableType {
275        TableType::Base
276    }
277
278    async fn scan(
279        &self,
280        _state: &dyn Session,
281        projection: Option<&Vec<usize>>,
282        filters: &[Expr],
283        _limit: Option<usize>,
284    ) -> DFResult<Arc<dyn ExecutionPlan>> {
285        // Use cached table (no refresh)
286        Ok(Arc::new(IcebergTableScan::new(
287            self.table.clone(),
288            self.snapshot_id,
289            self.schema.clone(),
290            projection,
291            filters,
292        )))
293    }
294
295    fn supports_filters_pushdown(
296        &self,
297        filters: &[&Expr],
298    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
299        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
300        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
301    }
302
303    async fn insert_into(
304        &self,
305        _state: &dyn Session,
306        _input: Arc<dyn ExecutionPlan>,
307        _insert_op: InsertOp,
308    ) -> DFResult<Arc<dyn ExecutionPlan>> {
309        Err(to_datafusion_error(Error::new(
310            ErrorKind::FeatureUnsupported,
311            "Write operations are not supported on IcebergStaticTableProvider. \
312             Use IcebergTableProvider with a catalog for write support."
313                .to_string(),
314        )))
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use std::collections::HashMap;
321    use std::sync::Arc;
322
323    use datafusion::common::Column;
324    use datafusion::prelude::SessionContext;
325    use iceberg::io::FileIO;
326    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
327    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
328    use iceberg::table::{StaticTable, Table};
329    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
330    use tempfile::TempDir;
331
332    use super::*;
333
334    async fn get_test_table_from_metadata_file() -> Table {
335        let metadata_file_name = "TableMetadataV2Valid.json";
336        let metadata_file_path = format!(
337            "{}/tests/test_data/{}",
338            env!("CARGO_MANIFEST_DIR"),
339            metadata_file_name
340        );
341        let file_io = FileIO::from_path(&metadata_file_path)
342            .unwrap()
343            .build()
344            .unwrap();
345        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
346        let static_table =
347            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
348                .await
349                .unwrap();
350        static_table.into_table()
351    }
352
353    async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
354        let temp_dir = TempDir::new().unwrap();
355        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
356
357        let catalog = MemoryCatalogBuilder::default()
358            .load(
359                "memory",
360                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
361            )
362            .await
363            .unwrap();
364
365        let namespace = NamespaceIdent::new("test_ns".to_string());
366        catalog
367            .create_namespace(&namespace, HashMap::new())
368            .await
369            .unwrap();
370
371        let schema = Schema::builder()
372            .with_schema_id(0)
373            .with_fields(vec![
374                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
375                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
376            ])
377            .build()
378            .unwrap();
379
380        let table_creation = TableCreation::builder()
381            .name("test_table".to_string())
382            .location(format!("{}/test_table", warehouse_path))
383            .schema(schema)
384            .properties(HashMap::new())
385            .build();
386
387        catalog
388            .create_table(&namespace, table_creation)
389            .await
390            .unwrap();
391
392        (
393            Arc::new(catalog),
394            namespace,
395            "test_table".to_string(),
396            temp_dir,
397        )
398    }
399
400    // Tests for IcebergStaticTableProvider
401
402    #[tokio::test]
403    async fn test_static_provider_from_table() {
404        let table = get_test_table_from_metadata_file().await;
405        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
406            .await
407            .unwrap();
408        let ctx = SessionContext::new();
409        ctx.register_table("mytable", Arc::new(table_provider))
410            .unwrap();
411        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
412        let df_schema = df.schema();
413        let df_columns = df_schema.fields();
414        assert_eq!(df_columns.len(), 3);
415        let x_column = df_columns.first().unwrap();
416        let column_data = format!(
417            "{:?}:{:?}",
418            x_column.name(),
419            x_column.data_type().to_string()
420        );
421        assert_eq!(column_data, "\"x\":\"Int64\"");
422        let has_column = df_schema.has_column(&Column::from_name("z"));
423        assert!(has_column);
424    }
425
426    #[tokio::test]
427    async fn test_static_provider_from_snapshot() {
428        let table = get_test_table_from_metadata_file().await;
429        let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
430        let table_provider =
431            IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
432                .await
433                .unwrap();
434        let ctx = SessionContext::new();
435        ctx.register_table("mytable", Arc::new(table_provider))
436            .unwrap();
437        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
438        let df_schema = df.schema();
439        let df_columns = df_schema.fields();
440        assert_eq!(df_columns.len(), 3);
441        let x_column = df_columns.first().unwrap();
442        let column_data = format!(
443            "{:?}:{:?}",
444            x_column.name(),
445            x_column.data_type().to_string()
446        );
447        assert_eq!(column_data, "\"x\":\"Int64\"");
448        let has_column = df_schema.has_column(&Column::from_name("z"));
449        assert!(has_column);
450    }
451
452    #[tokio::test]
453    async fn test_static_provider_rejects_writes() {
454        let table = get_test_table_from_metadata_file().await;
455        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
456            .await
457            .unwrap();
458        let ctx = SessionContext::new();
459        ctx.register_table("mytable", Arc::new(table_provider))
460            .unwrap();
461
462        // Attempt to insert into the static provider should fail
463        let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
464
465        // The error should occur during planning or execution
466        // We expect an error indicating write operations are not supported
467        assert!(
468            result.is_err() || {
469                let df = result.unwrap();
470                df.collect().await.is_err()
471            }
472        );
473    }
474
475    #[tokio::test]
476    async fn test_static_provider_scan() {
477        let table = get_test_table_from_metadata_file().await;
478        let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
479            .await
480            .unwrap();
481        let ctx = SessionContext::new();
482        ctx.register_table("mytable", Arc::new(table_provider))
483            .unwrap();
484
485        // Test that scan operations work correctly
486        let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
487        let physical_plan = df.create_physical_plan().await;
488        assert!(physical_plan.is_ok());
489    }
490
491    // Tests for IcebergTableProvider
492
493    #[tokio::test]
494    async fn test_catalog_backed_provider_creation() {
495        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
496
497        // Test creating a catalog-backed provider
498        let provider =
499            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
500                .await
501                .unwrap();
502
503        // Verify the schema is loaded correctly
504        let schema = provider.schema();
505        assert_eq!(schema.fields().len(), 2);
506        assert_eq!(schema.field(0).name(), "id");
507        assert_eq!(schema.field(1).name(), "name");
508    }
509
510    #[tokio::test]
511    async fn test_catalog_backed_provider_scan() {
512        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
513
514        let provider =
515            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
516                .await
517                .unwrap();
518
519        let ctx = SessionContext::new();
520        ctx.register_table("test_table", Arc::new(provider))
521            .unwrap();
522
523        // Test that scan operations work correctly
524        let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
525
526        // Verify the schema in the query result
527        let df_schema = df.schema();
528        assert_eq!(df_schema.fields().len(), 2);
529        assert_eq!(df_schema.field(0).name(), "id");
530        assert_eq!(df_schema.field(1).name(), "name");
531
532        let physical_plan = df.create_physical_plan().await;
533        assert!(physical_plan.is_ok());
534    }
535
536    #[tokio::test]
537    async fn test_catalog_backed_provider_insert() {
538        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
539
540        let provider =
541            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
542                .await
543                .unwrap();
544
545        let ctx = SessionContext::new();
546        ctx.register_table("test_table", Arc::new(provider))
547            .unwrap();
548
549        // Test that insert operations work correctly
550        let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
551
552        // Insert should succeed (or at least not fail during planning)
553        assert!(result.is_ok());
554
555        // Try to execute the insert plan
556        let df = result.unwrap();
557        let execution_result = df.collect().await;
558
559        // The execution should succeed
560        assert!(execution_result.is_ok());
561    }
562
563    #[tokio::test]
564    async fn test_physical_input_schema_consistent_with_logical_input_schema() {
565        let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
566
567        let provider =
568            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
569                .await
570                .unwrap();
571
572        let ctx = SessionContext::new();
573        ctx.register_table("test_table", Arc::new(provider))
574            .unwrap();
575
576        // Create a query plan
577        let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
578
579        // Get logical schema before consuming df
580        let logical_schema = df.schema().clone();
581
582        // Get physical plan (this consumes df)
583        let physical_plan = df.create_physical_plan().await.unwrap();
584        let physical_schema = physical_plan.schema();
585
586        // Verify that logical and physical schemas are consistent
587        assert_eq!(
588            logical_schema.fields().len(),
589            physical_schema.fields().len()
590        );
591
592        for (logical_field, physical_field) in logical_schema
593            .fields()
594            .iter()
595            .zip(physical_schema.fields().iter())
596        {
597            assert_eq!(logical_field.name(), physical_field.name());
598            assert_eq!(logical_field.data_type(), physical_field.data_type());
599        }
600    }
601}