iceberg_storage_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation for Apache Iceberg.
19//!
20//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
21//! which implement the [`Storage`](iceberg::io::Storage) and
22//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate
23//! using [OpenDAL](https://opendal.apache.org/) as the backend.
24
25mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38    StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47    if #[cfg(feature = "opendal-azdls")] {
48        mod azdls;
49        use azdls::*;
50        use opendal::services::AzdlsConfig;
51    }
52}
53
54cfg_if! {
55    if #[cfg(feature = "opendal-hf")] {
56        mod hf;
57        use hf::*;
58        use opendal::services::HfConfig;
59    }
60}
61
62cfg_if! {
63    if #[cfg(feature = "opendal-fs")] {
64        mod fs;
65        use fs::*;
66    }
67}
68
69cfg_if! {
70    if #[cfg(feature = "opendal-gcs")] {
71        mod gcs;
72        use gcs::*;
73        use opendal::services::GcsConfig;
74    }
75}
76
77cfg_if! {
78    if #[cfg(feature = "opendal-memory")] {
79        mod memory;
80        use memory::*;
81    }
82}
83
84cfg_if! {
85    if #[cfg(feature = "opendal-oss")] {
86        mod oss;
87        use opendal::services::OssConfig;
88        use oss::*;
89    }
90}
91
92cfg_if! {
93    if #[cfg(feature = "opendal-s3")] {
94        mod s3;
95        use opendal::services::S3Config;
96        pub use s3::*;
97    }
98}
99
100mod resolving;
101pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
102
103/// OpenDAL-based storage factory.
104///
105/// Maps scheme to the corresponding OpenDalStorage storage variant.
106/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances.
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub enum OpenDalStorageFactory {
109    /// Memory storage factory.
110    #[cfg(feature = "opendal-memory")]
111    Memory,
112    /// Local filesystem storage factory.
113    #[cfg(feature = "opendal-fs")]
114    Fs,
115    /// S3 storage factory.
116    #[cfg(feature = "opendal-s3")]
117    S3 {
118        /// Custom AWS credential loader.
119        #[serde(skip)]
120        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
121    },
122    /// GCS storage factory.
123    #[cfg(feature = "opendal-gcs")]
124    Gcs,
125    /// OSS storage factory.
126    #[cfg(feature = "opendal-oss")]
127    Oss,
128    /// Azure Data Lake Storage factory.
129    #[cfg(feature = "opendal-azdls")]
130    Azdls,
131    /// HuggingFace Hub storage factory.
132    #[cfg(feature = "opendal-hf")]
133    Hf,
134}
135
136#[typetag::serde(name = "OpenDalStorageFactory")]
137impl StorageFactory for OpenDalStorageFactory {
138    #[allow(unused_variables)]
139    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
140        match self {
141            #[cfg(feature = "opendal-memory")]
142            OpenDalStorageFactory::Memory => {
143                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
144            }
145            #[cfg(feature = "opendal-fs")]
146            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
147            #[cfg(feature = "opendal-s3")]
148            OpenDalStorageFactory::S3 {
149                customized_credential_load,
150            } => Ok(Arc::new(OpenDalStorage::S3 {
151                config: s3_config_parse(config.props().clone())?.into(),
152                customized_credential_load: customized_credential_load.clone(),
153            })),
154            #[cfg(feature = "opendal-gcs")]
155            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
156                config: gcs_config_parse(config.props().clone())?.into(),
157            })),
158            #[cfg(feature = "opendal-oss")]
159            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
160                config: oss_config_parse(config.props().clone())?.into(),
161            })),
162            #[cfg(feature = "opendal-azdls")]
163            OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
164                config: azdls_config_parse(config.props().clone())?.into(),
165            })),
166            #[cfg(feature = "opendal-hf")]
167            OpenDalStorageFactory::Hf => Ok(Arc::new(OpenDalStorage::Hf {
168                config: hf_config_parse(config.props().clone())?.into(),
169            })),
170            #[cfg(all(
171                not(feature = "opendal-memory"),
172                not(feature = "opendal-fs"),
173                not(feature = "opendal-s3"),
174                not(feature = "opendal-gcs"),
175                not(feature = "opendal-oss"),
176                not(feature = "opendal-azdls"),
177                not(feature = "opendal-hf"),
178            ))]
179            _ => Err(Error::new(
180                ErrorKind::FeatureUnsupported,
181                "No storage service has been enabled",
182            )),
183        }
184    }
185}
186
187/// Default memory operator for serde deserialization.
188#[cfg(feature = "opendal-memory")]
189fn default_memory_operator() -> Operator {
190    memory_config_build().expect("Failed to create default memory operator")
191}
192
193/// OpenDAL-based storage implementation.
194#[derive(Clone, Debug, Serialize, Deserialize)]
195pub enum OpenDalStorage {
196    /// Memory storage variant.
197    #[cfg(feature = "opendal-memory")]
198    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
199    /// Local filesystem storage variant.
200    #[cfg(feature = "opendal-fs")]
201    LocalFs,
202    /// S3 storage variant.
203    ///
204    /// Accepts any S3-family URL (`s3://`, `s3a://`, `s3n://`); the scheme is
205    /// derived from the path at call time.
206    #[cfg(feature = "opendal-s3")]
207    S3 {
208        /// S3 configuration.
209        config: Arc<S3Config>,
210        /// Custom AWS credential loader.
211        #[serde(skip)]
212        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
213    },
214    /// GCS storage variant.
215    #[cfg(feature = "opendal-gcs")]
216    Gcs {
217        /// GCS configuration.
218        config: Arc<GcsConfig>,
219    },
220    /// OSS storage variant.
221    #[cfg(feature = "opendal-oss")]
222    Oss {
223        /// OSS configuration.
224        config: Arc<OssConfig>,
225    },
226    /// Azure Data Lake Storage variant.
227    ///
228    /// Accepts paths of the form
229    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
230    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
231    /// The scheme is derived from the path at call time.
232    #[cfg(feature = "opendal-azdls")]
233    Azdls {
234        /// Azure DLS configuration.
235        config: Arc<AzdlsConfig>,
236    },
237    /// HuggingFace Hub storage variant.
238    ///
239    /// Accepts paths of the form
240    /// `hf://<repo_type>/<owner>/<repo>[@<revision>]/<path_in_repo>`,
241    /// where `<repo_type>` must be one of `models`, `datasets`, `spaces`, or `buckets`.
242    #[cfg(feature = "opendal-hf")]
243    Hf {
244        /// HuggingFace Hub configuration (token + endpoint).
245        config: Arc<HfConfig>,
246    },
247}
248
249impl OpenDalStorage {
250    /// Creates operator from path.
251    ///
252    /// # Arguments
253    ///
254    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO).
255    ///
256    /// # Returns
257    ///
258    /// The return value consists of two parts:
259    ///
260    /// * An [`opendal::Operator`] instance used to operate on file.
261    /// * Relative path to the root uri of [`opendal::Operator`].
262    #[allow(unreachable_code, unused_variables)]
263    pub(crate) fn create_operator<'a>(
264        &self,
265        path: &'a impl AsRef<str>,
266    ) -> Result<(Operator, &'a str)> {
267        let path = path.as_ref();
268        let (operator, relative_path): (Operator, &str) = match self {
269            #[cfg(feature = "opendal-memory")]
270            OpenDalStorage::Memory(op) => {
271                if let Some(stripped) = path.strip_prefix("memory:/") {
272                    (op.clone(), stripped)
273                } else {
274                    (op.clone(), &path[1..])
275                }
276            }
277            #[cfg(feature = "opendal-fs")]
278            OpenDalStorage::LocalFs => {
279                let op = fs_config_build()?;
280                if let Some(stripped) = path.strip_prefix("file:/") {
281                    (op, stripped)
282                } else {
283                    (op, &path[1..])
284                }
285            }
286            #[cfg(feature = "opendal-s3")]
287            OpenDalStorage::S3 {
288                config,
289                customized_credential_load,
290            } => {
291                let op = s3_config_build(config, customized_credential_load, path)?;
292                let op_info = op.info();
293
294                // Use the URL scheme in the path for prefix matching. This enables
295                // use of S3-compatible storage backends using custom schemes (e.g., `minio://`, `r2://`).
296                let url = url::Url::parse(path).map_err(|e| {
297                    Error::new(
298                        ErrorKind::DataInvalid,
299                        format!("Invalid s3 url: {path}: {e}"),
300                    )
301                })?;
302                let prefix = format!("{}://{}/", url.scheme(), op_info.name());
303                if path.starts_with(&prefix) {
304                    (op, &path[prefix.len()..])
305                } else {
306                    return Err(Error::new(
307                        ErrorKind::DataInvalid,
308                        format!("Invalid s3 url: {path}, should start with {prefix}"),
309                    ));
310                }
311            }
312            #[cfg(feature = "opendal-gcs")]
313            OpenDalStorage::Gcs { config } => {
314                let operator = gcs_config_build(config, path)?;
315                let prefix = format!("gs://{}/", operator.info().name());
316                if path.starts_with(&prefix) {
317                    (operator, &path[prefix.len()..])
318                } else {
319                    return Err(Error::new(
320                        ErrorKind::DataInvalid,
321                        format!("Invalid gcs url: {path}, should start with {prefix}"),
322                    ));
323                }
324            }
325            #[cfg(feature = "opendal-oss")]
326            OpenDalStorage::Oss { config } => {
327                let op = oss_config_build(config, path)?;
328                let prefix = format!("oss://{}/", op.info().name());
329                if path.starts_with(&prefix) {
330                    (op, &path[prefix.len()..])
331                } else {
332                    return Err(Error::new(
333                        ErrorKind::DataInvalid,
334                        format!("Invalid oss url: {path}, should start with {prefix}"),
335                    ));
336                }
337            }
338            #[cfg(feature = "opendal-azdls")]
339            OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
340            #[cfg(feature = "opendal-hf")]
341            OpenDalStorage::Hf { config } => hf_config_build(config, path)?,
342            #[cfg(all(
343                not(feature = "opendal-s3"),
344                not(feature = "opendal-fs"),
345                not(feature = "opendal-gcs"),
346                not(feature = "opendal-oss"),
347                not(feature = "opendal-azdls"),
348                not(feature = "opendal-hf"),
349            ))]
350            _ => {
351                return Err(Error::new(
352                    ErrorKind::FeatureUnsupported,
353                    "No storage service has been enabled",
354                ));
355            }
356        };
357
358        // Transient errors are common for object stores; however there's no
359        // harm in retrying temporary failures for other storage backends as well.
360        let operator = operator.layer(RetryLayer::new());
361        Ok((operator, relative_path))
362    }
363
364    /// Returns a cache key used by `delete_stream` to group paths by storage operator.
365    ///
366    /// For most backends the URL host (bucket name) is sufficient. For HF the host
367    /// encodes the repo type, not the repo identity, so a more specific key is used.
368    fn batch_key_for_path(&self, path: &str) -> String {
369        match self {
370            #[cfg(feature = "opendal-hf")]
371            OpenDalStorage::Hf { .. } => hf_batch_key(path),
372            _ => url::Url::parse(path)
373                .ok()
374                .and_then(|u| u.host_str().map(|s| s.to_string()))
375                .unwrap_or_default(),
376        }
377    }
378
379    /// Extracts the relative path from an absolute path without building an operator.
380    ///
381    /// This is a lightweight alternative to [`create_operator`](Self::create_operator) for cases
382    /// where only the relative path is needed (e.g. bulk deletes where the operator is already
383    /// available).
384    #[allow(unreachable_code, unused_variables)]
385    pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
386        match self {
387            #[cfg(feature = "opendal-memory")]
388            OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
389            #[cfg(feature = "opendal-fs")]
390            OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
391            #[cfg(feature = "opendal-s3")]
392            OpenDalStorage::S3 { .. } => {
393                let url = url::Url::parse(path)?;
394                let bucket = url.host_str().ok_or_else(|| {
395                    Error::new(
396                        ErrorKind::DataInvalid,
397                        format!("Invalid s3 url: {path}, missing bucket"),
398                    )
399                })?;
400                let prefix = format!("{}://{}/", url.scheme(), bucket);
401                if path.starts_with(&prefix) {
402                    Ok(&path[prefix.len()..])
403                } else {
404                    Err(Error::new(
405                        ErrorKind::DataInvalid,
406                        format!("Invalid s3 url: {path}, should start with {prefix}"),
407                    ))
408                }
409            }
410            #[cfg(feature = "opendal-gcs")]
411            OpenDalStorage::Gcs { .. } => {
412                let url = url::Url::parse(path)?;
413                let bucket = url.host_str().ok_or_else(|| {
414                    Error::new(
415                        ErrorKind::DataInvalid,
416                        format!("Invalid gcs url: {path}, missing bucket"),
417                    )
418                })?;
419                let prefix = format!("gs://{}/", bucket);
420                if path.starts_with(&prefix) {
421                    Ok(&path[prefix.len()..])
422                } else {
423                    Err(Error::new(
424                        ErrorKind::DataInvalid,
425                        format!("Invalid gcs url: {path}, should start with {prefix}"),
426                    ))
427                }
428            }
429            #[cfg(feature = "opendal-oss")]
430            OpenDalStorage::Oss { .. } => {
431                let url = url::Url::parse(path)?;
432                let bucket = url.host_str().ok_or_else(|| {
433                    Error::new(
434                        ErrorKind::DataInvalid,
435                        format!("Invalid oss url: {path}, missing bucket"),
436                    )
437                })?;
438                let prefix = format!("oss://{}/", bucket);
439                if path.starts_with(&prefix) {
440                    Ok(&path[prefix.len()..])
441                } else {
442                    Err(Error::new(
443                        ErrorKind::DataInvalid,
444                        format!("Invalid oss url: {path}, should start with {prefix}"),
445                    ))
446                }
447            }
448            #[cfg(feature = "opendal-azdls")]
449            OpenDalStorage::Azdls { config } => {
450                let azure_path = path.parse::<AzureStoragePath>()?;
451                match_path_with_config(&azure_path, config)?;
452                let relative_path_len = azure_path.path.len();
453                Ok(&path[path.len() - relative_path_len..])
454            }
455            #[cfg(feature = "opendal-hf")]
456            OpenDalStorage::Hf { .. } => {
457                let parsed = hf::HfUri::parse(path).ok_or_else(|| {
458                    Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}"))
459                })?;
460                Ok(&path[path.len() - parsed.path.len()..])
461            }
462            #[cfg(all(
463                not(feature = "opendal-s3"),
464                not(feature = "opendal-fs"),
465                not(feature = "opendal-gcs"),
466                not(feature = "opendal-oss"),
467                not(feature = "opendal-azdls"),
468                not(feature = "opendal-hf"),
469            ))]
470            _ => Err(Error::new(
471                ErrorKind::FeatureUnsupported,
472                "No storage service has been enabled",
473            )),
474        }
475    }
476}
477
478#[typetag::serde(name = "OpenDalStorage")]
479#[async_trait]
480impl Storage for OpenDalStorage {
481    async fn exists(&self, path: &str) -> Result<bool> {
482        let (op, relative_path) = self.create_operator(&path)?;
483        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
484    }
485
486    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
487        let (op, relative_path) = self.create_operator(&path)?;
488        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
489        Ok(FileMetadata {
490            size: meta.content_length(),
491        })
492    }
493
494    async fn read(&self, path: &str) -> Result<Bytes> {
495        let (op, relative_path) = self.create_operator(&path)?;
496        Ok(op
497            .read(relative_path)
498            .await
499            .map_err(from_opendal_error)?
500            .to_bytes())
501    }
502
503    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
504        let (op, relative_path) = self.create_operator(&path)?;
505        Ok(Box::new(OpenDalReader(
506            op.reader(relative_path).await.map_err(from_opendal_error)?,
507        )))
508    }
509
510    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
511        let (op, relative_path) = self.create_operator(&path)?;
512        op.write(relative_path, bs)
513            .await
514            .map_err(from_opendal_error)?;
515        Ok(())
516    }
517
518    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
519        let (op, relative_path) = self.create_operator(&path)?;
520        Ok(Box::new(OpenDalWriter(
521            op.writer(relative_path).await.map_err(from_opendal_error)?,
522        )))
523    }
524
525    async fn delete(&self, path: &str) -> Result<()> {
526        let (op, relative_path) = self.create_operator(&path)?;
527        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
528    }
529
530    async fn delete_prefix(&self, path: &str) -> Result<()> {
531        let (op, relative_path) = self.create_operator(&path)?;
532        let path = if relative_path.ends_with('/') {
533            relative_path.to_string()
534        } else {
535            format!("{relative_path}/")
536        };
537        Ok(op
538            .delete_with(&path)
539            .recursive(true)
540            .await
541            .map_err(from_opendal_error)?)
542    }
543
544    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
545        let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
546
547        while let Some(path) = paths.next().await {
548            let bucket = self.batch_key_for_path(&path);
549
550            let (relative_path, deleter) = match deleters.entry(bucket) {
551                Entry::Occupied(entry) => {
552                    (self.relativize_path(&path)?.to_string(), entry.into_mut())
553                }
554                Entry::Vacant(entry) => {
555                    let (op, rel) = self.create_operator(&path)?;
556                    let rel = rel.to_string();
557                    let deleter = op.deleter().await.map_err(from_opendal_error)?;
558                    (rel, entry.insert(deleter))
559                }
560            };
561
562            deleter
563                .delete(relative_path)
564                .await
565                .map_err(from_opendal_error)?;
566        }
567
568        for (_, mut deleter) in deleters {
569            deleter.close().await.map_err(from_opendal_error)?;
570        }
571
572        Ok(())
573    }
574
575    #[allow(unreachable_code, unused_variables)]
576    fn new_input(&self, path: &str) -> Result<InputFile> {
577        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
578    }
579
580    #[allow(unreachable_code, unused_variables)]
581    fn new_output(&self, path: &str) -> Result<OutputFile> {
582        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
583    }
584}
585
586// Newtype wrappers for opendal types to satisfy orphan rules.
587// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
588// Reader/Writer since neither trait nor type is defined in this crate.
589
590/// Wrapper around `opendal::Reader` that implements `FileRead`.
591pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
592
593#[async_trait]
594impl FileRead for OpenDalReader {
595    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
596        Ok(opendal::Reader::read(&self.0, range)
597            .await
598            .map_err(from_opendal_error)?
599            .to_bytes())
600    }
601}
602
603/// Wrapper around `opendal::Writer` that implements `FileWrite`.
604pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
605
606#[async_trait]
607impl FileWrite for OpenDalWriter {
608    async fn write(&mut self, bs: Bytes) -> Result<()> {
609        Ok(opendal::Writer::write(&mut self.0, bs)
610            .await
611            .map_err(from_opendal_error)?)
612    }
613
614    async fn close(&mut self) -> Result<()> {
615        let _ = opendal::Writer::close(&mut self.0)
616            .await
617            .map_err(from_opendal_error)?;
618        Ok(())
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[cfg(feature = "opendal-memory")]
627    #[test]
628    fn test_default_memory_operator() {
629        let op = default_memory_operator();
630        assert_eq!(op.info().scheme().to_string(), "memory");
631    }
632
633    #[cfg(feature = "opendal-memory")]
634    #[test]
635    fn test_relativize_path_memory() {
636        let storage = OpenDalStorage::Memory(default_memory_operator());
637
638        assert_eq!(
639            storage.relativize_path("memory:/path/to/file").unwrap(),
640            "path/to/file"
641        );
642        // Without the scheme prefix, falls back to stripping the leading slash
643        assert_eq!(
644            storage.relativize_path("/path/to/file").unwrap(),
645            "path/to/file"
646        );
647    }
648
649    #[cfg(feature = "opendal-fs")]
650    #[test]
651    fn test_relativize_path_fs() {
652        let storage = OpenDalStorage::LocalFs;
653
654        assert_eq!(
655            storage
656                .relativize_path("file:/tmp/data/file.parquet")
657                .unwrap(),
658            "tmp/data/file.parquet"
659        );
660        assert_eq!(
661            storage.relativize_path("/tmp/data/file.parquet").unwrap(),
662            "tmp/data/file.parquet"
663        );
664    }
665
666    #[cfg(feature = "opendal-s3")]
667    #[test]
668    fn test_relativize_path_s3() {
669        let storage = OpenDalStorage::S3 {
670            config: Arc::new(S3Config::default()),
671            customized_credential_load: None,
672        };
673
674        // All S3-family schemes are accepted by the same storage instance.
675        // Custom schemes for S3-compatible stores (e.g., `minio://`) are also
676        // accepted because the path's scheme is used as-is for prefix matching.
677        for scheme in ["s3", "s3a", "s3n", "minio"] {
678            assert_eq!(
679                storage
680                    .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
681                    .unwrap(),
682                "path/to/file.parquet"
683            );
684        }
685    }
686
687    #[cfg(feature = "opendal-gcs")]
688    #[test]
689    fn test_relativize_path_gcs() {
690        let storage = OpenDalStorage::Gcs {
691            config: Arc::new(GcsConfig::default()),
692        };
693
694        assert_eq!(
695            storage
696                .relativize_path("gs://my-bucket/path/to/file.parquet")
697                .unwrap(),
698            "path/to/file.parquet"
699        );
700    }
701
702    #[cfg(feature = "opendal-gcs")]
703    #[test]
704    fn test_relativize_path_gcs_invalid_scheme() {
705        let storage = OpenDalStorage::Gcs {
706            config: Arc::new(GcsConfig::default()),
707        };
708
709        assert!(
710            storage
711                .relativize_path("s3://my-bucket/path/to/file.parquet")
712                .is_err()
713        );
714    }
715
716    #[cfg(feature = "opendal-oss")]
717    #[test]
718    fn test_relativize_path_oss() {
719        let storage = OpenDalStorage::Oss {
720            config: Arc::new(OssConfig::default()),
721        };
722
723        assert_eq!(
724            storage
725                .relativize_path("oss://my-bucket/path/to/file.parquet")
726                .unwrap(),
727            "path/to/file.parquet"
728        );
729    }
730
731    #[cfg(feature = "opendal-oss")]
732    #[test]
733    fn test_relativize_path_oss_invalid_scheme() {
734        let storage = OpenDalStorage::Oss {
735            config: Arc::new(OssConfig::default()),
736        };
737
738        assert!(
739            storage
740                .relativize_path("s3://my-bucket/path/to/file.parquet")
741                .is_err()
742        );
743    }
744
745    #[cfg(feature = "opendal-azdls")]
746    #[test]
747    fn test_relativize_path_azdls() {
748        let storage = OpenDalStorage::Azdls {
749            config: Arc::new(AzdlsConfig {
750                account_name: Some("myaccount".to_string()),
751                endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
752                ..Default::default()
753            }),
754        };
755
756        assert_eq!(
757            storage
758                .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
759                .unwrap(),
760            "/path/to/file.parquet"
761        );
762    }
763}