1use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30 StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use serde::{Deserialize, Serialize};
34use url::Url;
35
36use crate::OpenDalStorage;
37#[cfg(feature = "opendal-s3")]
38use crate::s3::CustomAwsCredentialLoader;
39
40pub const SCHEME_MEMORY: &str = "memory";
42pub const SCHEME_FILE: &str = "file";
43pub const SCHEME_S3: &str = "s3";
44pub const SCHEME_S3A: &str = "s3a";
45pub const SCHEME_S3N: &str = "s3n";
46pub const SCHEME_GS: &str = "gs";
47pub const SCHEME_GCS: &str = "gcs";
48pub const SCHEME_OSS: &str = "oss";
49pub const SCHEME_ABFSS: &str = "abfss";
50pub const SCHEME_ABFS: &str = "abfs";
51pub const SCHEME_WASBS: &str = "wasbs";
52pub const SCHEME_WASB: &str = "wasb";
53pub const SCHEME_HF: &str = "hf";
54
55fn parse_scheme(scheme: &str) -> Result<&'static str> {
57 match scheme {
58 SCHEME_MEMORY => Ok("memory"),
59 SCHEME_FILE | "" => Ok("file"),
60 SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok("s3"),
61 SCHEME_GS | SCHEME_GCS => Ok("gcs"),
62 SCHEME_OSS => Ok("oss"),
63 SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"),
64 SCHEME_HF => Ok("hf"),
65 s => Err(Error::new(
66 ErrorKind::FeatureUnsupported,
67 format!("Unsupported storage scheme: {s}"),
68 )),
69 }
70}
71
72fn extract_scheme(path: &str) -> Result<&'static str> {
74 let url = Url::parse(path).map_err(|e| {
75 Error::new(
76 ErrorKind::DataInvalid,
77 format!("Invalid path: {path}, failed to parse URL: {e}"),
78 )
79 })?;
80 parse_scheme(url.scheme())
81}
82
83fn build_storage_for_scheme(
85 scheme: &'static str,
86 props: &HashMap<String, String>,
87 #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
88) -> Result<OpenDalStorage> {
89 match scheme {
90 #[cfg(feature = "opendal-s3")]
91 "s3" => {
92 let config = crate::s3::s3_config_parse(props.clone())?;
93 Ok(OpenDalStorage::S3 {
94 config: Arc::new(config),
95 customized_credential_load: customized_credential_load.clone(),
96 })
97 }
98 #[cfg(feature = "opendal-gcs")]
99 "gcs" => {
100 let config = crate::gcs::gcs_config_parse(props.clone())?;
101 Ok(OpenDalStorage::Gcs {
102 config: Arc::new(config),
103 })
104 }
105 #[cfg(feature = "opendal-oss")]
106 "oss" => {
107 let config = crate::oss::oss_config_parse(props.clone())?;
108 Ok(OpenDalStorage::Oss {
109 config: Arc::new(config),
110 })
111 }
112 #[cfg(feature = "opendal-azdls")]
113 "azdls" => {
114 let config = crate::azdls::azdls_config_parse(props.clone())?;
115 Ok(OpenDalStorage::Azdls {
116 config: Arc::new(config),
117 })
118 }
119 #[cfg(feature = "opendal-fs")]
120 "file" => Ok(OpenDalStorage::LocalFs),
121 #[cfg(feature = "opendal-memory")]
122 "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
123 #[cfg(feature = "opendal-hf")]
124 "hf" => {
125 let config = crate::hf::hf_config_parse(props.clone())?;
126 Ok(OpenDalStorage::Hf {
127 config: Arc::new(config),
128 })
129 }
130 unsupported => Err(Error::new(
131 ErrorKind::FeatureUnsupported,
132 format!("Unsupported storage scheme: {unsupported}"),
133 )),
134 }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
156pub struct OpenDalResolvingStorageFactory {
157 #[cfg(feature = "opendal-s3")]
159 #[serde(skip)]
160 customized_credential_load: Option<CustomAwsCredentialLoader>,
161}
162
163impl Default for OpenDalResolvingStorageFactory {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169impl OpenDalResolvingStorageFactory {
170 pub fn new() -> Self {
172 Self {
173 #[cfg(feature = "opendal-s3")]
174 customized_credential_load: None,
175 }
176 }
177
178 #[cfg(feature = "opendal-s3")]
180 pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
181 self.customized_credential_load = Some(loader);
182 self
183 }
184}
185
186#[typetag::serde]
187impl StorageFactory for OpenDalResolvingStorageFactory {
188 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
189 Ok(Arc::new(OpenDalResolvingStorage {
190 props: config.props().clone(),
191 storages: RwLock::new(HashMap::new()),
192 #[cfg(feature = "opendal-s3")]
193 customized_credential_load: self.customized_credential_load.clone(),
194 }))
195 }
196}
197
198#[derive(Debug, Serialize, Deserialize)]
205pub struct OpenDalResolvingStorage {
206 props: HashMap<String, String>,
208 #[serde(skip, default)]
210 storages: RwLock<HashMap<&'static str, Arc<OpenDalStorage>>>,
211 #[cfg(feature = "opendal-s3")]
213 #[serde(skip)]
214 customized_credential_load: Option<CustomAwsCredentialLoader>,
215}
216
217impl OpenDalResolvingStorage {
218 fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
221 let scheme = extract_scheme(path)?;
222
223 {
225 let cache = self
226 .storages
227 .read()
228 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
229 if let Some(storage) = cache.get(&scheme) {
230 return Ok(storage.clone());
231 }
232 }
233
234 let mut cache = self
236 .storages
237 .write()
238 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
239
240 if let Some(storage) = cache.get(&scheme) {
242 return Ok(storage.clone());
243 }
244
245 let storage = build_storage_for_scheme(
246 scheme,
247 &self.props,
248 #[cfg(feature = "opendal-s3")]
249 &self.customized_credential_load,
250 )?;
251 let storage = Arc::new(storage);
252 cache.insert(scheme, storage.clone());
253 Ok(storage)
254 }
255}
256
257#[async_trait]
258#[typetag::serde]
259impl Storage for OpenDalResolvingStorage {
260 async fn exists(&self, path: &str) -> Result<bool> {
261 self.resolve(path)?.exists(path).await
262 }
263
264 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
265 self.resolve(path)?.metadata(path).await
266 }
267
268 async fn read(&self, path: &str) -> Result<Bytes> {
269 self.resolve(path)?.read(path).await
270 }
271
272 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
273 self.resolve(path)?.reader(path).await
274 }
275
276 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
277 self.resolve(path)?.write(path, bs).await
278 }
279
280 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
281 self.resolve(path)?.writer(path).await
282 }
283
284 async fn delete(&self, path: &str) -> Result<()> {
285 self.resolve(path)?.delete(path).await
286 }
287
288 async fn delete_prefix(&self, path: &str) -> Result<()> {
289 self.resolve(path)?.delete_prefix(path).await
290 }
291
292 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
293 let mut grouped: HashMap<&'static str, Vec<String>> = HashMap::new();
296 while let Some(path) = paths.next().await {
297 let scheme = extract_scheme(&path)?;
298 grouped.entry(scheme).or_default().push(path);
299 }
300
301 for (_, paths) in grouped {
302 let storage = self.resolve(&paths[0])?;
303 storage
304 .delete_stream(futures::stream::iter(paths).boxed())
305 .await?;
306 }
307 Ok(())
308 }
309
310 fn new_input(&self, path: &str) -> Result<InputFile> {
311 Ok(InputFile::new(
312 Arc::new(self.resolve(path)?.as_ref().clone()),
313 path.to_string(),
314 ))
315 }
316
317 fn new_output(&self, path: &str) -> Result<OutputFile> {
318 Ok(OutputFile::new(
319 Arc::new(self.resolve(path)?.as_ref().clone()),
320 path.to_string(),
321 ))
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 fn empty_resolving_storage() -> OpenDalResolvingStorage {
332 OpenDalResolvingStorage {
333 props: HashMap::new(),
334 storages: RwLock::new(HashMap::new()),
335 #[cfg(feature = "opendal-s3")]
336 customized_credential_load: None,
337 }
338 }
339
340 #[cfg(feature = "opendal-s3")]
341 #[test]
342 fn test_resolve_s3_aliases_share_instance() {
343 let storage = empty_resolving_storage();
344
345 let a = storage.resolve("s3://bucket/key").unwrap();
349 let b = storage.resolve("s3a://bucket/key").unwrap();
350 let c = storage.resolve("s3n://bucket/key").unwrap();
351
352 assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
353 assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
354 }
355
356 #[cfg(feature = "opendal-azdls")]
357 #[test]
358 fn test_resolve_azdls_aliases_share_instance() {
359 let storage = empty_resolving_storage();
360
361 let path_for = |scheme: &str| {
362 format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
363 };
364
365 let abfss = storage.resolve(&path_for("abfss")).unwrap();
367 let abfs = storage.resolve(&path_for("abfs")).unwrap();
368
369 assert!(
370 Arc::ptr_eq(&abfss, &abfs),
371 "abfss and abfs should share one instance"
372 );
373 }
374}