1mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47 if #[cfg(feature = "opendal-azdls")] {
48 mod azdls;
49 use azdls::*;
50 use opendal::services::AzdlsConfig;
51 }
52}
53
54cfg_if! {
55 if #[cfg(feature = "opendal-fs")] {
56 mod fs;
57 use fs::*;
58 }
59}
60
61cfg_if! {
62 if #[cfg(feature = "opendal-gcs")] {
63 mod gcs;
64 use gcs::*;
65 use opendal::services::GcsConfig;
66 }
67}
68
69cfg_if! {
70 if #[cfg(feature = "opendal-memory")] {
71 mod memory;
72 use memory::*;
73 }
74}
75
76cfg_if! {
77 if #[cfg(feature = "opendal-oss")] {
78 mod oss;
79 use opendal::services::OssConfig;
80 use oss::*;
81 }
82}
83
84cfg_if! {
85 if #[cfg(feature = "opendal-s3")] {
86 mod s3;
87 use opendal::services::S3Config;
88 pub use s3::*;
89 }
90}
91
92mod resolving;
93pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
100pub enum OpenDalStorageFactory {
101 #[cfg(feature = "opendal-memory")]
103 Memory,
104 #[cfg(feature = "opendal-fs")]
106 Fs,
107 #[cfg(feature = "opendal-s3")]
109 S3 {
110 #[serde(skip)]
112 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
113 },
114 #[cfg(feature = "opendal-gcs")]
116 Gcs,
117 #[cfg(feature = "opendal-oss")]
119 Oss,
120 #[cfg(feature = "opendal-azdls")]
122 Azdls,
123}
124
125#[typetag::serde(name = "OpenDalStorageFactory")]
126impl StorageFactory for OpenDalStorageFactory {
127 #[allow(unused_variables)]
128 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
129 match self {
130 #[cfg(feature = "opendal-memory")]
131 OpenDalStorageFactory::Memory => {
132 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
133 }
134 #[cfg(feature = "opendal-fs")]
135 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
136 #[cfg(feature = "opendal-s3")]
137 OpenDalStorageFactory::S3 {
138 customized_credential_load,
139 } => Ok(Arc::new(OpenDalStorage::S3 {
140 config: s3_config_parse(config.props().clone())?.into(),
141 customized_credential_load: customized_credential_load.clone(),
142 })),
143 #[cfg(feature = "opendal-gcs")]
144 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
145 config: gcs_config_parse(config.props().clone())?.into(),
146 })),
147 #[cfg(feature = "opendal-oss")]
148 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
149 config: oss_config_parse(config.props().clone())?.into(),
150 })),
151 #[cfg(feature = "opendal-azdls")]
152 OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
153 config: azdls_config_parse(config.props().clone())?.into(),
154 })),
155 #[cfg(all(
156 not(feature = "opendal-memory"),
157 not(feature = "opendal-fs"),
158 not(feature = "opendal-s3"),
159 not(feature = "opendal-gcs"),
160 not(feature = "opendal-oss"),
161 not(feature = "opendal-azdls"),
162 ))]
163 _ => Err(Error::new(
164 ErrorKind::FeatureUnsupported,
165 "No storage service has been enabled",
166 )),
167 }
168 }
169}
170
171#[cfg(feature = "opendal-memory")]
173fn default_memory_operator() -> Operator {
174 memory_config_build().expect("Failed to create default memory operator")
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize)]
179pub enum OpenDalStorage {
180 #[cfg(feature = "opendal-memory")]
182 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
183 #[cfg(feature = "opendal-fs")]
185 LocalFs,
186 #[cfg(feature = "opendal-s3")]
191 S3 {
192 config: Arc<S3Config>,
194 #[serde(skip)]
196 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
197 },
198 #[cfg(feature = "opendal-gcs")]
200 Gcs {
201 config: Arc<GcsConfig>,
203 },
204 #[cfg(feature = "opendal-oss")]
206 Oss {
207 config: Arc<OssConfig>,
209 },
210 #[cfg(feature = "opendal-azdls")]
217 Azdls {
218 config: Arc<AzdlsConfig>,
220 },
221}
222
223impl OpenDalStorage {
224 #[allow(unreachable_code, unused_variables)]
237 pub(crate) fn create_operator<'a>(
238 &self,
239 path: &'a impl AsRef<str>,
240 ) -> Result<(Operator, &'a str)> {
241 let path = path.as_ref();
242 let (operator, relative_path): (Operator, &str) = match self {
243 #[cfg(feature = "opendal-memory")]
244 OpenDalStorage::Memory(op) => {
245 if let Some(stripped) = path.strip_prefix("memory:/") {
246 (op.clone(), stripped)
247 } else {
248 (op.clone(), &path[1..])
249 }
250 }
251 #[cfg(feature = "opendal-fs")]
252 OpenDalStorage::LocalFs => {
253 let op = fs_config_build()?;
254 if let Some(stripped) = path.strip_prefix("file:/") {
255 (op, stripped)
256 } else {
257 (op, &path[1..])
258 }
259 }
260 #[cfg(feature = "opendal-s3")]
261 OpenDalStorage::S3 {
262 config,
263 customized_credential_load,
264 } => {
265 let op = s3_config_build(config, customized_credential_load, path)?;
266 let op_info = op.info();
267
268 let url = url::Url::parse(path).map_err(|e| {
271 Error::new(
272 ErrorKind::DataInvalid,
273 format!("Invalid s3 url: {path}: {e}"),
274 )
275 })?;
276 let prefix = format!("{}://{}/", url.scheme(), op_info.name());
277 if path.starts_with(&prefix) {
278 (op, &path[prefix.len()..])
279 } else {
280 return Err(Error::new(
281 ErrorKind::DataInvalid,
282 format!("Invalid s3 url: {path}, should start with {prefix}"),
283 ));
284 }
285 }
286 #[cfg(feature = "opendal-gcs")]
287 OpenDalStorage::Gcs { config } => {
288 let operator = gcs_config_build(config, path)?;
289 let prefix = format!("gs://{}/", operator.info().name());
290 if path.starts_with(&prefix) {
291 (operator, &path[prefix.len()..])
292 } else {
293 return Err(Error::new(
294 ErrorKind::DataInvalid,
295 format!("Invalid gcs url: {path}, should start with {prefix}"),
296 ));
297 }
298 }
299 #[cfg(feature = "opendal-oss")]
300 OpenDalStorage::Oss { config } => {
301 let op = oss_config_build(config, path)?;
302 let prefix = format!("oss://{}/", op.info().name());
303 if path.starts_with(&prefix) {
304 (op, &path[prefix.len()..])
305 } else {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 format!("Invalid oss url: {path}, should start with {prefix}"),
309 ));
310 }
311 }
312 #[cfg(feature = "opendal-azdls")]
313 OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
314 #[cfg(all(
315 not(feature = "opendal-s3"),
316 not(feature = "opendal-fs"),
317 not(feature = "opendal-gcs"),
318 not(feature = "opendal-oss"),
319 not(feature = "opendal-azdls"),
320 ))]
321 _ => {
322 return Err(Error::new(
323 ErrorKind::FeatureUnsupported,
324 "No storage service has been enabled",
325 ));
326 }
327 };
328
329 let operator = operator.layer(RetryLayer::new());
332 Ok((operator, relative_path))
333 }
334
335 #[allow(unreachable_code, unused_variables)]
341 pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
342 match self {
343 #[cfg(feature = "opendal-memory")]
344 OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
345 #[cfg(feature = "opendal-fs")]
346 OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
347 #[cfg(feature = "opendal-s3")]
348 OpenDalStorage::S3 { .. } => {
349 let url = url::Url::parse(path)?;
350 let bucket = url.host_str().ok_or_else(|| {
351 Error::new(
352 ErrorKind::DataInvalid,
353 format!("Invalid s3 url: {path}, missing bucket"),
354 )
355 })?;
356 let prefix = format!("{}://{}/", url.scheme(), bucket);
357 if path.starts_with(&prefix) {
358 Ok(&path[prefix.len()..])
359 } else {
360 Err(Error::new(
361 ErrorKind::DataInvalid,
362 format!("Invalid s3 url: {path}, should start with {prefix}"),
363 ))
364 }
365 }
366 #[cfg(feature = "opendal-gcs")]
367 OpenDalStorage::Gcs { .. } => {
368 let url = url::Url::parse(path)?;
369 let bucket = url.host_str().ok_or_else(|| {
370 Error::new(
371 ErrorKind::DataInvalid,
372 format!("Invalid gcs url: {path}, missing bucket"),
373 )
374 })?;
375 let prefix = format!("gs://{}/", bucket);
376 if path.starts_with(&prefix) {
377 Ok(&path[prefix.len()..])
378 } else {
379 Err(Error::new(
380 ErrorKind::DataInvalid,
381 format!("Invalid gcs url: {path}, should start with {prefix}"),
382 ))
383 }
384 }
385 #[cfg(feature = "opendal-oss")]
386 OpenDalStorage::Oss { .. } => {
387 let url = url::Url::parse(path)?;
388 let bucket = url.host_str().ok_or_else(|| {
389 Error::new(
390 ErrorKind::DataInvalid,
391 format!("Invalid oss url: {path}, missing bucket"),
392 )
393 })?;
394 let prefix = format!("oss://{}/", bucket);
395 if path.starts_with(&prefix) {
396 Ok(&path[prefix.len()..])
397 } else {
398 Err(Error::new(
399 ErrorKind::DataInvalid,
400 format!("Invalid oss url: {path}, should start with {prefix}"),
401 ))
402 }
403 }
404 #[cfg(feature = "opendal-azdls")]
405 OpenDalStorage::Azdls { config } => {
406 let azure_path = path.parse::<AzureStoragePath>()?;
407 match_path_with_config(&azure_path, config)?;
408 let relative_path_len = azure_path.path.len();
409 Ok(&path[path.len() - relative_path_len..])
410 }
411 #[cfg(all(
412 not(feature = "opendal-s3"),
413 not(feature = "opendal-fs"),
414 not(feature = "opendal-gcs"),
415 not(feature = "opendal-oss"),
416 not(feature = "opendal-azdls"),
417 ))]
418 _ => Err(Error::new(
419 ErrorKind::FeatureUnsupported,
420 "No storage service has been enabled",
421 )),
422 }
423 }
424}
425
426#[typetag::serde(name = "OpenDalStorage")]
427#[async_trait]
428impl Storage for OpenDalStorage {
429 async fn exists(&self, path: &str) -> Result<bool> {
430 let (op, relative_path) = self.create_operator(&path)?;
431 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
432 }
433
434 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
435 let (op, relative_path) = self.create_operator(&path)?;
436 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
437 Ok(FileMetadata {
438 size: meta.content_length(),
439 })
440 }
441
442 async fn read(&self, path: &str) -> Result<Bytes> {
443 let (op, relative_path) = self.create_operator(&path)?;
444 Ok(op
445 .read(relative_path)
446 .await
447 .map_err(from_opendal_error)?
448 .to_bytes())
449 }
450
451 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
452 let (op, relative_path) = self.create_operator(&path)?;
453 Ok(Box::new(OpenDalReader(
454 op.reader(relative_path).await.map_err(from_opendal_error)?,
455 )))
456 }
457
458 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
459 let (op, relative_path) = self.create_operator(&path)?;
460 op.write(relative_path, bs)
461 .await
462 .map_err(from_opendal_error)?;
463 Ok(())
464 }
465
466 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
467 let (op, relative_path) = self.create_operator(&path)?;
468 Ok(Box::new(OpenDalWriter(
469 op.writer(relative_path).await.map_err(from_opendal_error)?,
470 )))
471 }
472
473 async fn delete(&self, path: &str) -> Result<()> {
474 let (op, relative_path) = self.create_operator(&path)?;
475 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
476 }
477
478 async fn delete_prefix(&self, path: &str) -> Result<()> {
479 let (op, relative_path) = self.create_operator(&path)?;
480 let path = if relative_path.ends_with('/') {
481 relative_path.to_string()
482 } else {
483 format!("{relative_path}/")
484 };
485 Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
486 }
487
488 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
489 let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
490
491 while let Some(path) = paths.next().await {
492 let bucket = url::Url::parse(&path)
493 .ok()
494 .and_then(|u| u.host_str().map(|s| s.to_string()))
495 .unwrap_or_default();
496
497 let (relative_path, deleter) = match deleters.entry(bucket) {
498 Entry::Occupied(entry) => {
499 (self.relativize_path(&path)?.to_string(), entry.into_mut())
500 }
501 Entry::Vacant(entry) => {
502 let (op, rel) = self.create_operator(&path)?;
503 let rel = rel.to_string();
504 let deleter = op.deleter().await.map_err(from_opendal_error)?;
505 (rel, entry.insert(deleter))
506 }
507 };
508
509 deleter
510 .delete(relative_path)
511 .await
512 .map_err(from_opendal_error)?;
513 }
514
515 for (_, mut deleter) in deleters {
516 deleter.close().await.map_err(from_opendal_error)?;
517 }
518
519 Ok(())
520 }
521
522 #[allow(unreachable_code, unused_variables)]
523 fn new_input(&self, path: &str) -> Result<InputFile> {
524 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
525 }
526
527 #[allow(unreachable_code, unused_variables)]
528 fn new_output(&self, path: &str) -> Result<OutputFile> {
529 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
530 }
531}
532
533pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
539
540#[async_trait]
541impl FileRead for OpenDalReader {
542 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
543 Ok(opendal::Reader::read(&self.0, range)
544 .await
545 .map_err(from_opendal_error)?
546 .to_bytes())
547 }
548}
549
550pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
552
553#[async_trait]
554impl FileWrite for OpenDalWriter {
555 async fn write(&mut self, bs: Bytes) -> Result<()> {
556 Ok(opendal::Writer::write(&mut self.0, bs)
557 .await
558 .map_err(from_opendal_error)?)
559 }
560
561 async fn close(&mut self) -> Result<()> {
562 let _ = opendal::Writer::close(&mut self.0)
563 .await
564 .map_err(from_opendal_error)?;
565 Ok(())
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572
573 #[cfg(feature = "opendal-memory")]
574 #[test]
575 fn test_default_memory_operator() {
576 let op = default_memory_operator();
577 assert_eq!(op.info().scheme().to_string(), "memory");
578 }
579
580 #[cfg(feature = "opendal-memory")]
581 #[test]
582 fn test_relativize_path_memory() {
583 let storage = OpenDalStorage::Memory(default_memory_operator());
584
585 assert_eq!(
586 storage.relativize_path("memory:/path/to/file").unwrap(),
587 "path/to/file"
588 );
589 assert_eq!(
591 storage.relativize_path("/path/to/file").unwrap(),
592 "path/to/file"
593 );
594 }
595
596 #[cfg(feature = "opendal-fs")]
597 #[test]
598 fn test_relativize_path_fs() {
599 let storage = OpenDalStorage::LocalFs;
600
601 assert_eq!(
602 storage
603 .relativize_path("file:/tmp/data/file.parquet")
604 .unwrap(),
605 "tmp/data/file.parquet"
606 );
607 assert_eq!(
608 storage.relativize_path("/tmp/data/file.parquet").unwrap(),
609 "tmp/data/file.parquet"
610 );
611 }
612
613 #[cfg(feature = "opendal-s3")]
614 #[test]
615 fn test_relativize_path_s3() {
616 let storage = OpenDalStorage::S3 {
617 config: Arc::new(S3Config::default()),
618 customized_credential_load: None,
619 };
620
621 for scheme in ["s3", "s3a", "s3n", "minio"] {
625 assert_eq!(
626 storage
627 .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
628 .unwrap(),
629 "path/to/file.parquet"
630 );
631 }
632 }
633
634 #[cfg(feature = "opendal-gcs")]
635 #[test]
636 fn test_relativize_path_gcs() {
637 let storage = OpenDalStorage::Gcs {
638 config: Arc::new(GcsConfig::default()),
639 };
640
641 assert_eq!(
642 storage
643 .relativize_path("gs://my-bucket/path/to/file.parquet")
644 .unwrap(),
645 "path/to/file.parquet"
646 );
647 }
648
649 #[cfg(feature = "opendal-gcs")]
650 #[test]
651 fn test_relativize_path_gcs_invalid_scheme() {
652 let storage = OpenDalStorage::Gcs {
653 config: Arc::new(GcsConfig::default()),
654 };
655
656 assert!(
657 storage
658 .relativize_path("s3://my-bucket/path/to/file.parquet")
659 .is_err()
660 );
661 }
662
663 #[cfg(feature = "opendal-oss")]
664 #[test]
665 fn test_relativize_path_oss() {
666 let storage = OpenDalStorage::Oss {
667 config: Arc::new(OssConfig::default()),
668 };
669
670 assert_eq!(
671 storage
672 .relativize_path("oss://my-bucket/path/to/file.parquet")
673 .unwrap(),
674 "path/to/file.parquet"
675 );
676 }
677
678 #[cfg(feature = "opendal-oss")]
679 #[test]
680 fn test_relativize_path_oss_invalid_scheme() {
681 let storage = OpenDalStorage::Oss {
682 config: Arc::new(OssConfig::default()),
683 };
684
685 assert!(
686 storage
687 .relativize_path("s3://my-bucket/path/to/file.parquet")
688 .is_err()
689 );
690 }
691
692 #[cfg(feature = "opendal-azdls")]
693 #[test]
694 fn test_relativize_path_azdls() {
695 let storage = OpenDalStorage::Azdls {
696 config: Arc::new(AzdlsConfig {
697 account_name: Some("myaccount".to_string()),
698 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
699 ..Default::default()
700 }),
701 };
702
703 assert_eq!(
704 storage
705 .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
706 .unwrap(),
707 "/path/to/file.parquet"
708 );
709 }
710}