1use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30 StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use serde::{Deserialize, Serialize};
34use url::Url;
35
36use crate::OpenDalStorage;
37#[cfg(feature = "opendal-s3")]
38use crate::s3::CustomAwsCredentialLoader;
39
40pub const SCHEME_MEMORY: &str = "memory";
42pub const SCHEME_FILE: &str = "file";
43pub const SCHEME_S3: &str = "s3";
44pub const SCHEME_S3A: &str = "s3a";
45pub const SCHEME_S3N: &str = "s3n";
46pub const SCHEME_GS: &str = "gs";
47pub const SCHEME_GCS: &str = "gcs";
48pub const SCHEME_OSS: &str = "oss";
49pub const SCHEME_ABFSS: &str = "abfss";
50pub const SCHEME_ABFS: &str = "abfs";
51pub const SCHEME_WASBS: &str = "wasbs";
52pub const SCHEME_WASB: &str = "wasb";
53
54fn parse_scheme(scheme: &str) -> Result<&'static str> {
56 match scheme {
57 SCHEME_MEMORY => Ok("memory"),
58 SCHEME_FILE | "" => Ok("file"),
59 SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok("s3"),
60 SCHEME_GS | SCHEME_GCS => Ok("gcs"),
61 SCHEME_OSS => Ok("oss"),
62 SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"),
63 s => Err(Error::new(
64 ErrorKind::FeatureUnsupported,
65 format!("Unsupported storage scheme: {s}"),
66 )),
67 }
68}
69
70fn extract_scheme(path: &str) -> Result<&'static str> {
72 let url = Url::parse(path).map_err(|e| {
73 Error::new(
74 ErrorKind::DataInvalid,
75 format!("Invalid path: {path}, failed to parse URL: {e}"),
76 )
77 })?;
78 parse_scheme(url.scheme())
79}
80
81fn build_storage_for_scheme(
83 scheme: &'static str,
84 props: &HashMap<String, String>,
85 #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
86) -> Result<OpenDalStorage> {
87 match scheme {
88 #[cfg(feature = "opendal-s3")]
89 "s3" => {
90 let config = crate::s3::s3_config_parse(props.clone())?;
91 Ok(OpenDalStorage::S3 {
92 config: Arc::new(config),
93 customized_credential_load: customized_credential_load.clone(),
94 })
95 }
96 #[cfg(feature = "opendal-gcs")]
97 "gcs" => {
98 let config = crate::gcs::gcs_config_parse(props.clone())?;
99 Ok(OpenDalStorage::Gcs {
100 config: Arc::new(config),
101 })
102 }
103 #[cfg(feature = "opendal-oss")]
104 "oss" => {
105 let config = crate::oss::oss_config_parse(props.clone())?;
106 Ok(OpenDalStorage::Oss {
107 config: Arc::new(config),
108 })
109 }
110 #[cfg(feature = "opendal-azdls")]
111 "azdls" => {
112 let config = crate::azdls::azdls_config_parse(props.clone())?;
113 Ok(OpenDalStorage::Azdls {
114 config: Arc::new(config),
115 })
116 }
117 #[cfg(feature = "opendal-fs")]
118 "file" => Ok(OpenDalStorage::LocalFs),
119 #[cfg(feature = "opendal-memory")]
120 "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
121 unsupported => Err(Error::new(
122 ErrorKind::FeatureUnsupported,
123 format!("Unsupported storage scheme: {unsupported}"),
124 )),
125 }
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct OpenDalResolvingStorageFactory {
148 #[cfg(feature = "opendal-s3")]
150 #[serde(skip)]
151 customized_credential_load: Option<CustomAwsCredentialLoader>,
152}
153
154impl Default for OpenDalResolvingStorageFactory {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160impl OpenDalResolvingStorageFactory {
161 pub fn new() -> Self {
163 Self {
164 #[cfg(feature = "opendal-s3")]
165 customized_credential_load: None,
166 }
167 }
168
169 #[cfg(feature = "opendal-s3")]
171 pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
172 self.customized_credential_load = Some(loader);
173 self
174 }
175}
176
177#[typetag::serde]
178impl StorageFactory for OpenDalResolvingStorageFactory {
179 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
180 Ok(Arc::new(OpenDalResolvingStorage {
181 props: config.props().clone(),
182 storages: RwLock::new(HashMap::new()),
183 #[cfg(feature = "opendal-s3")]
184 customized_credential_load: self.customized_credential_load.clone(),
185 }))
186 }
187}
188
189#[derive(Debug, Serialize, Deserialize)]
196pub struct OpenDalResolvingStorage {
197 props: HashMap<String, String>,
199 #[serde(skip, default)]
201 storages: RwLock<HashMap<&'static str, Arc<OpenDalStorage>>>,
202 #[cfg(feature = "opendal-s3")]
204 #[serde(skip)]
205 customized_credential_load: Option<CustomAwsCredentialLoader>,
206}
207
208impl OpenDalResolvingStorage {
209 fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
212 let scheme = extract_scheme(path)?;
213
214 {
216 let cache = self
217 .storages
218 .read()
219 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
220 if let Some(storage) = cache.get(&scheme) {
221 return Ok(storage.clone());
222 }
223 }
224
225 let mut cache = self
227 .storages
228 .write()
229 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
230
231 if let Some(storage) = cache.get(&scheme) {
233 return Ok(storage.clone());
234 }
235
236 let storage = build_storage_for_scheme(
237 scheme,
238 &self.props,
239 #[cfg(feature = "opendal-s3")]
240 &self.customized_credential_load,
241 )?;
242 let storage = Arc::new(storage);
243 cache.insert(scheme, storage.clone());
244 Ok(storage)
245 }
246}
247
248#[async_trait]
249#[typetag::serde]
250impl Storage for OpenDalResolvingStorage {
251 async fn exists(&self, path: &str) -> Result<bool> {
252 self.resolve(path)?.exists(path).await
253 }
254
255 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
256 self.resolve(path)?.metadata(path).await
257 }
258
259 async fn read(&self, path: &str) -> Result<Bytes> {
260 self.resolve(path)?.read(path).await
261 }
262
263 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
264 self.resolve(path)?.reader(path).await
265 }
266
267 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
268 self.resolve(path)?.write(path, bs).await
269 }
270
271 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
272 self.resolve(path)?.writer(path).await
273 }
274
275 async fn delete(&self, path: &str) -> Result<()> {
276 self.resolve(path)?.delete(path).await
277 }
278
279 async fn delete_prefix(&self, path: &str) -> Result<()> {
280 self.resolve(path)?.delete_prefix(path).await
281 }
282
283 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
284 let mut grouped: HashMap<&'static str, Vec<String>> = HashMap::new();
287 while let Some(path) = paths.next().await {
288 let scheme = extract_scheme(&path)?;
289 grouped.entry(scheme).or_default().push(path);
290 }
291
292 for (_, paths) in grouped {
293 let storage = self.resolve(&paths[0])?;
294 storage
295 .delete_stream(futures::stream::iter(paths).boxed())
296 .await?;
297 }
298 Ok(())
299 }
300
301 fn new_input(&self, path: &str) -> Result<InputFile> {
302 Ok(InputFile::new(
303 Arc::new(self.resolve(path)?.as_ref().clone()),
304 path.to_string(),
305 ))
306 }
307
308 fn new_output(&self, path: &str) -> Result<OutputFile> {
309 Ok(OutputFile::new(
310 Arc::new(self.resolve(path)?.as_ref().clone()),
311 path.to_string(),
312 ))
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 fn empty_resolving_storage() -> OpenDalResolvingStorage {
323 OpenDalResolvingStorage {
324 props: HashMap::new(),
325 storages: RwLock::new(HashMap::new()),
326 #[cfg(feature = "opendal-s3")]
327 customized_credential_load: None,
328 }
329 }
330
331 #[cfg(feature = "opendal-s3")]
332 #[test]
333 fn test_resolve_s3_aliases_share_instance() {
334 let storage = empty_resolving_storage();
335
336 let a = storage.resolve("s3://bucket/key").unwrap();
340 let b = storage.resolve("s3a://bucket/key").unwrap();
341 let c = storage.resolve("s3n://bucket/key").unwrap();
342
343 assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
344 assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
345 }
346
347 #[cfg(feature = "opendal-azdls")]
348 #[test]
349 fn test_resolve_azdls_aliases_share_instance() {
350 let storage = empty_resolving_storage();
351
352 let path_for = |scheme: &str| {
353 format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
354 };
355
356 let abfss = storage.resolve(&path_for("abfss")).unwrap();
358 let abfs = storage.resolve(&path_for("abfs")).unwrap();
359
360 assert!(
361 Arc::ptr_eq(&abfss, &abfs),
362 "abfss and abfs should share one instance"
363 );
364 }
365}