iceberg_sqllogictest/
schedule.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fs::read_to_string;
20use std::path::{Path, PathBuf};
21
22use anyhow::{Context, anyhow};
23use serde::{Deserialize, Serialize};
24use tracing::info;
25
26use crate::engine::{EngineConfig, EngineRunner, load_engine_runner};
27
28/// Raw configuration parsed from the schedule TOML file
29#[derive(Debug, Clone, Deserialize)]
30pub struct ScheduleConfig {
31    /// Engine name to engine configuration
32    pub engines: HashMap<String, EngineConfig>,
33    /// List of test steps to run
34    pub steps: Vec<Step>,
35}
36
37pub struct Schedule {
38    /// Engine names to engine instances
39    engines: HashMap<String, Box<dyn EngineRunner>>,
40    /// List of test steps to run
41    steps: Vec<Step>,
42    /// Path of the schedule file
43    schedule_file: String,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Step {
48    /// Engine name
49    engine: String,
50    /// Stl file path
51    slt: String,
52}
53
54impl Schedule {
55    pub fn new(
56        engines: HashMap<String, Box<dyn EngineRunner>>,
57        steps: Vec<Step>,
58        schedule_file: String,
59    ) -> Self {
60        Self {
61            engines,
62            steps,
63            schedule_file,
64        }
65    }
66
67    pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
68        let path_str = path.as_ref().to_string_lossy().to_string();
69        let content = read_to_string(path)?;
70
71        let config: ScheduleConfig = toml::from_str(&content)
72            .with_context(|| format!("Failed to parse schedule file: {path_str}"))?;
73
74        let engines = Self::instantiate_engines(config.engines).await?;
75
76        Ok(Self::new(engines, config.steps, path_str))
77    }
78
79    /// Instantiate engine runners from their configurations
80    async fn instantiate_engines(
81        configs: HashMap<String, EngineConfig>,
82    ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
83        let mut engines = HashMap::new();
84
85        for (name, config) in configs {
86            let engine = load_engine_runner(config).await?;
87            engines.insert(name, engine);
88        }
89
90        Ok(engines)
91    }
92
93    pub async fn run(mut self) -> anyhow::Result<()> {
94        info!("Starting test run with schedule: {}", self.schedule_file);
95
96        for (idx, step) in self.steps.iter().enumerate() {
97            info!(
98                "Running step {}/{}, using engine {}, slt file path: {}",
99                idx + 1,
100                self.steps.len(),
101                &step.engine,
102                &step.slt
103            );
104
105            let engine = self
106                .engines
107                .get_mut(&step.engine)
108                .ok_or_else(|| anyhow!("Engine {} not found", step.engine))?;
109
110            let step_sql_path = PathBuf::from(format!(
111                "{}/testdata/slts/{}",
112                env!("CARGO_MANIFEST_DIR"),
113                &step.slt
114            ));
115
116            engine.run_slt_file(&step_sql_path).await?;
117
118            info!(
119                "Completed step {}/{}, engine {}, slt file path: {}",
120                idx + 1,
121                self.steps.len(),
122                &step.engine,
123                &step.slt
124            );
125        }
126        Ok(())
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use crate::engine::EngineConfig;
133    use crate::schedule::ScheduleConfig;
134
135    #[test]
136    fn test_deserialize_schedule_config() {
137        let input = r#"
138            [engines]
139            df = { type = "datafusion" }
140
141            [[steps]]
142            engine = "df"
143            slt = "test.slt"
144        "#;
145
146        let config: ScheduleConfig = toml::from_str(input).unwrap();
147
148        assert_eq!(config.engines.len(), 1);
149        assert!(config.engines.contains_key("df"));
150        assert!(matches!(config.engines["df"], EngineConfig::Datafusion {
151            catalog: None
152        }));
153        assert_eq!(config.steps.len(), 1);
154        assert_eq!(config.steps[0].engine, "df");
155        assert_eq!(config.steps[0].slt, "test.slt");
156    }
157
158    #[test]
159    fn test_deserialize_multiple_steps() {
160        let input = r#"
161            [engines]
162            datafusion = { type = "datafusion" }
163
164            [[steps]]
165            engine = "datafusion"
166            slt = "test.slt"
167
168            [[steps]]
169            engine = "datafusion"
170            slt = "test2.slt"
171        "#;
172
173        let config: ScheduleConfig = toml::from_str(input).unwrap();
174
175        assert_eq!(config.steps.len(), 2);
176        assert_eq!(config.steps[0].engine, "datafusion");
177        assert_eq!(config.steps[0].slt, "test.slt");
178        assert_eq!(config.steps[1].engine, "datafusion");
179        assert_eq!(config.steps[1].slt, "test2.slt");
180    }
181
182    #[test]
183    fn test_deserialize_with_catalog_config() {
184        let input = r#"
185            [engines.df]
186            type = "datafusion"
187
188            [engines.df.catalog]
189            type = "rest"
190
191            [engines.df.catalog.props]
192            uri = "http://localhost:8181"
193
194            [[steps]]
195            engine = "df"
196            slt = "test.slt"
197        "#;
198
199        let config: ScheduleConfig = toml::from_str(input).unwrap();
200
201        match &config.engines["df"] {
202            EngineConfig::Datafusion { catalog: Some(cat) } => {
203                assert_eq!(cat.catalog_type, "rest");
204                assert_eq!(
205                    cat.props.get("uri"),
206                    Some(&"http://localhost:8181".to_string())
207                );
208            }
209            _ => panic!("Expected Datafusion with catalog config"),
210        }
211    }
212
213    #[test]
214    fn test_deserialize_missing_engine_type() {
215        let input = r#"
216            [engines]
217            df = { }
218
219            [[steps]]
220            engine = "df"
221            slt = "test.slt"
222        "#;
223
224        let result: Result<ScheduleConfig, _> = toml::from_str(input);
225        assert!(result.is_err());
226    }
227
228    #[test]
229    fn test_deserialize_invalid_engine_type() {
230        let input = r#"
231            [engines]
232            df = { type = "unknown_engine" }
233
234            [[steps]]
235            engine = "df"
236            slt = "test.slt"
237        "#;
238
239        let result: Result<ScheduleConfig, _> = toml::from_str(input);
240        assert!(result.is_err());
241    }
242
243    #[test]
244    fn test_deserialize_missing_step_fields() {
245        let input = r#"
246            [engines]
247            df = { type = "datafusion" }
248
249            [[steps]]
250        "#;
251
252        let result: Result<ScheduleConfig, _> = toml::from_str(input);
253        assert!(result.is_err());
254    }
255}