1pub mod metadata_table;
29pub mod table_provider_factory;
30
31use std::any::Any;
32use std::num::NonZeroUsize;
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
37use datafusion::catalog::Session;
38use datafusion::common::DataFusionError;
39use datafusion::datasource::{TableProvider, TableType};
40use datafusion::error::Result as DFResult;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
43use datafusion::physical_plan::ExecutionPlan;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use iceberg::arrow::schema_to_arrow_schema;
46use iceberg::inspect::MetadataTableType;
47use iceberg::spec::TableProperties;
48use iceberg::table::Table;
49use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
50use metadata_table::IcebergMetadataTableProvider;
51
52use crate::error::to_datafusion_error;
53use crate::physical_plan::commit::IcebergCommitExec;
54use crate::physical_plan::project::project_with_partition;
55use crate::physical_plan::repartition::repartition;
56use crate::physical_plan::scan::IcebergTableScan;
57use crate::physical_plan::sort::sort_by_partition;
58use crate::physical_plan::write::IcebergWriteExec;
59
60#[derive(Debug, Clone)]
69pub struct IcebergTableProvider {
70 catalog: Arc<dyn Catalog>,
72 table_ident: TableIdent,
74 schema: ArrowSchemaRef,
76}
77
78impl IcebergTableProvider {
79 pub(crate) async fn try_new(
84 catalog: Arc<dyn Catalog>,
85 namespace: NamespaceIdent,
86 name: impl Into<String>,
87 ) -> Result<Self> {
88 let table_ident = TableIdent::new(namespace, name.into());
89
90 let table = catalog.load_table(&table_ident).await?;
92 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93
94 Ok(IcebergTableProvider {
95 catalog,
96 table_ident,
97 schema,
98 })
99 }
100
101 pub(crate) async fn metadata_table(
102 &self,
103 r#type: MetadataTableType,
104 ) -> Result<IcebergMetadataTableProvider> {
105 let table = self.catalog.load_table(&self.table_ident).await?;
107 Ok(IcebergMetadataTableProvider { table, r#type })
108 }
109}
110
111#[async_trait]
112impl TableProvider for IcebergTableProvider {
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn schema(&self) -> ArrowSchemaRef {
118 self.schema.clone()
119 }
120
121 fn table_type(&self) -> TableType {
122 TableType::Base
123 }
124
125 async fn scan(
126 &self,
127 _state: &dyn Session,
128 projection: Option<&Vec<usize>>,
129 filters: &[Expr],
130 limit: Option<usize>,
131 ) -> DFResult<Arc<dyn ExecutionPlan>> {
132 let table = self
134 .catalog
135 .load_table(&self.table_ident)
136 .await
137 .map_err(to_datafusion_error)?;
138
139 Ok(Arc::new(IcebergTableScan::new(
141 table,
142 None, self.schema.clone(),
144 projection,
145 filters,
146 limit,
147 )))
148 }
149
150 fn supports_filters_pushdown(
151 &self,
152 filters: &[&Expr],
153 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
154 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
156 }
157
158 async fn insert_into(
159 &self,
160 state: &dyn Session,
161 input: Arc<dyn ExecutionPlan>,
162 _insert_op: InsertOp,
163 ) -> DFResult<Arc<dyn ExecutionPlan>> {
164 let table = self
166 .catalog
167 .load_table(&self.table_ident)
168 .await
169 .map_err(to_datafusion_error)?;
170
171 let partition_spec = table.metadata().default_partition_spec();
172
173 let plan_with_partition = if !partition_spec.is_unpartitioned() {
175 project_with_partition(input, &table)?
176 } else {
177 input
178 };
179
180 let target_partitions =
182 NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
183 DataFusionError::Configuration(
184 "target_partitions must be greater than 0".to_string(),
185 )
186 })?;
187
188 let repartitioned_plan =
189 repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;
190
191 let fanout_enabled = table
193 .metadata()
194 .properties()
195 .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
196 .map(|value| {
197 value
198 .parse::<bool>()
199 .map_err(|e| {
200 Error::new(
201 ErrorKind::DataInvalid,
202 format!(
203 "Invalid value for {}, expected 'true' or 'false'",
204 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
205 ),
206 )
207 .with_source(e)
208 })
209 .map_err(to_datafusion_error)
210 })
211 .transpose()?
212 .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
213
214 let write_input = if fanout_enabled {
215 repartitioned_plan
216 } else {
217 sort_by_partition(repartitioned_plan)?
218 };
219
220 let write_plan = Arc::new(IcebergWriteExec::new(
221 table.clone(),
222 write_input,
223 self.schema.clone(),
224 ));
225
226 let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
228
229 Ok(Arc::new(IcebergCommitExec::new(
230 table,
231 self.catalog.clone(),
232 coalesce_partitions,
233 self.schema.clone(),
234 )))
235 }
236}
237
238#[derive(Debug, Clone)]
247pub struct IcebergStaticTableProvider {
248 table: Table,
250 snapshot_id: Option<i64>,
252 schema: ArrowSchemaRef,
254}
255
256impl IcebergStaticTableProvider {
257 pub async fn try_new_from_table(table: Table) -> Result<Self> {
261 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
262 Ok(IcebergStaticTableProvider {
263 table,
264 snapshot_id: None,
265 schema,
266 })
267 }
268
269 pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
274 let snapshot = table
275 .metadata()
276 .snapshot_by_id(snapshot_id)
277 .ok_or_else(|| {
278 Error::new(
279 ErrorKind::Unexpected,
280 format!(
281 "snapshot id {snapshot_id} not found in table {}",
282 table.identifier().name()
283 ),
284 )
285 })?;
286 let table_schema = snapshot.schema(table.metadata())?;
287 let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
288 Ok(IcebergStaticTableProvider {
289 table,
290 snapshot_id: Some(snapshot_id),
291 schema,
292 })
293 }
294}
295
296#[async_trait]
297impl TableProvider for IcebergStaticTableProvider {
298 fn as_any(&self) -> &dyn Any {
299 self
300 }
301
302 fn schema(&self) -> ArrowSchemaRef {
303 self.schema.clone()
304 }
305
306 fn table_type(&self) -> TableType {
307 TableType::Base
308 }
309
310 async fn scan(
311 &self,
312 _state: &dyn Session,
313 projection: Option<&Vec<usize>>,
314 filters: &[Expr],
315 limit: Option<usize>,
316 ) -> DFResult<Arc<dyn ExecutionPlan>> {
317 Ok(Arc::new(IcebergTableScan::new(
319 self.table.clone(),
320 self.snapshot_id,
321 self.schema.clone(),
322 projection,
323 filters,
324 limit,
325 )))
326 }
327
328 fn supports_filters_pushdown(
329 &self,
330 filters: &[&Expr],
331 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
332 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
334 }
335
336 async fn insert_into(
337 &self,
338 _state: &dyn Session,
339 _input: Arc<dyn ExecutionPlan>,
340 _insert_op: InsertOp,
341 ) -> DFResult<Arc<dyn ExecutionPlan>> {
342 Err(to_datafusion_error(Error::new(
343 ErrorKind::FeatureUnsupported,
344 "Write operations are not supported on IcebergStaticTableProvider. \
345 Use IcebergTableProvider with a catalog for write support."
346 .to_string(),
347 )))
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use std::collections::HashMap;
354 use std::sync::Arc;
355
356 use datafusion::common::Column;
357 use datafusion::physical_plan::ExecutionPlan;
358 use datafusion::prelude::SessionContext;
359 use iceberg::io::FileIO;
360 use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
361 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
362 use iceberg::table::{StaticTable, Table};
363 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
364 use tempfile::TempDir;
365
366 use super::*;
367
368 async fn get_test_table_from_metadata_file() -> Table {
369 let metadata_file_name = "TableMetadataV2Valid.json";
370 let metadata_file_path = format!(
371 "{}/tests/test_data/{}",
372 env!("CARGO_MANIFEST_DIR"),
373 metadata_file_name
374 );
375 let file_io = FileIO::new_with_fs();
376 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
377 let static_table =
378 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
379 .await
380 .unwrap();
381 static_table.into_table()
382 }
383
384 async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
385 let temp_dir = TempDir::new().unwrap();
386 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
387
388 let catalog = MemoryCatalogBuilder::default()
389 .load(
390 "memory",
391 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
392 )
393 .await
394 .unwrap();
395
396 let namespace = NamespaceIdent::new("test_ns".to_string());
397 catalog
398 .create_namespace(&namespace, HashMap::new())
399 .await
400 .unwrap();
401
402 let schema = Schema::builder()
403 .with_schema_id(0)
404 .with_fields(vec![
405 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
406 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
407 ])
408 .build()
409 .unwrap();
410
411 let table_creation = TableCreation::builder()
412 .name("test_table".to_string())
413 .location(format!("{warehouse_path}/test_table"))
414 .schema(schema)
415 .properties(HashMap::new())
416 .build();
417
418 catalog
419 .create_table(&namespace, table_creation)
420 .await
421 .unwrap();
422
423 (
424 Arc::new(catalog),
425 namespace,
426 "test_table".to_string(),
427 temp_dir,
428 )
429 }
430
431 #[tokio::test]
434 async fn test_static_provider_from_table() {
435 let table = get_test_table_from_metadata_file().await;
436 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
437 .await
438 .unwrap();
439 let ctx = SessionContext::new();
440 ctx.register_table("mytable", Arc::new(table_provider))
441 .unwrap();
442 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
443 let df_schema = df.schema();
444 let df_columns = df_schema.fields();
445 assert_eq!(df_columns.len(), 3);
446 let x_column = df_columns.first().unwrap();
447 let column_data = format!(
448 "{:?}:{:?}",
449 x_column.name(),
450 x_column.data_type().to_string()
451 );
452 assert_eq!(column_data, "\"x\":\"Int64\"");
453 let has_column = df_schema.has_column(&Column::from_name("z"));
454 assert!(has_column);
455 }
456
457 #[tokio::test]
458 async fn test_static_provider_from_snapshot() {
459 let table = get_test_table_from_metadata_file().await;
460 let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
461 let table_provider =
462 IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
463 .await
464 .unwrap();
465 let ctx = SessionContext::new();
466 ctx.register_table("mytable", Arc::new(table_provider))
467 .unwrap();
468 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
469 let df_schema = df.schema();
470 let df_columns = df_schema.fields();
471 assert_eq!(df_columns.len(), 3);
472 let x_column = df_columns.first().unwrap();
473 let column_data = format!(
474 "{:?}:{:?}",
475 x_column.name(),
476 x_column.data_type().to_string()
477 );
478 assert_eq!(column_data, "\"x\":\"Int64\"");
479 let has_column = df_schema.has_column(&Column::from_name("z"));
480 assert!(has_column);
481 }
482
483 #[tokio::test]
484 async fn test_static_provider_rejects_writes() {
485 let table = get_test_table_from_metadata_file().await;
486 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
487 .await
488 .unwrap();
489 let ctx = SessionContext::new();
490 ctx.register_table("mytable", Arc::new(table_provider))
491 .unwrap();
492
493 let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
495
496 assert!(
499 result.is_err() || {
500 let df = result.unwrap();
501 df.collect().await.is_err()
502 }
503 );
504 }
505
506 #[tokio::test]
507 async fn test_static_provider_scan() {
508 let table = get_test_table_from_metadata_file().await;
509 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
510 .await
511 .unwrap();
512 let ctx = SessionContext::new();
513 ctx.register_table("mytable", Arc::new(table_provider))
514 .unwrap();
515
516 let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
518 let physical_plan = df.create_physical_plan().await;
519 assert!(physical_plan.is_ok());
520 }
521
522 #[tokio::test]
525 async fn test_catalog_backed_provider_creation() {
526 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
527
528 let provider =
530 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
531 .await
532 .unwrap();
533
534 let schema = provider.schema();
536 assert_eq!(schema.fields().len(), 2);
537 assert_eq!(schema.field(0).name(), "id");
538 assert_eq!(schema.field(1).name(), "name");
539 }
540
541 #[tokio::test]
542 async fn test_catalog_backed_provider_scan() {
543 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
544
545 let provider =
546 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
547 .await
548 .unwrap();
549
550 let ctx = SessionContext::new();
551 ctx.register_table("test_table", Arc::new(provider))
552 .unwrap();
553
554 let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
556
557 let df_schema = df.schema();
559 assert_eq!(df_schema.fields().len(), 2);
560 assert_eq!(df_schema.field(0).name(), "id");
561 assert_eq!(df_schema.field(1).name(), "name");
562
563 let physical_plan = df.create_physical_plan().await;
564 assert!(physical_plan.is_ok());
565 }
566
567 #[tokio::test]
568 async fn test_catalog_backed_provider_insert() {
569 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
570
571 let provider =
572 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
573 .await
574 .unwrap();
575
576 let ctx = SessionContext::new();
577 ctx.register_table("test_table", Arc::new(provider))
578 .unwrap();
579
580 let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await;
582
583 assert!(result.is_ok());
585
586 let df = result.unwrap();
588 let execution_result = df.collect().await;
589
590 assert!(execution_result.is_ok());
592 }
593
594 #[tokio::test]
595 async fn test_physical_input_schema_consistent_with_logical_input_schema() {
596 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
597
598 let provider =
599 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
600 .await
601 .unwrap();
602
603 let ctx = SessionContext::new();
604 ctx.register_table("test_table", Arc::new(provider))
605 .unwrap();
606
607 let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
609
610 let logical_schema = df.schema().clone();
612
613 let physical_plan = df.create_physical_plan().await.unwrap();
615 let physical_schema = physical_plan.schema();
616
617 assert_eq!(
619 logical_schema.fields().len(),
620 physical_schema.fields().len()
621 );
622
623 for (logical_field, physical_field) in logical_schema
624 .fields()
625 .iter()
626 .zip(physical_schema.fields().iter())
627 {
628 assert_eq!(logical_field.name(), physical_field.name());
629 assert_eq!(logical_field.data_type(), physical_field.data_type());
630 }
631 }
632
633 async fn get_partitioned_test_catalog_and_table(
634 fanout_enabled: Option<bool>,
635 ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
636 use iceberg::spec::{Transform, UnboundPartitionSpec};
637
638 let temp_dir = TempDir::new().unwrap();
639 let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
640
641 let catalog = MemoryCatalogBuilder::default()
642 .load(
643 "memory",
644 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
645 )
646 .await
647 .unwrap();
648
649 let namespace = NamespaceIdent::new("test_ns".to_string());
650 catalog
651 .create_namespace(&namespace, HashMap::new())
652 .await
653 .unwrap();
654
655 let schema = Schema::builder()
656 .with_schema_id(0)
657 .with_fields(vec![
658 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
659 NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
660 ])
661 .build()
662 .unwrap();
663
664 let partition_spec = UnboundPartitionSpec::builder()
665 .with_spec_id(0)
666 .add_partition_field(2, "category", Transform::Identity)
667 .unwrap()
668 .build();
669
670 let mut properties = HashMap::new();
671 if let Some(enabled) = fanout_enabled {
672 properties.insert(
673 iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
674 .to_string(),
675 enabled.to_string(),
676 );
677 }
678
679 let table_creation = TableCreation::builder()
680 .name("partitioned_table".to_string())
681 .location(format!("{warehouse_path}/partitioned_table"))
682 .schema(schema)
683 .partition_spec(partition_spec)
684 .properties(properties)
685 .build();
686
687 catalog
688 .create_table(&namespace, table_creation)
689 .await
690 .unwrap();
691
692 (
693 Arc::new(catalog),
694 namespace,
695 "partitioned_table".to_string(),
696 temp_dir,
697 )
698 }
699
700 fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
702 if plan.name() == "SortExec" {
703 return true;
704 }
705 for child in plan.children() {
706 if plan_contains_sort(child) {
707 return true;
708 }
709 }
710 false
711 }
712
713 #[tokio::test]
714 async fn test_insert_plan_fanout_enabled_no_sort() {
715 use datafusion::datasource::TableProvider;
716 use datafusion::logical_expr::dml::InsertOp;
717 use datafusion::physical_plan::empty::EmptyExec;
718
719 let (catalog, namespace, table_name, _temp_dir) =
721 get_partitioned_test_catalog_and_table(Some(true)).await;
722
723 let provider =
724 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
725 .await
726 .unwrap();
727
728 let ctx = SessionContext::new();
729 let input_schema = provider.schema();
730 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
731
732 let state = ctx.state();
733 let insert_plan = provider
734 .insert_into(&state, input, InsertOp::Append)
735 .await
736 .unwrap();
737
738 assert!(
740 !plan_contains_sort(&insert_plan),
741 "Plan should NOT contain SortExec when fanout is enabled"
742 );
743 }
744
745 #[tokio::test]
746 async fn test_insert_plan_fanout_disabled_has_sort() {
747 use datafusion::datasource::TableProvider;
748 use datafusion::logical_expr::dml::InsertOp;
749 use datafusion::physical_plan::empty::EmptyExec;
750
751 let (catalog, namespace, table_name, _temp_dir) =
753 get_partitioned_test_catalog_and_table(Some(false)).await;
754
755 let provider =
756 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
757 .await
758 .unwrap();
759
760 let ctx = SessionContext::new();
761 let input_schema = provider.schema();
762 let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;
763
764 let state = ctx.state();
765 let insert_plan = provider
766 .insert_into(&state, input, InsertOp::Append)
767 .await
768 .unwrap();
769
770 assert!(
772 plan_contains_sort(&insert_plan),
773 "Plan should contain SortExec when fanout is disabled"
774 );
775 }
776
777 #[tokio::test]
778 async fn test_limit_pushdown_static_provider() {
779 use datafusion::datasource::TableProvider;
780
781 let table = get_test_table_from_metadata_file().await;
782 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
783 .await
784 .unwrap();
785
786 let ctx = SessionContext::new();
787 let state = ctx.state();
788
789 let scan_plan = table_provider
791 .scan(&state, None, &[], Some(10))
792 .await
793 .unwrap();
794
795 let iceberg_scan = scan_plan
797 .as_any()
798 .downcast_ref::<IcebergTableScan>()
799 .expect("Expected IcebergTableScan");
800
801 assert_eq!(
803 iceberg_scan.limit(),
804 Some(10),
805 "Limit should be set to 10 in the scan plan"
806 );
807 }
808
809 #[tokio::test]
810 async fn test_limit_pushdown_catalog_backed_provider() {
811 use datafusion::datasource::TableProvider;
812
813 let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
814
815 let provider =
816 IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
817 .await
818 .unwrap();
819
820 let ctx = SessionContext::new();
821 let state = ctx.state();
822
823 let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();
825
826 let iceberg_scan = scan_plan
828 .as_any()
829 .downcast_ref::<IcebergTableScan>()
830 .expect("Expected IcebergTableScan");
831
832 assert_eq!(
834 iceberg_scan.limit(),
835 Some(5),
836 "Limit should be set to 5 in the scan plan"
837 );
838 }
839
840 #[tokio::test]
841 async fn test_no_limit_pushdown() {
842 use datafusion::datasource::TableProvider;
843
844 let table = get_test_table_from_metadata_file().await;
845 let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
846 .await
847 .unwrap();
848
849 let ctx = SessionContext::new();
850 let state = ctx.state();
851
852 let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();
854
855 let iceberg_scan = scan_plan
857 .as_any()
858 .downcast_ref::<IcebergTableScan>()
859 .expect("Expected IcebergTableScan");
860
861 assert_eq!(
863 iceberg_scan.limit(),
864 None,
865 "Limit should be None when not specified"
866 );
867 }
868}