iceberg_storage_opendal/
resolving.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Resolving storage that auto-detects the scheme from a path and delegates
19//! to the appropriate [`OpenDalStorage`] variant.
20
21use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30    StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use serde::{Deserialize, Serialize};
34use url::Url;
35
36use crate::OpenDalStorage;
37#[cfg(feature = "opendal-s3")]
38use crate::s3::CustomAwsCredentialLoader;
39
40/// Schemes supported by OpenDalResolvingStorage
41pub const SCHEME_MEMORY: &str = "memory";
42pub const SCHEME_FILE: &str = "file";
43pub const SCHEME_S3: &str = "s3";
44pub const SCHEME_S3A: &str = "s3a";
45pub const SCHEME_S3N: &str = "s3n";
46pub const SCHEME_GS: &str = "gs";
47pub const SCHEME_GCS: &str = "gcs";
48pub const SCHEME_OSS: &str = "oss";
49pub const SCHEME_ABFSS: &str = "abfss";
50pub const SCHEME_ABFS: &str = "abfs";
51pub const SCHEME_WASBS: &str = "wasbs";
52pub const SCHEME_WASB: &str = "wasb";
53pub const SCHEME_HF: &str = "hf";
54
55/// Parse a URL scheme string.
56fn parse_scheme(scheme: &str) -> Result<&'static str> {
57    match scheme {
58        SCHEME_MEMORY => Ok("memory"),
59        SCHEME_FILE | "" => Ok("file"),
60        SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok("s3"),
61        SCHEME_GS | SCHEME_GCS => Ok("gcs"),
62        SCHEME_OSS => Ok("oss"),
63        SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"),
64        SCHEME_HF => Ok("hf"),
65        s => Err(Error::new(
66            ErrorKind::FeatureUnsupported,
67            format!("Unsupported storage scheme: {s}"),
68        )),
69    }
70}
71
72/// Extract the scheme from a path URL.
73fn extract_scheme(path: &str) -> Result<&'static str> {
74    let url = Url::parse(path).map_err(|e| {
75        Error::new(
76            ErrorKind::DataInvalid,
77            format!("Invalid path: {path}, failed to parse URL: {e}"),
78        )
79    })?;
80    parse_scheme(url.scheme())
81}
82
83/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
84fn build_storage_for_scheme(
85    scheme: &'static str,
86    props: &HashMap<String, String>,
87    #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
88) -> Result<OpenDalStorage> {
89    match scheme {
90        #[cfg(feature = "opendal-s3")]
91        "s3" => {
92            let config = crate::s3::s3_config_parse(props.clone())?;
93            Ok(OpenDalStorage::S3 {
94                config: Arc::new(config),
95                customized_credential_load: customized_credential_load.clone(),
96            })
97        }
98        #[cfg(feature = "opendal-gcs")]
99        "gcs" => {
100            let config = crate::gcs::gcs_config_parse(props.clone())?;
101            Ok(OpenDalStorage::Gcs {
102                config: Arc::new(config),
103            })
104        }
105        #[cfg(feature = "opendal-oss")]
106        "oss" => {
107            let config = crate::oss::oss_config_parse(props.clone())?;
108            Ok(OpenDalStorage::Oss {
109                config: Arc::new(config),
110            })
111        }
112        #[cfg(feature = "opendal-azdls")]
113        "azdls" => {
114            let config = crate::azdls::azdls_config_parse(props.clone())?;
115            Ok(OpenDalStorage::Azdls {
116                config: Arc::new(config),
117            })
118        }
119        #[cfg(feature = "opendal-fs")]
120        "file" => Ok(OpenDalStorage::LocalFs),
121        #[cfg(feature = "opendal-memory")]
122        "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
123        #[cfg(feature = "opendal-hf")]
124        "hf" => {
125            let config = crate::hf::hf_config_parse(props.clone())?;
126            Ok(OpenDalStorage::Hf {
127                config: Arc::new(config),
128            })
129        }
130        unsupported => Err(Error::new(
131            ErrorKind::FeatureUnsupported,
132            format!("Unsupported storage scheme: {unsupported}"),
133        )),
134    }
135}
136
137/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances.
138///
139/// This factory accepts paths from any supported storage system and dynamically
140/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
141/// the path scheme.
142///
143/// # Example
144///
145/// ```rust,ignore
146/// use std::sync::Arc;
147/// use iceberg::io::FileIOBuilder;
148/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
149///
150/// let factory = OpenDalResolvingStorageFactory::new();
151/// let file_io = FileIOBuilder::new(Arc::new(factory))
152///     .with_prop("s3.region", "us-east-1")
153///     .build();
154/// ```
155#[derive(Clone, Debug, Serialize, Deserialize)]
156pub struct OpenDalResolvingStorageFactory {
157    /// Custom AWS credential loader for S3 storage.
158    #[cfg(feature = "opendal-s3")]
159    #[serde(skip)]
160    customized_credential_load: Option<CustomAwsCredentialLoader>,
161}
162
163impl Default for OpenDalResolvingStorageFactory {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169impl OpenDalResolvingStorageFactory {
170    /// Create a new resolving storage factory.
171    pub fn new() -> Self {
172        Self {
173            #[cfg(feature = "opendal-s3")]
174            customized_credential_load: None,
175        }
176    }
177
178    /// Set a custom AWS credential loader for S3 storage.
179    #[cfg(feature = "opendal-s3")]
180    pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
181        self.customized_credential_load = Some(loader);
182        self
183    }
184}
185
186#[typetag::serde]
187impl StorageFactory for OpenDalResolvingStorageFactory {
188    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
189        Ok(Arc::new(OpenDalResolvingStorage {
190            props: config.props().clone(),
191            storages: RwLock::new(HashMap::new()),
192            #[cfg(feature = "opendal-s3")]
193            customized_credential_load: self.customized_credential_load.clone(),
194        }))
195    }
196}
197
198/// A resolving storage that auto-detects the scheme from a path and delegates
199/// to the appropriate [`OpenDalStorage`] variant.
200///
201/// Sub-storages are lazily created on first use for each scheme and cached
202/// for subsequent operations. Scheme aliases like `s3`/`s3a`/`s3n` map to
203/// the same canonical scheme, so they share a storage instance.
204#[derive(Debug, Serialize, Deserialize)]
205pub struct OpenDalResolvingStorage {
206    /// Configuration properties shared across all backends.
207    props: HashMap<String, String>,
208    /// Cache of canonical scheme to storage mappings.
209    #[serde(skip, default)]
210    storages: RwLock<HashMap<&'static str, Arc<OpenDalStorage>>>,
211    /// Custom AWS credential loader for S3 storage.
212    #[cfg(feature = "opendal-s3")]
213    #[serde(skip)]
214    customized_credential_load: Option<CustomAwsCredentialLoader>,
215}
216
217impl OpenDalResolvingStorage {
218    /// Resolve the storage for the given path by extracting the canonical scheme and
219    /// returning the cached or newly-created [`OpenDalStorage`].
220    fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
221        let scheme = extract_scheme(path)?;
222
223        // Fast path: check read lock first.
224        {
225            let cache = self
226                .storages
227                .read()
228                .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
229            if let Some(storage) = cache.get(&scheme) {
230                return Ok(storage.clone());
231            }
232        }
233
234        // Slow path: build and insert under write lock.
235        let mut cache = self
236            .storages
237            .write()
238            .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
239
240        // Double-check after acquiring write lock.
241        if let Some(storage) = cache.get(&scheme) {
242            return Ok(storage.clone());
243        }
244
245        let storage = build_storage_for_scheme(
246            scheme,
247            &self.props,
248            #[cfg(feature = "opendal-s3")]
249            &self.customized_credential_load,
250        )?;
251        let storage = Arc::new(storage);
252        cache.insert(scheme, storage.clone());
253        Ok(storage)
254    }
255}
256
257#[async_trait]
258#[typetag::serde]
259impl Storage for OpenDalResolvingStorage {
260    async fn exists(&self, path: &str) -> Result<bool> {
261        self.resolve(path)?.exists(path).await
262    }
263
264    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
265        self.resolve(path)?.metadata(path).await
266    }
267
268    async fn read(&self, path: &str) -> Result<Bytes> {
269        self.resolve(path)?.read(path).await
270    }
271
272    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
273        self.resolve(path)?.reader(path).await
274    }
275
276    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
277        self.resolve(path)?.write(path, bs).await
278    }
279
280    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
281        self.resolve(path)?.writer(path).await
282    }
283
284    async fn delete(&self, path: &str) -> Result<()> {
285        self.resolve(path)?.delete(path).await
286    }
287
288    async fn delete_prefix(&self, path: &str) -> Result<()> {
289        self.resolve(path)?.delete_prefix(path).await
290    }
291
292    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
293        // Group paths by canonical scheme so each resolved storage receives a batch,
294        // avoiding repeated operator creation per path.
295        let mut grouped: HashMap<&'static str, Vec<String>> = HashMap::new();
296        while let Some(path) = paths.next().await {
297            let scheme = extract_scheme(&path)?;
298            grouped.entry(scheme).or_default().push(path);
299        }
300
301        for (_, paths) in grouped {
302            let storage = self.resolve(&paths[0])?;
303            storage
304                .delete_stream(futures::stream::iter(paths).boxed())
305                .await?;
306        }
307        Ok(())
308    }
309
310    fn new_input(&self, path: &str) -> Result<InputFile> {
311        Ok(InputFile::new(
312            Arc::new(self.resolve(path)?.as_ref().clone()),
313            path.to_string(),
314        ))
315    }
316
317    fn new_output(&self, path: &str) -> Result<OutputFile> {
318        Ok(OutputFile::new(
319            Arc::new(self.resolve(path)?.as_ref().clone()),
320            path.to_string(),
321        ))
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    /// Builds a resolving storage with empty props, suitable for `resolve()`
330    /// calls that don't actually hit any backend.
331    fn empty_resolving_storage() -> OpenDalResolvingStorage {
332        OpenDalResolvingStorage {
333            props: HashMap::new(),
334            storages: RwLock::new(HashMap::new()),
335            #[cfg(feature = "opendal-s3")]
336            customized_credential_load: None,
337        }
338    }
339
340    #[cfg(feature = "opendal-s3")]
341    #[test]
342    fn test_resolve_s3_aliases_share_instance() {
343        let storage = empty_resolving_storage();
344
345        // All three S3-family schemes must collapse to a single cached
346        // `Arc<OpenDalStorage>` so that catalogs handing the resolver a mix
347        // of `s3://`, `s3a://`, `s3n://` paths don't rebuild operators.
348        let a = storage.resolve("s3://bucket/key").unwrap();
349        let b = storage.resolve("s3a://bucket/key").unwrap();
350        let c = storage.resolve("s3n://bucket/key").unwrap();
351
352        assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
353        assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
354    }
355
356    #[cfg(feature = "opendal-azdls")]
357    #[test]
358    fn test_resolve_azdls_aliases_share_instance() {
359        let storage = empty_resolving_storage();
360
361        let path_for = |scheme: &str| {
362            format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
363        };
364
365        // All Azure schemes collapse onto one cached instance.
366        let abfss = storage.resolve(&path_for("abfss")).unwrap();
367        let abfs = storage.resolve(&path_for("abfs")).unwrap();
368
369        assert!(
370            Arc::ptr_eq(&abfss, &abfs),
371            "abfss and abfs should share one instance"
372        );
373    }
374}