1use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24#[cfg(feature = "storage-azdls")]
25use azdls::AzureStorageScheme;
26use bytes::Bytes;
27use opendal::layers::RetryLayer;
28#[cfg(feature = "storage-azdls")]
29use opendal::services::AzdlsConfig;
30#[cfg(feature = "storage-gcs")]
31use opendal::services::GcsConfig;
32#[cfg(feature = "storage-oss")]
33use opendal::services::OssConfig;
34#[cfg(feature = "storage-s3")]
35use opendal::services::S3Config;
36use opendal::{Operator, Scheme};
37#[cfg(feature = "storage-s3")]
38pub use s3::CustomAwsCredentialLoader;
39use serde::{Deserialize, Serialize};
40
41use super::{
42 FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
43 StorageConfig, StorageFactory,
44};
45use crate::{Error, ErrorKind, Result};
46
47#[cfg(feature = "storage-azdls")]
48mod azdls;
49#[cfg(feature = "storage-fs")]
50mod fs;
51#[cfg(feature = "storage-gcs")]
52mod gcs;
53#[cfg(feature = "storage-memory")]
54mod memory;
55#[cfg(feature = "storage-oss")]
56mod oss;
57#[cfg(feature = "storage-s3")]
58mod s3;
59
60#[cfg(feature = "storage-azdls")]
61use azdls::*;
62#[cfg(feature = "storage-fs")]
63use fs::*;
64#[cfg(feature = "storage-gcs")]
65use gcs::*;
66#[cfg(feature = "storage-memory")]
67use memory::*;
68#[cfg(feature = "storage-oss")]
69use oss::*;
70#[cfg(feature = "storage-s3")]
71pub use s3::*;
72
73#[derive(Clone, Debug, Serialize, Deserialize)]
79pub enum OpenDalStorageFactory {
80 #[cfg(feature = "storage-memory")]
82 Memory,
83 #[cfg(feature = "storage-fs")]
85 Fs,
86 #[cfg(feature = "storage-s3")]
88 S3 {
89 #[serde(skip)]
91 customized_credential_load: Option<CustomAwsCredentialLoader>,
92 },
93 #[cfg(feature = "storage-gcs")]
95 Gcs,
96 #[cfg(feature = "storage-oss")]
98 Oss,
99 #[cfg(feature = "storage-azdls")]
101 Azdls {
102 configured_scheme: AzureStorageScheme,
104 },
105}
106
107#[typetag::serde(name = "OpenDalStorageFactory")]
108impl StorageFactory for OpenDalStorageFactory {
109 #[allow(unused_variables)]
110 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
111 match self {
112 #[cfg(feature = "storage-memory")]
113 OpenDalStorageFactory::Memory => {
114 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
115 }
116 #[cfg(feature = "storage-fs")]
117 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
118 #[cfg(feature = "storage-s3")]
119 OpenDalStorageFactory::S3 {
120 customized_credential_load,
121 } => Ok(Arc::new(OpenDalStorage::S3 {
122 configured_scheme: "s3".to_string(),
123 config: s3_config_parse(config.props().clone())?.into(),
124 customized_credential_load: customized_credential_load.clone(),
125 })),
126 #[cfg(feature = "storage-gcs")]
127 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
128 config: gcs_config_parse(config.props().clone())?.into(),
129 })),
130 #[cfg(feature = "storage-oss")]
131 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
132 config: oss_config_parse(config.props().clone())?.into(),
133 })),
134 #[cfg(feature = "storage-azdls")]
135 OpenDalStorageFactory::Azdls { configured_scheme } => {
136 Ok(Arc::new(OpenDalStorage::Azdls {
137 configured_scheme: configured_scheme.clone(),
138 config: azdls_config_parse(config.props().clone())?.into(),
139 }))
140 }
141 #[cfg(all(
142 not(feature = "storage-memory"),
143 not(feature = "storage-fs"),
144 not(feature = "storage-s3"),
145 not(feature = "storage-gcs"),
146 not(feature = "storage-oss"),
147 not(feature = "storage-azdls"),
148 ))]
149 _ => Err(Error::new(
150 ErrorKind::FeatureUnsupported,
151 "No storage service has been enabled",
152 )),
153 }
154 }
155}
156
157#[cfg(feature = "storage-memory")]
159fn default_memory_operator() -> Operator {
160 memory_config_build().expect("Failed to create default memory operator")
161}
162
163#[derive(Clone, Debug, Serialize, Deserialize)]
165pub enum OpenDalStorage {
166 #[cfg(feature = "storage-memory")]
168 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
169 #[cfg(feature = "storage-fs")]
171 LocalFs,
172 #[cfg(feature = "storage-s3")]
174 S3 {
175 configured_scheme: String,
178 config: Arc<S3Config>,
180 #[serde(skip)]
182 customized_credential_load: Option<CustomAwsCredentialLoader>,
183 },
184 #[cfg(feature = "storage-gcs")]
186 Gcs {
187 config: Arc<GcsConfig>,
189 },
190 #[cfg(feature = "storage-oss")]
192 Oss {
193 config: Arc<OssConfig>,
195 },
196 #[cfg(feature = "storage-azdls")]
201 #[allow(private_interfaces)]
202 Azdls {
203 configured_scheme: AzureStorageScheme,
207 config: Arc<AzdlsConfig>,
209 },
210}
211
212impl OpenDalStorage {
213 pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
217 let (scheme_str, props, extensions) = file_io_builder.into_parts();
218 let _ = (&props, &extensions);
219 let scheme = Self::parse_scheme(&scheme_str)?;
220
221 match scheme {
222 #[cfg(feature = "storage-memory")]
223 Scheme::Memory => Ok(Self::Memory(memory_config_build()?)),
224 #[cfg(feature = "storage-fs")]
225 Scheme::Fs => Ok(Self::LocalFs),
226 #[cfg(feature = "storage-s3")]
227 Scheme::S3 => Ok(Self::S3 {
228 configured_scheme: scheme_str,
229 config: s3_config_parse(props)?.into(),
230 customized_credential_load: extensions
231 .get::<CustomAwsCredentialLoader>()
232 .map(Arc::unwrap_or_clone),
233 }),
234 #[cfg(feature = "storage-gcs")]
235 Scheme::Gcs => Ok(Self::Gcs {
236 config: gcs_config_parse(props)?.into(),
237 }),
238 #[cfg(feature = "storage-oss")]
239 Scheme::Oss => Ok(Self::Oss {
240 config: oss_config_parse(props)?.into(),
241 }),
242 #[cfg(feature = "storage-azdls")]
243 Scheme::Azdls => {
244 let scheme = scheme_str.parse::<AzureStorageScheme>()?;
245 Ok(Self::Azdls {
246 config: azdls_config_parse(props)?.into(),
247 configured_scheme: scheme,
248 })
249 }
250 _ => Err(Error::new(
252 ErrorKind::FeatureUnsupported,
253 format!("Constructing file io from scheme: {scheme} not supported now",),
254 )),
255 }
256 }
257
258 #[allow(unreachable_code, unused_variables)]
271 pub(crate) fn create_operator<'a>(
272 &self,
273 path: &'a impl AsRef<str>,
274 ) -> Result<(Operator, &'a str)> {
275 let path = path.as_ref();
276 let (operator, relative_path): (Operator, &str) = match self {
277 #[cfg(feature = "storage-memory")]
278 OpenDalStorage::Memory(op) => {
279 if let Some(stripped) = path.strip_prefix("memory:/") {
280 (op.clone(), stripped)
281 } else {
282 (op.clone(), &path[1..])
283 }
284 }
285 #[cfg(feature = "storage-fs")]
286 OpenDalStorage::LocalFs => {
287 let op = fs_config_build()?;
288 if let Some(stripped) = path.strip_prefix("file:/") {
289 (op, stripped)
290 } else {
291 (op, &path[1..])
292 }
293 }
294 #[cfg(feature = "storage-s3")]
295 OpenDalStorage::S3 {
296 configured_scheme,
297 config,
298 customized_credential_load,
299 } => {
300 let op = s3_config_build(config, customized_credential_load, path)?;
301 let op_info = op.info();
302
303 let prefix = format!("{}://{}/", configured_scheme, op_info.name());
305 if path.starts_with(&prefix) {
306 (op, &path[prefix.len()..])
307 } else {
308 return Err(Error::new(
309 ErrorKind::DataInvalid,
310 format!("Invalid s3 url: {path}, should start with {prefix}"),
311 ));
312 }
313 }
314 #[cfg(feature = "storage-gcs")]
315 OpenDalStorage::Gcs { config } => {
316 let operator = gcs_config_build(config, path)?;
317 let prefix = format!("gs://{}/", operator.info().name());
318 if path.starts_with(&prefix) {
319 (operator, &path[prefix.len()..])
320 } else {
321 return Err(Error::new(
322 ErrorKind::DataInvalid,
323 format!("Invalid gcs url: {path}, should start with {prefix}"),
324 ));
325 }
326 }
327 #[cfg(feature = "storage-oss")]
328 OpenDalStorage::Oss { config } => {
329 let op = oss_config_build(config, path)?;
330 let prefix = format!("oss://{}/", op.info().name());
331 if path.starts_with(&prefix) {
332 (op, &path[prefix.len()..])
333 } else {
334 return Err(Error::new(
335 ErrorKind::DataInvalid,
336 format!("Invalid oss url: {path}, should start with {prefix}"),
337 ));
338 }
339 }
340 #[cfg(feature = "storage-azdls")]
341 OpenDalStorage::Azdls {
342 configured_scheme,
343 config,
344 } => azdls_create_operator(path, config, configured_scheme)?,
345 #[cfg(all(
346 not(feature = "storage-s3"),
347 not(feature = "storage-fs"),
348 not(feature = "storage-gcs"),
349 not(feature = "storage-oss"),
350 not(feature = "storage-azdls"),
351 ))]
352 _ => {
353 return Err(Error::new(
354 ErrorKind::FeatureUnsupported,
355 "No storage service has been enabled",
356 ));
357 }
358 };
359
360 let operator = operator.layer(RetryLayer::new());
363 Ok((operator, relative_path))
364 }
365
366 fn parse_scheme(scheme: &str) -> Result<Scheme> {
368 match scheme {
369 "memory" => Ok(Scheme::Memory),
370 "file" | "" => Ok(Scheme::Fs),
371 "s3" | "s3a" => Ok(Scheme::S3),
372 "gs" | "gcs" => Ok(Scheme::Gcs),
373 "oss" => Ok(Scheme::Oss),
374 "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
375 s => Ok(s.parse::<Scheme>()?),
376 }
377 }
378}
379
380#[typetag::serde(name = "OpenDalStorage")]
381#[async_trait]
382impl Storage for OpenDalStorage {
383 async fn exists(&self, path: &str) -> Result<bool> {
384 let (op, relative_path) = self.create_operator(&path)?;
385 Ok(op.exists(relative_path).await?)
386 }
387
388 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
389 let (op, relative_path) = self.create_operator(&path)?;
390 let meta = op.stat(relative_path).await?;
391 Ok(FileMetadata {
392 size: meta.content_length(),
393 })
394 }
395
396 async fn read(&self, path: &str) -> Result<Bytes> {
397 let (op, relative_path) = self.create_operator(&path)?;
398 Ok(op.read(relative_path).await?.to_bytes())
399 }
400
401 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
402 let (op, relative_path) = self.create_operator(&path)?;
403 Ok(Box::new(op.reader(relative_path).await?))
404 }
405
406 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
407 let (op, relative_path) = self.create_operator(&path)?;
408 op.write(relative_path, bs).await?;
409 Ok(())
410 }
411
412 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
413 let (op, relative_path) = self.create_operator(&path)?;
414 Ok(Box::new(op.writer(relative_path).await?))
415 }
416
417 async fn delete(&self, path: &str) -> Result<()> {
418 let (op, relative_path) = self.create_operator(&path)?;
419 Ok(op.delete(relative_path).await?)
420 }
421
422 async fn delete_prefix(&self, path: &str) -> Result<()> {
423 let (op, relative_path) = self.create_operator(&path)?;
424 let path = if relative_path.ends_with('/') {
425 relative_path.to_string()
426 } else {
427 format!("{relative_path}/")
428 };
429 Ok(op.remove_all(&path).await?)
430 }
431
432 #[allow(unreachable_code, unused_variables)]
433 fn new_input(&self, path: &str) -> Result<InputFile> {
434 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
435 }
436
437 #[allow(unreachable_code, unused_variables)]
438 fn new_output(&self, path: &str) -> Result<OutputFile> {
439 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
440 }
441}
442
443#[async_trait]
446impl FileRead for opendal::Reader {
447 async fn read(&self, range: Range<u64>) -> Result<Bytes> {
448 Ok(opendal::Reader::read(self, range).await?.to_bytes())
449 }
450}
451
452#[async_trait]
453impl FileWrite for opendal::Writer {
454 async fn write(&mut self, bs: Bytes) -> Result<()> {
455 Ok(opendal::Writer::write(self, bs).await?)
456 }
457
458 async fn close(&mut self) -> Result<()> {
459 let _ = opendal::Writer::close(self).await?;
460 Ok(())
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[cfg(feature = "storage-memory")]
469 #[test]
470 fn test_default_memory_operator() {
471 let op = default_memory_operator();
472 assert_eq!(op.info().scheme().to_string(), "memory");
473 }
474}