1use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30 StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use opendal::Scheme;
34use serde::{Deserialize, Serialize};
35use url::Url;
36
37use crate::OpenDalStorage;
38#[cfg(feature = "opendal-s3")]
39use crate::s3::CustomAwsCredentialLoader;
40
41pub const SCHEME_MEMORY: &str = "memory";
43pub const SCHEME_FILE: &str = "file";
44pub const SCHEME_S3: &str = "s3";
45pub const SCHEME_S3A: &str = "s3a";
46pub const SCHEME_S3N: &str = "s3n";
47pub const SCHEME_GS: &str = "gs";
48pub const SCHEME_GCS: &str = "gcs";
49pub const SCHEME_OSS: &str = "oss";
50pub const SCHEME_ABFSS: &str = "abfss";
51pub const SCHEME_ABFS: &str = "abfs";
52pub const SCHEME_WASBS: &str = "wasbs";
53pub const SCHEME_WASB: &str = "wasb";
54
55fn parse_scheme(scheme: &str) -> Result<Scheme> {
57 match scheme {
58 SCHEME_MEMORY => Ok(Scheme::Memory),
59 SCHEME_FILE | "" => Ok(Scheme::Fs),
60 SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
61 SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
62 SCHEME_OSS => Ok(Scheme::Oss),
63 SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
64 s => s.parse::<Scheme>().map_err(|e| {
65 Error::new(
66 ErrorKind::FeatureUnsupported,
67 format!("Unsupported storage scheme: {s}: {e}"),
68 )
69 }),
70 }
71}
72
73fn extract_scheme(path: &str) -> Result<Scheme> {
75 let url = Url::parse(path).map_err(|e| {
76 Error::new(
77 ErrorKind::DataInvalid,
78 format!("Invalid path: {path}, failed to parse URL: {e}"),
79 )
80 })?;
81 parse_scheme(url.scheme())
82}
83
84fn build_storage_for_scheme(
86 scheme: Scheme,
87 props: &HashMap<String, String>,
88 #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
89) -> Result<OpenDalStorage> {
90 match scheme {
91 #[cfg(feature = "opendal-s3")]
92 Scheme::S3 => {
93 let config = crate::s3::s3_config_parse(props.clone())?;
94 Ok(OpenDalStorage::S3 {
95 config: Arc::new(config),
96 customized_credential_load: customized_credential_load.clone(),
97 })
98 }
99 #[cfg(feature = "opendal-gcs")]
100 Scheme::Gcs => {
101 let config = crate::gcs::gcs_config_parse(props.clone())?;
102 Ok(OpenDalStorage::Gcs {
103 config: Arc::new(config),
104 })
105 }
106 #[cfg(feature = "opendal-oss")]
107 Scheme::Oss => {
108 let config = crate::oss::oss_config_parse(props.clone())?;
109 Ok(OpenDalStorage::Oss {
110 config: Arc::new(config),
111 })
112 }
113 #[cfg(feature = "opendal-azdls")]
114 Scheme::Azdls => {
115 let config = crate::azdls::azdls_config_parse(props.clone())?;
116 Ok(OpenDalStorage::Azdls {
117 config: Arc::new(config),
118 })
119 }
120 #[cfg(feature = "opendal-fs")]
121 Scheme::Fs => Ok(OpenDalStorage::LocalFs),
122 #[cfg(feature = "opendal-memory")]
123 Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
124 unsupported => Err(Error::new(
125 ErrorKind::FeatureUnsupported,
126 format!("Unsupported storage scheme: {unsupported}"),
127 )),
128 }
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
150pub struct OpenDalResolvingStorageFactory {
151 #[cfg(feature = "opendal-s3")]
153 #[serde(skip)]
154 customized_credential_load: Option<CustomAwsCredentialLoader>,
155}
156
157impl Default for OpenDalResolvingStorageFactory {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl OpenDalResolvingStorageFactory {
164 pub fn new() -> Self {
166 Self {
167 #[cfg(feature = "opendal-s3")]
168 customized_credential_load: None,
169 }
170 }
171
172 #[cfg(feature = "opendal-s3")]
174 pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
175 self.customized_credential_load = Some(loader);
176 self
177 }
178}
179
180#[typetag::serde]
181impl StorageFactory for OpenDalResolvingStorageFactory {
182 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
183 Ok(Arc::new(OpenDalResolvingStorage {
184 props: config.props().clone(),
185 storages: RwLock::new(HashMap::new()),
186 #[cfg(feature = "opendal-s3")]
187 customized_credential_load: self.customized_credential_load.clone(),
188 }))
189 }
190}
191
192#[derive(Debug, Serialize, Deserialize)]
199pub struct OpenDalResolvingStorage {
200 props: HashMap<String, String>,
202 #[serde(skip, default)]
204 storages: RwLock<HashMap<Scheme, Arc<OpenDalStorage>>>,
205 #[cfg(feature = "opendal-s3")]
207 #[serde(skip)]
208 customized_credential_load: Option<CustomAwsCredentialLoader>,
209}
210
211impl OpenDalResolvingStorage {
212 fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
215 let scheme = extract_scheme(path)?;
216
217 {
219 let cache = self
220 .storages
221 .read()
222 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
223 if let Some(storage) = cache.get(&scheme) {
224 return Ok(storage.clone());
225 }
226 }
227
228 let mut cache = self
230 .storages
231 .write()
232 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
233
234 if let Some(storage) = cache.get(&scheme) {
236 return Ok(storage.clone());
237 }
238
239 let storage = build_storage_for_scheme(
240 scheme,
241 &self.props,
242 #[cfg(feature = "opendal-s3")]
243 &self.customized_credential_load,
244 )?;
245 let storage = Arc::new(storage);
246 cache.insert(scheme, storage.clone());
247 Ok(storage)
248 }
249}
250
251#[async_trait]
252#[typetag::serde]
253impl Storage for OpenDalResolvingStorage {
254 async fn exists(&self, path: &str) -> Result<bool> {
255 self.resolve(path)?.exists(path).await
256 }
257
258 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
259 self.resolve(path)?.metadata(path).await
260 }
261
262 async fn read(&self, path: &str) -> Result<Bytes> {
263 self.resolve(path)?.read(path).await
264 }
265
266 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
267 self.resolve(path)?.reader(path).await
268 }
269
270 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
271 self.resolve(path)?.write(path, bs).await
272 }
273
274 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
275 self.resolve(path)?.writer(path).await
276 }
277
278 async fn delete(&self, path: &str) -> Result<()> {
279 self.resolve(path)?.delete(path).await
280 }
281
282 async fn delete_prefix(&self, path: &str) -> Result<()> {
283 self.resolve(path)?.delete_prefix(path).await
284 }
285
286 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
287 let mut grouped: HashMap<Scheme, Vec<String>> = HashMap::new();
290 while let Some(path) = paths.next().await {
291 let scheme = extract_scheme(&path)?;
292 grouped.entry(scheme).or_default().push(path);
293 }
294
295 for (_, paths) in grouped {
296 let storage = self.resolve(&paths[0])?;
297 storage
298 .delete_stream(futures::stream::iter(paths).boxed())
299 .await?;
300 }
301 Ok(())
302 }
303
304 fn new_input(&self, path: &str) -> Result<InputFile> {
305 Ok(InputFile::new(
306 Arc::new(self.resolve(path)?.as_ref().clone()),
307 path.to_string(),
308 ))
309 }
310
311 fn new_output(&self, path: &str) -> Result<OutputFile> {
312 Ok(OutputFile::new(
313 Arc::new(self.resolve(path)?.as_ref().clone()),
314 path.to_string(),
315 ))
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 fn empty_resolving_storage() -> OpenDalResolvingStorage {
326 OpenDalResolvingStorage {
327 props: HashMap::new(),
328 storages: RwLock::new(HashMap::new()),
329 #[cfg(feature = "opendal-s3")]
330 customized_credential_load: None,
331 }
332 }
333
334 #[cfg(feature = "opendal-s3")]
335 #[test]
336 fn test_resolve_s3_aliases_share_instance() {
337 let storage = empty_resolving_storage();
338
339 let a = storage.resolve("s3://bucket/key").unwrap();
343 let b = storage.resolve("s3a://bucket/key").unwrap();
344 let c = storage.resolve("s3n://bucket/key").unwrap();
345
346 assert!(Arc::ptr_eq(&a, &b), "s3 and s3a should share one instance");
347 assert!(Arc::ptr_eq(&a, &c), "s3 and s3n should share one instance");
348 }
349
350 #[cfg(feature = "opendal-azdls")]
351 #[test]
352 fn test_resolve_azdls_aliases_share_instance() {
353 let storage = empty_resolving_storage();
354
355 let path_for = |scheme: &str| {
356 format!("{scheme}://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
357 };
358
359 let abfss = storage.resolve(&path_for("abfss")).unwrap();
361 let abfs = storage.resolve(&path_for("abfs")).unwrap();
362
363 assert!(
364 Arc::ptr_eq(&abfss, &abfs),
365 "abfss and abfs should share one instance"
366 );
367 }
368}