iceberg_sqllogictest/engine/
datafusion.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use datafusion::catalog::CatalogProvider;
23use datafusion::prelude::{SessionConfig, SessionContext};
24use datafusion_sqllogictest::DataFusion;
25use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
26use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec};
27use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
28use iceberg_datafusion::IcebergCatalogProvider;
29use indicatif::ProgressBar;
30
31use crate::engine::{DatafusionCatalogConfig, EngineRunner, run_slt_with_runner};
32use crate::error::Result;
33
34pub struct DataFusionEngine {
35    test_data_path: PathBuf,
36    session_context: SessionContext,
37}
38
39#[async_trait::async_trait]
40impl EngineRunner for DataFusionEngine {
41    async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
42        let ctx = self.session_context.clone();
43        let testdata = self.test_data_path.clone();
44
45        let runner = sqllogictest::Runner::new({
46            move || {
47                let ctx = ctx.clone();
48                let testdata = testdata.clone();
49                async move {
50                    // Everything here is owned; no `self` capture.
51                    Ok(DataFusion::new(ctx, testdata, ProgressBar::new(100)))
52                }
53            }
54        });
55
56        run_slt_with_runner(runner, path).await
57    }
58}
59
60impl DataFusionEngine {
61    pub async fn new(catalog_config: Option<DatafusionCatalogConfig>) -> Result<Self> {
62        let session_config = SessionConfig::new()
63            .with_target_partitions(4)
64            .with_information_schema(true);
65        let ctx = SessionContext::new_with_config(session_config);
66        ctx.register_catalog(
67            "default",
68            Self::create_catalog(catalog_config.as_ref()).await?,
69        );
70
71        Ok(Self {
72            test_data_path: PathBuf::from("testdata"),
73            session_context: ctx,
74        })
75    }
76
77    async fn create_catalog(
78        _catalog_config: Option<&DatafusionCatalogConfig>,
79    ) -> anyhow::Result<Arc<dyn CatalogProvider>> {
80        // TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader
81        // See: https://github.com/apache/iceberg-rust/issues/1780
82        let catalog = MemoryCatalogBuilder::default()
83            .load(
84                "memory",
85                HashMap::from([(
86                    MEMORY_CATALOG_WAREHOUSE.to_string(),
87                    "memory://".to_string(),
88                )]),
89            )
90            .await?;
91
92        // Create a test namespace for INSERT INTO tests
93        let namespace = NamespaceIdent::new("default".to_string());
94        catalog.create_namespace(&namespace, HashMap::new()).await?;
95
96        // Create partitioned test table (unpartitioned tables are now created via SQL)
97        Self::create_partitioned_table(&catalog, &namespace).await?;
98        Self::create_binary_table(&catalog, &namespace).await?;
99
100        Ok(Arc::new(
101            IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
102        ))
103    }
104
105    /// Create a partitioned test table with id, category, and value columns
106    /// Partitioned by category using identity transform
107    /// TODO: this can be removed when we support CREATE EXTERNAL TABLE
108    async fn create_partitioned_table(
109        catalog: &impl Catalog,
110        namespace: &NamespaceIdent,
111    ) -> anyhow::Result<()> {
112        let schema = Schema::builder()
113            .with_fields(vec![
114                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
115                NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
116                NestedField::optional(3, "value", Type::Primitive(PrimitiveType::String)).into(),
117            ])
118            .build()?;
119
120        let partition_spec = UnboundPartitionSpec::builder()
121            .with_spec_id(0)
122            .add_partition_field(2, "category", Transform::Identity)?
123            .build();
124
125        catalog
126            .create_table(
127                namespace,
128                TableCreation::builder()
129                    .name("test_partitioned_table".to_string())
130                    .schema(schema)
131                    .partition_spec(partition_spec)
132                    .build(),
133            )
134            .await?;
135
136        Ok(())
137    }
138
139    /// Create a test table with binary type column
140    /// Used for testing binary predicate pushdown
141    /// TODO: this can be removed when we support CREATE TABLE
142    async fn create_binary_table(
143        catalog: &impl Catalog,
144        namespace: &NamespaceIdent,
145    ) -> anyhow::Result<()> {
146        let schema = Schema::builder()
147            .with_fields(vec![
148                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
149                NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Binary)).into(),
150            ])
151            .build()?;
152
153        catalog
154            .create_table(
155                namespace,
156                TableCreation::builder()
157                    .name("test_binary_table".to_string())
158                    .schema(schema)
159                    .build(),
160            )
161            .await?;
162
163        Ok(())
164    }
165}