iceberg_sqllogictest/engine/
datafusion.rs1use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use datafusion::catalog::CatalogProvider;
23use datafusion::prelude::{SessionConfig, SessionContext};
24use datafusion_sqllogictest::DataFusion;
25use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
26use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec};
27use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
28use iceberg_datafusion::IcebergCatalogProvider;
29use indicatif::ProgressBar;
30
31use crate::engine::{DatafusionCatalogConfig, EngineRunner, run_slt_with_runner};
32use crate::error::Result;
33
34pub struct DataFusionEngine {
35 test_data_path: PathBuf,
36 session_context: SessionContext,
37}
38
39#[async_trait::async_trait]
40impl EngineRunner for DataFusionEngine {
41 async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
42 let ctx = self.session_context.clone();
43 let testdata = self.test_data_path.clone();
44
45 let runner = sqllogictest::Runner::new({
46 move || {
47 let ctx = ctx.clone();
48 let testdata = testdata.clone();
49 async move {
50 Ok(DataFusion::new(ctx, testdata, ProgressBar::new(100)))
52 }
53 }
54 });
55
56 run_slt_with_runner(runner, path).await
57 }
58}
59
60impl DataFusionEngine {
61 pub async fn new(catalog_config: Option<DatafusionCatalogConfig>) -> Result<Self> {
62 let session_config = SessionConfig::new()
63 .with_target_partitions(4)
64 .with_information_schema(true);
65 let ctx = SessionContext::new_with_config(session_config);
66 ctx.register_catalog(
67 "default",
68 Self::create_catalog(catalog_config.as_ref()).await?,
69 );
70
71 Ok(Self {
72 test_data_path: PathBuf::from("testdata"),
73 session_context: ctx,
74 })
75 }
76
77 async fn create_catalog(
78 _catalog_config: Option<&DatafusionCatalogConfig>,
79 ) -> anyhow::Result<Arc<dyn CatalogProvider>> {
80 let catalog = MemoryCatalogBuilder::default()
83 .load(
84 "memory",
85 HashMap::from([(
86 MEMORY_CATALOG_WAREHOUSE.to_string(),
87 "memory://".to_string(),
88 )]),
89 )
90 .await?;
91
92 let namespace = NamespaceIdent::new("default".to_string());
94 catalog.create_namespace(&namespace, HashMap::new()).await?;
95
96 Self::create_partitioned_table(&catalog, &namespace).await?;
98 Self::create_binary_table(&catalog, &namespace).await?;
99
100 Ok(Arc::new(
101 IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
102 ))
103 }
104
105 async fn create_partitioned_table(
109 catalog: &impl Catalog,
110 namespace: &NamespaceIdent,
111 ) -> anyhow::Result<()> {
112 let schema = Schema::builder()
113 .with_fields(vec![
114 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
115 NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
116 NestedField::optional(3, "value", Type::Primitive(PrimitiveType::String)).into(),
117 ])
118 .build()?;
119
120 let partition_spec = UnboundPartitionSpec::builder()
121 .with_spec_id(0)
122 .add_partition_field(2, "category", Transform::Identity)?
123 .build();
124
125 catalog
126 .create_table(
127 namespace,
128 TableCreation::builder()
129 .name("test_partitioned_table".to_string())
130 .schema(schema)
131 .partition_spec(partition_spec)
132 .build(),
133 )
134 .await?;
135
136 Ok(())
137 }
138
139 async fn create_binary_table(
143 catalog: &impl Catalog,
144 namespace: &NamespaceIdent,
145 ) -> anyhow::Result<()> {
146 let schema = Schema::builder()
147 .with_fields(vec![
148 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
149 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Binary)).into(),
150 ])
151 .build()?;
152
153 catalog
154 .create_table(
155 namespace,
156 TableCreation::builder()
157 .name("test_binary_table".to_string())
158 .schema(schema)
159 .build(),
160 )
161 .await?;
162
163 Ok(())
164 }
165}