iceberg_sqllogictest/
schedule.rs1use std::collections::HashMap;
19use std::fs::read_to_string;
20use std::path::{Path, PathBuf};
21
22use anyhow::{Context, anyhow};
23use serde::{Deserialize, Serialize};
24use tracing::info;
25
26use crate::engine::{EngineConfig, EngineRunner, load_engine_runner};
27
28#[derive(Debug, Clone, Deserialize)]
30pub struct ScheduleConfig {
31 pub engines: HashMap<String, EngineConfig>,
33 pub steps: Vec<Step>,
35}
36
37pub struct Schedule {
38 engines: HashMap<String, Box<dyn EngineRunner>>,
40 steps: Vec<Step>,
42 schedule_file: String,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Step {
48 engine: String,
50 slt: String,
52}
53
54impl Schedule {
55 pub fn new(
56 engines: HashMap<String, Box<dyn EngineRunner>>,
57 steps: Vec<Step>,
58 schedule_file: String,
59 ) -> Self {
60 Self {
61 engines,
62 steps,
63 schedule_file,
64 }
65 }
66
67 pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
68 let path_str = path.as_ref().to_string_lossy().to_string();
69 let content = read_to_string(path)?;
70
71 let config: ScheduleConfig = toml::from_str(&content)
72 .with_context(|| format!("Failed to parse schedule file: {path_str}"))?;
73
74 let engines = Self::instantiate_engines(config.engines).await?;
75
76 Ok(Self::new(engines, config.steps, path_str))
77 }
78
79 async fn instantiate_engines(
81 configs: HashMap<String, EngineConfig>,
82 ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
83 let mut engines = HashMap::new();
84
85 for (name, config) in configs {
86 let engine = load_engine_runner(config).await?;
87 engines.insert(name, engine);
88 }
89
90 Ok(engines)
91 }
92
93 pub async fn run(mut self) -> anyhow::Result<()> {
94 info!("Starting test run with schedule: {}", self.schedule_file);
95
96 for (idx, step) in self.steps.iter().enumerate() {
97 info!(
98 "Running step {}/{}, using engine {}, slt file path: {}",
99 idx + 1,
100 self.steps.len(),
101 &step.engine,
102 &step.slt
103 );
104
105 let engine = self
106 .engines
107 .get_mut(&step.engine)
108 .ok_or_else(|| anyhow!("Engine {} not found", step.engine))?;
109
110 let step_sql_path = PathBuf::from(format!(
111 "{}/testdata/slts/{}",
112 env!("CARGO_MANIFEST_DIR"),
113 &step.slt
114 ));
115
116 engine.run_slt_file(&step_sql_path).await?;
117
118 info!(
119 "Completed step {}/{}, engine {}, slt file path: {}",
120 idx + 1,
121 self.steps.len(),
122 &step.engine,
123 &step.slt
124 );
125 }
126 Ok(())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use crate::engine::EngineConfig;
133 use crate::schedule::ScheduleConfig;
134
135 #[test]
136 fn test_deserialize_schedule_config() {
137 let input = r#"
138 [engines]
139 df = { type = "datafusion" }
140
141 [[steps]]
142 engine = "df"
143 slt = "test.slt"
144 "#;
145
146 let config: ScheduleConfig = toml::from_str(input).unwrap();
147
148 assert_eq!(config.engines.len(), 1);
149 assert!(config.engines.contains_key("df"));
150 assert!(matches!(config.engines["df"], EngineConfig::Datafusion {
151 catalog: None
152 }));
153 assert_eq!(config.steps.len(), 1);
154 assert_eq!(config.steps[0].engine, "df");
155 assert_eq!(config.steps[0].slt, "test.slt");
156 }
157
158 #[test]
159 fn test_deserialize_multiple_steps() {
160 let input = r#"
161 [engines]
162 datafusion = { type = "datafusion" }
163
164 [[steps]]
165 engine = "datafusion"
166 slt = "test.slt"
167
168 [[steps]]
169 engine = "datafusion"
170 slt = "test2.slt"
171 "#;
172
173 let config: ScheduleConfig = toml::from_str(input).unwrap();
174
175 assert_eq!(config.steps.len(), 2);
176 assert_eq!(config.steps[0].engine, "datafusion");
177 assert_eq!(config.steps[0].slt, "test.slt");
178 assert_eq!(config.steps[1].engine, "datafusion");
179 assert_eq!(config.steps[1].slt, "test2.slt");
180 }
181
182 #[test]
183 fn test_deserialize_with_catalog_config() {
184 let input = r#"
185 [engines.df]
186 type = "datafusion"
187
188 [engines.df.catalog]
189 type = "rest"
190
191 [engines.df.catalog.props]
192 uri = "http://localhost:8181"
193
194 [[steps]]
195 engine = "df"
196 slt = "test.slt"
197 "#;
198
199 let config: ScheduleConfig = toml::from_str(input).unwrap();
200
201 match &config.engines["df"] {
202 EngineConfig::Datafusion { catalog: Some(cat) } => {
203 assert_eq!(cat.catalog_type, "rest");
204 assert_eq!(
205 cat.props.get("uri"),
206 Some(&"http://localhost:8181".to_string())
207 );
208 }
209 _ => panic!("Expected Datafusion with catalog config"),
210 }
211 }
212
213 #[test]
214 fn test_deserialize_missing_engine_type() {
215 let input = r#"
216 [engines]
217 df = { }
218
219 [[steps]]
220 engine = "df"
221 slt = "test.slt"
222 "#;
223
224 let result: Result<ScheduleConfig, _> = toml::from_str(input);
225 assert!(result.is_err());
226 }
227
228 #[test]
229 fn test_deserialize_invalid_engine_type() {
230 let input = r#"
231 [engines]
232 df = { type = "unknown_engine" }
233
234 [[steps]]
235 engine = "df"
236 slt = "test.slt"
237 "#;
238
239 let result: Result<ScheduleConfig, _> = toml::from_str(input);
240 assert!(result.is_err());
241 }
242
243 #[test]
244 fn test_deserialize_missing_step_fields() {
245 let input = r#"
246 [engines]
247 df = { type = "datafusion" }
248
249 [[steps]]
250 "#;
251
252 let result: Result<ScheduleConfig, _> = toml::from_str(input);
253 assert!(result.is_err());
254 }
255}