iceberg_storage_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation for Apache Iceberg.
19//!
20//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
21//! which implement the [`Storage`](iceberg::io::Storage) and
22//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate
23//! using [OpenDAL](https://opendal.apache.org/) as the backend.
24
25mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38    StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47    if #[cfg(feature = "opendal-azdls")] {
48        mod azdls;
49        use azdls::AzureStorageScheme;
50        use azdls::*;
51        use opendal::services::AzdlsConfig;
52    }
53}
54
55cfg_if! {
56    if #[cfg(feature = "opendal-fs")] {
57        mod fs;
58        use fs::*;
59    }
60}
61
62cfg_if! {
63    if #[cfg(feature = "opendal-gcs")] {
64        mod gcs;
65        use gcs::*;
66        use opendal::services::GcsConfig;
67    }
68}
69
70cfg_if! {
71    if #[cfg(feature = "opendal-memory")] {
72        mod memory;
73        use memory::*;
74    }
75}
76
77cfg_if! {
78    if #[cfg(feature = "opendal-oss")] {
79        mod oss;
80        use opendal::services::OssConfig;
81        use oss::*;
82    }
83}
84
85cfg_if! {
86    if #[cfg(feature = "opendal-s3")] {
87        mod s3;
88        use opendal::services::S3Config;
89        pub use s3::*;
90    }
91}
92
93mod resolving;
94pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
95
96/// OpenDAL-based storage factory.
97///
98/// Maps scheme to the corresponding OpenDalStorage storage variant.
99/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances.
100#[derive(Clone, Debug, Serialize, Deserialize)]
101pub enum OpenDalStorageFactory {
102    /// Memory storage factory.
103    #[cfg(feature = "opendal-memory")]
104    Memory,
105    /// Local filesystem storage factory.
106    #[cfg(feature = "opendal-fs")]
107    Fs,
108    /// S3 storage factory.
109    #[cfg(feature = "opendal-s3")]
110    S3 {
111        /// s3 storage could have `s3://` and `s3a://`.
112        /// Storing the scheme string here to return the correct path.
113        configured_scheme: String,
114        /// Custom AWS credential loader.
115        #[serde(skip)]
116        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
117    },
118    /// GCS storage factory.
119    #[cfg(feature = "opendal-gcs")]
120    Gcs,
121    /// OSS storage factory.
122    #[cfg(feature = "opendal-oss")]
123    Oss,
124    /// Azure Data Lake Storage factory.
125    #[cfg(feature = "opendal-azdls")]
126    Azdls {
127        /// The configured Azure storage scheme.
128        configured_scheme: AzureStorageScheme,
129    },
130}
131
132#[typetag::serde(name = "OpenDalStorageFactory")]
133impl StorageFactory for OpenDalStorageFactory {
134    #[allow(unused_variables)]
135    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
136        match self {
137            #[cfg(feature = "opendal-memory")]
138            OpenDalStorageFactory::Memory => {
139                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
140            }
141            #[cfg(feature = "opendal-fs")]
142            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
143            #[cfg(feature = "opendal-s3")]
144            OpenDalStorageFactory::S3 {
145                configured_scheme,
146                customized_credential_load,
147            } => Ok(Arc::new(OpenDalStorage::S3 {
148                configured_scheme: configured_scheme.clone(),
149                config: s3_config_parse(config.props().clone())?.into(),
150                customized_credential_load: customized_credential_load.clone(),
151            })),
152            #[cfg(feature = "opendal-gcs")]
153            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
154                config: gcs_config_parse(config.props().clone())?.into(),
155            })),
156            #[cfg(feature = "opendal-oss")]
157            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
158                config: oss_config_parse(config.props().clone())?.into(),
159            })),
160            #[cfg(feature = "opendal-azdls")]
161            OpenDalStorageFactory::Azdls { configured_scheme } => {
162                Ok(Arc::new(OpenDalStorage::Azdls {
163                    configured_scheme: configured_scheme.clone(),
164                    config: azdls_config_parse(config.props().clone())?.into(),
165                }))
166            }
167            #[cfg(all(
168                not(feature = "opendal-memory"),
169                not(feature = "opendal-fs"),
170                not(feature = "opendal-s3"),
171                not(feature = "opendal-gcs"),
172                not(feature = "opendal-oss"),
173                not(feature = "opendal-azdls"),
174            ))]
175            _ => Err(Error::new(
176                ErrorKind::FeatureUnsupported,
177                "No storage service has been enabled",
178            )),
179        }
180    }
181}
182
183/// Default memory operator for serde deserialization.
184#[cfg(feature = "opendal-memory")]
185fn default_memory_operator() -> Operator {
186    memory_config_build().expect("Failed to create default memory operator")
187}
188
189/// OpenDAL-based storage implementation.
190#[derive(Clone, Debug, Serialize, Deserialize)]
191pub enum OpenDalStorage {
192    /// Memory storage variant.
193    #[cfg(feature = "opendal-memory")]
194    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
195    /// Local filesystem storage variant.
196    #[cfg(feature = "opendal-fs")]
197    LocalFs,
198    /// S3 storage variant.
199    #[cfg(feature = "opendal-s3")]
200    S3 {
201        /// s3 storage could have `s3://` and `s3a://`.
202        /// Storing the scheme string here to return the correct path.
203        configured_scheme: String,
204        /// S3 configuration.
205        config: Arc<S3Config>,
206        /// Custom AWS credential loader.
207        #[serde(skip)]
208        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
209    },
210    /// GCS storage variant.
211    #[cfg(feature = "opendal-gcs")]
212    Gcs {
213        /// GCS configuration.
214        config: Arc<GcsConfig>,
215    },
216    /// OSS storage variant.
217    #[cfg(feature = "opendal-oss")]
218    Oss {
219        /// OSS configuration.
220        config: Arc<OssConfig>,
221    },
222    /// Azure Data Lake Storage variant.
223    /// Expects paths of the form
224    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
225    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
226    #[cfg(feature = "opendal-azdls")]
227    #[allow(private_interfaces)]
228    Azdls {
229        /// The configured Azure storage scheme.
230        /// Because Azdls accepts multiple possible schemes, we store the full
231        /// passed scheme here to later validate schemes passed via paths.
232        configured_scheme: AzureStorageScheme,
233        /// Azure DLS configuration.
234        config: Arc<AzdlsConfig>,
235    },
236}
237
238impl OpenDalStorage {
239    /// Creates operator from path.
240    ///
241    /// # Arguments
242    ///
243    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO).
244    ///
245    /// # Returns
246    ///
247    /// The return value consists of two parts:
248    ///
249    /// * An [`opendal::Operator`] instance used to operate on file.
250    /// * Relative path to the root uri of [`opendal::Operator`].
251    #[allow(unreachable_code, unused_variables)]
252    pub(crate) fn create_operator<'a>(
253        &self,
254        path: &'a impl AsRef<str>,
255    ) -> Result<(Operator, &'a str)> {
256        let path = path.as_ref();
257        let (operator, relative_path): (Operator, &str) = match self {
258            #[cfg(feature = "opendal-memory")]
259            OpenDalStorage::Memory(op) => {
260                if let Some(stripped) = path.strip_prefix("memory:/") {
261                    (op.clone(), stripped)
262                } else {
263                    (op.clone(), &path[1..])
264                }
265            }
266            #[cfg(feature = "opendal-fs")]
267            OpenDalStorage::LocalFs => {
268                let op = fs_config_build()?;
269                if let Some(stripped) = path.strip_prefix("file:/") {
270                    (op, stripped)
271                } else {
272                    (op, &path[1..])
273                }
274            }
275            #[cfg(feature = "opendal-s3")]
276            OpenDalStorage::S3 {
277                configured_scheme,
278                config,
279                customized_credential_load,
280            } => {
281                let op = s3_config_build(config, customized_credential_load, path)?;
282                let op_info = op.info();
283
284                // Check prefix of s3 path.
285                let prefix = format!("{}://{}/", configured_scheme, op_info.name());
286                if path.starts_with(&prefix) {
287                    (op, &path[prefix.len()..])
288                } else {
289                    return Err(Error::new(
290                        ErrorKind::DataInvalid,
291                        format!("Invalid s3 url: {path}, should start with {prefix}"),
292                    ));
293                }
294            }
295            #[cfg(feature = "opendal-gcs")]
296            OpenDalStorage::Gcs { config } => {
297                let operator = gcs_config_build(config, path)?;
298                let prefix = format!("gs://{}/", operator.info().name());
299                if path.starts_with(&prefix) {
300                    (operator, &path[prefix.len()..])
301                } else {
302                    return Err(Error::new(
303                        ErrorKind::DataInvalid,
304                        format!("Invalid gcs url: {path}, should start with {prefix}"),
305                    ));
306                }
307            }
308            #[cfg(feature = "opendal-oss")]
309            OpenDalStorage::Oss { config } => {
310                let op = oss_config_build(config, path)?;
311                let prefix = format!("oss://{}/", op.info().name());
312                if path.starts_with(&prefix) {
313                    (op, &path[prefix.len()..])
314                } else {
315                    return Err(Error::new(
316                        ErrorKind::DataInvalid,
317                        format!("Invalid oss url: {path}, should start with {prefix}"),
318                    ));
319                }
320            }
321            #[cfg(feature = "opendal-azdls")]
322            OpenDalStorage::Azdls {
323                configured_scheme,
324                config,
325            } => azdls_create_operator(path, config, configured_scheme)?,
326            #[cfg(all(
327                not(feature = "opendal-s3"),
328                not(feature = "opendal-fs"),
329                not(feature = "opendal-gcs"),
330                not(feature = "opendal-oss"),
331                not(feature = "opendal-azdls"),
332            ))]
333            _ => {
334                return Err(Error::new(
335                    ErrorKind::FeatureUnsupported,
336                    "No storage service has been enabled",
337                ));
338            }
339        };
340
341        // Transient errors are common for object stores; however there's no
342        // harm in retrying temporary failures for other storage backends as well.
343        let operator = operator.layer(RetryLayer::new());
344        Ok((operator, relative_path))
345    }
346
347    /// Extracts the relative path from an absolute path without building an operator.
348    ///
349    /// This is a lightweight alternative to [`create_operator`](Self::create_operator) for cases
350    /// where only the relative path is needed (e.g. bulk deletes where the operator is already
351    /// available).
352    #[allow(unreachable_code, unused_variables)]
353    pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
354        match self {
355            #[cfg(feature = "opendal-memory")]
356            OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
357            #[cfg(feature = "opendal-fs")]
358            OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
359            #[cfg(feature = "opendal-s3")]
360            OpenDalStorage::S3 {
361                configured_scheme, ..
362            } => {
363                let url = url::Url::parse(path)?;
364                let bucket = url.host_str().ok_or_else(|| {
365                    Error::new(
366                        ErrorKind::DataInvalid,
367                        format!("Invalid s3 url: {path}, missing bucket"),
368                    )
369                })?;
370                let prefix = format!("{}://{}/", configured_scheme, bucket);
371                if path.starts_with(&prefix) {
372                    Ok(&path[prefix.len()..])
373                } else {
374                    Err(Error::new(
375                        ErrorKind::DataInvalid,
376                        format!("Invalid s3 url: {path}, should start with {prefix}"),
377                    ))
378                }
379            }
380            #[cfg(feature = "opendal-gcs")]
381            OpenDalStorage::Gcs { .. } => {
382                let url = url::Url::parse(path)?;
383                let bucket = url.host_str().ok_or_else(|| {
384                    Error::new(
385                        ErrorKind::DataInvalid,
386                        format!("Invalid gcs url: {path}, missing bucket"),
387                    )
388                })?;
389                let prefix = format!("gs://{}/", bucket);
390                if path.starts_with(&prefix) {
391                    Ok(&path[prefix.len()..])
392                } else {
393                    Err(Error::new(
394                        ErrorKind::DataInvalid,
395                        format!("Invalid gcs url: {path}, should start with {prefix}"),
396                    ))
397                }
398            }
399            #[cfg(feature = "opendal-oss")]
400            OpenDalStorage::Oss { .. } => {
401                let url = url::Url::parse(path)?;
402                let bucket = url.host_str().ok_or_else(|| {
403                    Error::new(
404                        ErrorKind::DataInvalid,
405                        format!("Invalid oss url: {path}, missing bucket"),
406                    )
407                })?;
408                let prefix = format!("oss://{}/", bucket);
409                if path.starts_with(&prefix) {
410                    Ok(&path[prefix.len()..])
411                } else {
412                    Err(Error::new(
413                        ErrorKind::DataInvalid,
414                        format!("Invalid oss url: {path}, should start with {prefix}"),
415                    ))
416                }
417            }
418            #[cfg(feature = "opendal-azdls")]
419            OpenDalStorage::Azdls {
420                configured_scheme,
421                config,
422            } => {
423                let azure_path = path.parse::<AzureStoragePath>()?;
424                match_path_with_config(&azure_path, config, configured_scheme)?;
425                let relative_path_len = azure_path.path.len();
426                Ok(&path[path.len() - relative_path_len..])
427            }
428            #[cfg(all(
429                not(feature = "opendal-s3"),
430                not(feature = "opendal-fs"),
431                not(feature = "opendal-gcs"),
432                not(feature = "opendal-oss"),
433                not(feature = "opendal-azdls"),
434            ))]
435            _ => Err(Error::new(
436                ErrorKind::FeatureUnsupported,
437                "No storage service has been enabled",
438            )),
439        }
440    }
441}
442
443#[typetag::serde(name = "OpenDalStorage")]
444#[async_trait]
445impl Storage for OpenDalStorage {
446    async fn exists(&self, path: &str) -> Result<bool> {
447        let (op, relative_path) = self.create_operator(&path)?;
448        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
449    }
450
451    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
452        let (op, relative_path) = self.create_operator(&path)?;
453        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
454        Ok(FileMetadata {
455            size: meta.content_length(),
456        })
457    }
458
459    async fn read(&self, path: &str) -> Result<Bytes> {
460        let (op, relative_path) = self.create_operator(&path)?;
461        Ok(op
462            .read(relative_path)
463            .await
464            .map_err(from_opendal_error)?
465            .to_bytes())
466    }
467
468    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
469        let (op, relative_path) = self.create_operator(&path)?;
470        Ok(Box::new(OpenDalReader(
471            op.reader(relative_path).await.map_err(from_opendal_error)?,
472        )))
473    }
474
475    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
476        let (op, relative_path) = self.create_operator(&path)?;
477        op.write(relative_path, bs)
478            .await
479            .map_err(from_opendal_error)?;
480        Ok(())
481    }
482
483    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
484        let (op, relative_path) = self.create_operator(&path)?;
485        Ok(Box::new(OpenDalWriter(
486            op.writer(relative_path).await.map_err(from_opendal_error)?,
487        )))
488    }
489
490    async fn delete(&self, path: &str) -> Result<()> {
491        let (op, relative_path) = self.create_operator(&path)?;
492        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
493    }
494
495    async fn delete_prefix(&self, path: &str) -> Result<()> {
496        let (op, relative_path) = self.create_operator(&path)?;
497        let path = if relative_path.ends_with('/') {
498            relative_path.to_string()
499        } else {
500            format!("{relative_path}/")
501        };
502        Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
503    }
504
505    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
506        let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
507
508        while let Some(path) = paths.next().await {
509            let bucket = url::Url::parse(&path)
510                .ok()
511                .and_then(|u| u.host_str().map(|s| s.to_string()))
512                .unwrap_or_default();
513
514            let (relative_path, deleter) = match deleters.entry(bucket) {
515                Entry::Occupied(entry) => {
516                    (self.relativize_path(&path)?.to_string(), entry.into_mut())
517                }
518                Entry::Vacant(entry) => {
519                    let (op, rel) = self.create_operator(&path)?;
520                    let rel = rel.to_string();
521                    let deleter = op.deleter().await.map_err(from_opendal_error)?;
522                    (rel, entry.insert(deleter))
523                }
524            };
525
526            deleter
527                .delete(relative_path)
528                .await
529                .map_err(from_opendal_error)?;
530        }
531
532        for (_, mut deleter) in deleters {
533            deleter.close().await.map_err(from_opendal_error)?;
534        }
535
536        Ok(())
537    }
538
539    #[allow(unreachable_code, unused_variables)]
540    fn new_input(&self, path: &str) -> Result<InputFile> {
541        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
542    }
543
544    #[allow(unreachable_code, unused_variables)]
545    fn new_output(&self, path: &str) -> Result<OutputFile> {
546        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
547    }
548}
549
550// Newtype wrappers for opendal types to satisfy orphan rules.
551// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
552// Reader/Writer since neither trait nor type is defined in this crate.
553
554/// Wrapper around `opendal::Reader` that implements `FileRead`.
555pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
556
557#[async_trait]
558impl FileRead for OpenDalReader {
559    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
560        Ok(opendal::Reader::read(&self.0, range)
561            .await
562            .map_err(from_opendal_error)?
563            .to_bytes())
564    }
565}
566
567/// Wrapper around `opendal::Writer` that implements `FileWrite`.
568pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
569
570#[async_trait]
571impl FileWrite for OpenDalWriter {
572    async fn write(&mut self, bs: Bytes) -> Result<()> {
573        Ok(opendal::Writer::write(&mut self.0, bs)
574            .await
575            .map_err(from_opendal_error)?)
576    }
577
578    async fn close(&mut self) -> Result<()> {
579        let _ = opendal::Writer::close(&mut self.0)
580            .await
581            .map_err(from_opendal_error)?;
582        Ok(())
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[cfg(feature = "opendal-memory")]
591    #[test]
592    fn test_default_memory_operator() {
593        let op = default_memory_operator();
594        assert_eq!(op.info().scheme().to_string(), "memory");
595    }
596
597    #[cfg(feature = "opendal-memory")]
598    #[test]
599    fn test_relativize_path_memory() {
600        let storage = OpenDalStorage::Memory(default_memory_operator());
601
602        assert_eq!(
603            storage.relativize_path("memory:/path/to/file").unwrap(),
604            "path/to/file"
605        );
606        // Without the scheme prefix, falls back to stripping the leading slash
607        assert_eq!(
608            storage.relativize_path("/path/to/file").unwrap(),
609            "path/to/file"
610        );
611    }
612
613    #[cfg(feature = "opendal-fs")]
614    #[test]
615    fn test_relativize_path_fs() {
616        let storage = OpenDalStorage::LocalFs;
617
618        assert_eq!(
619            storage
620                .relativize_path("file:/tmp/data/file.parquet")
621                .unwrap(),
622            "tmp/data/file.parquet"
623        );
624        assert_eq!(
625            storage.relativize_path("/tmp/data/file.parquet").unwrap(),
626            "tmp/data/file.parquet"
627        );
628    }
629
630    #[cfg(feature = "opendal-s3")]
631    #[test]
632    fn test_relativize_path_s3() {
633        let storage = OpenDalStorage::S3 {
634            configured_scheme: "s3".to_string(),
635            config: Arc::new(S3Config::default()),
636            customized_credential_load: None,
637        };
638
639        assert_eq!(
640            storage
641                .relativize_path("s3://my-bucket/path/to/file.parquet")
642                .unwrap(),
643            "path/to/file.parquet"
644        );
645
646        // s3a scheme
647        let storage_s3a = OpenDalStorage::S3 {
648            configured_scheme: "s3a".to_string(),
649            config: Arc::new(S3Config::default()),
650            customized_credential_load: None,
651        };
652        assert_eq!(
653            storage_s3a
654                .relativize_path("s3a://my-bucket/path/to/file.parquet")
655                .unwrap(),
656            "path/to/file.parquet"
657        );
658    }
659
660    #[cfg(feature = "opendal-s3")]
661    #[test]
662    fn test_relativize_path_s3_scheme_mismatch() {
663        let storage = OpenDalStorage::S3 {
664            configured_scheme: "s3".to_string(),
665            config: Arc::new(S3Config::default()),
666            customized_credential_load: None,
667        };
668
669        // Scheme mismatch should error
670        assert!(
671            storage
672                .relativize_path("s3a://my-bucket/path/to/file.parquet")
673                .is_err()
674        );
675    }
676
677    #[cfg(feature = "opendal-gcs")]
678    #[test]
679    fn test_relativize_path_gcs() {
680        let storage = OpenDalStorage::Gcs {
681            config: Arc::new(GcsConfig::default()),
682        };
683
684        assert_eq!(
685            storage
686                .relativize_path("gs://my-bucket/path/to/file.parquet")
687                .unwrap(),
688            "path/to/file.parquet"
689        );
690    }
691
692    #[cfg(feature = "opendal-gcs")]
693    #[test]
694    fn test_relativize_path_gcs_invalid_scheme() {
695        let storage = OpenDalStorage::Gcs {
696            config: Arc::new(GcsConfig::default()),
697        };
698
699        assert!(
700            storage
701                .relativize_path("s3://my-bucket/path/to/file.parquet")
702                .is_err()
703        );
704    }
705
706    #[cfg(feature = "opendal-oss")]
707    #[test]
708    fn test_relativize_path_oss() {
709        let storage = OpenDalStorage::Oss {
710            config: Arc::new(OssConfig::default()),
711        };
712
713        assert_eq!(
714            storage
715                .relativize_path("oss://my-bucket/path/to/file.parquet")
716                .unwrap(),
717            "path/to/file.parquet"
718        );
719    }
720
721    #[cfg(feature = "opendal-oss")]
722    #[test]
723    fn test_relativize_path_oss_invalid_scheme() {
724        let storage = OpenDalStorage::Oss {
725            config: Arc::new(OssConfig::default()),
726        };
727
728        assert!(
729            storage
730                .relativize_path("s3://my-bucket/path/to/file.parquet")
731                .is_err()
732        );
733    }
734
735    #[cfg(feature = "opendal-azdls")]
736    #[test]
737    fn test_relativize_path_azdls() {
738        let storage = OpenDalStorage::Azdls {
739            configured_scheme: AzureStorageScheme::Abfss,
740            config: Arc::new(AzdlsConfig {
741                account_name: Some("myaccount".to_string()),
742                endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
743                ..Default::default()
744            }),
745        };
746
747        assert_eq!(
748            storage
749                .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
750                .unwrap(),
751            "/path/to/file.parquet"
752        );
753    }
754
755    #[cfg(feature = "opendal-azdls")]
756    #[test]
757    fn test_relativize_path_azdls_scheme_mismatch() {
758        let storage = OpenDalStorage::Azdls {
759            configured_scheme: AzureStorageScheme::Abfss,
760            config: Arc::new(AzdlsConfig {
761                account_name: Some("myaccount".to_string()),
762                endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
763                ..Default::default()
764            }),
765        };
766
767        // wasbs scheme doesn't match configured abfss
768        assert!(
769            storage
770                .relativize_path("wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
771                .is_err()
772        );
773    }
774}