iceberg_sqllogictest/engine/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod datafusion;
19
20use std::collections::HashMap;
21use std::path::Path;
22
23use anyhow::anyhow;
24use serde::Deserialize;
25use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
26
27use crate::engine::datafusion::DataFusionEngine;
28use crate::error::{Error, Result};
29
30/// Configuration for the catalog used by the DataFusion engine
31#[derive(Debug, Clone, Deserialize)]
32pub struct DatafusionCatalogConfig {
33    /// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql"
34    #[serde(rename = "type")]
35    pub catalog_type: String,
36    /// Catalog properties passed to the catalog loader
37    #[serde(default)]
38    pub props: HashMap<String, String>,
39}
40
41/// Engine configuration as a tagged enum
42#[derive(Debug, Clone, Deserialize)]
43#[serde(tag = "type", rename_all = "lowercase")]
44pub enum EngineConfig {
45    Datafusion {
46        #[serde(default)]
47        catalog: Option<DatafusionCatalogConfig>,
48    },
49}
50
51#[async_trait::async_trait]
52pub trait EngineRunner: Send {
53    async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
54}
55
56pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn EngineRunner>> {
57    match config {
58        EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)),
59    }
60}
61
62pub async fn run_slt_with_runner<D, M>(
63    mut runner: Runner<D, M>,
64    step_slt_file: impl AsRef<Path>,
65) -> Result<()>
66where
67    D: AsyncDB + Send + 'static,
68    M: MakeConnection<Conn = D> + Send + 'static,
69{
70    let path = step_slt_file.as_ref().canonicalize()?;
71    let records = parse_file(&path).map_err(|e| Error(anyhow!("parsing slt file failed: {e}")))?;
72
73    for record in records {
74        if let Err(err) = runner.run_async(record).await {
75            return Err(Error(anyhow!("SLT record execution failed: {err}")));
76        }
77    }
78
79    Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84    use crate::engine::{DatafusionCatalogConfig, EngineConfig, load_engine_runner};
85
86    #[test]
87    fn test_deserialize_engine_config() {
88        let input = r#"type = "datafusion""#;
89
90        let config: EngineConfig = toml::from_str(input).unwrap();
91        assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
92    }
93
94    #[test]
95    fn test_deserialize_engine_config_with_catalog() {
96        let input = r#"
97            type = "datafusion"
98
99            [catalog]
100            type = "rest"
101
102            [catalog.props]
103            uri = "http://localhost:8181"
104        "#;
105
106        let config: EngineConfig = toml::from_str(input).unwrap();
107        match config {
108            EngineConfig::Datafusion { catalog: Some(cat) } => {
109                assert_eq!(cat.catalog_type, "rest");
110                assert_eq!(
111                    cat.props.get("uri"),
112                    Some(&"http://localhost:8181".to_string())
113                );
114            }
115            _ => panic!("Expected Datafusion with catalog"),
116        }
117    }
118
119    #[test]
120    fn test_deserialize_catalog_config() {
121        let input = r#"
122            type = "memory"
123
124            [props]
125            warehouse = "file:///tmp/warehouse"
126        "#;
127
128        let config: DatafusionCatalogConfig = toml::from_str(input).unwrap();
129        assert_eq!(config.catalog_type, "memory");
130        assert_eq!(
131            config.props.get("warehouse"),
132            Some(&"file:///tmp/warehouse".to_string())
133        );
134    }
135
136    #[tokio::test]
137    async fn test_load_datafusion() {
138        let config = EngineConfig::Datafusion { catalog: None };
139
140        let result = load_engine_runner(config).await;
141        assert!(result.is_ok());
142    }
143}