1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use iceberg::io::StorageFactory;
23use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
24use iceberg_catalog_glue::GlueCatalogBuilder;
25use iceberg_catalog_hms::HmsCatalogBuilder;
26use iceberg_catalog_rest::RestCatalogBuilder;
27use iceberg_catalog_s3tables::S3TablesCatalogBuilder;
28use iceberg_catalog_sql::SqlCatalogBuilder;
29
30type CatalogBuilderFactory = fn() -> Box<dyn BoxedCatalogBuilder>;
32
33static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[
35 ("rest", || Box::new(RestCatalogBuilder::default())),
36 ("glue", || Box::new(GlueCatalogBuilder::default())),
37 ("s3tables", || Box::new(S3TablesCatalogBuilder::default())),
38 ("hms", || Box::new(HmsCatalogBuilder::default())),
39 ("sql", || Box::new(SqlCatalogBuilder::default())),
40];
41
42pub fn supported_types() -> Vec<&'static str> {
44 CATALOG_REGISTRY.iter().map(|(k, _)| *k).collect()
45}
46
47#[async_trait]
48pub trait BoxedCatalogBuilder: Send {
49 fn with_storage_factory(
50 self: Box<Self>,
51 storage_factory: Arc<dyn StorageFactory>,
52 ) -> Box<dyn BoxedCatalogBuilder>;
53
54 async fn load(
55 self: Box<Self>,
56 name: String,
57 props: HashMap<String, String>,
58 ) -> Result<Arc<dyn Catalog>>;
59}
60
61#[async_trait]
62impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
63 fn with_storage_factory(
64 self: Box<Self>,
65 storage_factory: Arc<dyn StorageFactory>,
66 ) -> Box<dyn BoxedCatalogBuilder> {
67 Box::new(CatalogBuilder::with_storage_factory(*self, storage_factory))
68 }
69
70 async fn load(
71 self: Box<Self>,
72 name: String,
73 props: HashMap<String, String>,
74 ) -> Result<Arc<dyn Catalog>> {
75 let builder = *self;
76 Ok(Arc::new(builder.load(name, props).await?) as Arc<dyn Catalog>)
77 }
78}
79
80pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
82 let key = r#type.trim();
83 if let Some((_, factory)) = CATALOG_REGISTRY
84 .iter()
85 .find(|(k, _)| k.eq_ignore_ascii_case(key))
86 {
87 Ok(factory())
88 } else {
89 Err(Error::new(
90 ErrorKind::FeatureUnsupported,
91 format!(
92 "Unsupported catalog type: {}. Supported types: {}",
93 r#type,
94 supported_types().join(", ")
95 ),
96 ))
97 }
98}
99
100pub struct CatalogLoader<'a> {
102 catalog_type: &'a str,
103}
104
105impl<'a> From<&'a str> for CatalogLoader<'a> {
106 fn from(s: &'a str) -> Self {
107 Self { catalog_type: s }
108 }
109}
110
111impl CatalogLoader<'_> {
112 pub async fn load(
113 self,
114 name: String,
115 props: HashMap<String, String>,
116 ) -> Result<Arc<dyn Catalog>> {
117 let builder = load(self.catalog_type)?;
118 builder.load(name, props).await
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::collections::HashMap;
125 use std::sync::Arc;
126
127 use iceberg::io::LocalFsStorageFactory;
128 use sqlx::migrate::MigrateDatabase;
129 use tempfile::TempDir;
130
131 use crate::{CatalogLoader, load};
132
133 #[tokio::test]
134 async fn test_load_unsupported_catalog() {
135 let result = load("unsupported");
136 assert!(result.is_err());
137 }
138
139 #[tokio::test]
140 async fn test_catalog_loader_pattern() {
141 use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
142
143 let catalog = CatalogLoader::from("rest")
144 .load(
145 "rest".to_string(),
146 HashMap::from([
147 (
148 REST_CATALOG_PROP_URI.to_string(),
149 "http://localhost:8080".to_string(),
150 ),
151 ("key".to_string(), "value".to_string()),
152 ]),
153 )
154 .await;
155
156 assert!(catalog.is_ok());
157 }
158
159 #[tokio::test]
160 async fn test_catalog_loader_pattern_rest_catalog() {
161 use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
162
163 let catalog_loader = load("rest").unwrap();
164 let catalog = catalog_loader
165 .load(
166 "rest".to_string(),
167 HashMap::from([
168 (
169 REST_CATALOG_PROP_URI.to_string(),
170 "http://localhost:8080".to_string(),
171 ),
172 ("key".to_string(), "value".to_string()),
173 ]),
174 )
175 .await;
176
177 assert!(catalog.is_ok());
178 }
179
180 #[tokio::test]
181 async fn test_catalog_loader_pattern_glue_catalog() {
182 use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
183
184 let catalog_loader = load("glue").unwrap();
185 let catalog = catalog_loader
186 .load(
187 "glue".to_string(),
188 HashMap::from([
189 (
190 GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
191 "s3://test".to_string(),
192 ),
193 ("key".to_string(), "value".to_string()),
194 ]),
195 )
196 .await;
197
198 assert!(catalog.is_ok());
199 }
200
201 #[tokio::test]
202 async fn test_catalog_loader_pattern_s3tables() {
203 use iceberg_catalog_s3tables::S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN;
204
205 let catalog = CatalogLoader::from("s3tables")
206 .load(
207 "s3tables".to_string(),
208 HashMap::from([
209 (
210 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
211 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
212 ),
213 ("key".to_string(), "value".to_string()),
214 ]),
215 )
216 .await;
217
218 assert!(catalog.is_ok());
219 }
220
221 #[tokio::test]
222 async fn test_catalog_loader_pattern_hms_catalog() {
223 use iceberg_catalog_hms::{HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE};
224
225 let catalog_loader = load("hms").unwrap();
226 let catalog = catalog_loader
227 .with_storage_factory(Arc::new(LocalFsStorageFactory))
228 .load(
229 "hms".to_string(),
230 HashMap::from([
231 (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
232 (
233 HMS_CATALOG_PROP_WAREHOUSE.to_string(),
234 "s3://warehouse".to_string(),
235 ),
236 ("key".to_string(), "value".to_string()),
237 ]),
238 )
239 .await;
240
241 assert!(catalog.is_ok());
242 }
243
244 fn temp_path() -> String {
245 let temp_dir = TempDir::new().unwrap();
246 temp_dir.path().to_str().unwrap().to_string()
247 }
248
249 #[tokio::test]
250 async fn test_catalog_loader_pattern_sql_catalog() {
251 use iceberg_catalog_sql::{SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE};
252
253 let uri = format!("sqlite:{}", temp_path());
254 sqlx::Sqlite::create_database(&uri).await.unwrap();
255
256 let catalog_loader = load("sql").unwrap();
257 let catalog = catalog_loader
258 .with_storage_factory(Arc::new(LocalFsStorageFactory))
259 .load(
260 "sql".to_string(),
261 HashMap::from([
262 (SQL_CATALOG_PROP_URI.to_string(), uri),
263 (
264 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
265 "s3://warehouse".to_string(),
266 ),
267 ]),
268 )
269 .await;
270
271 assert!(catalog.is_ok());
272 }
273
274 #[tokio::test]
275 async fn test_error_message_includes_supported_types() {
276 let err = match load("does-not-exist") {
277 Ok(_) => panic!("expected error for unsupported type"),
278 Err(e) => e,
279 };
280 let msg = err.message().to_string();
281 assert!(msg.contains("Supported types:"));
282 assert!(msg.contains("rest"));
284 }
285}