1pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::table::Table;
48use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
49use metadata_table::IcebergMetadataTableProvider;
50
51use crate::error::to_datafusion_error;
52use crate::physical_plan::commit::IcebergCommitExec;
53use crate::physical_plan::project::project_with_partition;
54use crate::physical_plan::repartition::repartition;
55use crate::physical_plan::scan::IcebergTableScan;
56use crate::physical_plan::write::IcebergWriteExec;
57
58#[derive(Debug, Clone)]
67pub struct IcebergTableProvider {
68 catalog: Arc<dyn Catalog>,
70 table_ident: TableIdent,
72 schema: ArrowSchemaRef,
74}
75
76impl IcebergTableProvider {
77 pub(crate) async fn try_new(
82 catalog: Arc<dyn Catalog>,
83 namespace: NamespaceIdent,
84 name: impl Into<String>,
85 ) -> Result<Self> {
86 let table_ident = TableIdent::new(namespace, name.into());
87
88 let table = catalog.load_table(&table_ident).await?;
90 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
91
92 Ok(IcebergTableProvider {
93 catalog,
94 table_ident,
95 schema,
96 })
97 }
98
99 pub(crate) async fn metadata_table(
100 &self,
101 r#type: MetadataTableType,
102 ) -> Result<IcebergMetadataTableProvider> {
103 let table = self.catalog.load_table(&self.table_ident).await?;
105 Ok(IcebergMetadataTableProvider { table, r#type })
106 }
107}
108
109#[async_trait]
110impl TableProvider for IcebergTableProvider {
111 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn schema(&self) -> ArrowSchemaRef {
116 self.schema.clone()
117 }
118
119 fn table_type(&self) -> TableType {
120 TableType::Base
121 }
122
123 async fn scan(
124 &self,
125 _state: &dyn Session,
126 projection: Option<&Vec<usize>>,
127 filters: &[Expr],
128 _limit: Option<usize>,
129 ) -> DFResult<Arc<dyn ExecutionPlan>> {
130 let table = self
132 .catalog
133 .load_table(&self.table_ident)
134 .await
135 .map_err(to_datafusion_error)?;
136
137 Ok(Arc::new(IcebergTableScan::new(
139 table,
140 None, self.schema.clone(),
142 projection,
143 filters,
144 )))
145 }
146
147 fn supports_filters_pushdown(
148 &self,
149 filters: &[&Expr],
150 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
151 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
153 }
154
155 async fn insert_into(
156 &self,
157 state: &dyn Session,
158 input: Arc<dyn ExecutionPlan>,
159 _insert_op: InsertOp,
160 ) -> DFResult<Arc<dyn ExecutionPlan>> {
161 let table = self
163 .catalog
164 .load_table(&self.table_ident)
165 .await
166 .map_err(to_datafusion_error)?;
167
168 let partition_spec = table.metadata().default_partition_spec();
169
170 let plan_with_partition = if !partition_spec.is_unpartitioned() {
172 project_with_partition(input, &table)?
173 } else {
174 input
175 };
176
177 let target_partitions =
179 NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
180 DataFusionError::Configuration(
181 "target_partitions must be greater than 0".to_string(),
182 )
183 })?;
184
185 let repartitioned_plan =
186 repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
187
188 let write_plan = Arc::new(IcebergWriteExec::new(
189 table.clone(),
190 repartitioned_plan,
191 self.schema.clone(),
192 ));
193
194 let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
196
197 Ok(Arc::new(IcebergCommitExec::new(
198 table,
199 self.catalog.clone(),
200 coalesce_partitions,
201 self.schema.clone(),
202 )))
203 }
204}
205
206#[derive(Debug, Clone)]
215pub struct IcebergStaticTableProvider {
216 table: Table,
218 snapshot_id: Option<i64>,
220 schema: ArrowSchemaRef,
222}
223
224impl IcebergStaticTableProvider {
225 pub async fn try_new_from_table(table: Table) -> Result<Self> {
229 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
230 Ok(IcebergStaticTableProvider {
231 table,
232 snapshot_id: None,
233 schema,
234 })
235 }
236
237 pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
242 let snapshot = table
243 .metadata()
244 .snapshot_by_id(snapshot_id)
245 .ok_or_else(|| {
246 Error::new(
247 ErrorKind::Unexpected,
248 format!(
249 "snapshot id {snapshot_id} not found in table {}",
250 table.identifier().name()
251 ),
252 )
253 })?;
254 let table_schema = snapshot.schema(table.metadata())?;
255 let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
256 Ok(IcebergStaticTableProvider {
257 table,
258 snapshot_id: Some(snapshot_id),
259 schema,
260 })
261 }
262}
263
264#[async_trait]
265impl TableProvider for IcebergStaticTableProvider {
266 fn as_any(&self) -> &dyn Any {
267 self
268 }
269
270 fn schema(&self) -> ArrowSchemaRef {
271 self.schema.clone()
272 }
273
274 fn table_type(&self) -> TableType {
275 TableType::Base
276 }
277
278 async fn scan(
279 &self,
280 _state: &dyn Session,
281 projection: Option<&Vec<usize>>,
282 filters: &[Expr],
283 _limit: Option<usize>,
284 ) -> DFResult<Arc<dyn ExecutionPlan>> {
285 Ok(Arc::new(IcebergTableScan::new(
287 self.table.clone(),
288 self.snapshot_id,
289 self.schema.clone(),
290 projection,
291 filters,
292 )))
293 }
294
295 fn supports_filters_pushdown(
296 &self,
297 filters: &[&Expr],
298 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
299 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
301 }
302
303 async fn insert_into(
304 &self,
305 _state: &dyn Session,
306 _input: Arc<dyn ExecutionPlan>,
307 _insert_op: InsertOp,
308 ) -> DFResult<Arc<dyn ExecutionPlan>> {
309 Err(to_datafusion_error(Error::new(
310 ErrorKind::FeatureUnsupported,
311 "Write operations are not supported on IcebergStaticTableProvider. \
312 Use IcebergTableProvider with a catalog for write support."
313 .to_string(),
314 )))
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use std::collections::HashMap;
321 use std::sync::Arc;
322
323 use datafusion::common::Column;
324 use datafusion::prelude::SessionContext;
325 use iceberg::io::FileIO;
326 use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
327 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
328 use iceberg::table::{StaticTable, Table};
329 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
330 use tempfile::TempDir;
331
332 use super::*;
333
334 async fn get_test_table_from_metadata_file() -> Table {
335 let metadata_file_name = "TableMetadataV2Valid.json";
336 let metadata_file_path = format!(
337 "{}/tests/test_data/{}",
338 env!("CARGO_MANIFEST_DIR"),
339 metadata_file_name
340 );
341 let file_io = FileIO::from_path(&metadata_file_path)
342 .unwrap()
343 .build()
344 .unwrap();
345 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
346 let static_table =
347 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
348 .await
349 .unwrap();
350 static_table.into_table()
351 }
352
353 async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
354 let temp_dir = TempDir::new().unwrap();
355 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
356
357 let catalog = MemoryCatalogBuilder::default()
358 .load(
359 "memory",
360 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
361 )
362 .await
363 .unwrap();
364
365 let namespace = NamespaceIdent::new("test_ns".to_string());
366 catalog
367 .create_namespace(&namespace, HashMap::new())
368 .await
369 .unwrap();
370
371 let schema = Schema::builder()
372 .with_schema_id(0)
373 .with_fields(vec![
374 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
375 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
376 ])
377 .build()
378 .unwrap();
379
380 let table_creation = TableCreation::builder()
381 .name("test_table".to_string())
382 .location(format!("{}/test_table", warehouse_path))
383 .schema(schema)
384 .properties(HashMap::new())
385 .build();
386
387 catalog
388 .create_table(&namespace, table_creation)
389 .await
390 .unwrap();
391
392 (
393 Arc::new(catalog),
394 namespace,
395 "test_table".to_string(),
396 temp_dir,
397 )
398 }
399
400 #[tokio::test]
403 async fn test_static_provider_from_table() {
404 let table = get_test_table_from_metadata_file().await;
405 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
406 .await
407 .unwrap();
408 let ctx = SessionContext::new();
409 ctx.register_table("mytable", Arc::new(table_provider))
410 .unwrap();
411 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
412 let df_schema = df.schema();
413 let df_columns = df_schema.fields();
414 assert_eq!(df_columns.len(), 3);
415 let x_column = df_columns.first().unwrap();
416 let column_data = format!(
417 "{:?}:{:?}",
418 x_column.name(),
419 x_column.data_type().to_string()
420 );
421 assert_eq!(column_data, "\"x\":\"Int64\"");
422 let has_column = df_schema.has_column(&Column::from_name("z"));
423 assert!(has_column);
424 }
425
426 #[tokio::test]
427 async fn test_static_provider_from_snapshot() {
428 let table = get_test_table_from_metadata_file().await;
429 let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
430 let table_provider =
431 IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
432 .await
433 .unwrap();
434 let ctx = SessionContext::new();
435 ctx.register_table("mytable", Arc::new(table_provider))
436 .unwrap();
437 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
438 let df_schema = df.schema();
439 let df_columns = df_schema.fields();
440 assert_eq!(df_columns.len(), 3);
441 let x_column = df_columns.first().unwrap();
442 let column_data = format!(
443 "{:?}:{:?}",
444 x_column.name(),
445 x_column.data_type().to_string()
446 );
447 assert_eq!(column_data, "\"x\":\"Int64\"");
448 let has_column = df_schema.has_column(&Column::from_name("z"));
449 assert!(has_column);
450 }
451
452 #[tokio::test]
453 async fn test_static_provider_rejects_writes() {
454 let table = get_test_table_from_metadata_file().await;
455 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
456 .await
457 .unwrap();
458 let ctx = SessionContext::new();
459 ctx.register_table("mytable", Arc::new(table_provider))
460 .unwrap();
461
462 let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
464
465 assert!(
468 result.is_err() || {
469 let df = result.unwrap();
470 df.collect().await.is_err()
471 }
472 );
473 }
474
475 #[tokio::test]
476 async fn test_static_provider_scan() {
477 let table = get_test_table_from_metadata_file().await;
478 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
479 .await
480 .unwrap();
481 let ctx = SessionContext::new();
482 ctx.register_table("mytable", Arc::new(table_provider))
483 .unwrap();
484
485 let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
487 let physical_plan = df.create_physical_plan().await;
488 assert!(physical_plan.is_ok());
489 }
490
491 #[tokio::test]
494 async fn test_catalog_backed_provider_creation() {
495 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
496
497 let provider =
499 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
500 .await
501 .unwrap();
502
503 let schema = provider.schema();
505 assert_eq!(schema.fields().len(), 2);
506 assert_eq!(schema.field(0).name(), "id");
507 assert_eq!(schema.field(1).name(), "name");
508 }
509
510 #[tokio::test]
511 async fn test_catalog_backed_provider_scan() {
512 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
513
514 let provider =
515 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
516 .await
517 .unwrap();
518
519 let ctx = SessionContext::new();
520 ctx.register_table("test_table", Arc::new(provider))
521 .unwrap();
522
523 let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
525
526 let df_schema = df.schema();
528 assert_eq!(df_schema.fields().len(), 2);
529 assert_eq!(df_schema.field(0).name(), "id");
530 assert_eq!(df_schema.field(1).name(), "name");
531
532 let physical_plan = df.create_physical_plan().await;
533 assert!(physical_plan.is_ok());
534 }
535
536 #[tokio::test]
537 async fn test_catalog_backed_provider_insert() {
538 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
539
540 let provider =
541 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
542 .await
543 .unwrap();
544
545 let ctx = SessionContext::new();
546 ctx.register_table("test_table", Arc::new(provider))
547 .unwrap();
548
549 let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
551
552 assert!(result.is_ok());
554
555 let df = result.unwrap();
557 let execution_result = df.collect().await;
558
559 assert!(execution_result.is_ok());
561 }
562
563 #[tokio::test]
564 async fn test_physical_input_schema_consistent_with_logical_input_schema() {
565 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
566
567 let provider =
568 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
569 .await
570 .unwrap();
571
572 let ctx = SessionContext::new();
573 ctx.register_table("test_table", Arc::new(provider))
574 .unwrap();
575
576 let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
578
579 let logical_schema = df.schema().clone();
581
582 let physical_plan = df.create_physical_plan().await.unwrap();
584 let physical_schema = physical_plan.schema();
585
586 assert_eq!(
588 logical_schema.fields().len(),
589 physical_schema.fields().len()
590 );
591
592 for (logical_field, physical_field) in logical_schema
593 .fields()
594 .iter()
595 .zip(physical_schema.fields().iter())
596 {
597 assert_eq!(logical_field.name(), physical_field.name());
598 assert_eq!(logical_field.data_type(), physical_field.data_type());
599 }
600 }
601}