iceberg_storage_opendal/
resolving.rs1use std::collections::HashMap;
22use std::sync::{Arc, RwLock};
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use futures::StreamExt;
27use futures::stream::BoxStream;
28use iceberg::io::{
29 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30 StorageFactory,
31};
32use iceberg::{Error, ErrorKind, Result};
33use opendal::Scheme;
34use serde::{Deserialize, Serialize};
35use url::Url;
36
37use crate::OpenDalStorage;
38#[cfg(feature = "opendal-s3")]
39use crate::s3::CustomAwsCredentialLoader;
40
41pub const SCHEME_MEMORY: &str = "memory";
43pub const SCHEME_FILE: &str = "file";
44pub const SCHEME_S3: &str = "s3";
45pub const SCHEME_S3A: &str = "s3a";
46pub const SCHEME_S3N: &str = "s3n";
47pub const SCHEME_GS: &str = "gs";
48pub const SCHEME_GCS: &str = "gcs";
49pub const SCHEME_OSS: &str = "oss";
50pub const SCHEME_ABFSS: &str = "abfss";
51pub const SCHEME_ABFS: &str = "abfs";
52pub const SCHEME_WASBS: &str = "wasbs";
53pub const SCHEME_WASB: &str = "wasb";
54
55fn parse_scheme(scheme: &str) -> Result<Scheme> {
57 match scheme {
58 SCHEME_MEMORY => Ok(Scheme::Memory),
59 SCHEME_FILE | "" => Ok(Scheme::Fs),
60 SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
61 SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
62 SCHEME_OSS => Ok(Scheme::Oss),
63 SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
64 s => s.parse::<Scheme>().map_err(|e| {
65 Error::new(
66 ErrorKind::FeatureUnsupported,
67 format!("Unsupported storage scheme: {s}: {e}"),
68 )
69 }),
70 }
71}
72
73fn extract_scheme(path: &str) -> Result<String> {
75 let url = Url::parse(path).map_err(|e| {
76 Error::new(
77 ErrorKind::DataInvalid,
78 format!("Invalid path: {path}, failed to parse URL: {e}"),
79 )
80 })?;
81 Ok(url.scheme().to_string())
82}
83
84fn build_storage_for_scheme(
86 scheme: &str,
87 props: &HashMap<String, String>,
88 #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
89) -> Result<OpenDalStorage> {
90 match parse_scheme(scheme)? {
91 #[cfg(feature = "opendal-s3")]
92 Scheme::S3 => {
93 let config = crate::s3::s3_config_parse(props.clone())?;
94 Ok(OpenDalStorage::S3 {
95 configured_scheme: scheme.to_string(),
96 config: Arc::new(config),
97 customized_credential_load: customized_credential_load.clone(),
98 })
99 }
100 #[cfg(feature = "opendal-gcs")]
101 Scheme::Gcs => {
102 let config = crate::gcs::gcs_config_parse(props.clone())?;
103 Ok(OpenDalStorage::Gcs {
104 config: Arc::new(config),
105 })
106 }
107 #[cfg(feature = "opendal-oss")]
108 Scheme::Oss => {
109 let config = crate::oss::oss_config_parse(props.clone())?;
110 Ok(OpenDalStorage::Oss {
111 config: Arc::new(config),
112 })
113 }
114 #[cfg(feature = "opendal-azdls")]
115 Scheme::Azdls => {
116 let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?;
117 let config = crate::azdls::azdls_config_parse(props.clone())?;
118 Ok(OpenDalStorage::Azdls {
119 configured_scheme,
120 config: Arc::new(config),
121 })
122 }
123 #[cfg(feature = "opendal-fs")]
124 Scheme::Fs => Ok(OpenDalStorage::LocalFs),
125 #[cfg(feature = "opendal-memory")]
126 Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
127 unsupported => Err(Error::new(
128 ErrorKind::FeatureUnsupported,
129 format!("Unsupported storage scheme: {unsupported}"),
130 )),
131 }
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize)]
153pub struct OpenDalResolvingStorageFactory {
154 #[cfg(feature = "opendal-s3")]
156 #[serde(skip)]
157 customized_credential_load: Option<CustomAwsCredentialLoader>,
158}
159
160impl Default for OpenDalResolvingStorageFactory {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166impl OpenDalResolvingStorageFactory {
167 pub fn new() -> Self {
169 Self {
170 #[cfg(feature = "opendal-s3")]
171 customized_credential_load: None,
172 }
173 }
174
175 #[cfg(feature = "opendal-s3")]
177 pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
178 self.customized_credential_load = Some(loader);
179 self
180 }
181}
182
183#[typetag::serde]
184impl StorageFactory for OpenDalResolvingStorageFactory {
185 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
186 Ok(Arc::new(OpenDalResolvingStorage {
187 props: config.props().clone(),
188 storages: RwLock::new(HashMap::new()),
189 #[cfg(feature = "opendal-s3")]
190 customized_credential_load: self.customized_credential_load.clone(),
191 }))
192 }
193}
194
195#[derive(Debug, Serialize, Deserialize)]
201pub struct OpenDalResolvingStorage {
202 props: HashMap<String, String>,
204 #[serde(skip, default)]
206 storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>,
207 #[cfg(feature = "opendal-s3")]
209 #[serde(skip)]
210 customized_credential_load: Option<CustomAwsCredentialLoader>,
211}
212
213impl OpenDalResolvingStorage {
214 fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
217 let scheme = extract_scheme(path)?;
218
219 {
221 let cache = self
222 .storages
223 .read()
224 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
225 if let Some(storage) = cache.get(&scheme) {
226 return Ok(storage.clone());
227 }
228 }
229
230 let mut cache = self
232 .storages
233 .write()
234 .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
235
236 if let Some(storage) = cache.get(&scheme) {
238 return Ok(storage.clone());
239 }
240
241 let storage = build_storage_for_scheme(
242 &scheme,
243 &self.props,
244 #[cfg(feature = "opendal-s3")]
245 &self.customized_credential_load,
246 )?;
247 let storage = Arc::new(storage);
248 cache.insert(scheme, storage.clone());
249 Ok(storage)
250 }
251}
252
253#[async_trait]
254#[typetag::serde]
255impl Storage for OpenDalResolvingStorage {
256 async fn exists(&self, path: &str) -> Result<bool> {
257 self.resolve(path)?.exists(path).await
258 }
259
260 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
261 self.resolve(path)?.metadata(path).await
262 }
263
264 async fn read(&self, path: &str) -> Result<Bytes> {
265 self.resolve(path)?.read(path).await
266 }
267
268 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
269 self.resolve(path)?.reader(path).await
270 }
271
272 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
273 self.resolve(path)?.write(path, bs).await
274 }
275
276 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
277 self.resolve(path)?.writer(path).await
278 }
279
280 async fn delete(&self, path: &str) -> Result<()> {
281 self.resolve(path)?.delete(path).await
282 }
283
284 async fn delete_prefix(&self, path: &str) -> Result<()> {
285 self.resolve(path)?.delete_prefix(path).await
286 }
287
288 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
289 let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
292 while let Some(path) = paths.next().await {
293 let scheme = extract_scheme(&path)?;
294 grouped.entry(scheme).or_default().push(path);
295 }
296
297 for (_, paths) in grouped {
298 let storage = self.resolve(&paths[0])?;
299 storage
300 .delete_stream(futures::stream::iter(paths).boxed())
301 .await?;
302 }
303 Ok(())
304 }
305
306 fn new_input(&self, path: &str) -> Result<InputFile> {
307 Ok(InputFile::new(
308 Arc::new(self.resolve(path)?.as_ref().clone()),
309 path.to_string(),
310 ))
311 }
312
313 fn new_output(&self, path: &str) -> Result<OutputFile> {
314 Ok(OutputFile::new(
315 Arc::new(self.resolve(path)?.as_ref().clone()),
316 path.to_string(),
317 ))
318 }
319}