iceberg_storage_opendal/
resolving.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Resolving storage that auto-detects the scheme from a path and delegates
19//! to the appropriate [`OpenDalStorage`] variant.
20
21use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30    StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use opendal::Scheme;
34use serde::{Deserialize, Serialize};
35use url::Url;
36
37use crate::OpenDalStorage;
38#[cfg(feature = "opendal-s3")]
39use crate::s3::CustomAwsCredentialLoader;
40
41/// Schemes supported by OpenDalResolvingStorage
42pub const SCHEME_MEMORY: &str = "memory";
43pub const SCHEME_FILE: &str = "file";
44pub const SCHEME_S3: &str = "s3";
45pub const SCHEME_S3A: &str = "s3a";
46pub const SCHEME_S3N: &str = "s3n";
47pub const SCHEME_GS: &str = "gs";
48pub const SCHEME_GCS: &str = "gcs";
49pub const SCHEME_OSS: &str = "oss";
50pub const SCHEME_ABFSS: &str = "abfss";
51pub const SCHEME_ABFS: &str = "abfs";
52pub const SCHEME_WASBS: &str = "wasbs";
53pub const SCHEME_WASB: &str = "wasb";
54
55/// Parse a URL scheme string into an [`opendal::Scheme`].
56fn parse_scheme(scheme: &str) -> Result<Scheme> {
57    match scheme {
58        SCHEME_MEMORY => Ok(Scheme::Memory),
59        SCHEME_FILE | "" => Ok(Scheme::Fs),
60        SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
61        SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
62        SCHEME_OSS => Ok(Scheme::Oss),
63        SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
64        s => s.parse::<Scheme>().map_err(|e| {
65            Error::new(
66                ErrorKind::FeatureUnsupported,
67                format!("Unsupported storage scheme: {s}: {e}"),
68            )
69        }),
70    }
71}
72
73/// Extract the scheme string from a path URL.
74fn extract_scheme(path: &str) -> Result<String> {
75    let url = Url::parse(path).map_err(|e| {
76        Error::new(
77            ErrorKind::DataInvalid,
78            format!("Invalid path: {path}, failed to parse URL: {e}"),
79        )
80    })?;
81    Ok(url.scheme().to_string())
82}
83
84/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
85fn build_storage_for_scheme(
86    scheme: &str,
87    props: &HashMap<String, String>,
88    #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
89) -> Result<OpenDalStorage> {
90    match parse_scheme(scheme)? {
91        #[cfg(feature = "opendal-s3")]
92        Scheme::S3 => {
93            let config = crate::s3::s3_config_parse(props.clone())?;
94            Ok(OpenDalStorage::S3 {
95                configured_scheme: scheme.to_string(),
96                config: Arc::new(config),
97                customized_credential_load: customized_credential_load.clone(),
98            })
99        }
100        #[cfg(feature = "opendal-gcs")]
101        Scheme::Gcs => {
102            let config = crate::gcs::gcs_config_parse(props.clone())?;
103            Ok(OpenDalStorage::Gcs {
104                config: Arc::new(config),
105            })
106        }
107        #[cfg(feature = "opendal-oss")]
108        Scheme::Oss => {
109            let config = crate::oss::oss_config_parse(props.clone())?;
110            Ok(OpenDalStorage::Oss {
111                config: Arc::new(config),
112            })
113        }
114        #[cfg(feature = "opendal-azdls")]
115        Scheme::Azdls => {
116            let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?;
117            let config = crate::azdls::azdls_config_parse(props.clone())?;
118            Ok(OpenDalStorage::Azdls {
119                configured_scheme,
120                config: Arc::new(config),
121            })
122        }
123        #[cfg(feature = "opendal-fs")]
124        Scheme::Fs => Ok(OpenDalStorage::LocalFs),
125        #[cfg(feature = "opendal-memory")]
126        Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
127        unsupported => Err(Error::new(
128            ErrorKind::FeatureUnsupported,
129            format!("Unsupported storage scheme: {unsupported}"),
130        )),
131    }
132}
133
134/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances.
135///
136/// This factory accepts paths from any supported storage system and dynamically
137/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
138/// the path scheme.
139///
140/// # Example
141///
142/// ```rust,ignore
143/// use std::sync::Arc;
144/// use iceberg::io::FileIOBuilder;
145/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
146///
147/// let factory = OpenDalResolvingStorageFactory::new();
148/// let file_io = FileIOBuilder::new(Arc::new(factory))
149///     .with_prop("s3.region", "us-east-1")
150///     .build();
151/// ```
152#[derive(Clone, Debug, Serialize, Deserialize)]
153pub struct OpenDalResolvingStorageFactory {
154    /// Custom AWS credential loader for S3 storage.
155    #[cfg(feature = "opendal-s3")]
156    #[serde(skip)]
157    customized_credential_load: Option<CustomAwsCredentialLoader>,
158}
159
160impl Default for OpenDalResolvingStorageFactory {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl OpenDalResolvingStorageFactory {
167    /// Create a new resolving storage factory.
168    pub fn new() -> Self {
169        Self {
170            #[cfg(feature = "opendal-s3")]
171            customized_credential_load: None,
172        }
173    }
174
175    /// Set a custom AWS credential loader for S3 storage.
176    #[cfg(feature = "opendal-s3")]
177    pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
178        self.customized_credential_load = Some(loader);
179        self
180    }
181}
182
183#[typetag::serde]
184impl StorageFactory for OpenDalResolvingStorageFactory {
185    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
186        Ok(Arc::new(OpenDalResolvingStorage {
187            props: config.props().clone(),
188            storages: RwLock::new(HashMap::new()),
189            #[cfg(feature = "opendal-s3")]
190            customized_credential_load: self.customized_credential_load.clone(),
191        }))
192    }
193}
194
195/// A resolving storage that auto-detects the scheme from a path and delegates
196/// to the appropriate [`OpenDalStorage`] variant.
197///
198/// Sub-storages are lazily created on first use for each scheme and cached
199/// for subsequent operations.
200#[derive(Debug, Serialize, Deserialize)]
201pub struct OpenDalResolvingStorage {
202    /// Configuration properties shared across all backends.
203    props: HashMap<String, String>,
204    /// Cache of scheme → storage mappings.
205    #[serde(skip, default)]
206    storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>,
207    /// Custom AWS credential loader for S3 storage.
208    #[cfg(feature = "opendal-s3")]
209    #[serde(skip)]
210    customized_credential_load: Option<CustomAwsCredentialLoader>,
211}
212
213impl OpenDalResolvingStorage {
214    /// Resolve the storage for the given path by extracting the scheme and
215    /// returning the cached or newly-created [`OpenDalStorage`].
216    fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
217        let scheme = extract_scheme(path)?;
218
219        // Fast path: check read lock first.
220        {
221            let cache = self
222                .storages
223                .read()
224                .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
225            if let Some(storage) = cache.get(&scheme) {
226                return Ok(storage.clone());
227            }
228        }
229
230        // Slow path: build and insert under write lock.
231        let mut cache = self
232            .storages
233            .write()
234            .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
235
236        // Double-check after acquiring write lock.
237        if let Some(storage) = cache.get(&scheme) {
238            return Ok(storage.clone());
239        }
240
241        let storage = build_storage_for_scheme(
242            &scheme,
243            &self.props,
244            #[cfg(feature = "opendal-s3")]
245            &self.customized_credential_load,
246        )?;
247        let storage = Arc::new(storage);
248        cache.insert(scheme, storage.clone());
249        Ok(storage)
250    }
251}
252
253#[async_trait]
254#[typetag::serde]
255impl Storage for OpenDalResolvingStorage {
256    async fn exists(&self, path: &str) -> Result<bool> {
257        self.resolve(path)?.exists(path).await
258    }
259
260    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
261        self.resolve(path)?.metadata(path).await
262    }
263
264    async fn read(&self, path: &str) -> Result<Bytes> {
265        self.resolve(path)?.read(path).await
266    }
267
268    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
269        self.resolve(path)?.reader(path).await
270    }
271
272    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
273        self.resolve(path)?.write(path, bs).await
274    }
275
276    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
277        self.resolve(path)?.writer(path).await
278    }
279
280    async fn delete(&self, path: &str) -> Result<()> {
281        self.resolve(path)?.delete(path).await
282    }
283
284    async fn delete_prefix(&self, path: &str) -> Result<()> {
285        self.resolve(path)?.delete_prefix(path).await
286    }
287
288    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
289        // Group paths by scheme so each resolved storage receives a batch,
290        // avoiding repeated operator creation per path.
291        let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
292        while let Some(path) = paths.next().await {
293            let scheme = extract_scheme(&path)?;
294            grouped.entry(scheme).or_default().push(path);
295        }
296
297        for (_, paths) in grouped {
298            let storage = self.resolve(&paths[0])?;
299            storage
300                .delete_stream(futures::stream::iter(paths).boxed())
301                .await?;
302        }
303        Ok(())
304    }
305
306    fn new_input(&self, path: &str) -> Result<InputFile> {
307        Ok(InputFile::new(
308            Arc::new(self.resolve(path)?.as_ref().clone()),
309            path.to_string(),
310        ))
311    }
312
313    fn new_output(&self, path: &str) -> Result<OutputFile> {
314        Ok(OutputFile::new(
315            Arc::new(self.resolve(path)?.as_ref().clone()),
316            path.to_string(),
317        ))
318    }
319}