iceberg_sqllogictest/engine/
mod.rs1mod datafusion;
19
20use std::collections::HashMap;
21use std::path::Path;
22
23use anyhow::anyhow;
24use serde::Deserialize;
25use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
26
27use crate::engine::datafusion::DataFusionEngine;
28use crate::error::{Error, Result};
29
30#[derive(Debug, Clone, Deserialize)]
32pub struct DatafusionCatalogConfig {
33 #[serde(rename = "type")]
35 pub catalog_type: String,
36 #[serde(default)]
38 pub props: HashMap<String, String>,
39}
40
41#[derive(Debug, Clone, Deserialize)]
43#[serde(tag = "type", rename_all = "lowercase")]
44pub enum EngineConfig {
45 Datafusion {
46 #[serde(default)]
47 catalog: Option<DatafusionCatalogConfig>,
48 },
49}
50
51#[async_trait::async_trait]
52pub trait EngineRunner: Send {
53 async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
54}
55
56pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn EngineRunner>> {
57 match config {
58 EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)),
59 }
60}
61
62pub async fn run_slt_with_runner<D, M>(
63 mut runner: Runner<D, M>,
64 step_slt_file: impl AsRef<Path>,
65) -> Result<()>
66where
67 D: AsyncDB + Send + 'static,
68 M: MakeConnection<Conn = D> + Send + 'static,
69{
70 let path = step_slt_file.as_ref().canonicalize()?;
71 let records = parse_file(&path).map_err(|e| Error(anyhow!("parsing slt file failed: {e}")))?;
72
73 for record in records {
74 if let Err(err) = runner.run_async(record).await {
75 return Err(Error(anyhow!("SLT record execution failed: {err}")));
76 }
77 }
78
79 Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84 use crate::engine::{DatafusionCatalogConfig, EngineConfig, load_engine_runner};
85
86 #[test]
87 fn test_deserialize_engine_config() {
88 let input = r#"type = "datafusion""#;
89
90 let config: EngineConfig = toml::from_str(input).unwrap();
91 assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
92 }
93
94 #[test]
95 fn test_deserialize_engine_config_with_catalog() {
96 let input = r#"
97 type = "datafusion"
98
99 [catalog]
100 type = "rest"
101
102 [catalog.props]
103 uri = "http://localhost:8181"
104 "#;
105
106 let config: EngineConfig = toml::from_str(input).unwrap();
107 match config {
108 EngineConfig::Datafusion { catalog: Some(cat) } => {
109 assert_eq!(cat.catalog_type, "rest");
110 assert_eq!(
111 cat.props.get("uri"),
112 Some(&"http://localhost:8181".to_string())
113 );
114 }
115 _ => panic!("Expected Datafusion with catalog"),
116 }
117 }
118
119 #[test]
120 fn test_deserialize_catalog_config() {
121 let input = r#"
122 type = "memory"
123
124 [props]
125 warehouse = "file:///tmp/warehouse"
126 "#;
127
128 let config: DatafusionCatalogConfig = toml::from_str(input).unwrap();
129 assert_eq!(config.catalog_type, "memory");
130 assert_eq!(
131 config.props.get("warehouse"),
132 Some(&"file:///tmp/warehouse".to_string())
133 );
134 }
135
136 #[tokio::test]
137 async fn test_load_datafusion() {
138 let config = EngineConfig::Datafusion { catalog: None };
139
140 let result = load_engine_runner(config).await;
141 assert!(result.is_ok());
142 }
143}