1mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::{RetryLayer, TimeoutLayer};
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47 if #[cfg(feature = "opendal-azdls")] {
48 mod azdls;
49 use azdls::*;
50 use opendal::services::AzdlsConfig;
51 }
52}
53
54cfg_if! {
55 if #[cfg(feature = "opendal-hf")] {
56 mod hf;
57 use hf::*;
58 use opendal::services::HfConfig;
59 }
60}
61
62cfg_if! {
63 if #[cfg(feature = "opendal-fs")] {
64 mod fs;
65 use fs::*;
66 }
67}
68
69cfg_if! {
70 if #[cfg(feature = "opendal-gcs")] {
71 mod gcs;
72 use gcs::*;
73 use opendal::services::GcsConfig;
74 }
75}
76
77cfg_if! {
78 if #[cfg(feature = "opendal-memory")] {
79 mod memory;
80 use memory::*;
81 }
82}
83
84cfg_if! {
85 if #[cfg(feature = "opendal-oss")] {
86 mod oss;
87 use opendal::services::OssConfig;
88 use oss::*;
89 }
90}
91
92cfg_if! {
93 if #[cfg(feature = "opendal-s3")] {
94 mod s3;
95 use opendal::services::S3Config;
96 pub use s3::*;
97 }
98}
99
100mod resolving;
101pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
108pub enum OpenDalStorageFactory {
109 #[cfg(feature = "opendal-memory")]
111 Memory,
112 #[cfg(feature = "opendal-fs")]
114 Fs,
115 #[cfg(feature = "opendal-s3")]
117 S3 {
118 #[serde(skip)]
120 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
121 },
122 #[cfg(feature = "opendal-gcs")]
124 Gcs,
125 #[cfg(feature = "opendal-oss")]
127 Oss,
128 #[cfg(feature = "opendal-azdls")]
130 Azdls,
131 #[cfg(feature = "opendal-hf")]
133 Hf,
134}
135
136#[typetag::serde(name = "OpenDalStorageFactory")]
137impl StorageFactory for OpenDalStorageFactory {
138 #[allow(unused_variables)]
139 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
140 match self {
141 #[cfg(feature = "opendal-memory")]
142 OpenDalStorageFactory::Memory => {
143 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
144 }
145 #[cfg(feature = "opendal-fs")]
146 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
147 #[cfg(feature = "opendal-s3")]
148 OpenDalStorageFactory::S3 {
149 customized_credential_load,
150 } => Ok(Arc::new(OpenDalStorage::S3 {
151 config: s3_config_parse(config.props().clone())?.into(),
152 customized_credential_load: customized_credential_load.clone(),
153 })),
154 #[cfg(feature = "opendal-gcs")]
155 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
156 config: gcs_config_parse(config.props().clone())?.into(),
157 })),
158 #[cfg(feature = "opendal-oss")]
159 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
160 config: oss_config_parse(config.props().clone())?.into(),
161 })),
162 #[cfg(feature = "opendal-azdls")]
163 OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
164 config: azdls_config_parse(config.props().clone())?.into(),
165 })),
166 #[cfg(feature = "opendal-hf")]
167 OpenDalStorageFactory::Hf => Ok(Arc::new(OpenDalStorage::Hf {
168 config: hf_config_parse(config.props().clone())?.into(),
169 })),
170 #[cfg(all(
171 not(feature = "opendal-memory"),
172 not(feature = "opendal-fs"),
173 not(feature = "opendal-s3"),
174 not(feature = "opendal-gcs"),
175 not(feature = "opendal-oss"),
176 not(feature = "opendal-azdls"),
177 not(feature = "opendal-hf"),
178 ))]
179 _ => Err(Error::new(
180 ErrorKind::FeatureUnsupported,
181 "No storage service has been enabled",
182 )),
183 }
184 }
185}
186
187#[cfg(feature = "opendal-memory")]
189fn default_memory_operator() -> Operator {
190 memory_config_build().expect("Failed to create default memory operator")
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
195pub enum OpenDalStorage {
196 #[cfg(feature = "opendal-memory")]
198 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
199 #[cfg(feature = "opendal-fs")]
201 LocalFs,
202 #[cfg(feature = "opendal-s3")]
207 S3 {
208 config: Arc<S3Config>,
210 #[serde(skip)]
212 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
213 },
214 #[cfg(feature = "opendal-gcs")]
216 Gcs {
217 config: Arc<GcsConfig>,
219 },
220 #[cfg(feature = "opendal-oss")]
222 Oss {
223 config: Arc<OssConfig>,
225 },
226 #[cfg(feature = "opendal-azdls")]
233 Azdls {
234 config: Arc<AzdlsConfig>,
236 },
237 #[cfg(feature = "opendal-hf")]
243 Hf {
244 config: Arc<HfConfig>,
246 },
247}
248
249impl OpenDalStorage {
250 #[allow(unreachable_code, unused_variables)]
263 pub(crate) fn create_operator<'a>(
264 &self,
265 path: &'a impl AsRef<str>,
266 ) -> Result<(Operator, &'a str)> {
267 let path = path.as_ref();
268 let (operator, relative_path): (Operator, &str) = match self {
269 #[cfg(feature = "opendal-memory")]
270 OpenDalStorage::Memory(op) => {
271 if let Some(stripped) = path.strip_prefix("memory:/") {
272 (op.clone(), stripped)
273 } else {
274 (op.clone(), &path[1..])
275 }
276 }
277 #[cfg(feature = "opendal-fs")]
278 OpenDalStorage::LocalFs => {
279 let op = fs_config_build()?;
280 if let Some(stripped) = path.strip_prefix("file:/") {
281 (op, stripped)
282 } else {
283 (op, &path[1..])
284 }
285 }
286 #[cfg(feature = "opendal-s3")]
287 OpenDalStorage::S3 {
288 config,
289 customized_credential_load,
290 } => {
291 let op = s3_config_build(config, customized_credential_load, path)?;
292 let op_info = op.info();
293
294 let url = url::Url::parse(path).map_err(|e| {
297 Error::new(
298 ErrorKind::DataInvalid,
299 format!("Invalid s3 url: {path}: {e}"),
300 )
301 })?;
302 let prefix = format!("{}://{}/", url.scheme(), op_info.name());
303 if path.starts_with(&prefix) {
304 (op, &path[prefix.len()..])
305 } else {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 format!("Invalid s3 url: {path}, should start with {prefix}"),
309 ));
310 }
311 }
312 #[cfg(feature = "opendal-gcs")]
313 OpenDalStorage::Gcs { config } => {
314 let operator = gcs_config_build(config, path)?;
315 let prefix = format!("gs://{}/", operator.info().name());
316 if path.starts_with(&prefix) {
317 (operator, &path[prefix.len()..])
318 } else {
319 return Err(Error::new(
320 ErrorKind::DataInvalid,
321 format!("Invalid gcs url: {path}, should start with {prefix}"),
322 ));
323 }
324 }
325 #[cfg(feature = "opendal-oss")]
326 OpenDalStorage::Oss { config } => {
327 let op = oss_config_build(config, path)?;
328 let prefix = format!("oss://{}/", op.info().name());
329 if path.starts_with(&prefix) {
330 (op, &path[prefix.len()..])
331 } else {
332 return Err(Error::new(
333 ErrorKind::DataInvalid,
334 format!("Invalid oss url: {path}, should start with {prefix}"),
335 ));
336 }
337 }
338 #[cfg(feature = "opendal-azdls")]
339 OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
340 #[cfg(feature = "opendal-hf")]
341 OpenDalStorage::Hf { config } => hf_config_build(config, path)?,
342 #[cfg(all(
343 not(feature = "opendal-s3"),
344 not(feature = "opendal-fs"),
345 not(feature = "opendal-gcs"),
346 not(feature = "opendal-oss"),
347 not(feature = "opendal-azdls"),
348 not(feature = "opendal-hf"),
349 ))]
350 _ => {
351 return Err(Error::new(
352 ErrorKind::FeatureUnsupported,
353 "No storage service has been enabled",
354 ));
355 }
356 };
357
358 let operator = operator.layer(TimeoutLayer::new()).layer(RetryLayer::new());
369 Ok((operator, relative_path))
370 }
371
372 fn batch_key_for_path(&self, path: &str) -> String {
377 match self {
378 #[cfg(feature = "opendal-hf")]
379 OpenDalStorage::Hf { .. } => hf_batch_key(path),
380 _ => url::Url::parse(path)
381 .ok()
382 .and_then(|u| u.host_str().map(|s| s.to_string()))
383 .unwrap_or_default(),
384 }
385 }
386
387 #[allow(unreachable_code, unused_variables)]
393 pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
394 match self {
395 #[cfg(feature = "opendal-memory")]
396 OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
397 #[cfg(feature = "opendal-fs")]
398 OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
399 #[cfg(feature = "opendal-s3")]
400 OpenDalStorage::S3 { .. } => {
401 let url = url::Url::parse(path)?;
402 let bucket = url.host_str().ok_or_else(|| {
403 Error::new(
404 ErrorKind::DataInvalid,
405 format!("Invalid s3 url: {path}, missing bucket"),
406 )
407 })?;
408 let prefix = format!("{}://{}/", url.scheme(), bucket);
409 if path.starts_with(&prefix) {
410 Ok(&path[prefix.len()..])
411 } else {
412 Err(Error::new(
413 ErrorKind::DataInvalid,
414 format!("Invalid s3 url: {path}, should start with {prefix}"),
415 ))
416 }
417 }
418 #[cfg(feature = "opendal-gcs")]
419 OpenDalStorage::Gcs { .. } => {
420 let url = url::Url::parse(path)?;
421 let bucket = url.host_str().ok_or_else(|| {
422 Error::new(
423 ErrorKind::DataInvalid,
424 format!("Invalid gcs url: {path}, missing bucket"),
425 )
426 })?;
427 let prefix = format!("gs://{}/", bucket);
428 if path.starts_with(&prefix) {
429 Ok(&path[prefix.len()..])
430 } else {
431 Err(Error::new(
432 ErrorKind::DataInvalid,
433 format!("Invalid gcs url: {path}, should start with {prefix}"),
434 ))
435 }
436 }
437 #[cfg(feature = "opendal-oss")]
438 OpenDalStorage::Oss { .. } => {
439 let url = url::Url::parse(path)?;
440 let bucket = url.host_str().ok_or_else(|| {
441 Error::new(
442 ErrorKind::DataInvalid,
443 format!("Invalid oss url: {path}, missing bucket"),
444 )
445 })?;
446 let prefix = format!("oss://{}/", bucket);
447 if path.starts_with(&prefix) {
448 Ok(&path[prefix.len()..])
449 } else {
450 Err(Error::new(
451 ErrorKind::DataInvalid,
452 format!("Invalid oss url: {path}, should start with {prefix}"),
453 ))
454 }
455 }
456 #[cfg(feature = "opendal-azdls")]
457 OpenDalStorage::Azdls { config } => {
458 let azure_path = path.parse::<AzureStoragePath>()?;
459 match_path_with_config(&azure_path, config)?;
460 let relative_path_len = azure_path.path.len();
461 Ok(&path[path.len() - relative_path_len..])
462 }
463 #[cfg(feature = "opendal-hf")]
464 OpenDalStorage::Hf { .. } => {
465 let parsed = hf::HfUri::parse(path).ok_or_else(|| {
466 Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}"))
467 })?;
468 Ok(&path[path.len() - parsed.path.len()..])
469 }
470 #[cfg(all(
471 not(feature = "opendal-s3"),
472 not(feature = "opendal-fs"),
473 not(feature = "opendal-gcs"),
474 not(feature = "opendal-oss"),
475 not(feature = "opendal-azdls"),
476 not(feature = "opendal-hf"),
477 ))]
478 _ => Err(Error::new(
479 ErrorKind::FeatureUnsupported,
480 "No storage service has been enabled",
481 )),
482 }
483 }
484}
485
486#[typetag::serde(name = "OpenDalStorage")]
487#[async_trait]
488impl Storage for OpenDalStorage {
489 async fn exists(&self, path: &str) -> Result<bool> {
490 let (op, relative_path) = self.create_operator(&path)?;
491 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
492 }
493
494 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
495 let (op, relative_path) = self.create_operator(&path)?;
496 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
497 Ok(FileMetadata {
498 size: meta.content_length(),
499 })
500 }
501
502 async fn read(&self, path: &str) -> Result<Bytes> {
503 let (op, relative_path) = self.create_operator(&path)?;
504 Ok(op
505 .read(relative_path)
506 .await
507 .map_err(from_opendal_error)?
508 .to_bytes())
509 }
510
511 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
512 let (op, relative_path) = self.create_operator(&path)?;
513 Ok(Box::new(OpenDalReader(
514 op.reader(relative_path).await.map_err(from_opendal_error)?,
515 )))
516 }
517
518 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
519 let (op, relative_path) = self.create_operator(&path)?;
520 op.write(relative_path, bs)
521 .await
522 .map_err(from_opendal_error)?;
523 Ok(())
524 }
525
526 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
527 let (op, relative_path) = self.create_operator(&path)?;
528 Ok(Box::new(OpenDalWriter(
529 op.writer(relative_path).await.map_err(from_opendal_error)?,
530 )))
531 }
532
533 async fn delete(&self, path: &str) -> Result<()> {
534 let (op, relative_path) = self.create_operator(&path)?;
535 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
536 }
537
538 async fn delete_prefix(&self, path: &str) -> Result<()> {
539 let (op, relative_path) = self.create_operator(&path)?;
540 let path = if relative_path.ends_with('/') {
541 relative_path.to_string()
542 } else {
543 format!("{relative_path}/")
544 };
545 Ok(op
546 .delete_with(&path)
547 .recursive(true)
548 .await
549 .map_err(from_opendal_error)?)
550 }
551
552 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
553 let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
554
555 while let Some(path) = paths.next().await {
556 let bucket = self.batch_key_for_path(&path);
557
558 let (relative_path, deleter) = match deleters.entry(bucket) {
559 Entry::Occupied(entry) => {
560 (self.relativize_path(&path)?.to_string(), entry.into_mut())
561 }
562 Entry::Vacant(entry) => {
563 let (op, rel) = self.create_operator(&path)?;
564 let rel = rel.to_string();
565 let deleter = op.deleter().await.map_err(from_opendal_error)?;
566 (rel, entry.insert(deleter))
567 }
568 };
569
570 deleter
571 .delete(relative_path)
572 .await
573 .map_err(from_opendal_error)?;
574 }
575
576 for (_, mut deleter) in deleters {
577 deleter.close().await.map_err(from_opendal_error)?;
578 }
579
580 Ok(())
581 }
582
583 #[allow(unreachable_code, unused_variables)]
584 fn new_input(&self, path: &str) -> Result<InputFile> {
585 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
586 }
587
588 #[allow(unreachable_code, unused_variables)]
589 fn new_output(&self, path: &str) -> Result<OutputFile> {
590 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
591 }
592}
593
594pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
600
601#[async_trait]
602impl FileRead for OpenDalReader {
603 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
604 Ok(opendal::Reader::read(&self.0, range)
605 .await
606 .map_err(from_opendal_error)?
607 .to_bytes())
608 }
609}
610
611pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
613
614#[async_trait]
615impl FileWrite for OpenDalWriter {
616 async fn write(&mut self, bs: Bytes) -> Result<()> {
617 Ok(opendal::Writer::write(&mut self.0, bs)
618 .await
619 .map_err(from_opendal_error)?)
620 }
621
622 async fn close(&mut self) -> Result<()> {
623 let _ = opendal::Writer::close(&mut self.0)
624 .await
625 .map_err(from_opendal_error)?;
626 Ok(())
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633
634 #[cfg(feature = "opendal-memory")]
635 #[test]
636 fn test_default_memory_operator() {
637 let op = default_memory_operator();
638 assert_eq!(op.info().scheme().to_string(), "memory");
639 }
640
641 #[cfg(feature = "opendal-memory")]
642 #[test]
643 fn test_relativize_path_memory() {
644 let storage = OpenDalStorage::Memory(default_memory_operator());
645
646 assert_eq!(
647 storage.relativize_path("memory:/path/to/file").unwrap(),
648 "path/to/file"
649 );
650 assert_eq!(
652 storage.relativize_path("/path/to/file").unwrap(),
653 "path/to/file"
654 );
655 }
656
657 #[cfg(feature = "opendal-fs")]
658 #[test]
659 fn test_relativize_path_fs() {
660 let storage = OpenDalStorage::LocalFs;
661
662 assert_eq!(
663 storage
664 .relativize_path("file:/tmp/data/file.parquet")
665 .unwrap(),
666 "tmp/data/file.parquet"
667 );
668 assert_eq!(
669 storage.relativize_path("/tmp/data/file.parquet").unwrap(),
670 "tmp/data/file.parquet"
671 );
672 }
673
674 #[cfg(feature = "opendal-s3")]
675 #[test]
676 fn test_relativize_path_s3() {
677 let storage = OpenDalStorage::S3 {
678 config: Arc::new(S3Config::default()),
679 customized_credential_load: None,
680 };
681
682 for scheme in ["s3", "s3a", "s3n", "minio"] {
686 assert_eq!(
687 storage
688 .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
689 .unwrap(),
690 "path/to/file.parquet"
691 );
692 }
693 }
694
695 #[cfg(feature = "opendal-gcs")]
696 #[test]
697 fn test_relativize_path_gcs() {
698 let storage = OpenDalStorage::Gcs {
699 config: Arc::new(GcsConfig::default()),
700 };
701
702 assert_eq!(
703 storage
704 .relativize_path("gs://my-bucket/path/to/file.parquet")
705 .unwrap(),
706 "path/to/file.parquet"
707 );
708 }
709
710 #[cfg(feature = "opendal-gcs")]
711 #[test]
712 fn test_relativize_path_gcs_invalid_scheme() {
713 let storage = OpenDalStorage::Gcs {
714 config: Arc::new(GcsConfig::default()),
715 };
716
717 assert!(
718 storage
719 .relativize_path("s3://my-bucket/path/to/file.parquet")
720 .is_err()
721 );
722 }
723
724 #[cfg(feature = "opendal-oss")]
725 #[test]
726 fn test_relativize_path_oss() {
727 let storage = OpenDalStorage::Oss {
728 config: Arc::new(OssConfig::default()),
729 };
730
731 assert_eq!(
732 storage
733 .relativize_path("oss://my-bucket/path/to/file.parquet")
734 .unwrap(),
735 "path/to/file.parquet"
736 );
737 }
738
739 #[cfg(feature = "opendal-oss")]
740 #[test]
741 fn test_relativize_path_oss_invalid_scheme() {
742 let storage = OpenDalStorage::Oss {
743 config: Arc::new(OssConfig::default()),
744 };
745
746 assert!(
747 storage
748 .relativize_path("s3://my-bucket/path/to/file.parquet")
749 .is_err()
750 );
751 }
752
753 #[cfg(feature = "opendal-azdls")]
754 #[test]
755 fn test_relativize_path_azdls() {
756 let storage = OpenDalStorage::Azdls {
757 config: Arc::new(AzdlsConfig {
758 account_name: Some("myaccount".to_string()),
759 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
760 ..Default::default()
761 }),
762 };
763
764 assert_eq!(
765 storage
766 .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
767 .unwrap(),
768 "/path/to/file.parquet"
769 );
770 }
771}