1mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47 if #[cfg(feature = "opendal-azdls")] {
48 mod azdls;
49 use azdls::*;
50 use opendal::services::AzdlsConfig;
51 }
52}
53
54cfg_if! {
55 if #[cfg(feature = "opendal-hf")] {
56 mod hf;
57 use hf::*;
58 use opendal::services::HfConfig;
59 }
60}
61
62cfg_if! {
63 if #[cfg(feature = "opendal-fs")] {
64 mod fs;
65 use fs::*;
66 }
67}
68
69cfg_if! {
70 if #[cfg(feature = "opendal-gcs")] {
71 mod gcs;
72 use gcs::*;
73 use opendal::services::GcsConfig;
74 }
75}
76
77cfg_if! {
78 if #[cfg(feature = "opendal-memory")] {
79 mod memory;
80 use memory::*;
81 }
82}
83
84cfg_if! {
85 if #[cfg(feature = "opendal-oss")] {
86 mod oss;
87 use opendal::services::OssConfig;
88 use oss::*;
89 }
90}
91
92cfg_if! {
93 if #[cfg(feature = "opendal-s3")] {
94 mod s3;
95 use opendal::services::S3Config;
96 pub use s3::*;
97 }
98}
99
100mod resolving;
101pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
108pub enum OpenDalStorageFactory {
109 #[cfg(feature = "opendal-memory")]
111 Memory,
112 #[cfg(feature = "opendal-fs")]
114 Fs,
115 #[cfg(feature = "opendal-s3")]
117 S3 {
118 #[serde(skip)]
120 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
121 },
122 #[cfg(feature = "opendal-gcs")]
124 Gcs,
125 #[cfg(feature = "opendal-oss")]
127 Oss,
128 #[cfg(feature = "opendal-azdls")]
130 Azdls,
131 #[cfg(feature = "opendal-hf")]
133 Hf,
134}
135
136#[typetag::serde(name = "OpenDalStorageFactory")]
137impl StorageFactory for OpenDalStorageFactory {
138 #[allow(unused_variables)]
139 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
140 match self {
141 #[cfg(feature = "opendal-memory")]
142 OpenDalStorageFactory::Memory => {
143 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
144 }
145 #[cfg(feature = "opendal-fs")]
146 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
147 #[cfg(feature = "opendal-s3")]
148 OpenDalStorageFactory::S3 {
149 customized_credential_load,
150 } => Ok(Arc::new(OpenDalStorage::S3 {
151 config: s3_config_parse(config.props().clone())?.into(),
152 customized_credential_load: customized_credential_load.clone(),
153 })),
154 #[cfg(feature = "opendal-gcs")]
155 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
156 config: gcs_config_parse(config.props().clone())?.into(),
157 })),
158 #[cfg(feature = "opendal-oss")]
159 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
160 config: oss_config_parse(config.props().clone())?.into(),
161 })),
162 #[cfg(feature = "opendal-azdls")]
163 OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
164 config: azdls_config_parse(config.props().clone())?.into(),
165 })),
166 #[cfg(feature = "opendal-hf")]
167 OpenDalStorageFactory::Hf => Ok(Arc::new(OpenDalStorage::Hf {
168 config: hf_config_parse(config.props().clone())?.into(),
169 })),
170 #[cfg(all(
171 not(feature = "opendal-memory"),
172 not(feature = "opendal-fs"),
173 not(feature = "opendal-s3"),
174 not(feature = "opendal-gcs"),
175 not(feature = "opendal-oss"),
176 not(feature = "opendal-azdls"),
177 not(feature = "opendal-hf"),
178 ))]
179 _ => Err(Error::new(
180 ErrorKind::FeatureUnsupported,
181 "No storage service has been enabled",
182 )),
183 }
184 }
185}
186
187#[cfg(feature = "opendal-memory")]
189fn default_memory_operator() -> Operator {
190 memory_config_build().expect("Failed to create default memory operator")
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
195pub enum OpenDalStorage {
196 #[cfg(feature = "opendal-memory")]
198 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
199 #[cfg(feature = "opendal-fs")]
201 LocalFs,
202 #[cfg(feature = "opendal-s3")]
207 S3 {
208 config: Arc<S3Config>,
210 #[serde(skip)]
212 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
213 },
214 #[cfg(feature = "opendal-gcs")]
216 Gcs {
217 config: Arc<GcsConfig>,
219 },
220 #[cfg(feature = "opendal-oss")]
222 Oss {
223 config: Arc<OssConfig>,
225 },
226 #[cfg(feature = "opendal-azdls")]
233 Azdls {
234 config: Arc<AzdlsConfig>,
236 },
237 #[cfg(feature = "opendal-hf")]
243 Hf {
244 config: Arc<HfConfig>,
246 },
247}
248
249impl OpenDalStorage {
250 #[allow(unreachable_code, unused_variables)]
263 pub(crate) fn create_operator<'a>(
264 &self,
265 path: &'a impl AsRef<str>,
266 ) -> Result<(Operator, &'a str)> {
267 let path = path.as_ref();
268 let (operator, relative_path): (Operator, &str) = match self {
269 #[cfg(feature = "opendal-memory")]
270 OpenDalStorage::Memory(op) => {
271 if let Some(stripped) = path.strip_prefix("memory:/") {
272 (op.clone(), stripped)
273 } else {
274 (op.clone(), &path[1..])
275 }
276 }
277 #[cfg(feature = "opendal-fs")]
278 OpenDalStorage::LocalFs => {
279 let op = fs_config_build()?;
280 if let Some(stripped) = path.strip_prefix("file:/") {
281 (op, stripped)
282 } else {
283 (op, &path[1..])
284 }
285 }
286 #[cfg(feature = "opendal-s3")]
287 OpenDalStorage::S3 {
288 config,
289 customized_credential_load,
290 } => {
291 let op = s3_config_build(config, customized_credential_load, path)?;
292 let op_info = op.info();
293
294 let url = url::Url::parse(path).map_err(|e| {
297 Error::new(
298 ErrorKind::DataInvalid,
299 format!("Invalid s3 url: {path}: {e}"),
300 )
301 })?;
302 let prefix = format!("{}://{}/", url.scheme(), op_info.name());
303 if path.starts_with(&prefix) {
304 (op, &path[prefix.len()..])
305 } else {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 format!("Invalid s3 url: {path}, should start with {prefix}"),
309 ));
310 }
311 }
312 #[cfg(feature = "opendal-gcs")]
313 OpenDalStorage::Gcs { config } => {
314 let operator = gcs_config_build(config, path)?;
315 let prefix = format!("gs://{}/", operator.info().name());
316 if path.starts_with(&prefix) {
317 (operator, &path[prefix.len()..])
318 } else {
319 return Err(Error::new(
320 ErrorKind::DataInvalid,
321 format!("Invalid gcs url: {path}, should start with {prefix}"),
322 ));
323 }
324 }
325 #[cfg(feature = "opendal-oss")]
326 OpenDalStorage::Oss { config } => {
327 let op = oss_config_build(config, path)?;
328 let prefix = format!("oss://{}/", op.info().name());
329 if path.starts_with(&prefix) {
330 (op, &path[prefix.len()..])
331 } else {
332 return Err(Error::new(
333 ErrorKind::DataInvalid,
334 format!("Invalid oss url: {path}, should start with {prefix}"),
335 ));
336 }
337 }
338 #[cfg(feature = "opendal-azdls")]
339 OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
340 #[cfg(feature = "opendal-hf")]
341 OpenDalStorage::Hf { config } => hf_config_build(config, path)?,
342 #[cfg(all(
343 not(feature = "opendal-s3"),
344 not(feature = "opendal-fs"),
345 not(feature = "opendal-gcs"),
346 not(feature = "opendal-oss"),
347 not(feature = "opendal-azdls"),
348 not(feature = "opendal-hf"),
349 ))]
350 _ => {
351 return Err(Error::new(
352 ErrorKind::FeatureUnsupported,
353 "No storage service has been enabled",
354 ));
355 }
356 };
357
358 let operator = operator.layer(RetryLayer::new());
361 Ok((operator, relative_path))
362 }
363
364 fn batch_key_for_path(&self, path: &str) -> String {
369 match self {
370 #[cfg(feature = "opendal-hf")]
371 OpenDalStorage::Hf { .. } => hf_batch_key(path),
372 _ => url::Url::parse(path)
373 .ok()
374 .and_then(|u| u.host_str().map(|s| s.to_string()))
375 .unwrap_or_default(),
376 }
377 }
378
379 #[allow(unreachable_code, unused_variables)]
385 pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
386 match self {
387 #[cfg(feature = "opendal-memory")]
388 OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
389 #[cfg(feature = "opendal-fs")]
390 OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
391 #[cfg(feature = "opendal-s3")]
392 OpenDalStorage::S3 { .. } => {
393 let url = url::Url::parse(path)?;
394 let bucket = url.host_str().ok_or_else(|| {
395 Error::new(
396 ErrorKind::DataInvalid,
397 format!("Invalid s3 url: {path}, missing bucket"),
398 )
399 })?;
400 let prefix = format!("{}://{}/", url.scheme(), bucket);
401 if path.starts_with(&prefix) {
402 Ok(&path[prefix.len()..])
403 } else {
404 Err(Error::new(
405 ErrorKind::DataInvalid,
406 format!("Invalid s3 url: {path}, should start with {prefix}"),
407 ))
408 }
409 }
410 #[cfg(feature = "opendal-gcs")]
411 OpenDalStorage::Gcs { .. } => {
412 let url = url::Url::parse(path)?;
413 let bucket = url.host_str().ok_or_else(|| {
414 Error::new(
415 ErrorKind::DataInvalid,
416 format!("Invalid gcs url: {path}, missing bucket"),
417 )
418 })?;
419 let prefix = format!("gs://{}/", bucket);
420 if path.starts_with(&prefix) {
421 Ok(&path[prefix.len()..])
422 } else {
423 Err(Error::new(
424 ErrorKind::DataInvalid,
425 format!("Invalid gcs url: {path}, should start with {prefix}"),
426 ))
427 }
428 }
429 #[cfg(feature = "opendal-oss")]
430 OpenDalStorage::Oss { .. } => {
431 let url = url::Url::parse(path)?;
432 let bucket = url.host_str().ok_or_else(|| {
433 Error::new(
434 ErrorKind::DataInvalid,
435 format!("Invalid oss url: {path}, missing bucket"),
436 )
437 })?;
438 let prefix = format!("oss://{}/", bucket);
439 if path.starts_with(&prefix) {
440 Ok(&path[prefix.len()..])
441 } else {
442 Err(Error::new(
443 ErrorKind::DataInvalid,
444 format!("Invalid oss url: {path}, should start with {prefix}"),
445 ))
446 }
447 }
448 #[cfg(feature = "opendal-azdls")]
449 OpenDalStorage::Azdls { config } => {
450 let azure_path = path.parse::<AzureStoragePath>()?;
451 match_path_with_config(&azure_path, config)?;
452 let relative_path_len = azure_path.path.len();
453 Ok(&path[path.len() - relative_path_len..])
454 }
455 #[cfg(feature = "opendal-hf")]
456 OpenDalStorage::Hf { .. } => {
457 let parsed = hf::HfUri::parse(path).ok_or_else(|| {
458 Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}"))
459 })?;
460 Ok(&path[path.len() - parsed.path.len()..])
461 }
462 #[cfg(all(
463 not(feature = "opendal-s3"),
464 not(feature = "opendal-fs"),
465 not(feature = "opendal-gcs"),
466 not(feature = "opendal-oss"),
467 not(feature = "opendal-azdls"),
468 not(feature = "opendal-hf"),
469 ))]
470 _ => Err(Error::new(
471 ErrorKind::FeatureUnsupported,
472 "No storage service has been enabled",
473 )),
474 }
475 }
476}
477
478#[typetag::serde(name = "OpenDalStorage")]
479#[async_trait]
480impl Storage for OpenDalStorage {
481 async fn exists(&self, path: &str) -> Result<bool> {
482 let (op, relative_path) = self.create_operator(&path)?;
483 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
484 }
485
486 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
487 let (op, relative_path) = self.create_operator(&path)?;
488 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
489 Ok(FileMetadata {
490 size: meta.content_length(),
491 })
492 }
493
494 async fn read(&self, path: &str) -> Result<Bytes> {
495 let (op, relative_path) = self.create_operator(&path)?;
496 Ok(op
497 .read(relative_path)
498 .await
499 .map_err(from_opendal_error)?
500 .to_bytes())
501 }
502
503 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
504 let (op, relative_path) = self.create_operator(&path)?;
505 Ok(Box::new(OpenDalReader(
506 op.reader(relative_path).await.map_err(from_opendal_error)?,
507 )))
508 }
509
510 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
511 let (op, relative_path) = self.create_operator(&path)?;
512 op.write(relative_path, bs)
513 .await
514 .map_err(from_opendal_error)?;
515 Ok(())
516 }
517
518 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
519 let (op, relative_path) = self.create_operator(&path)?;
520 Ok(Box::new(OpenDalWriter(
521 op.writer(relative_path).await.map_err(from_opendal_error)?,
522 )))
523 }
524
525 async fn delete(&self, path: &str) -> Result<()> {
526 let (op, relative_path) = self.create_operator(&path)?;
527 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
528 }
529
530 async fn delete_prefix(&self, path: &str) -> Result<()> {
531 let (op, relative_path) = self.create_operator(&path)?;
532 let path = if relative_path.ends_with('/') {
533 relative_path.to_string()
534 } else {
535 format!("{relative_path}/")
536 };
537 Ok(op
538 .delete_with(&path)
539 .recursive(true)
540 .await
541 .map_err(from_opendal_error)?)
542 }
543
544 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
545 let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
546
547 while let Some(path) = paths.next().await {
548 let bucket = self.batch_key_for_path(&path);
549
550 let (relative_path, deleter) = match deleters.entry(bucket) {
551 Entry::Occupied(entry) => {
552 (self.relativize_path(&path)?.to_string(), entry.into_mut())
553 }
554 Entry::Vacant(entry) => {
555 let (op, rel) = self.create_operator(&path)?;
556 let rel = rel.to_string();
557 let deleter = op.deleter().await.map_err(from_opendal_error)?;
558 (rel, entry.insert(deleter))
559 }
560 };
561
562 deleter
563 .delete(relative_path)
564 .await
565 .map_err(from_opendal_error)?;
566 }
567
568 for (_, mut deleter) in deleters {
569 deleter.close().await.map_err(from_opendal_error)?;
570 }
571
572 Ok(())
573 }
574
575 #[allow(unreachable_code, unused_variables)]
576 fn new_input(&self, path: &str) -> Result<InputFile> {
577 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
578 }
579
580 #[allow(unreachable_code, unused_variables)]
581 fn new_output(&self, path: &str) -> Result<OutputFile> {
582 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
583 }
584}
585
586pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
592
593#[async_trait]
594impl FileRead for OpenDalReader {
595 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
596 Ok(opendal::Reader::read(&self.0, range)
597 .await
598 .map_err(from_opendal_error)?
599 .to_bytes())
600 }
601}
602
603pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
605
606#[async_trait]
607impl FileWrite for OpenDalWriter {
608 async fn write(&mut self, bs: Bytes) -> Result<()> {
609 Ok(opendal::Writer::write(&mut self.0, bs)
610 .await
611 .map_err(from_opendal_error)?)
612 }
613
614 async fn close(&mut self) -> Result<()> {
615 let _ = opendal::Writer::close(&mut self.0)
616 .await
617 .map_err(from_opendal_error)?;
618 Ok(())
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[cfg(feature = "opendal-memory")]
627 #[test]
628 fn test_default_memory_operator() {
629 let op = default_memory_operator();
630 assert_eq!(op.info().scheme().to_string(), "memory");
631 }
632
633 #[cfg(feature = "opendal-memory")]
634 #[test]
635 fn test_relativize_path_memory() {
636 let storage = OpenDalStorage::Memory(default_memory_operator());
637
638 assert_eq!(
639 storage.relativize_path("memory:/path/to/file").unwrap(),
640 "path/to/file"
641 );
642 assert_eq!(
644 storage.relativize_path("/path/to/file").unwrap(),
645 "path/to/file"
646 );
647 }
648
649 #[cfg(feature = "opendal-fs")]
650 #[test]
651 fn test_relativize_path_fs() {
652 let storage = OpenDalStorage::LocalFs;
653
654 assert_eq!(
655 storage
656 .relativize_path("file:/tmp/data/file.parquet")
657 .unwrap(),
658 "tmp/data/file.parquet"
659 );
660 assert_eq!(
661 storage.relativize_path("/tmp/data/file.parquet").unwrap(),
662 "tmp/data/file.parquet"
663 );
664 }
665
666 #[cfg(feature = "opendal-s3")]
667 #[test]
668 fn test_relativize_path_s3() {
669 let storage = OpenDalStorage::S3 {
670 config: Arc::new(S3Config::default()),
671 customized_credential_load: None,
672 };
673
674 for scheme in ["s3", "s3a", "s3n", "minio"] {
678 assert_eq!(
679 storage
680 .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
681 .unwrap(),
682 "path/to/file.parquet"
683 );
684 }
685 }
686
687 #[cfg(feature = "opendal-gcs")]
688 #[test]
689 fn test_relativize_path_gcs() {
690 let storage = OpenDalStorage::Gcs {
691 config: Arc::new(GcsConfig::default()),
692 };
693
694 assert_eq!(
695 storage
696 .relativize_path("gs://my-bucket/path/to/file.parquet")
697 .unwrap(),
698 "path/to/file.parquet"
699 );
700 }
701
702 #[cfg(feature = "opendal-gcs")]
703 #[test]
704 fn test_relativize_path_gcs_invalid_scheme() {
705 let storage = OpenDalStorage::Gcs {
706 config: Arc::new(GcsConfig::default()),
707 };
708
709 assert!(
710 storage
711 .relativize_path("s3://my-bucket/path/to/file.parquet")
712 .is_err()
713 );
714 }
715
716 #[cfg(feature = "opendal-oss")]
717 #[test]
718 fn test_relativize_path_oss() {
719 let storage = OpenDalStorage::Oss {
720 config: Arc::new(OssConfig::default()),
721 };
722
723 assert_eq!(
724 storage
725 .relativize_path("oss://my-bucket/path/to/file.parquet")
726 .unwrap(),
727 "path/to/file.parquet"
728 );
729 }
730
731 #[cfg(feature = "opendal-oss")]
732 #[test]
733 fn test_relativize_path_oss_invalid_scheme() {
734 let storage = OpenDalStorage::Oss {
735 config: Arc::new(OssConfig::default()),
736 };
737
738 assert!(
739 storage
740 .relativize_path("s3://my-bucket/path/to/file.parquet")
741 .is_err()
742 );
743 }
744
745 #[cfg(feature = "opendal-azdls")]
746 #[test]
747 fn test_relativize_path_azdls() {
748 let storage = OpenDalStorage::Azdls {
749 config: Arc::new(AzdlsConfig {
750 account_name: Some("myaccount".to_string()),
751 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
752 ..Default::default()
753 }),
754 };
755
756 assert_eq!(
757 storage
758 .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
759 .unwrap(),
760 "/path/to/file.parquet"
761 );
762 }
763}