1pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70 catalog: Arc<dyn Catalog>,
72 table_ident: TableIdent,
74 schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79 pub(crate) async fn try_new(
84 catalog: Arc<dyn Catalog>,
85 namespace: NamespaceIdent,
86 name: impl Into<String>,
87 ) -> Result<Self> {
88 let table_ident = TableIdent::new(namespace, name.into());
89
90 let table = catalog.load_table(&table_ident).await?;
92 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94 Ok(IcebergTableProvider {
95 catalog,
96 table_ident,
97 schema,
98 })
99 }
100
101 pub(crate) async fn metadata_table(
102 &self,
103 r#type: MetadataTableType,
104 ) -> Result<IcebergMetadataTableProvider> {
105 let table = self.catalog.load_table(&self.table_ident).await?;
107 Ok(IcebergMetadataTableProvider { table, r#type })
108 }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn schema(&self) -> ArrowSchemaRef {
118 self.schema.clone()
119 }
120
121 fn table_type(&self) -> TableType {
122 TableType::Base
123 }
124
125 async fn scan(
126 &self,
127 _state: &dyn Session,
128 projection: Option<&Vec<usize>>,
129 filters: &[Expr],
130 limit: Option<usize>,
131 ) -> DFResult<Arc<dyn ExecutionPlan>> {
132 let table = self
134 .catalog
135 .load_table(&self.table_ident)
136 .await
137 .map_err(to_datafusion_error)?;
138
139 Ok(Arc::new(IcebergTableScan::new(
141 table,
142 None, self.schema.clone(),
144 projection,
145 filters,
146 limit,
147 )))
148 }
149
150 fn supports_filters_pushdown(
151 &self,
152 filters: &[&Expr],
153 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156 }
157
158 async fn insert_into(
159 &self,
160 state: &dyn Session,
161 input: Arc<dyn ExecutionPlan>,
162 _insert_op: InsertOp,
163 ) -> DFResult<Arc<dyn ExecutionPlan>> {
164 let table = self
166 .catalog
167 .load_table(&self.table_ident)
168 .await
169 .map_err(to_datafusion_error)?;
170
171 let partition_spec = table.metadata().default_partition_spec();
172
173 let plan_with_partition = if !partition_spec.is_unpartitioned() {
175 project_with_partition(input, &table)?
176 } else {
177 input
178 };
179
180 let target_partitions =
182 NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
183 DataFusionError::Configuration(
184 "target_partitions must be greater than 0".to_string(),
185 )
186 })?;
187
188 let repartitioned_plan =
189 repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
190
191 let fanout_enabled = table
193 .metadata()
194 .properties()
195 .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
196 .map(|value| {
197 value
198 .parse::<bool>()
199 .map_err(|e| {
200 Error::new(
201 ErrorKind::DataInvalid,
202 format!(
203 "Invalid value for {}, expected 'true' or 'false'",
204 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
205 ),
206 )
207 .with_source(e)
208 })
209 .map_err(to_datafusion_error)
210 })
211 .transpose()?
212 .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
213
214 let write_input = if fanout_enabled {
215 repartitioned_plan
216 } else {
217 sort_by_partition(repartitioned_plan)?
218 };
219
220 let write_plan = Arc::new(IcebergWriteExec::new(
221 table.clone(),
222 write_input,
223 self.schema.clone(),
224 ));
225
226 let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
228
229 Ok(Arc::new(IcebergCommitExec::new(
230 table,
231 self.catalog.clone(),
232 coalesce_partitions,
233 self.schema.clone(),
234 )))
235 }
236}
237
238#[derive(Debug, Clone)]
247pub struct IcebergStaticTableProvider {
248 table: Table,
250 snapshot_id: Option<i64>,
252 schema: ArrowSchemaRef,
254}
255
256impl IcebergStaticTableProvider {
257 pub async fn try_new_from_table(table: Table) -> Result<Self> {
261 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
262 Ok(IcebergStaticTableProvider {
263 table,
264 snapshot_id: None,
265 schema,
266 })
267 }
268
269 pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
274 let snapshot = table
275 .metadata()
276 .snapshot_by_id(snapshot_id)
277 .ok_or_else(|| {
278 Error::new(
279 ErrorKind::Unexpected,
280 format!(
281 "snapshot id {snapshot_id} not found in table {}",
282 table.identifier().name()
283 ),
284 )
285 })?;
286 let table_schema = snapshot.schema(table.metadata())?;
287 let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
288 Ok(IcebergStaticTableProvider {
289 table,
290 snapshot_id: Some(snapshot_id),
291 schema,
292 })
293 }
294}
295
296#[async_trait]
297impl TableProvider for IcebergStaticTableProvider {
298 fn as_any(&self) -> &dyn Any {
299 self
300 }
301
302 fn schema(&self) -> ArrowSchemaRef {
303 self.schema.clone()
304 }
305
306 fn table_type(&self) -> TableType {
307 TableType::Base
308 }
309
310 async fn scan(
311 &self,
312 _state: &dyn Session,
313 projection: Option<&Vec<usize>>,
314 filters: &[Expr],
315 limit: Option<usize>,
316 ) -> DFResult<Arc<dyn ExecutionPlan>> {
317 Ok(Arc::new(IcebergTableScan::new(
319 self.table.clone(),
320 self.snapshot_id,
321 self.schema.clone(),
322 projection,
323 filters,
324 limit,
325 )))
326 }
327
328 fn supports_filters_pushdown(
329 &self,
330 filters: &[&Expr],
331 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
332 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
334 }
335
336 async fn insert_into(
337 &self,
338 _state: &dyn Session,
339 _input: Arc<dyn ExecutionPlan>,
340 _insert_op: InsertOp,
341 ) -> DFResult<Arc<dyn ExecutionPlan>> {
342 Err(to_datafusion_error(Error::new(
343 ErrorKind::FeatureUnsupported,
344 "Write operations are not supported on IcebergStaticTableProvider. \
345 Use IcebergTableProvider with a catalog for write support."
346 .to_string(),
347 )))
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use std::collections::HashMap;
354 use std::sync::Arc;
355
356 use datafusion::common::Column;
357 use datafusion::physical_plan::ExecutionPlan;
358 use datafusion::prelude::SessionContext;
359 use iceberg::io::FileIO;
360 use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
361 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
362 use iceberg::table::{StaticTable, Table};
363 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
364 use tempfile::TempDir;
365
366 use super::*;
367
368 async fn get_test_table_from_metadata_file() -> Table {
369 let metadata_file_name = "TableMetadataV2Valid.json";
370 let metadata_file_path = format!(
371 "{}/tests/test_data/{}",
372 env!("CARGO_MANIFEST_DIR"),
373 metadata_file_name
374 );
375 let file_io = FileIO::from_path(&metadata_file_path)
376 .unwrap()
377 .build()
378 .unwrap();
379 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
380 let static_table =
381 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
382 .await
383 .unwrap();
384 static_table.into_table()
385 }
386
387 async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
388 let temp_dir = TempDir::new().unwrap();
389 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
390
391 let catalog = MemoryCatalogBuilder::default()
392 .load(
393 "memory",
394 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
395 )
396 .await
397 .unwrap();
398
399 let namespace = NamespaceIdent::new("test_ns".to_string());
400 catalog
401 .create_namespace(&namespace, HashMap::new())
402 .await
403 .unwrap();
404
405 let schema = Schema::builder()
406 .with_schema_id(0)
407 .with_fields(vec![
408 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
409 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
410 ])
411 .build()
412 .unwrap();
413
414 let table_creation = TableCreation::builder()
415 .name("test_table".to_string())
416 .location(format!("{warehouse_path}/test_table"))
417 .schema(schema)
418 .properties(HashMap::new())
419 .build();
420
421 catalog
422 .create_table(&namespace, table_creation)
423 .await
424 .unwrap();
425
426 (
427 Arc::new(catalog),
428 namespace,
429 "test_table".to_string(),
430 temp_dir,
431 )
432 }
433
434 #[tokio::test]
437 async fn test_static_provider_from_table() {
438 let table = get_test_table_from_metadata_file().await;
439 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
440 .await
441 .unwrap();
442 let ctx = SessionContext::new();
443 ctx.register_table("mytable", Arc::new(table_provider))
444 .unwrap();
445 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
446 let df_schema = df.schema();
447 let df_columns = df_schema.fields();
448 assert_eq!(df_columns.len(), 3);
449 let x_column = df_columns.first().unwrap();
450 let column_data = format!(
451 "{:?}:{:?}",
452 x_column.name(),
453 x_column.data_type().to_string()
454 );
455 assert_eq!(column_data, "\"x\":\"Int64\"");
456 let has_column = df_schema.has_column(&Column::from_name("z"));
457 assert!(has_column);
458 }
459
460 #[tokio::test]
461 async fn test_static_provider_from_snapshot() {
462 let table = get_test_table_from_metadata_file().await;
463 let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
464 let table_provider =
465 IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
466 .await
467 .unwrap();
468 let ctx = SessionContext::new();
469 ctx.register_table("mytable", Arc::new(table_provider))
470 .unwrap();
471 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
472 let df_schema = df.schema();
473 let df_columns = df_schema.fields();
474 assert_eq!(df_columns.len(), 3);
475 let x_column = df_columns.first().unwrap();
476 let column_data = format!(
477 "{:?}:{:?}",
478 x_column.name(),
479 x_column.data_type().to_string()
480 );
481 assert_eq!(column_data, "\"x\":\"Int64\"");
482 let has_column = df_schema.has_column(&Column::from_name("z"));
483 assert!(has_column);
484 }
485
486 #[tokio::test]
487 async fn test_static_provider_rejects_writes() {
488 let table = get_test_table_from_metadata_file().await;
489 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
490 .await
491 .unwrap();
492 let ctx = SessionContext::new();
493 ctx.register_table("mytable", Arc::new(table_provider))
494 .unwrap();
495
496 let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
498
499 assert!(
502 result.is_err() || {
503 let df = result.unwrap();
504 df.collect().await.is_err()
505 }
506 );
507 }
508
509 #[tokio::test]
510 async fn test_static_provider_scan() {
511 let table = get_test_table_from_metadata_file().await;
512 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
513 .await
514 .unwrap();
515 let ctx = SessionContext::new();
516 ctx.register_table("mytable", Arc::new(table_provider))
517 .unwrap();
518
519 let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
521 let physical_plan = df.create_physical_plan().await;
522 assert!(physical_plan.is_ok());
523 }
524
525 #[tokio::test]
528 async fn test_catalog_backed_provider_creation() {
529 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
530
531 let provider =
533 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
534 .await
535 .unwrap();
536
537 let schema = provider.schema();
539 assert_eq!(schema.fields().len(), 2);
540 assert_eq!(schema.field(0).name(), "id");
541 assert_eq!(schema.field(1).name(), "name");
542 }
543
544 #[tokio::test]
545 async fn test_catalog_backed_provider_scan() {
546 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
547
548 let provider =
549 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
550 .await
551 .unwrap();
552
553 let ctx = SessionContext::new();
554 ctx.register_table("test_table", Arc::new(provider))
555 .unwrap();
556
557 let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
559
560 let df_schema = df.schema();
562 assert_eq!(df_schema.fields().len(), 2);
563 assert_eq!(df_schema.field(0).name(), "id");
564 assert_eq!(df_schema.field(1).name(), "name");
565
566 let physical_plan = df.create_physical_plan().await;
567 assert!(physical_plan.is_ok());
568 }
569
570 #[tokio::test]
571 async fn test_catalog_backed_provider_insert() {
572 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
573
574 let provider =
575 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
576 .await
577 .unwrap();
578
579 let ctx = SessionContext::new();
580 ctx.register_table("test_table", Arc::new(provider))
581 .unwrap();
582
583 let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
585
586 assert!(result.is_ok());
588
589 let df = result.unwrap();
591 let execution_result = df.collect().await;
592
593 assert!(execution_result.is_ok());
595 }
596
597 #[tokio::test]
598 async fn test_physical_input_schema_consistent_with_logical_input_schema() {
599 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
600
601 let provider =
602 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
603 .await
604 .unwrap();
605
606 let ctx = SessionContext::new();
607 ctx.register_table("test_table", Arc::new(provider))
608 .unwrap();
609
610 let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
612
613 let logical_schema = df.schema().clone();
615
616 let physical_plan = df.create_physical_plan().await.unwrap();
618 let physical_schema = physical_plan.schema();
619
620 assert_eq!(
622 logical_schema.fields().len(),
623 physical_schema.fields().len()
624 );
625
626 for (logical_field, physical_field) in logical_schema
627 .fields()
628 .iter()
629 .zip(physical_schema.fields().iter())
630 {
631 assert_eq!(logical_field.name(), physical_field.name());
632 assert_eq!(logical_field.data_type(), physical_field.data_type());
633 }
634 }
635
636 async fn get_partitioned_test_catalog_and_table(
637 fanout_enabled: Option<bool>,
638 ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
639 use iceberg::spec::{Transform, UnboundPartitionSpec};
640
641 let temp_dir = TempDir::new().unwrap();
642 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
643
644 let catalog = MemoryCatalogBuilder::default()
645 .load(
646 "memory",
647 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
648 )
649 .await
650 .unwrap();
651
652 let namespace = NamespaceIdent::new("test_ns".to_string());
653 catalog
654 .create_namespace(&namespace, HashMap::new())
655 .await
656 .unwrap();
657
658 let schema = Schema::builder()
659 .with_schema_id(0)
660 .with_fields(vec![
661 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
662 NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
663 ])
664 .build()
665 .unwrap();
666
667 let partition_spec = UnboundPartitionSpec::builder()
668 .with_spec_id(0)
669 .add_partition_field(2, "category", Transform::Identity)
670 .unwrap()
671 .build();
672
673 let mut properties = HashMap::new();
674 if let Some(enabled) = fanout_enabled {
675 properties.insert(
676 iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
677 .to_string(),
678 enabled.to_string(),
679 );
680 }
681
682 let table_creation = TableCreation::builder()
683 .name("partitioned_table".to_string())
684 .location(format!("{warehouse_path}/partitioned_table"))
685 .schema(schema)
686 .partition_spec(partition_spec)
687 .properties(properties)
688 .build();
689
690 catalog
691 .create_table(&namespace, table_creation)
692 .await
693 .unwrap();
694
695 (
696 Arc::new(catalog),
697 namespace,
698 "partitioned_table".to_string(),
699 temp_dir,
700 )
701 }
702
703 fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
705 if plan.name() == "SortExec" {
706 return true;
707 }
708 for child in plan.children() {
709 if plan_contains_sort(child) {
710 return true;
711 }
712 }
713 false
714 }
715
716 #[tokio::test]
717 async fn test_insert_plan_fanout_enabled_no_sort() {
718 use datafusion::datasource::TableProvider;
719 use datafusion::logical_expr::dml::InsertOp;
720 use datafusion::physical_plan::empty::EmptyExec;
721
722 let (catalog, namespace, table_name, _temp_dir) =
724 get_partitioned_test_catalog_and_table(Some(true)).await;
725
726 let provider =
727 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
728 .await
729 .unwrap();
730
731 let ctx = SessionContext::new();
732 let input_schema = provider.schema();
733 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
734
735 let state = ctx.state();
736 let insert_plan = provider
737 .insert_into(&state, input, InsertOp::Append)
738 .await
739 .unwrap();
740
741 assert!(
743 !plan_contains_sort(&insert_plan),
744 "Plan should NOT contain SortExec when fanout is enabled"
745 );
746 }
747
748 #[tokio::test]
749 async fn test_insert_plan_fanout_disabled_has_sort() {
750 use datafusion::datasource::TableProvider;
751 use datafusion::logical_expr::dml::InsertOp;
752 use datafusion::physical_plan::empty::EmptyExec;
753
754 let (catalog, namespace, table_name, _temp_dir) =
756 get_partitioned_test_catalog_and_table(Some(false)).await;
757
758 let provider =
759 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
760 .await
761 .unwrap();
762
763 let ctx = SessionContext::new();
764 let input_schema = provider.schema();
765 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
766
767 let state = ctx.state();
768 let insert_plan = provider
769 .insert_into(&state, input, InsertOp::Append)
770 .await
771 .unwrap();
772
773 assert!(
775 plan_contains_sort(&insert_plan),
776 "Plan should contain SortExec when fanout is disabled"
777 );
778 }
779
780 #[tokio::test]
781 async fn test_limit_pushdown_static_provider() {
782 use datafusion::datasource::TableProvider;
783
784 let table = get_test_table_from_metadata_file().await;
785 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
786 .await
787 .unwrap();
788
789 let ctx = SessionContext::new();
790 let state = ctx.state();
791
792 let scan_plan = table_provider
794 .scan(&state, None, &[], Some(10))
795 .await
796 .unwrap();
797
798 let iceberg_scan = scan_plan
800 .as_any()
801 .downcast_ref::<IcebergTableScan>()
802 .expect("Expected IcebergTableScan");
803
804 assert_eq!(
806 iceberg_scan.limit(),
807 Some(10),
808 "Limit should be set to 10 in the scan plan"
809 );
810 }
811
812 #[tokio::test]
813 async fn test_limit_pushdown_catalog_backed_provider() {
814 use datafusion::datasource::TableProvider;
815
816 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
817
818 let provider =
819 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
820 .await
821 .unwrap();
822
823 let ctx = SessionContext::new();
824 let state = ctx.state();
825
826 let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
828
829 let iceberg_scan = scan_plan
831 .as_any()
832 .downcast_ref::<IcebergTableScan>()
833 .expect("Expected IcebergTableScan");
834
835 assert_eq!(
837 iceberg_scan.limit(),
838 Some(5),
839 "Limit should be set to 5 in the scan plan"
840 );
841 }
842
843 #[tokio::test]
844 async fn test_no_limit_pushdown() {
845 use datafusion::datasource::TableProvider;
846
847 let table = get_test_table_from_metadata_file().await;
848 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
849 .await
850 .unwrap();
851
852 let ctx = SessionContext::new();
853 let state = ctx.state();
854
855 let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
857
858 let iceberg_scan = scan_plan
860 .as_any()
861 .downcast_ref::<IcebergTableScan>()
862 .expect("Expected IcebergTableScan");
863
864 assert_eq!(
866 iceberg_scan.limit(),
867 None,
868 "Limit should be None when not specified"
869 );
870 }
871}