iceberg_storage_opendal/
resolving.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Resolving storage that auto-detects the scheme from a path and delegates
19//! to the appropriate [`OpenDalStorage`] variant.
20
21use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30    StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use serde::{Deserialize, Serialize};
34use url::Url;
35
36use crate::OpenDalStorage;
37#[cfg(feature = "opendal-s3")]
38use crate::s3::CustomAwsCredentialLoader;
39
40/// Schemes supported by OpenDalResolvingStorage
41pub const SCHEME_MEMORY: &str = "memory";
42pub const SCHEME_FILE: &str = "file";
43pub const SCHEME_S3: &str = "s3";
44pub const SCHEME_S3A: &str = "s3a";
45pub const SCHEME_S3N: &str = "s3n";
46pub const SCHEME_GS: &str = "gs";
47pub const SCHEME_GCS: &str = "gcs";
48pub const SCHEME_OSS: &str = "oss";
49pub const SCHEME_ABFSS: &str = "abfss";
50pub const SCHEME_ABFS: &str = "abfs";
51pub const SCHEME_WASBS: &str = "wasbs";
52pub const SCHEME_WASB: &str = "wasb";
53
54/// Parse a URL scheme string.
55fn parse_scheme(scheme: &str) -> Result<&'static str> {
56    match scheme {
57        SCHEME_MEMORY => Ok("memory"),
58        SCHEME_FILE | "" => Ok("file"),
59        SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok("s3"),
60        SCHEME_GS | SCHEME_GCS => Ok("gcs"),
61        SCHEME_OSS => Ok("oss"),
62        SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"),
63        s => Err(Error::new(
64            ErrorKind::FeatureUnsupported,
65            format!("Unsupported storage scheme: {s}"),
66        )),
67    }
68}
69
70/// Extract the scheme from a path URL.
71fn extract_scheme(path: &str) -> Result<&'static str> {
72    let url = Url::parse(path).map_err(|e| {
73        Error::new(
74            ErrorKind::DataInvalid,
75            format!("Invalid path: {path}, failed to parse URL: {e}"),
76        )
77    })?;
78    parse_scheme(url.scheme())
79}
80
81/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
82fn build_storage_for_scheme(
83    scheme: &'static str,
84    props: &HashMap<String, String>,
85    #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
86) -> Result<OpenDalStorage> {
87    match scheme {
88        #[cfg(feature = "opendal-s3")]
89        "s3" => {
90            let config = crate::s3::s3_config_parse(props.clone())?;
91            Ok(OpenDalStorage::S3 {
92                config: Arc::new(config),
93                customized_credential_load: customized_credential_load.clone(),
94            })
95        }
96        #[cfg(feature = "opendal-gcs")]
97        "gcs" => {
98            let config = crate::gcs::gcs_config_parse(props.clone())?;
99            Ok(OpenDalStorage::Gcs {
100                config: Arc::new(config),
101            })
102        }
103        #[cfg(feature = "opendal-oss")]
104        "oss" => {
105            let config = crate::oss::oss_config_parse(props.clone())?;
106            Ok(OpenDalStorage::Oss {
107                config: Arc::new(config),
108            })
109        }
110        #[cfg(feature = "opendal-azdls")]
111        "azdls" => {
112            let config = crate::azdls::azdls_config_parse(props.clone())?;
113            Ok(OpenDalStorage::Azdls {
114                config: Arc::new(config),
115            })
116        }
117        #[cfg(feature = "opendal-fs")]
118        "file" => Ok(OpenDalStorage::LocalFs),
119        #[cfg(feature = "opendal-memory")]
120        "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
121        unsupported => Err(Error::new(
122            ErrorKind::FeatureUnsupported,
123            format!("Unsupported storage scheme: {unsupported}"),
124        )),
125    }
126}
127
128/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances.
129///
130/// This factory accepts paths from any supported storage system and dynamically
131/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
132/// the path scheme.
133///
134/// # Example
135///
136/// ```rust,ignore
137/// use std::sync::Arc;
138/// use iceberg::io::FileIOBuilder;
139/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
140///
141/// let factory = OpenDalResolvingStorageFactory::new();
142/// let file_io = FileIOBuilder::new(Arc::new(factory))
143///     .with_prop("s3.region", "us-east-1")
144///     .build();
145/// ```
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct OpenDalResolvingStorageFactory {
148    /// Custom AWS credential loader for S3 storage.
149    #[cfg(feature = "opendal-s3")]
150    #[serde(skip)]
151    customized_credential_load: Option<CustomAwsCredentialLoader>,
152}
153
154impl Default for OpenDalResolvingStorageFactory {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160impl OpenDalResolvingStorageFactory {
161    /// Create a new resolving storage factory.
162    pub fn new() -> Self {
163        Self {
164            #[cfg(feature = "opendal-s3")]
165            customized_credential_load: None,
166        }
167    }
168
169    /// Set a custom AWS credential loader for S3 storage.
170    #[cfg(feature = "opendal-s3")]
171    pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
172        self.customized_credential_load = Some(loader);
173        self
174    }
175}
176
177#[typetag::serde]
178impl StorageFactory for OpenDalResolvingStorageFactory {
179    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
180        Ok(Arc::new(OpenDalResolvingStorage {
181            props: config.props().clone(),
182            storages: RwLock::new(HashMap::new()),
183            #[cfg(feature = "opendal-s3")]
184            customized_credential_load: self.customized_credential_load.clone(),
185        }))
186    }
187}
188
189/// A resolving storage that auto-detects the scheme from a path and delegates
190/// to the appropriate [`OpenDalStorage`] variant.
191///
192/// Sub-storages are lazily created on first use for each scheme and cached
193/// for subsequent operations. Scheme aliases like `s3`/`s3a`/`s3n` map to
194/// the same canonical scheme, so they share a storage instance.
195#[derive(Debug, Serialize, Deserialize)]
196pub struct OpenDalResolvingStorage {
197    /// Configuration properties shared across all backends.
198    props: HashMap<String, String>,
199    /// Cache of scheme to storage mappings.
200    #[serde(skip, default)]
201    storages: RwLock<HashMap<&'static str, Arc<OpenDalStorage>>>,
202    /// Custom AWS credential loader for S3 storage.
203    #[cfg(feature = "opendal-s3")]
204    #[serde(skip)]
205    customized_credential_load: Option<CustomAwsCredentialLoader>,
206}
207
208impl OpenDalResolvingStorage {
209    /// Resolve the storage for the given path by extracting the scheme and
210    /// returning the cached or newly-created [`OpenDalStorage`].
211    fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
212        let scheme = extract_scheme(path)?;
213
214        // Fast path: check read lock first.
215        {
216            let cache = self
217                .storages
218                .read()
219                .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
220            if let Some(storage) = cache.get(&scheme) {
221                return Ok(storage.clone());
222            }
223        }
224
225        // Slow path: build and insert under write lock.
226        let mut cache = self
227            .storages
228            .write()
229            .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
230
231        // Double-check after acquiring write lock.
232        if let Some(storage) = cache.get(&scheme) {
233            return Ok(storage.clone());
234        }
235
236        let storage = build_storage_for_scheme(
237            scheme,
238            &self.props,
239            #[cfg(feature = "opendal-s3")]
240            &self.customized_credential_load,
241        )?;
242        let storage = Arc::new(storage);
243        cache.insert(scheme, storage.clone());
244        Ok(storage)
245    }
246}
247
248#[async_trait]
249#[typetag::serde]
250impl Storage for OpenDalResolvingStorage {
251    async fn exists(&self, path: &str) -> Result<bool> {
252        self.resolve(path)?.exists(path).await
253    }
254
255    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
256        self.resolve(path)?.metadata(path).await
257    }
258
259    async fn read(&self, path: &str) -> Result<Bytes> {
260        self.resolve(path)?.read(path).await
261    }
262
263    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
264        self.resolve(path)?.reader(path).await
265    }
266
267    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
268        self.resolve(path)?.write(path, bs).await
269    }
270
271    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
272        self.resolve(path)?.writer(path).await
273    }
274
275    async fn delete(&self, path: &str) -> Result<()> {
276        self.resolve(path)?.delete(path).await
277    }
278
279    async fn delete_prefix(&self, path: &str) -> Result<()> {
280        self.resolve(path)?.delete_prefix(path).await
281    }
282
283    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
284        // Group paths by scheme so each resolved storage receives a batch,
285        // avoiding repeated operator creation per path.
286        let mut grouped: HashMap<&'static str, Vec<String>> = HashMap::new();
287        while let Some(path) = paths.next().await {
288            let scheme = extract_scheme(&path)?;
289            grouped.entry(scheme).or_default().push(path);
290        }
291
292        for (_, paths) in grouped {
293            let storage = self.resolve(&paths[0])?;
294            storage
295                .delete_stream(futures::stream::iter(paths).boxed())
296                .await?;
297        }
298        Ok(())
299    }
300
301    fn new_input(&self, path: &str) -> Result<InputFile> {
302        Ok(InputFile::new(
303            Arc::new(self.resolve(path)?.as_ref().clone()),
304            path.to_string(),
305        ))
306    }
307
308    fn new_output(&self, path: &str) -> Result<OutputFile> {
309        Ok(OutputFile::new(
310            Arc::new(self.resolve(path)?.as_ref().clone()),
311            path.to_string(),
312        ))
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    /// Builds a resolving storage with empty props, suitable for `resolve()`
321    /// calls that don't actually hit any backend.
322    fn empty_resolving_storage() -> OpenDalResolvingStorage {
323        OpenDalResolvingStorage {
324            props: HashMap::new(),
325            storages: RwLock::new(HashMap::new()),
326            #[cfg(feature = "opendal-s3")]
327            customized_credential_load: None,
328        }
329    }
330
331    #[cfg(feature = "opendal-s3")]
332    #[test]
333    fn test_resolve_s3_aliases_share_instance() {
334        let storage = empty_resolving_storage();
335
336        // All three S3-family schemes must collapse to a single cached
337        // `Arc<OpenDalStorage>` so that catalogs handing the resolver a mix
338        // of `s3://`, `s3a://`, `s3n://` paths don't rebuild operators.
339        let a = storage.resolve("s3://bucket/key").unwrap();
340        let b = storage.resolve("s3a://bucket/key").unwrap();
341        let c = storage.resolve("s3n://bucket/key").unwrap();
342
343        assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
344        assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
345    }
346
347    #[cfg(feature = "opendal-azdls")]
348    #[test]
349    fn test_resolve_azdls_aliases_share_instance() {
350        let storage = empty_resolving_storage();
351
352        let path_for = |scheme: &str| {
353            format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
354        };
355
356        // All Azure schemes collapse onto one cached instance.
357        let abfss = storage.resolve(&path_for("abfss")).unwrap();
358        let abfs = storage.resolve(&path_for("abfs")).unwrap();
359
360        assert!(
361            Arc::ptr_eq(&abfss, &abfs),
362            "abfss and abfs should share one instance"
363        );
364    }
365}