iceberg_storage_opendal/
resolving.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Resolving storage that auto-detects the scheme from a path and delegates
19//! to the appropriate [`OpenDalStorage`] variant.
20
21use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30    StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use opendal::Scheme;
34use serde::{Deserialize, Serialize};
35use url::Url;
36
37use crate::OpenDalStorage;
38#[cfg(feature = "opendal-s3")]
39use crate::s3::CustomAwsCredentialLoader;
40
41/// Schemes supported by OpenDalResolvingStorage
42pub const SCHEME_MEMORY: &str = "memory";
43pub const SCHEME_FILE: &str = "file";
44pub const SCHEME_S3: &str = "s3";
45pub const SCHEME_S3A: &str = "s3a";
46pub const SCHEME_S3N: &str = "s3n";
47pub const SCHEME_GS: &str = "gs";
48pub const SCHEME_GCS: &str = "gcs";
49pub const SCHEME_OSS: &str = "oss";
50pub const SCHEME_ABFSS: &str = "abfss";
51pub const SCHEME_ABFS: &str = "abfs";
52pub const SCHEME_WASBS: &str = "wasbs";
53pub const SCHEME_WASB: &str = "wasb";
54
55/// Parse a URL scheme string into an [`opendal::Scheme`].
56fn parse_scheme(scheme: &str) -> Result<Scheme> {
57    match scheme {
58        SCHEME_MEMORY => Ok(Scheme::Memory),
59        SCHEME_FILE | "" => Ok(Scheme::Fs),
60        SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
61        SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
62        SCHEME_OSS => Ok(Scheme::Oss),
63        SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
64        s => s.parse::<Scheme>().map_err(|e| {
65            Error::new(
66                ErrorKind::FeatureUnsupported,
67                format!("Unsupported storage scheme: {s}: {e}"),
68            )
69        }),
70    }
71}
72
73/// Extract the [`Scheme`] family from a path URL.
74fn extract_scheme(path: &str) -> Result<Scheme> {
75    let url = Url::parse(path).map_err(|e| {
76        Error::new(
77            ErrorKind::DataInvalid,
78            format!("Invalid path: {path}, failed to parse URL: {e}"),
79        )
80    })?;
81    parse_scheme(url.scheme())
82}
83
84/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
85fn build_storage_for_scheme(
86    scheme: Scheme,
87    props: &HashMap<String, String>,
88    #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
89) -> Result<OpenDalStorage> {
90    match scheme {
91        #[cfg(feature = "opendal-s3")]
92        Scheme::S3 => {
93            let config = crate::s3::s3_config_parse(props.clone())?;
94            Ok(OpenDalStorage::S3 {
95                config: Arc::new(config),
96                customized_credential_load: customized_credential_load.clone(),
97            })
98        }
99        #[cfg(feature = "opendal-gcs")]
100        Scheme::Gcs => {
101            let config = crate::gcs::gcs_config_parse(props.clone())?;
102            Ok(OpenDalStorage::Gcs {
103                config: Arc::new(config),
104            })
105        }
106        #[cfg(feature = "opendal-oss")]
107        Scheme::Oss => {
108            let config = crate::oss::oss_config_parse(props.clone())?;
109            Ok(OpenDalStorage::Oss {
110                config: Arc::new(config),
111            })
112        }
113        #[cfg(feature = "opendal-azdls")]
114        Scheme::Azdls => {
115            let config = crate::azdls::azdls_config_parse(props.clone())?;
116            Ok(OpenDalStorage::Azdls {
117                config: Arc::new(config),
118            })
119        }
120        #[cfg(feature = "opendal-fs")]
121        Scheme::Fs => Ok(OpenDalStorage::LocalFs),
122        #[cfg(feature = "opendal-memory")]
123        Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
124        unsupported => Err(Error::new(
125            ErrorKind::FeatureUnsupported,
126            format!("Unsupported storage scheme: {unsupported}"),
127        )),
128    }
129}
130
131/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances.
132///
133/// This factory accepts paths from any supported storage system and dynamically
134/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
135/// the path scheme.
136///
137/// # Example
138///
139/// ```rust,ignore
140/// use std::sync::Arc;
141/// use iceberg::io::FileIOBuilder;
142/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
143///
144/// let factory = OpenDalResolvingStorageFactory::new();
145/// let file_io = FileIOBuilder::new(Arc::new(factory))
146///     .with_prop("s3.region", "us-east-1")
147///     .build();
148/// ```
149#[derive(Clone, Debug, Serialize, Deserialize)]
150pub struct OpenDalResolvingStorageFactory {
151    /// Custom AWS credential loader for S3 storage.
152    #[cfg(feature = "opendal-s3")]
153    #[serde(skip)]
154    customized_credential_load: Option<CustomAwsCredentialLoader>,
155}
156
157impl Default for OpenDalResolvingStorageFactory {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl OpenDalResolvingStorageFactory {
164    /// Create a new resolving storage factory.
165    pub fn new() -> Self {
166        Self {
167            #[cfg(feature = "opendal-s3")]
168            customized_credential_load: None,
169        }
170    }
171
172    /// Set a custom AWS credential loader for S3 storage.
173    #[cfg(feature = "opendal-s3")]
174    pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
175        self.customized_credential_load = Some(loader);
176        self
177    }
178}
179
180#[typetag::serde]
181impl StorageFactory for OpenDalResolvingStorageFactory {
182    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
183        Ok(Arc::new(OpenDalResolvingStorage {
184            props: config.props().clone(),
185            storages: RwLock::new(HashMap::new()),
186            #[cfg(feature = "opendal-s3")]
187            customized_credential_load: self.customized_credential_load.clone(),
188        }))
189    }
190}
191
192/// A resolving storage that auto-detects the scheme from a path and delegates
193/// to the appropriate [`OpenDalStorage`] variant.
194///
195/// Sub-storages are lazily created on first use for each scheme and cached
196/// for subsequent operations. Scheme aliases like `s3`/`s3a`/`s3n` map to
197/// the same [`Scheme`] variant, so they share a storage instance.
198#[derive(Debug, Serialize, Deserialize)]
199pub struct OpenDalResolvingStorage {
200    /// Configuration properties shared across all backends.
201    props: HashMap<String, String>,
202    /// Cache of scheme to storage mappings.
203    #[serde(skip, default)]
204    storages: RwLock<HashMap<Scheme, Arc<OpenDalStorage>>>,
205    /// Custom AWS credential loader for S3 storage.
206    #[cfg(feature = "opendal-s3")]
207    #[serde(skip)]
208    customized_credential_load: Option<CustomAwsCredentialLoader>,
209}
210
211impl OpenDalResolvingStorage {
212    /// Resolve the storage for the given path by extracting the scheme and
213    /// returning the cached or newly-created [`OpenDalStorage`].
214    fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
215        let scheme = extract_scheme(path)?;
216
217        // Fast path: check read lock first.
218        {
219            let cache = self
220                .storages
221                .read()
222                .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
223            if let Some(storage) = cache.get(&scheme) {
224                return Ok(storage.clone());
225            }
226        }
227
228        // Slow path: build and insert under write lock.
229        let mut cache = self
230            .storages
231            .write()
232            .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
233
234        // Double-check after acquiring write lock.
235        if let Some(storage) = cache.get(&scheme) {
236            return Ok(storage.clone());
237        }
238
239        let storage = build_storage_for_scheme(
240            scheme,
241            &self.props,
242            #[cfg(feature = "opendal-s3")]
243            &self.customized_credential_load,
244        )?;
245        let storage = Arc::new(storage);
246        cache.insert(scheme, storage.clone());
247        Ok(storage)
248    }
249}
250
251#[async_trait]
252#[typetag::serde]
253impl Storage for OpenDalResolvingStorage {
254    async fn exists(&self, path: &str) -> Result<bool> {
255        self.resolve(path)?.exists(path).await
256    }
257
258    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
259        self.resolve(path)?.metadata(path).await
260    }
261
262    async fn read(&self, path: &str) -> Result<Bytes> {
263        self.resolve(path)?.read(path).await
264    }
265
266    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
267        self.resolve(path)?.reader(path).await
268    }
269
270    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
271        self.resolve(path)?.write(path, bs).await
272    }
273
274    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
275        self.resolve(path)?.writer(path).await
276    }
277
278    async fn delete(&self, path: &str) -> Result<()> {
279        self.resolve(path)?.delete(path).await
280    }
281
282    async fn delete_prefix(&self, path: &str) -> Result<()> {
283        self.resolve(path)?.delete_prefix(path).await
284    }
285
286    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
287        // Group paths by scheme so each resolved storage receives a batch,
288        // avoiding repeated operator creation per path.
289        let mut grouped: HashMap<Scheme, Vec<String>> = HashMap::new();
290        while let Some(path) = paths.next().await {
291            let scheme = extract_scheme(&path)?;
292            grouped.entry(scheme).or_default().push(path);
293        }
294
295        for (_, paths) in grouped {
296            let storage = self.resolve(&paths[0])?;
297            storage
298                .delete_stream(futures::stream::iter(paths).boxed())
299                .await?;
300        }
301        Ok(())
302    }
303
304    fn new_input(&self, path: &str) -> Result<InputFile> {
305        Ok(InputFile::new(
306            Arc::new(self.resolve(path)?.as_ref().clone()),
307            path.to_string(),
308        ))
309    }
310
311    fn new_output(&self, path: &str) -> Result<OutputFile> {
312        Ok(OutputFile::new(
313            Arc::new(self.resolve(path)?.as_ref().clone()),
314            path.to_string(),
315        ))
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    /// Builds a resolving storage with empty props, suitable for `resolve()`
324    /// calls that don't actually hit any backend.
325    fn empty_resolving_storage() -> OpenDalResolvingStorage {
326        OpenDalResolvingStorage {
327            props: HashMap::new(),
328            storages: RwLock::new(HashMap::new()),
329            #[cfg(feature = "opendal-s3")]
330            customized_credential_load: None,
331        }
332    }
333
334    #[cfg(feature = "opendal-s3")]
335    #[test]
336    fn test_resolve_s3_aliases_share_instance() {
337        let storage = empty_resolving_storage();
338
339        // All three S3-family schemes must collapse to a single cached
340        // `Arc<OpenDalStorage>` so that catalogs handing the resolver a mix
341        // of `s3://`, `s3a://`, `s3n://` paths don't rebuild operators.
342        let a = storage.resolve("s3://bucket/key").unwrap();
343        let b = storage.resolve("s3a://bucket/key").unwrap();
344        let c = storage.resolve("s3n://bucket/key").unwrap();
345
346        assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
347        assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
348    }
349
350    #[cfg(feature = "opendal-azdls")]
351    #[test]
352    fn test_resolve_azdls_aliases_share_instance() {
353        let storage = empty_resolving_storage();
354
355        let path_for = |scheme: &str| {
356            format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
357        };
358
359        // All Azure schemes collapse onto one cached instance.
360        let abfss = storage.resolve(&path_for("abfss")).unwrap();
361        let abfs = storage.resolve(&path_for("abfs")).unwrap();
362
363        assert!(
364            Arc::ptr_eq(&abfss, &abfs),
365            "abfss and abfs should share one instance"
366        );
367    }
368}