1pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70 catalog: Arc<dyn Catalog>,
72 table_ident: TableIdent,
74 schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79 pub(crate) async fn try_new(
84 catalog: Arc<dyn Catalog>,
85 namespace: NamespaceIdent,
86 name: impl Into<String>,
87 ) -> Result<Self> {
88 let table_ident = TableIdent::new(namespace, name.into());
89
90 let table = catalog.load_table(&table_ident).await?;
92 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94 Ok(IcebergTableProvider {
95 catalog,
96 table_ident,
97 schema,
98 })
99 }
100
101 pub(crate) async fn metadata_table(
102 &self,
103 r#type: MetadataTableType,
104 ) -> Result<IcebergMetadataTableProvider> {
105 let table = self.catalog.load_table(&self.table_ident).await?;
107 Ok(IcebergMetadataTableProvider { table, r#type })
108 }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn schema(&self) -> ArrowSchemaRef {
118 self.schema.clone()
119 }
120
121 fn table_type(&self) -> TableType {
122 TableType::Base
123 }
124
125 async fn scan(
126 &self,
127 _state: &dyn Session,
128 projection: Option<&Vec<usize>>,
129 filters: &[Expr],
130 limit: Option<usize>,
131 ) -> DFResult<Arc<dyn ExecutionPlan>> {
132 let table = self
134 .catalog
135 .load_table(&self.table_ident)
136 .await
137 .map_err(to_datafusion_error)?;
138
139 Ok(Arc::new(IcebergTableScan::new(
141 table,
142 None, self.schema.clone(),
144 projection,
145 filters,
146 limit,
147 )))
148 }
149
150 fn supports_filters_pushdown(
151 &self,
152 filters: &[&Expr],
153 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156 }
157
158 async fn insert_into(
159 &self,
160 state: &dyn Session,
161 input: Arc<dyn ExecutionPlan>,
162 _insert_op: InsertOp,
163 ) -> DFResult<Arc<dyn ExecutionPlan>> {
164 if _insert_op != InsertOp::Append {
165 return Err(DataFusionError::NotImplemented(format!(
166 "IcebergTableProvider supports only append inserts, got {_insert_op}"
167 )));
168 }
169
170 let table = self
172 .catalog
173 .load_table(&self.table_ident)
174 .await
175 .map_err(to_datafusion_error)?;
176
177 let partition_spec = table.metadata().default_partition_spec();
178
179 let plan_with_partition = if !partition_spec.is_unpartitioned() {
181 project_with_partition(input, &table)?
182 } else {
183 input
184 };
185
186 let target_partitions =
188 NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
189 DataFusionError::Configuration(
190 "target_partitions must be greater than 0".to_string(),
191 )
192 })?;
193
194 let repartitioned_plan =
195 repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
196
197 let fanout_enabled = table
199 .metadata()
200 .properties()
201 .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
202 .map(|value| {
203 value
204 .parse::<bool>()
205 .map_err(|e| {
206 Error::new(
207 ErrorKind::DataInvalid,
208 format!(
209 "Invalid value for {}, expected 'true' or 'false'",
210 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
211 ),
212 )
213 .with_source(e)
214 })
215 .map_err(to_datafusion_error)
216 })
217 .transpose()?
218 .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
219
220 let write_input = if fanout_enabled {
221 repartitioned_plan
222 } else {
223 sort_by_partition(repartitioned_plan)?
224 };
225
226 let write_plan = Arc::new(IcebergWriteExec::new(
227 table.clone(),
228 write_input,
229 self.schema.clone(),
230 ));
231
232 let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
234
235 Ok(Arc::new(IcebergCommitExec::new(
236 table,
237 self.catalog.clone(),
238 coalesce_partitions,
239 self.schema.clone(),
240 )))
241 }
242}
243
244#[derive(Debug, Clone)]
253pub struct IcebergStaticTableProvider {
254 table: Table,
256 snapshot_id: Option<i64>,
258 schema: ArrowSchemaRef,
260}
261
262impl IcebergStaticTableProvider {
263 pub async fn try_new_from_table(table: Table) -> Result<Self> {
267 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
268 Ok(IcebergStaticTableProvider {
269 table,
270 snapshot_id: None,
271 schema,
272 })
273 }
274
275 pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
280 let snapshot = table
281 .metadata()
282 .snapshot_by_id(snapshot_id)
283 .ok_or_else(|| {
284 Error::new(
285 ErrorKind::Unexpected,
286 format!(
287 "snapshot id {snapshot_id} not found in table {}",
288 table.identifier().name()
289 ),
290 )
291 })?;
292 let table_schema = snapshot.schema(table.metadata())?;
293 let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
294 Ok(IcebergStaticTableProvider {
295 table,
296 snapshot_id: Some(snapshot_id),
297 schema,
298 })
299 }
300}
301
302#[async_trait]
303impl TableProvider for IcebergStaticTableProvider {
304 fn as_any(&self) -> &dyn Any {
305 self
306 }
307
308 fn schema(&self) -> ArrowSchemaRef {
309 self.schema.clone()
310 }
311
312 fn table_type(&self) -> TableType {
313 TableType::Base
314 }
315
316 async fn scan(
317 &self,
318 _state: &dyn Session,
319 projection: Option<&Vec<usize>>,
320 filters: &[Expr],
321 limit: Option<usize>,
322 ) -> DFResult<Arc<dyn ExecutionPlan>> {
323 Ok(Arc::new(IcebergTableScan::new(
325 self.table.clone(),
326 self.snapshot_id,
327 self.schema.clone(),
328 projection,
329 filters,
330 limit,
331 )))
332 }
333
334 fn supports_filters_pushdown(
335 &self,
336 filters: &[&Expr],
337 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
338 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
340 }
341
342 async fn insert_into(
343 &self,
344 _state: &dyn Session,
345 _input: Arc<dyn ExecutionPlan>,
346 _insert_op: InsertOp,
347 ) -> DFResult<Arc<dyn ExecutionPlan>> {
348 Err(to_datafusion_error(Error::new(
349 ErrorKind::FeatureUnsupported,
350 "Write operations are not supported on IcebergStaticTableProvider. \
351 Use IcebergTableProvider with a catalog for write support."
352 .to_string(),
353 )))
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::collections::HashMap;
360 use std::sync::Arc;
361
362 use datafusion::common::Column;
363 use datafusion::physical_plan::ExecutionPlan;
364 use datafusion::prelude::SessionContext;
365 use iceberg::io::FileIO;
366 use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
367 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
368 use iceberg::table::{StaticTable, Table};
369 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
370 use tempfile::TempDir;
371
372 use super::*;
373
374 async fn get_test_table_from_metadata_file() -> Table {
375 let metadata_file_name = "TableMetadataV2Valid.json";
376 let metadata_file_path = format!(
377 "{}/tests/test_data/{}",
378 env!("CARGO_MANIFEST_DIR"),
379 metadata_file_name
380 );
381 let file_io = FileIO::new_with_fs();
382 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
383 let static_table =
384 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
385 .await
386 .unwrap();
387 static_table.into_table()
388 }
389
390 async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
391 let temp_dir = TempDir::new().unwrap();
392 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
393
394 let catalog = MemoryCatalogBuilder::default()
395 .load(
396 "memory",
397 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
398 )
399 .await
400 .unwrap();
401
402 let namespace = NamespaceIdent::new("test_ns".to_string());
403 catalog
404 .create_namespace(&namespace, HashMap::new())
405 .await
406 .unwrap();
407
408 let schema = Schema::builder()
409 .with_schema_id(0)
410 .with_fields(vec![
411 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
412 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
413 ])
414 .build()
415 .unwrap();
416
417 let table_creation = TableCreation::builder()
418 .name("test_table".to_string())
419 .location(format!("{warehouse_path}/test_table"))
420 .schema(schema)
421 .properties(HashMap::new())
422 .build();
423
424 catalog
425 .create_table(&namespace, table_creation)
426 .await
427 .unwrap();
428
429 (
430 Arc::new(catalog),
431 namespace,
432 "test_table".to_string(),
433 temp_dir,
434 )
435 }
436
437 #[tokio::test]
440 async fn test_static_provider_from_table() {
441 let table = get_test_table_from_metadata_file().await;
442 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
443 .await
444 .unwrap();
445 let ctx = SessionContext::new();
446 ctx.register_table("mytable", Arc::new(table_provider))
447 .unwrap();
448 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
449 let df_schema = df.schema();
450 let df_columns = df_schema.fields();
451 assert_eq!(df_columns.len(), 3);
452 let x_column = df_columns.first().unwrap();
453 let column_data = format!(
454 "{:?}:{:?}",
455 x_column.name(),
456 x_column.data_type().to_string()
457 );
458 assert_eq!(column_data, "\"x\":\"Int64\"");
459 let has_column = df_schema.has_column(&Column::from_name("z"));
460 assert!(has_column);
461 }
462
463 #[tokio::test]
464 async fn test_static_provider_from_snapshot() {
465 let table = get_test_table_from_metadata_file().await;
466 let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
467 let table_provider =
468 IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
469 .await
470 .unwrap();
471 let ctx = SessionContext::new();
472 ctx.register_table("mytable", Arc::new(table_provider))
473 .unwrap();
474 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
475 let df_schema = df.schema();
476 let df_columns = df_schema.fields();
477 assert_eq!(df_columns.len(), 3);
478 let x_column = df_columns.first().unwrap();
479 let column_data = format!(
480 "{:?}:{:?}",
481 x_column.name(),
482 x_column.data_type().to_string()
483 );
484 assert_eq!(column_data, "\"x\":\"Int64\"");
485 let has_column = df_schema.has_column(&Column::from_name("z"));
486 assert!(has_column);
487 }
488
489 #[tokio::test]
490 async fn test_static_provider_rejects_writes() {
491 let table = get_test_table_from_metadata_file().await;
492 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
493 .await
494 .unwrap();
495 let ctx = SessionContext::new();
496 ctx.register_table("mytable", Arc::new(table_provider))
497 .unwrap();
498
499 let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
501
502 assert!(
505 result.is_err() || {
506 let df = result.unwrap();
507 df.collect().await.is_err()
508 }
509 );
510 }
511
512 #[tokio::test]
513 async fn test_static_provider_scan() {
514 let table = get_test_table_from_metadata_file().await;
515 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
516 .await
517 .unwrap();
518 let ctx = SessionContext::new();
519 ctx.register_table("mytable", Arc::new(table_provider))
520 .unwrap();
521
522 let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
524 let physical_plan = df.create_physical_plan().await;
525 assert!(physical_plan.is_ok());
526 }
527
528 #[tokio::test]
531 async fn test_catalog_backed_provider_creation() {
532 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
533
534 let provider =
536 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
537 .await
538 .unwrap();
539
540 let schema = provider.schema();
542 assert_eq!(schema.fields().len(), 2);
543 assert_eq!(schema.field(0).name(), "id");
544 assert_eq!(schema.field(1).name(), "name");
545 }
546
547 #[tokio::test]
548 async fn test_catalog_backed_provider_scan() {
549 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
550
551 let provider =
552 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
553 .await
554 .unwrap();
555
556 let ctx = SessionContext::new();
557 ctx.register_table("test_table", Arc::new(provider))
558 .unwrap();
559
560 let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
562
563 let df_schema = df.schema();
565 assert_eq!(df_schema.fields().len(), 2);
566 assert_eq!(df_schema.field(0).name(), "id");
567 assert_eq!(df_schema.field(1).name(), "name");
568
569 let physical_plan = df.create_physical_plan().await;
570 assert!(physical_plan.is_ok());
571 }
572
573 #[tokio::test]
574 async fn test_catalog_backed_provider_insert() {
575 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
576
577 let provider =
578 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
579 .await
580 .unwrap();
581
582 let ctx = SessionContext::new();
583 ctx.register_table("test_table", Arc::new(provider))
584 .unwrap();
585
586 let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
588
589 assert!(result.is_ok());
591
592 let df = result.unwrap();
594 let execution_result = df.collect().await;
595
596 assert!(execution_result.is_ok());
598 }
599
600 #[tokio::test]
601 async fn test_physical_input_schema_consistent_with_logical_input_schema() {
602 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
603
604 let provider =
605 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
606 .await
607 .unwrap();
608
609 let ctx = SessionContext::new();
610 ctx.register_table("test_table", Arc::new(provider))
611 .unwrap();
612
613 let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
615
616 let logical_schema = df.schema().clone();
618
619 let physical_plan = df.create_physical_plan().await.unwrap();
621 let physical_schema = physical_plan.schema();
622
623 assert_eq!(
625 logical_schema.fields().len(),
626 physical_schema.fields().len()
627 );
628
629 for (logical_field, physical_field) in logical_schema
630 .fields()
631 .iter()
632 .zip(physical_schema.fields().iter())
633 {
634 assert_eq!(logical_field.name(), physical_field.name());
635 assert_eq!(logical_field.data_type(), physical_field.data_type());
636 }
637 }
638
639 async fn get_partitioned_test_catalog_and_table(
640 fanout_enabled: Option<bool>,
641 ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
642 use iceberg::spec::{Transform, UnboundPartitionSpec};
643
644 let temp_dir = TempDir::new().unwrap();
645 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
646
647 let catalog = MemoryCatalogBuilder::default()
648 .load(
649 "memory",
650 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
651 )
652 .await
653 .unwrap();
654
655 let namespace = NamespaceIdent::new("test_ns".to_string());
656 catalog
657 .create_namespace(&namespace, HashMap::new())
658 .await
659 .unwrap();
660
661 let schema = Schema::builder()
662 .with_schema_id(0)
663 .with_fields(vec![
664 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
665 NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
666 ])
667 .build()
668 .unwrap();
669
670 let partition_spec = UnboundPartitionSpec::builder()
671 .with_spec_id(0)
672 .add_partition_field(2, "category", Transform::Identity)
673 .unwrap()
674 .build();
675
676 let mut properties = HashMap::new();
677 if let Some(enabled) = fanout_enabled {
678 properties.insert(
679 iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
680 .to_string(),
681 enabled.to_string(),
682 );
683 }
684
685 let table_creation = TableCreation::builder()
686 .name("partitioned_table".to_string())
687 .location(format!("{warehouse_path}/partitioned_table"))
688 .schema(schema)
689 .partition_spec(partition_spec)
690 .properties(properties)
691 .build();
692
693 catalog
694 .create_table(&namespace, table_creation)
695 .await
696 .unwrap();
697
698 (
699 Arc::new(catalog),
700 namespace,
701 "partitioned_table".to_string(),
702 temp_dir,
703 )
704 }
705
706 fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
708 if plan.name() == "SortExec" {
709 return true;
710 }
711 for child in plan.children() {
712 if plan_contains_sort(child) {
713 return true;
714 }
715 }
716 false
717 }
718
719 #[tokio::test]
720 async fn test_catalog_backed_provider_rejects_non_append_op() {
721 use datafusion::physical_plan::empty::EmptyExec;
722
723 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
724 let provider = IcebergTableProvider::try_new(catalog, namespace, table_name)
725 .await
726 .unwrap();
727 let ctx = SessionContext::new();
728
729 for (insert_op, expected_message) in [
730 (
731 InsertOp::Overwrite,
732 "IcebergTableProvider supports only append inserts, got Insert Overwrite",
733 ),
734 (
735 InsertOp::Replace,
736 "IcebergTableProvider supports only append inserts, got Replace Into",
737 ),
738 ] {
739 let input = Arc::new(EmptyExec::new(provider.schema())) as Arc<dyn ExecutionPlan>;
740 let error = provider
741 .insert_into(&ctx.state(), input, insert_op)
742 .await
743 .expect_err("non-append inserts should be rejected");
744
745 assert!(
746 matches!(
747 error,
748 DataFusionError::NotImplemented(ref message) if message == expected_message
749 ),
750 "unexpected error: {error}"
751 );
752 }
753 }
754
755 #[tokio::test]
756 async fn test_insert_plan_fanout_enabled_no_sort() {
757 use datafusion::datasource::TableProvider;
758 use datafusion::logical_expr::dml::InsertOp;
759 use datafusion::physical_plan::empty::EmptyExec;
760
761 let (catalog, namespace, table_name, _temp_dir) =
763 get_partitioned_test_catalog_and_table(Some(true)).await;
764
765 let provider =
766 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
767 .await
768 .unwrap();
769
770 let ctx = SessionContext::new();
771 let input_schema = provider.schema();
772 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
773
774 let state = ctx.state();
775 let insert_plan = provider
776 .insert_into(&state, input, InsertOp::Append)
777 .await
778 .unwrap();
779
780 assert!(
782 !plan_contains_sort(&insert_plan),
783 "Plan should NOT contain SortExec when fanout is enabled"
784 );
785 }
786
787 #[tokio::test]
788 async fn test_insert_plan_fanout_disabled_has_sort() {
789 use datafusion::datasource::TableProvider;
790 use datafusion::logical_expr::dml::InsertOp;
791 use datafusion::physical_plan::empty::EmptyExec;
792
793 let (catalog, namespace, table_name, _temp_dir) =
795 get_partitioned_test_catalog_and_table(Some(false)).await;
796
797 let provider =
798 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
799 .await
800 .unwrap();
801
802 let ctx = SessionContext::new();
803 let input_schema = provider.schema();
804 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
805
806 let state = ctx.state();
807 let insert_plan = provider
808 .insert_into(&state, input, InsertOp::Append)
809 .await
810 .unwrap();
811
812 assert!(
814 plan_contains_sort(&insert_plan),
815 "Plan should contain SortExec when fanout is disabled"
816 );
817 }
818
819 #[tokio::test]
820 async fn test_limit_pushdown_static_provider() {
821 use datafusion::datasource::TableProvider;
822
823 let table = get_test_table_from_metadata_file().await;
824 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
825 .await
826 .unwrap();
827
828 let ctx = SessionContext::new();
829 let state = ctx.state();
830
831 let scan_plan = table_provider
833 .scan(&state, None, &[], Some(10))
834 .await
835 .unwrap();
836
837 let iceberg_scan = scan_plan
839 .as_any()
840 .downcast_ref::<IcebergTableScan>()
841 .expect("Expected IcebergTableScan");
842
843 assert_eq!(
845 iceberg_scan.limit(),
846 Some(10),
847 "Limit should be set to 10 in the scan plan"
848 );
849 }
850
851 #[tokio::test]
852 async fn test_limit_pushdown_catalog_backed_provider() {
853 use datafusion::datasource::TableProvider;
854
855 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
856
857 let provider =
858 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
859 .await
860 .unwrap();
861
862 let ctx = SessionContext::new();
863 let state = ctx.state();
864
865 let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
867
868 let iceberg_scan = scan_plan
870 .as_any()
871 .downcast_ref::<IcebergTableScan>()
872 .expect("Expected IcebergTableScan");
873
874 assert_eq!(
876 iceberg_scan.limit(),
877 Some(5),
878 "Limit should be set to 5 in the scan plan"
879 );
880 }
881
882 #[tokio::test]
883 async fn test_no_limit_pushdown() {
884 use datafusion::datasource::TableProvider;
885
886 let table = get_test_table_from_metadata_file().await;
887 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
888 .await
889 .unwrap();
890
891 let ctx = SessionContext::new();
892 let state = ctx.state();
893
894 let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
896
897 let iceberg_scan = scan_plan
899 .as_any()
900 .downcast_ref::<IcebergTableScan>()
901 .expect("Expected IcebergTableScan");
902
903 assert_eq!(
905 iceberg_scan.limit(),
906 None,
907 "Limit should be None when not specified"
908 );
909 }
910}