1mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47 if #[cfg(feature = "opendal-azdls")] {
48 mod azdls;
49 use azdls::*;
50 use opendal::services::AzdlsConfig;
51 }
52}
53
54cfg_if! {
55 if #[cfg(feature = "opendal-fs")] {
56 mod fs;
57 use fs::*;
58 }
59}
60
61cfg_if! {
62 if #[cfg(feature = "opendal-gcs")] {
63 mod gcs;
64 use gcs::*;
65 use opendal::services::GcsConfig;
66 }
67}
68
69cfg_if! {
70 if #[cfg(feature = "opendal-memory")] {
71 mod memory;
72 use memory::*;
73 }
74}
75
76cfg_if! {
77 if #[cfg(feature = "opendal-oss")] {
78 mod oss;
79 use opendal::services::OssConfig;
80 use oss::*;
81 }
82}
83
84cfg_if! {
85 if #[cfg(feature = "opendal-s3")] {
86 mod s3;
87 use opendal::services::S3Config;
88 pub use s3::*;
89 }
90}
91
92mod resolving;
93pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
100pub enum OpenDalStorageFactory {
101 #[cfg(feature = "opendal-memory")]
103 Memory,
104 #[cfg(feature = "opendal-fs")]
106 Fs,
107 #[cfg(feature = "opendal-s3")]
109 S3 {
110 #[serde(skip)]
112 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
113 },
114 #[cfg(feature = "opendal-gcs")]
116 Gcs,
117 #[cfg(feature = "opendal-oss")]
119 Oss,
120 #[cfg(feature = "opendal-azdls")]
122 Azdls,
123}
124
125#[typetag::serde(name = "OpenDalStorageFactory")]
126impl StorageFactory for OpenDalStorageFactory {
127 #[allow(unused_variables)]
128 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
129 match self {
130 #[cfg(feature = "opendal-memory")]
131 OpenDalStorageFactory::Memory => {
132 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
133 }
134 #[cfg(feature = "opendal-fs")]
135 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
136 #[cfg(feature = "opendal-s3")]
137 OpenDalStorageFactory::S3 {
138 customized_credential_load,
139 } => Ok(Arc::new(OpenDalStorage::S3 {
140 config: s3_config_parse(config.props().clone())?.into(),
141 customized_credential_load: customized_credential_load.clone(),
142 })),
143 #[cfg(feature = "opendal-gcs")]
144 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
145 config: gcs_config_parse(config.props().clone())?.into(),
146 })),
147 #[cfg(feature = "opendal-oss")]
148 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
149 config: oss_config_parse(config.props().clone())?.into(),
150 })),
151 #[cfg(feature = "opendal-azdls")]
152 OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
153 config: azdls_config_parse(config.props().clone())?.into(),
154 })),
155 #[cfg(all(
156 not(feature = "opendal-memory"),
157 not(feature = "opendal-fs"),
158 not(feature = "opendal-s3"),
159 not(feature = "opendal-gcs"),
160 not(feature = "opendal-oss"),
161 not(feature = "opendal-azdls"),
162 ))]
163 _ => Err(Error::new(
164 ErrorKind::FeatureUnsupported,
165 "No storage service has been enabled",
166 )),
167 }
168 }
169}
170
171#[cfg(feature = "opendal-memory")]
173fn default_memory_operator() -> Operator {
174 memory_config_build().expect("Failed to create default memory operator")
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize)]
179pub enum OpenDalStorage {
180 #[cfg(feature = "opendal-memory")]
182 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
183 #[cfg(feature = "opendal-fs")]
185 LocalFs,
186 #[cfg(feature = "opendal-s3")]
191 S3 {
192 config: Arc<S3Config>,
194 #[serde(skip)]
196 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
197 },
198 #[cfg(feature = "opendal-gcs")]
200 Gcs {
201 config: Arc<GcsConfig>,
203 },
204 #[cfg(feature = "opendal-oss")]
206 Oss {
207 config: Arc<OssConfig>,
209 },
210 #[cfg(feature = "opendal-azdls")]
217 Azdls {
218 config: Arc<AzdlsConfig>,
220 },
221}
222
223impl OpenDalStorage {
224 #[allow(unreachable_code, unused_variables)]
237 pub(crate) fn create_operator<'a>(
238 &self,
239 path: &'a impl AsRef<str>,
240 ) -> Result<(Operator, &'a str)> {
241 let path = path.as_ref();
242 let (operator, relative_path): (Operator, &str) = match self {
243 #[cfg(feature = "opendal-memory")]
244 OpenDalStorage::Memory(op) => {
245 if let Some(stripped) = path.strip_prefix("memory:/") {
246 (op.clone(), stripped)
247 } else {
248 (op.clone(), &path[1..])
249 }
250 }
251 #[cfg(feature = "opendal-fs")]
252 OpenDalStorage::LocalFs => {
253 let op = fs_config_build()?;
254 if let Some(stripped) = path.strip_prefix("file:/") {
255 (op, stripped)
256 } else {
257 (op, &path[1..])
258 }
259 }
260 #[cfg(feature = "opendal-s3")]
261 OpenDalStorage::S3 {
262 config,
263 customized_credential_load,
264 } => {
265 let op = s3_config_build(config, customized_credential_load, path)?;
266 let op_info = op.info();
267
268 let url = url::Url::parse(path).map_err(|e| {
271 Error::new(
272 ErrorKind::DataInvalid,
273 format!("Invalid s3 url: {path}: {e}"),
274 )
275 })?;
276 let prefix = format!("{}://{}/", url.scheme(), op_info.name());
277 if path.starts_with(&prefix) {
278 (op, &path[prefix.len()..])
279 } else {
280 return Err(Error::new(
281 ErrorKind::DataInvalid,
282 format!("Invalid s3 url: {path}, should start with {prefix}"),
283 ));
284 }
285 }
286 #[cfg(feature = "opendal-gcs")]
287 OpenDalStorage::Gcs { config } => {
288 let operator = gcs_config_build(config, path)?;
289 let prefix = format!("gs://{}/", operator.info().name());
290 if path.starts_with(&prefix) {
291 (operator, &path[prefix.len()..])
292 } else {
293 return Err(Error::new(
294 ErrorKind::DataInvalid,
295 format!("Invalid gcs url: {path}, should start with {prefix}"),
296 ));
297 }
298 }
299 #[cfg(feature = "opendal-oss")]
300 OpenDalStorage::Oss { config } => {
301 let op = oss_config_build(config, path)?;
302 let prefix = format!("oss://{}/", op.info().name());
303 if path.starts_with(&prefix) {
304 (op, &path[prefix.len()..])
305 } else {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 format!("Invalid oss url: {path}, should start with {prefix}"),
309 ));
310 }
311 }
312 #[cfg(feature = "opendal-azdls")]
313 OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
314 #[cfg(all(
315 not(feature = "opendal-s3"),
316 not(feature = "opendal-fs"),
317 not(feature = "opendal-gcs"),
318 not(feature = "opendal-oss"),
319 not(feature = "opendal-azdls"),
320 ))]
321 _ => {
322 return Err(Error::new(
323 ErrorKind::FeatureUnsupported,
324 "No storage service has been enabled",
325 ));
326 }
327 };
328
329 let operator = operator.layer(RetryLayer::new());
332 Ok((operator, relative_path))
333 }
334
335 #[allow(unreachable_code, unused_variables)]
341 pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
342 match self {
343 #[cfg(feature = "opendal-memory")]
344 OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
345 #[cfg(feature = "opendal-fs")]
346 OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
347 #[cfg(feature = "opendal-s3")]
348 OpenDalStorage::S3 { .. } => {
349 let url = url::Url::parse(path)?;
350 let bucket = url.host_str().ok_or_else(|| {
351 Error::new(
352 ErrorKind::DataInvalid,
353 format!("Invalid s3 url: {path}, missing bucket"),
354 )
355 })?;
356 let prefix = format!("{}://{}/", url.scheme(), bucket);
357 if path.starts_with(&prefix) {
358 Ok(&path[prefix.len()..])
359 } else {
360 Err(Error::new(
361 ErrorKind::DataInvalid,
362 format!("Invalid s3 url: {path}, should start with {prefix}"),
363 ))
364 }
365 }
366 #[cfg(feature = "opendal-gcs")]
367 OpenDalStorage::Gcs { .. } => {
368 let url = url::Url::parse(path)?;
369 let bucket = url.host_str().ok_or_else(|| {
370 Error::new(
371 ErrorKind::DataInvalid,
372 format!("Invalid gcs url: {path}, missing bucket"),
373 )
374 })?;
375 let prefix = format!("gs://{}/", bucket);
376 if path.starts_with(&prefix) {
377 Ok(&path[prefix.len()..])
378 } else {
379 Err(Error::new(
380 ErrorKind::DataInvalid,
381 format!("Invalid gcs url: {path}, should start with {prefix}"),
382 ))
383 }
384 }
385 #[cfg(feature = "opendal-oss")]
386 OpenDalStorage::Oss { .. } => {
387 let url = url::Url::parse(path)?;
388 let bucket = url.host_str().ok_or_else(|| {
389 Error::new(
390 ErrorKind::DataInvalid,
391 format!("Invalid oss url: {path}, missing bucket"),
392 )
393 })?;
394 let prefix = format!("oss://{}/", bucket);
395 if path.starts_with(&prefix) {
396 Ok(&path[prefix.len()..])
397 } else {
398 Err(Error::new(
399 ErrorKind::DataInvalid,
400 format!("Invalid oss url: {path}, should start with {prefix}"),
401 ))
402 }
403 }
404 #[cfg(feature = "opendal-azdls")]
405 OpenDalStorage::Azdls { config } => {
406 let azure_path = path.parse::<AzureStoragePath>()?;
407 match_path_with_config(&azure_path, config)?;
408 let relative_path_len = azure_path.path.len();
409 Ok(&path[path.len() - relative_path_len..])
410 }
411 #[cfg(all(
412 not(feature = "opendal-s3"),
413 not(feature = "opendal-fs"),
414 not(feature = "opendal-gcs"),
415 not(feature = "opendal-oss"),
416 not(feature = "opendal-azdls"),
417 ))]
418 _ => Err(Error::new(
419 ErrorKind::FeatureUnsupported,
420 "No storage service has been enabled",
421 )),
422 }
423 }
424}
425
426#[typetag::serde(name = "OpenDalStorage")]
427#[async_trait]
428impl Storage for OpenDalStorage {
429 async fn exists(&self, path: &str) -> Result<bool> {
430 let (op, relative_path) = self.create_operator(&path)?;
431 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
432 }
433
434 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
435 let (op, relative_path) = self.create_operator(&path)?;
436 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
437 Ok(FileMetadata {
438 size: meta.content_length(),
439 })
440 }
441
442 async fn read(&self, path: &str) -> Result<Bytes> {
443 let (op, relative_path) = self.create_operator(&path)?;
444 Ok(op
445 .read(relative_path)
446 .await
447 .map_err(from_opendal_error)?
448 .to_bytes())
449 }
450
451 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
452 let (op, relative_path) = self.create_operator(&path)?;
453 Ok(Box::new(OpenDalReader(
454 op.reader(relative_path).await.map_err(from_opendal_error)?,
455 )))
456 }
457
458 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
459 let (op, relative_path) = self.create_operator(&path)?;
460 op.write(relative_path, bs)
461 .await
462 .map_err(from_opendal_error)?;
463 Ok(())
464 }
465
466 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
467 let (op, relative_path) = self.create_operator(&path)?;
468 Ok(Box::new(OpenDalWriter(
469 op.writer(relative_path).await.map_err(from_opendal_error)?,
470 )))
471 }
472
473 async fn delete(&self, path: &str) -> Result<()> {
474 let (op, relative_path) = self.create_operator(&path)?;
475 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
476 }
477
478 async fn delete_prefix(&self, path: &str) -> Result<()> {
479 let (op, relative_path) = self.create_operator(&path)?;
480 let path = if relative_path.ends_with('/') {
481 relative_path.to_string()
482 } else {
483 format!("{relative_path}/")
484 };
485 Ok(op
486 .delete_with(&path)
487 .recursive(true)
488 .await
489 .map_err(from_opendal_error)?)
490 }
491
492 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
493 let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
494
495 while let Some(path) = paths.next().await {
496 let bucket = url::Url::parse(&path)
497 .ok()
498 .and_then(|u| u.host_str().map(|s| s.to_string()))
499 .unwrap_or_default();
500
501 let (relative_path, deleter) = match deleters.entry(bucket) {
502 Entry::Occupied(entry) => {
503 (self.relativize_path(&path)?.to_string(), entry.into_mut())
504 }
505 Entry::Vacant(entry) => {
506 let (op, rel) = self.create_operator(&path)?;
507 let rel = rel.to_string();
508 let deleter = op.deleter().await.map_err(from_opendal_error)?;
509 (rel, entry.insert(deleter))
510 }
511 };
512
513 deleter
514 .delete(relative_path)
515 .await
516 .map_err(from_opendal_error)?;
517 }
518
519 for (_, mut deleter) in deleters {
520 deleter.close().await.map_err(from_opendal_error)?;
521 }
522
523 Ok(())
524 }
525
526 #[allow(unreachable_code, unused_variables)]
527 fn new_input(&self, path: &str) -> Result<InputFile> {
528 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
529 }
530
531 #[allow(unreachable_code, unused_variables)]
532 fn new_output(&self, path: &str) -> Result<OutputFile> {
533 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
534 }
535}
536
537pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
543
544#[async_trait]
545impl FileRead for OpenDalReader {
546 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
547 Ok(opendal::Reader::read(&self.0, range)
548 .await
549 .map_err(from_opendal_error)?
550 .to_bytes())
551 }
552}
553
554pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
556
557#[async_trait]
558impl FileWrite for OpenDalWriter {
559 async fn write(&mut self, bs: Bytes) -> Result<()> {
560 Ok(opendal::Writer::write(&mut self.0, bs)
561 .await
562 .map_err(from_opendal_error)?)
563 }
564
565 async fn close(&mut self) -> Result<()> {
566 let _ = opendal::Writer::close(&mut self.0)
567 .await
568 .map_err(from_opendal_error)?;
569 Ok(())
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[cfg(feature = "opendal-memory")]
578 #[test]
579 fn test_default_memory_operator() {
580 let op = default_memory_operator();
581 assert_eq!(op.info().scheme().to_string(), "memory");
582 }
583
584 #[cfg(feature = "opendal-memory")]
585 #[test]
586 fn test_relativize_path_memory() {
587 let storage = OpenDalStorage::Memory(default_memory_operator());
588
589 assert_eq!(
590 storage.relativize_path("memory:/path/to/file").unwrap(),
591 "path/to/file"
592 );
593 assert_eq!(
595 storage.relativize_path("/path/to/file").unwrap(),
596 "path/to/file"
597 );
598 }
599
600 #[cfg(feature = "opendal-fs")]
601 #[test]
602 fn test_relativize_path_fs() {
603 let storage = OpenDalStorage::LocalFs;
604
605 assert_eq!(
606 storage
607 .relativize_path("file:/tmp/data/file.parquet")
608 .unwrap(),
609 "tmp/data/file.parquet"
610 );
611 assert_eq!(
612 storage.relativize_path("/tmp/data/file.parquet").unwrap(),
613 "tmp/data/file.parquet"
614 );
615 }
616
617 #[cfg(feature = "opendal-s3")]
618 #[test]
619 fn test_relativize_path_s3() {
620 let storage = OpenDalStorage::S3 {
621 config: Arc::new(S3Config::default()),
622 customized_credential_load: None,
623 };
624
625 for scheme in ["s3", "s3a", "s3n", "minio"] {
629 assert_eq!(
630 storage
631 .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
632 .unwrap(),
633 "path/to/file.parquet"
634 );
635 }
636 }
637
638 #[cfg(feature = "opendal-gcs")]
639 #[test]
640 fn test_relativize_path_gcs() {
641 let storage = OpenDalStorage::Gcs {
642 config: Arc::new(GcsConfig::default()),
643 };
644
645 assert_eq!(
646 storage
647 .relativize_path("gs://my-bucket/path/to/file.parquet")
648 .unwrap(),
649 "path/to/file.parquet"
650 );
651 }
652
653 #[cfg(feature = "opendal-gcs")]
654 #[test]
655 fn test_relativize_path_gcs_invalid_scheme() {
656 let storage = OpenDalStorage::Gcs {
657 config: Arc::new(GcsConfig::default()),
658 };
659
660 assert!(
661 storage
662 .relativize_path("s3://my-bucket/path/to/file.parquet")
663 .is_err()
664 );
665 }
666
667 #[cfg(feature = "opendal-oss")]
668 #[test]
669 fn test_relativize_path_oss() {
670 let storage = OpenDalStorage::Oss {
671 config: Arc::new(OssConfig::default()),
672 };
673
674 assert_eq!(
675 storage
676 .relativize_path("oss://my-bucket/path/to/file.parquet")
677 .unwrap(),
678 "path/to/file.parquet"
679 );
680 }
681
682 #[cfg(feature = "opendal-oss")]
683 #[test]
684 fn test_relativize_path_oss_invalid_scheme() {
685 let storage = OpenDalStorage::Oss {
686 config: Arc::new(OssConfig::default()),
687 };
688
689 assert!(
690 storage
691 .relativize_path("s3://my-bucket/path/to/file.parquet")
692 .is_err()
693 );
694 }
695
696 #[cfg(feature = "opendal-azdls")]
697 #[test]
698 fn test_relativize_path_azdls() {
699 let storage = OpenDalStorage::Azdls {
700 config: Arc::new(AzdlsConfig {
701 account_name: Some("myaccount".to_string()),
702 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
703 ..Default::default()
704 }),
705 };
706
707 assert_eq!(
708 storage
709 .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
710 .unwrap(),
711 "/path/to/file.parquet"
712 );
713 }
714}