iceberg_catalog_loader/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use iceberg::io::StorageFactory;
23use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
24use iceberg_catalog_glue::GlueCatalogBuilder;
25use iceberg_catalog_hms::HmsCatalogBuilder;
26use iceberg_catalog_rest::RestCatalogBuilder;
27use iceberg_catalog_s3tables::S3TablesCatalogBuilder;
28use iceberg_catalog_sql::SqlCatalogBuilder;
29
30/// A CatalogBuilderFactory creating a new catalog builder.
31type CatalogBuilderFactory = fn() -> Box<dyn BoxedCatalogBuilder>;
32
33/// A registry of catalog builders.
34static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[
35    ("rest", || Box::new(RestCatalogBuilder::default())),
36    ("glue", || Box::new(GlueCatalogBuilder::default())),
37    ("s3tables", || Box::new(S3TablesCatalogBuilder::default())),
38    ("hms", || Box::new(HmsCatalogBuilder::default())),
39    ("sql", || Box::new(SqlCatalogBuilder::default())),
40];
41
42/// Return the list of supported catalog types.
43pub fn supported_types() -> Vec<&'static str> {
44    CATALOG_REGISTRY.iter().map(|(k, _)| *k).collect()
45}
46
47#[async_trait]
48pub trait BoxedCatalogBuilder: Send {
49    fn with_storage_factory(
50        self: Box<Self>,
51        storage_factory: Arc<dyn StorageFactory>,
52    ) -> Box<dyn BoxedCatalogBuilder>;
53
54    async fn load(
55        self: Box<Self>,
56        name: String,
57        props: HashMap<String, String>,
58    ) -> Result<Arc<dyn Catalog>>;
59}
60
61#[async_trait]
62impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
63    fn with_storage_factory(
64        self: Box<Self>,
65        storage_factory: Arc<dyn StorageFactory>,
66    ) -> Box<dyn BoxedCatalogBuilder> {
67        Box::new(CatalogBuilder::with_storage_factory(*self, storage_factory))
68    }
69
70    async fn load(
71        self: Box<Self>,
72        name: String,
73        props: HashMap<String, String>,
74    ) -> Result<Arc<dyn Catalog>> {
75        let builder = *self;
76        Ok(Arc::new(builder.load(name, props).await?) as Arc<dyn Catalog>)
77    }
78}
79
80/// Load a catalog from a string.
81pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
82    let key = r#type.trim();
83    if let Some((_, factory)) = CATALOG_REGISTRY
84        .iter()
85        .find(|(k, _)| k.eq_ignore_ascii_case(key))
86    {
87        Ok(factory())
88    } else {
89        Err(Error::new(
90            ErrorKind::FeatureUnsupported,
91            format!(
92                "Unsupported catalog type: {}. Supported types: {}",
93                r#type,
94                supported_types().join(", ")
95            ),
96        ))
97    }
98}
99
100/// Ergonomic catalog loader builder pattern.
101pub struct CatalogLoader<'a> {
102    catalog_type: &'a str,
103}
104
105impl<'a> From<&'a str> for CatalogLoader<'a> {
106    fn from(s: &'a str) -> Self {
107        Self { catalog_type: s }
108    }
109}
110
111impl CatalogLoader<'_> {
112    pub async fn load(
113        self,
114        name: String,
115        props: HashMap<String, String>,
116    ) -> Result<Arc<dyn Catalog>> {
117        let builder = load(self.catalog_type)?;
118        builder.load(name, props).await
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use std::collections::HashMap;
125    use std::sync::Arc;
126
127    use iceberg::io::LocalFsStorageFactory;
128    use sqlx::migrate::MigrateDatabase;
129    use tempfile::TempDir;
130
131    use crate::{CatalogLoader, load};
132
133    #[tokio::test]
134    async fn test_load_unsupported_catalog() {
135        let result = load("unsupported");
136        assert!(result.is_err());
137    }
138
139    #[tokio::test]
140    async fn test_catalog_loader_pattern() {
141        use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
142
143        let catalog = CatalogLoader::from("rest")
144            .load(
145                "rest".to_string(),
146                HashMap::from([
147                    (
148                        REST_CATALOG_PROP_URI.to_string(),
149                        "http://localhost:8080".to_string(),
150                    ),
151                    ("key".to_string(), "value".to_string()),
152                ]),
153            )
154            .await;
155
156        assert!(catalog.is_ok());
157    }
158
159    #[tokio::test]
160    async fn test_catalog_loader_pattern_rest_catalog() {
161        use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
162
163        let catalog_loader = load("rest").unwrap();
164        let catalog = catalog_loader
165            .load(
166                "rest".to_string(),
167                HashMap::from([
168                    (
169                        REST_CATALOG_PROP_URI.to_string(),
170                        "http://localhost:8080".to_string(),
171                    ),
172                    ("key".to_string(), "value".to_string()),
173                ]),
174            )
175            .await;
176
177        assert!(catalog.is_ok());
178    }
179
180    #[tokio::test]
181    async fn test_catalog_loader_pattern_glue_catalog() {
182        use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
183
184        let catalog_loader = load("glue").unwrap();
185        let catalog = catalog_loader
186            .load(
187                "glue".to_string(),
188                HashMap::from([
189                    (
190                        GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
191                        "s3://test".to_string(),
192                    ),
193                    ("key".to_string(), "value".to_string()),
194                ]),
195            )
196            .await;
197
198        assert!(catalog.is_ok());
199    }
200
201    #[tokio::test]
202    async fn test_catalog_loader_pattern_s3tables() {
203        use iceberg_catalog_s3tables::S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN;
204
205        let catalog = CatalogLoader::from("s3tables")
206            .load(
207                "s3tables".to_string(),
208                HashMap::from([
209                    (
210                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
211                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
212                    ),
213                    ("key".to_string(), "value".to_string()),
214                ]),
215            )
216            .await;
217
218        assert!(catalog.is_ok());
219    }
220
221    #[tokio::test]
222    async fn test_catalog_loader_pattern_hms_catalog() {
223        use iceberg_catalog_hms::{HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE};
224
225        let catalog_loader = load("hms").unwrap();
226        let catalog = catalog_loader
227            .with_storage_factory(Arc::new(LocalFsStorageFactory))
228            .load(
229                "hms".to_string(),
230                HashMap::from([
231                    (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
232                    (
233                        HMS_CATALOG_PROP_WAREHOUSE.to_string(),
234                        "s3://warehouse".to_string(),
235                    ),
236                    ("key".to_string(), "value".to_string()),
237                ]),
238            )
239            .await;
240
241        assert!(catalog.is_ok());
242    }
243
244    fn temp_path() -> String {
245        let temp_dir = TempDir::new().unwrap();
246        temp_dir.path().to_str().unwrap().to_string()
247    }
248
249    #[tokio::test]
250    async fn test_catalog_loader_pattern_sql_catalog() {
251        use iceberg_catalog_sql::{SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE};
252
253        let uri = format!("sqlite:{}", temp_path());
254        sqlx::Sqlite::create_database(&uri).await.unwrap();
255
256        let catalog_loader = load("sql").unwrap();
257        let catalog = catalog_loader
258            .with_storage_factory(Arc::new(LocalFsStorageFactory))
259            .load(
260                "sql".to_string(),
261                HashMap::from([
262                    (SQL_CATALOG_PROP_URI.to_string(), uri),
263                    (
264                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
265                        "s3://warehouse".to_string(),
266                    ),
267                ]),
268            )
269            .await;
270
271        assert!(catalog.is_ok());
272    }
273
274    #[tokio::test]
275    async fn test_error_message_includes_supported_types() {
276        let err = match load("does-not-exist") {
277            Ok(_) => panic!("expected error for unsupported type"),
278            Err(e) => e,
279        };
280        let msg = err.message().to_string();
281        assert!(msg.contains("Supported types:"));
282        // Should include at least the built-in type
283        assert!(msg.contains("rest"));
284    }
285}