1mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47 if #[cfg(feature = "opendal-azdls")] {
48 mod azdls;
49 use azdls::AzureStorageScheme;
50 use azdls::*;
51 use opendal::services::AzdlsConfig;
52 }
53}
54
55cfg_if! {
56 if #[cfg(feature = "opendal-fs")] {
57 mod fs;
58 use fs::*;
59 }
60}
61
62cfg_if! {
63 if #[cfg(feature = "opendal-gcs")] {
64 mod gcs;
65 use gcs::*;
66 use opendal::services::GcsConfig;
67 }
68}
69
70cfg_if! {
71 if #[cfg(feature = "opendal-memory")] {
72 mod memory;
73 use memory::*;
74 }
75}
76
77cfg_if! {
78 if #[cfg(feature = "opendal-oss")] {
79 mod oss;
80 use opendal::services::OssConfig;
81 use oss::*;
82 }
83}
84
85cfg_if! {
86 if #[cfg(feature = "opendal-s3")] {
87 mod s3;
88 use opendal::services::S3Config;
89 pub use s3::*;
90 }
91}
92
93mod resolving;
94pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
101pub enum OpenDalStorageFactory {
102 #[cfg(feature = "opendal-memory")]
104 Memory,
105 #[cfg(feature = "opendal-fs")]
107 Fs,
108 #[cfg(feature = "opendal-s3")]
110 S3 {
111 configured_scheme: String,
114 #[serde(skip)]
116 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
117 },
118 #[cfg(feature = "opendal-gcs")]
120 Gcs,
121 #[cfg(feature = "opendal-oss")]
123 Oss,
124 #[cfg(feature = "opendal-azdls")]
126 Azdls {
127 configured_scheme: AzureStorageScheme,
129 },
130}
131
132#[typetag::serde(name = "OpenDalStorageFactory")]
133impl StorageFactory for OpenDalStorageFactory {
134 #[allow(unused_variables)]
135 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
136 match self {
137 #[cfg(feature = "opendal-memory")]
138 OpenDalStorageFactory::Memory => {
139 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
140 }
141 #[cfg(feature = "opendal-fs")]
142 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
143 #[cfg(feature = "opendal-s3")]
144 OpenDalStorageFactory::S3 {
145 configured_scheme,
146 customized_credential_load,
147 } => Ok(Arc::new(OpenDalStorage::S3 {
148 configured_scheme: configured_scheme.clone(),
149 config: s3_config_parse(config.props().clone())?.into(),
150 customized_credential_load: customized_credential_load.clone(),
151 })),
152 #[cfg(feature = "opendal-gcs")]
153 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
154 config: gcs_config_parse(config.props().clone())?.into(),
155 })),
156 #[cfg(feature = "opendal-oss")]
157 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
158 config: oss_config_parse(config.props().clone())?.into(),
159 })),
160 #[cfg(feature = "opendal-azdls")]
161 OpenDalStorageFactory::Azdls { configured_scheme } => {
162 Ok(Arc::new(OpenDalStorage::Azdls {
163 configured_scheme: configured_scheme.clone(),
164 config: azdls_config_parse(config.props().clone())?.into(),
165 }))
166 }
167 #[cfg(all(
168 not(feature = "opendal-memory"),
169 not(feature = "opendal-fs"),
170 not(feature = "opendal-s3"),
171 not(feature = "opendal-gcs"),
172 not(feature = "opendal-oss"),
173 not(feature = "opendal-azdls"),
174 ))]
175 _ => Err(Error::new(
176 ErrorKind::FeatureUnsupported,
177 "No storage service has been enabled",
178 )),
179 }
180 }
181}
182
183#[cfg(feature = "opendal-memory")]
185fn default_memory_operator() -> Operator {
186 memory_config_build().expect("Failed to create default memory operator")
187}
188
189#[derive(Clone, Debug, Serialize, Deserialize)]
191pub enum OpenDalStorage {
192 #[cfg(feature = "opendal-memory")]
194 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
195 #[cfg(feature = "opendal-fs")]
197 LocalFs,
198 #[cfg(feature = "opendal-s3")]
200 S3 {
201 configured_scheme: String,
204 config: Arc<S3Config>,
206 #[serde(skip)]
208 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
209 },
210 #[cfg(feature = "opendal-gcs")]
212 Gcs {
213 config: Arc<GcsConfig>,
215 },
216 #[cfg(feature = "opendal-oss")]
218 Oss {
219 config: Arc<OssConfig>,
221 },
222 #[cfg(feature = "opendal-azdls")]
227 #[allow(private_interfaces)]
228 Azdls {
229 configured_scheme: AzureStorageScheme,
233 config: Arc<AzdlsConfig>,
235 },
236}
237
238impl OpenDalStorage {
239 #[allow(unreachable_code, unused_variables)]
252 pub(crate) fn create_operator<'a>(
253 &self,
254 path: &'a impl AsRef<str>,
255 ) -> Result<(Operator, &'a str)> {
256 let path = path.as_ref();
257 let (operator, relative_path): (Operator, &str) = match self {
258 #[cfg(feature = "opendal-memory")]
259 OpenDalStorage::Memory(op) => {
260 if let Some(stripped) = path.strip_prefix("memory:/") {
261 (op.clone(), stripped)
262 } else {
263 (op.clone(), &path[1..])
264 }
265 }
266 #[cfg(feature = "opendal-fs")]
267 OpenDalStorage::LocalFs => {
268 let op = fs_config_build()?;
269 if let Some(stripped) = path.strip_prefix("file:/") {
270 (op, stripped)
271 } else {
272 (op, &path[1..])
273 }
274 }
275 #[cfg(feature = "opendal-s3")]
276 OpenDalStorage::S3 {
277 configured_scheme,
278 config,
279 customized_credential_load,
280 } => {
281 let op = s3_config_build(config, customized_credential_load, path)?;
282 let op_info = op.info();
283
284 let prefix = format!("{}://{}/", configured_scheme, op_info.name());
286 if path.starts_with(&prefix) {
287 (op, &path[prefix.len()..])
288 } else {
289 return Err(Error::new(
290 ErrorKind::DataInvalid,
291 format!("Invalid s3 url: {path}, should start with {prefix}"),
292 ));
293 }
294 }
295 #[cfg(feature = "opendal-gcs")]
296 OpenDalStorage::Gcs { config } => {
297 let operator = gcs_config_build(config, path)?;
298 let prefix = format!("gs://{}/", operator.info().name());
299 if path.starts_with(&prefix) {
300 (operator, &path[prefix.len()..])
301 } else {
302 return Err(Error::new(
303 ErrorKind::DataInvalid,
304 format!("Invalid gcs url: {path}, should start with {prefix}"),
305 ));
306 }
307 }
308 #[cfg(feature = "opendal-oss")]
309 OpenDalStorage::Oss { config } => {
310 let op = oss_config_build(config, path)?;
311 let prefix = format!("oss://{}/", op.info().name());
312 if path.starts_with(&prefix) {
313 (op, &path[prefix.len()..])
314 } else {
315 return Err(Error::new(
316 ErrorKind::DataInvalid,
317 format!("Invalid oss url: {path}, should start with {prefix}"),
318 ));
319 }
320 }
321 #[cfg(feature = "opendal-azdls")]
322 OpenDalStorage::Azdls {
323 configured_scheme,
324 config,
325 } => azdls_create_operator(path, config, configured_scheme)?,
326 #[cfg(all(
327 not(feature = "opendal-s3"),
328 not(feature = "opendal-fs"),
329 not(feature = "opendal-gcs"),
330 not(feature = "opendal-oss"),
331 not(feature = "opendal-azdls"),
332 ))]
333 _ => {
334 return Err(Error::new(
335 ErrorKind::FeatureUnsupported,
336 "No storage service has been enabled",
337 ));
338 }
339 };
340
341 let operator = operator.layer(RetryLayer::new());
344 Ok((operator, relative_path))
345 }
346
347 #[allow(unreachable_code, unused_variables)]
353 pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
354 match self {
355 #[cfg(feature = "opendal-memory")]
356 OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
357 #[cfg(feature = "opendal-fs")]
358 OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
359 #[cfg(feature = "opendal-s3")]
360 OpenDalStorage::S3 {
361 configured_scheme, ..
362 } => {
363 let url = url::Url::parse(path)?;
364 let bucket = url.host_str().ok_or_else(|| {
365 Error::new(
366 ErrorKind::DataInvalid,
367 format!("Invalid s3 url: {path}, missing bucket"),
368 )
369 })?;
370 let prefix = format!("{}://{}/", configured_scheme, bucket);
371 if path.starts_with(&prefix) {
372 Ok(&path[prefix.len()..])
373 } else {
374 Err(Error::new(
375 ErrorKind::DataInvalid,
376 format!("Invalid s3 url: {path}, should start with {prefix}"),
377 ))
378 }
379 }
380 #[cfg(feature = "opendal-gcs")]
381 OpenDalStorage::Gcs { .. } => {
382 let url = url::Url::parse(path)?;
383 let bucket = url.host_str().ok_or_else(|| {
384 Error::new(
385 ErrorKind::DataInvalid,
386 format!("Invalid gcs url: {path}, missing bucket"),
387 )
388 })?;
389 let prefix = format!("gs://{}/", bucket);
390 if path.starts_with(&prefix) {
391 Ok(&path[prefix.len()..])
392 } else {
393 Err(Error::new(
394 ErrorKind::DataInvalid,
395 format!("Invalid gcs url: {path}, should start with {prefix}"),
396 ))
397 }
398 }
399 #[cfg(feature = "opendal-oss")]
400 OpenDalStorage::Oss { .. } => {
401 let url = url::Url::parse(path)?;
402 let bucket = url.host_str().ok_or_else(|| {
403 Error::new(
404 ErrorKind::DataInvalid,
405 format!("Invalid oss url: {path}, missing bucket"),
406 )
407 })?;
408 let prefix = format!("oss://{}/", bucket);
409 if path.starts_with(&prefix) {
410 Ok(&path[prefix.len()..])
411 } else {
412 Err(Error::new(
413 ErrorKind::DataInvalid,
414 format!("Invalid oss url: {path}, should start with {prefix}"),
415 ))
416 }
417 }
418 #[cfg(feature = "opendal-azdls")]
419 OpenDalStorage::Azdls {
420 configured_scheme,
421 config,
422 } => {
423 let azure_path = path.parse::<AzureStoragePath>()?;
424 match_path_with_config(&azure_path, config, configured_scheme)?;
425 let relative_path_len = azure_path.path.len();
426 Ok(&path[path.len() - relative_path_len..])
427 }
428 #[cfg(all(
429 not(feature = "opendal-s3"),
430 not(feature = "opendal-fs"),
431 not(feature = "opendal-gcs"),
432 not(feature = "opendal-oss"),
433 not(feature = "opendal-azdls"),
434 ))]
435 _ => Err(Error::new(
436 ErrorKind::FeatureUnsupported,
437 "No storage service has been enabled",
438 )),
439 }
440 }
441}
442
443#[typetag::serde(name = "OpenDalStorage")]
444#[async_trait]
445impl Storage for OpenDalStorage {
446 async fn exists(&self, path: &str) -> Result<bool> {
447 let (op, relative_path) = self.create_operator(&path)?;
448 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
449 }
450
451 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
452 let (op, relative_path) = self.create_operator(&path)?;
453 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
454 Ok(FileMetadata {
455 size: meta.content_length(),
456 })
457 }
458
459 async fn read(&self, path: &str) -> Result<Bytes> {
460 let (op, relative_path) = self.create_operator(&path)?;
461 Ok(op
462 .read(relative_path)
463 .await
464 .map_err(from_opendal_error)?
465 .to_bytes())
466 }
467
468 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
469 let (op, relative_path) = self.create_operator(&path)?;
470 Ok(Box::new(OpenDalReader(
471 op.reader(relative_path).await.map_err(from_opendal_error)?,
472 )))
473 }
474
475 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
476 let (op, relative_path) = self.create_operator(&path)?;
477 op.write(relative_path, bs)
478 .await
479 .map_err(from_opendal_error)?;
480 Ok(())
481 }
482
483 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
484 let (op, relative_path) = self.create_operator(&path)?;
485 Ok(Box::new(OpenDalWriter(
486 op.writer(relative_path).await.map_err(from_opendal_error)?,
487 )))
488 }
489
490 async fn delete(&self, path: &str) -> Result<()> {
491 let (op, relative_path) = self.create_operator(&path)?;
492 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
493 }
494
495 async fn delete_prefix(&self, path: &str) -> Result<()> {
496 let (op, relative_path) = self.create_operator(&path)?;
497 let path = if relative_path.ends_with('/') {
498 relative_path.to_string()
499 } else {
500 format!("{relative_path}/")
501 };
502 Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
503 }
504
505 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
506 let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
507
508 while let Some(path) = paths.next().await {
509 let bucket = url::Url::parse(&path)
510 .ok()
511 .and_then(|u| u.host_str().map(|s| s.to_string()))
512 .unwrap_or_default();
513
514 let (relative_path, deleter) = match deleters.entry(bucket) {
515 Entry::Occupied(entry) => {
516 (self.relativize_path(&path)?.to_string(), entry.into_mut())
517 }
518 Entry::Vacant(entry) => {
519 let (op, rel) = self.create_operator(&path)?;
520 let rel = rel.to_string();
521 let deleter = op.deleter().await.map_err(from_opendal_error)?;
522 (rel, entry.insert(deleter))
523 }
524 };
525
526 deleter
527 .delete(relative_path)
528 .await
529 .map_err(from_opendal_error)?;
530 }
531
532 for (_, mut deleter) in deleters {
533 deleter.close().await.map_err(from_opendal_error)?;
534 }
535
536 Ok(())
537 }
538
539 #[allow(unreachable_code, unused_variables)]
540 fn new_input(&self, path: &str) -> Result<InputFile> {
541 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
542 }
543
544 #[allow(unreachable_code, unused_variables)]
545 fn new_output(&self, path: &str) -> Result<OutputFile> {
546 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
547 }
548}
549
550pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
556
557#[async_trait]
558impl FileRead for OpenDalReader {
559 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
560 Ok(opendal::Reader::read(&self.0, range)
561 .await
562 .map_err(from_opendal_error)?
563 .to_bytes())
564 }
565}
566
567pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
569
570#[async_trait]
571impl FileWrite for OpenDalWriter {
572 async fn write(&mut self, bs: Bytes) -> Result<()> {
573 Ok(opendal::Writer::write(&mut self.0, bs)
574 .await
575 .map_err(from_opendal_error)?)
576 }
577
578 async fn close(&mut self) -> Result<()> {
579 let _ = opendal::Writer::close(&mut self.0)
580 .await
581 .map_err(from_opendal_error)?;
582 Ok(())
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[cfg(feature = "opendal-memory")]
591 #[test]
592 fn test_default_memory_operator() {
593 let op = default_memory_operator();
594 assert_eq!(op.info().scheme().to_string(), "memory");
595 }
596
597 #[cfg(feature = "opendal-memory")]
598 #[test]
599 fn test_relativize_path_memory() {
600 let storage = OpenDalStorage::Memory(default_memory_operator());
601
602 assert_eq!(
603 storage.relativize_path("memory:/path/to/file").unwrap(),
604 "path/to/file"
605 );
606 assert_eq!(
608 storage.relativize_path("/path/to/file").unwrap(),
609 "path/to/file"
610 );
611 }
612
613 #[cfg(feature = "opendal-fs")]
614 #[test]
615 fn test_relativize_path_fs() {
616 let storage = OpenDalStorage::LocalFs;
617
618 assert_eq!(
619 storage
620 .relativize_path("file:/tmp/data/file.parquet")
621 .unwrap(),
622 "tmp/data/file.parquet"
623 );
624 assert_eq!(
625 storage.relativize_path("/tmp/data/file.parquet").unwrap(),
626 "tmp/data/file.parquet"
627 );
628 }
629
630 #[cfg(feature = "opendal-s3")]
631 #[test]
632 fn test_relativize_path_s3() {
633 let storage = OpenDalStorage::S3 {
634 configured_scheme: "s3".to_string(),
635 config: Arc::new(S3Config::default()),
636 customized_credential_load: None,
637 };
638
639 assert_eq!(
640 storage
641 .relativize_path("s3://my-bucket/path/to/file.parquet")
642 .unwrap(),
643 "path/to/file.parquet"
644 );
645
646 let storage_s3a = OpenDalStorage::S3 {
648 configured_scheme: "s3a".to_string(),
649 config: Arc::new(S3Config::default()),
650 customized_credential_load: None,
651 };
652 assert_eq!(
653 storage_s3a
654 .relativize_path("s3a://my-bucket/path/to/file.parquet")
655 .unwrap(),
656 "path/to/file.parquet"
657 );
658 }
659
660 #[cfg(feature = "opendal-s3")]
661 #[test]
662 fn test_relativize_path_s3_scheme_mismatch() {
663 let storage = OpenDalStorage::S3 {
664 configured_scheme: "s3".to_string(),
665 config: Arc::new(S3Config::default()),
666 customized_credential_load: None,
667 };
668
669 assert!(
671 storage
672 .relativize_path("s3a://my-bucket/path/to/file.parquet")
673 .is_err()
674 );
675 }
676
677 #[cfg(feature = "opendal-gcs")]
678 #[test]
679 fn test_relativize_path_gcs() {
680 let storage = OpenDalStorage::Gcs {
681 config: Arc::new(GcsConfig::default()),
682 };
683
684 assert_eq!(
685 storage
686 .relativize_path("gs://my-bucket/path/to/file.parquet")
687 .unwrap(),
688 "path/to/file.parquet"
689 );
690 }
691
692 #[cfg(feature = "opendal-gcs")]
693 #[test]
694 fn test_relativize_path_gcs_invalid_scheme() {
695 let storage = OpenDalStorage::Gcs {
696 config: Arc::new(GcsConfig::default()),
697 };
698
699 assert!(
700 storage
701 .relativize_path("s3://my-bucket/path/to/file.parquet")
702 .is_err()
703 );
704 }
705
706 #[cfg(feature = "opendal-oss")]
707 #[test]
708 fn test_relativize_path_oss() {
709 let storage = OpenDalStorage::Oss {
710 config: Arc::new(OssConfig::default()),
711 };
712
713 assert_eq!(
714 storage
715 .relativize_path("oss://my-bucket/path/to/file.parquet")
716 .unwrap(),
717 "path/to/file.parquet"
718 );
719 }
720
721 #[cfg(feature = "opendal-oss")]
722 #[test]
723 fn test_relativize_path_oss_invalid_scheme() {
724 let storage = OpenDalStorage::Oss {
725 config: Arc::new(OssConfig::default()),
726 };
727
728 assert!(
729 storage
730 .relativize_path("s3://my-bucket/path/to/file.parquet")
731 .is_err()
732 );
733 }
734
735 #[cfg(feature = "opendal-azdls")]
736 #[test]
737 fn test_relativize_path_azdls() {
738 let storage = OpenDalStorage::Azdls {
739 configured_scheme: AzureStorageScheme::Abfss,
740 config: Arc::new(AzdlsConfig {
741 account_name: Some("myaccount".to_string()),
742 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
743 ..Default::default()
744 }),
745 };
746
747 assert_eq!(
748 storage
749 .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
750 .unwrap(),
751 "/path/to/file.parquet"
752 );
753 }
754
755 #[cfg(feature = "opendal-azdls")]
756 #[test]
757 fn test_relativize_path_azdls_scheme_mismatch() {
758 let storage = OpenDalStorage::Azdls {
759 configured_scheme: AzureStorageScheme::Abfss,
760 config: Arc::new(AzdlsConfig {
761 account_name: Some("myaccount".to_string()),
762 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
763 ..Default::default()
764 }),
765 };
766
767 assert!(
769 storage
770 .relativize_path("wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
771 .is_err()
772 );
773 }
774}