Skip to main content

iceberg_storage_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation for Apache Iceberg.
19//!
20//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
21//! which implement the [`Storage`](iceberg::io::Storage) and
22//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate
23//! using [OpenDAL](https://opendal.apache.org/) as the backend.
24
25mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38    StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::{RetryLayer, TimeoutLayer};
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47    if #[cfg(feature = "opendal-azdls")] {
48        mod azdls;
49        use azdls::*;
50        use opendal::services::AzdlsConfig;
51    }
52}
53
54cfg_if! {
55    if #[cfg(feature = "opendal-hf")] {
56        mod hf;
57        use hf::*;
58        use opendal::services::HfConfig;
59    }
60}
61
62cfg_if! {
63    if #[cfg(feature = "opendal-fs")] {
64        mod fs;
65        use fs::*;
66    }
67}
68
69cfg_if! {
70    if #[cfg(feature = "opendal-gcs")] {
71        mod gcs;
72        use gcs::*;
73        use opendal::services::GcsConfig;
74    }
75}
76
77cfg_if! {
78    if #[cfg(feature = "opendal-memory")] {
79        mod memory;
80        use memory::*;
81    }
82}
83
84cfg_if! {
85    if #[cfg(feature = "opendal-oss")] {
86        mod oss;
87        use opendal::services::OssConfig;
88        use oss::*;
89    }
90}
91
92cfg_if! {
93    if #[cfg(feature = "opendal-s3")] {
94        mod s3;
95        use opendal::services::S3Config;
96        pub use s3::*;
97    }
98}
99
100mod resolving;
101pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
102
103/// OpenDAL-based storage factory.
104///
105/// Maps scheme to the corresponding OpenDalStorage storage variant.
106/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances.
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub enum OpenDalStorageFactory {
109    /// Memory storage factory.
110    #[cfg(feature = "opendal-memory")]
111    Memory,
112    /// Local filesystem storage factory.
113    #[cfg(feature = "opendal-fs")]
114    Fs,
115    /// S3 storage factory.
116    #[cfg(feature = "opendal-s3")]
117    S3 {
118        /// Custom AWS credential loader.
119        #[serde(skip)]
120        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
121    },
122    /// GCS storage factory.
123    #[cfg(feature = "opendal-gcs")]
124    Gcs,
125    /// OSS storage factory.
126    #[cfg(feature = "opendal-oss")]
127    Oss,
128    /// Azure Data Lake Storage factory.
129    #[cfg(feature = "opendal-azdls")]
130    Azdls,
131    /// HuggingFace Hub storage factory.
132    #[cfg(feature = "opendal-hf")]
133    Hf,
134}
135
136#[typetag::serde(name = "OpenDalStorageFactory")]
137impl StorageFactory for OpenDalStorageFactory {
138    #[allow(unused_variables)]
139    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
140        match self {
141            #[cfg(feature = "opendal-memory")]
142            OpenDalStorageFactory::Memory => {
143                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
144            }
145            #[cfg(feature = "opendal-fs")]
146            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
147            #[cfg(feature = "opendal-s3")]
148            OpenDalStorageFactory::S3 {
149                customized_credential_load,
150            } => Ok(Arc::new(OpenDalStorage::S3 {
151                config: s3_config_parse(config.props().clone())?.into(),
152                customized_credential_load: customized_credential_load.clone(),
153            })),
154            #[cfg(feature = "opendal-gcs")]
155            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
156                config: gcs_config_parse(config.props().clone())?.into(),
157            })),
158            #[cfg(feature = "opendal-oss")]
159            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
160                config: oss_config_parse(config.props().clone())?.into(),
161            })),
162            #[cfg(feature = "opendal-azdls")]
163            OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
164                config: azdls_config_parse(config.props().clone())?.into(),
165            })),
166            #[cfg(feature = "opendal-hf")]
167            OpenDalStorageFactory::Hf => Ok(Arc::new(OpenDalStorage::Hf {
168                config: hf_config_parse(config.props().clone())?.into(),
169            })),
170            #[cfg(all(
171                not(feature = "opendal-memory"),
172                not(feature = "opendal-fs"),
173                not(feature = "opendal-s3"),
174                not(feature = "opendal-gcs"),
175                not(feature = "opendal-oss"),
176                not(feature = "opendal-azdls"),
177                not(feature = "opendal-hf"),
178            ))]
179            _ => Err(Error::new(
180                ErrorKind::FeatureUnsupported,
181                "No storage service has been enabled",
182            )),
183        }
184    }
185}
186
187/// Default memory operator for serde deserialization.
188#[cfg(feature = "opendal-memory")]
189fn default_memory_operator() -> Operator {
190    memory_config_build().expect("Failed to create default memory operator")
191}
192
193/// OpenDAL-based storage implementation.
194#[derive(Clone, Debug, Serialize, Deserialize)]
195pub enum OpenDalStorage {
196    /// Memory storage variant.
197    #[cfg(feature = "opendal-memory")]
198    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
199    /// Local filesystem storage variant.
200    #[cfg(feature = "opendal-fs")]
201    LocalFs,
202    /// S3 storage variant.
203    ///
204    /// Accepts any S3-family URL (`s3://`, `s3a://`, `s3n://`); the scheme is
205    /// derived from the path at call time.
206    #[cfg(feature = "opendal-s3")]
207    S3 {
208        /// S3 configuration.
209        config: Arc<S3Config>,
210        /// Custom AWS credential loader.
211        #[serde(skip)]
212        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
213    },
214    /// GCS storage variant.
215    #[cfg(feature = "opendal-gcs")]
216    Gcs {
217        /// GCS configuration.
218        config: Arc<GcsConfig>,
219    },
220    /// OSS storage variant.
221    #[cfg(feature = "opendal-oss")]
222    Oss {
223        /// OSS configuration.
224        config: Arc<OssConfig>,
225    },
226    /// Azure Data Lake Storage variant.
227    ///
228    /// Accepts paths of the form
229    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
230    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
231    /// The scheme is derived from the path at call time.
232    #[cfg(feature = "opendal-azdls")]
233    Azdls {
234        /// Azure DLS configuration.
235        config: Arc<AzdlsConfig>,
236    },
237    /// HuggingFace Hub storage variant.
238    ///
239    /// Accepts paths of the form
240    /// `hf://<repo_type>/<owner>/<repo>[@<revision>]/<path_in_repo>`,
241    /// where `<repo_type>` must be one of `models`, `datasets`, `spaces`, or `buckets`.
242    #[cfg(feature = "opendal-hf")]
243    Hf {
244        /// HuggingFace Hub configuration (token + endpoint).
245        config: Arc<HfConfig>,
246    },
247}
248
249impl OpenDalStorage {
250    /// Creates operator from path.
251    ///
252    /// # Arguments
253    ///
254    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO).
255    ///
256    /// # Returns
257    ///
258    /// The return value consists of two parts:
259    ///
260    /// * An [`opendal::Operator`] instance used to operate on file.
261    /// * Relative path to the root uri of [`opendal::Operator`].
262    #[allow(unreachable_code, unused_variables)]
263    pub(crate) fn create_operator<'a>(
264        &self,
265        path: &'a impl AsRef<str>,
266    ) -> Result<(Operator, &'a str)> {
267        let path = path.as_ref();
268        let (operator, relative_path): (Operator, &str) = match self {
269            #[cfg(feature = "opendal-memory")]
270            OpenDalStorage::Memory(op) => {
271                if let Some(stripped) = path.strip_prefix("memory:/") {
272                    (op.clone(), stripped)
273                } else {
274                    (op.clone(), &path[1..])
275                }
276            }
277            #[cfg(feature = "opendal-fs")]
278            OpenDalStorage::LocalFs => {
279                let op = fs_config_build()?;
280                if let Some(stripped) = path.strip_prefix("file:/") {
281                    (op, stripped)
282                } else {
283                    (op, &path[1..])
284                }
285            }
286            #[cfg(feature = "opendal-s3")]
287            OpenDalStorage::S3 {
288                config,
289                customized_credential_load,
290            } => {
291                let op = s3_config_build(config, customized_credential_load, path)?;
292                let op_info = op.info();
293
294                // Use the URL scheme in the path for prefix matching. This enables
295                // use of S3-compatible storage backends using custom schemes (e.g., `minio://`, `r2://`).
296                let url = url::Url::parse(path).map_err(|e| {
297                    Error::new(
298                        ErrorKind::DataInvalid,
299                        format!("Invalid s3 url: {path}: {e}"),
300                    )
301                })?;
302                let prefix = format!("{}://{}/", url.scheme(), op_info.name());
303                if path.starts_with(&prefix) {
304                    (op, &path[prefix.len()..])
305                } else {
306                    return Err(Error::new(
307                        ErrorKind::DataInvalid,
308                        format!("Invalid s3 url: {path}, should start with {prefix}"),
309                    ));
310                }
311            }
312            #[cfg(feature = "opendal-gcs")]
313            OpenDalStorage::Gcs { config } => {
314                let operator = gcs_config_build(config, path)?;
315                let prefix = format!("gs://{}/", operator.info().name());
316                if path.starts_with(&prefix) {
317                    (operator, &path[prefix.len()..])
318                } else {
319                    return Err(Error::new(
320                        ErrorKind::DataInvalid,
321                        format!("Invalid gcs url: {path}, should start with {prefix}"),
322                    ));
323                }
324            }
325            #[cfg(feature = "opendal-oss")]
326            OpenDalStorage::Oss { config } => {
327                let op = oss_config_build(config, path)?;
328                let prefix = format!("oss://{}/", op.info().name());
329                if path.starts_with(&prefix) {
330                    (op, &path[prefix.len()..])
331                } else {
332                    return Err(Error::new(
333                        ErrorKind::DataInvalid,
334                        format!("Invalid oss url: {path}, should start with {prefix}"),
335                    ));
336                }
337            }
338            #[cfg(feature = "opendal-azdls")]
339            OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
340            #[cfg(feature = "opendal-hf")]
341            OpenDalStorage::Hf { config } => hf_config_build(config, path)?,
342            #[cfg(all(
343                not(feature = "opendal-s3"),
344                not(feature = "opendal-fs"),
345                not(feature = "opendal-gcs"),
346                not(feature = "opendal-oss"),
347                not(feature = "opendal-azdls"),
348                not(feature = "opendal-hf"),
349            ))]
350            _ => {
351                return Err(Error::new(
352                    ErrorKind::FeatureUnsupported,
353                    "No storage service has been enabled",
354                ));
355            }
356        };
357
358        // Apply observability/resilience layers. TimeoutLayer must be
359        // inside RetryLayer so each retry attempt is independently
360        // bounded — without a per-attempt timeout, a future parked on a
361        // silently dropped TCP connection never produces an `Err` and
362        // RetryLayer cannot retry, leaving the caller hung indefinitely.
363        // See: https://opendal.apache.org/docs/rust/opendal/layers/struct.TimeoutLayer.html
364        //
365        // Transient errors are common for object stores; we retry temporary
366        // failures with exponential backoff. The retry behavior also
367        // benefits non-object-store backends.
368        let operator = operator.layer(TimeoutLayer::new()).layer(RetryLayer::new());
369        Ok((operator, relative_path))
370    }
371
372    /// Returns a cache key used by `delete_stream` to group paths by storage operator.
373    ///
374    /// For most backends the URL host (bucket name) is sufficient. For HF the host
375    /// encodes the repo type, not the repo identity, so a more specific key is used.
376    fn batch_key_for_path(&self, path: &str) -> String {
377        match self {
378            #[cfg(feature = "opendal-hf")]
379            OpenDalStorage::Hf { .. } => hf_batch_key(path),
380            _ => url::Url::parse(path)
381                .ok()
382                .and_then(|u| u.host_str().map(|s| s.to_string()))
383                .unwrap_or_default(),
384        }
385    }
386
387    /// Extracts the relative path from an absolute path without building an operator.
388    ///
389    /// This is a lightweight alternative to [`create_operator`](Self::create_operator) for cases
390    /// where only the relative path is needed (e.g. bulk deletes where the operator is already
391    /// available).
392    #[allow(unreachable_code, unused_variables)]
393    pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
394        match self {
395            #[cfg(feature = "opendal-memory")]
396            OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
397            #[cfg(feature = "opendal-fs")]
398            OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
399            #[cfg(feature = "opendal-s3")]
400            OpenDalStorage::S3 { .. } => {
401                let url = url::Url::parse(path)?;
402                let bucket = url.host_str().ok_or_else(|| {
403                    Error::new(
404                        ErrorKind::DataInvalid,
405                        format!("Invalid s3 url: {path}, missing bucket"),
406                    )
407                })?;
408                let prefix = format!("{}://{}/", url.scheme(), bucket);
409                if path.starts_with(&prefix) {
410                    Ok(&path[prefix.len()..])
411                } else {
412                    Err(Error::new(
413                        ErrorKind::DataInvalid,
414                        format!("Invalid s3 url: {path}, should start with {prefix}"),
415                    ))
416                }
417            }
418            #[cfg(feature = "opendal-gcs")]
419            OpenDalStorage::Gcs { .. } => {
420                let url = url::Url::parse(path)?;
421                let bucket = url.host_str().ok_or_else(|| {
422                    Error::new(
423                        ErrorKind::DataInvalid,
424                        format!("Invalid gcs url: {path}, missing bucket"),
425                    )
426                })?;
427                let prefix = format!("gs://{}/", bucket);
428                if path.starts_with(&prefix) {
429                    Ok(&path[prefix.len()..])
430                } else {
431                    Err(Error::new(
432                        ErrorKind::DataInvalid,
433                        format!("Invalid gcs url: {path}, should start with {prefix}"),
434                    ))
435                }
436            }
437            #[cfg(feature = "opendal-oss")]
438            OpenDalStorage::Oss { .. } => {
439                let url = url::Url::parse(path)?;
440                let bucket = url.host_str().ok_or_else(|| {
441                    Error::new(
442                        ErrorKind::DataInvalid,
443                        format!("Invalid oss url: {path}, missing bucket"),
444                    )
445                })?;
446                let prefix = format!("oss://{}/", bucket);
447                if path.starts_with(&prefix) {
448                    Ok(&path[prefix.len()..])
449                } else {
450                    Err(Error::new(
451                        ErrorKind::DataInvalid,
452                        format!("Invalid oss url: {path}, should start with {prefix}"),
453                    ))
454                }
455            }
456            #[cfg(feature = "opendal-azdls")]
457            OpenDalStorage::Azdls { config } => {
458                let azure_path = path.parse::<AzureStoragePath>()?;
459                match_path_with_config(&azure_path, config)?;
460                let relative_path_len = azure_path.path.len();
461                Ok(&path[path.len() - relative_path_len..])
462            }
463            #[cfg(feature = "opendal-hf")]
464            OpenDalStorage::Hf { .. } => {
465                let parsed = hf::HfUri::parse(path).ok_or_else(|| {
466                    Error::new(ErrorKind::DataInvalid, format!("Invalid hf url: {path}"))
467                })?;
468                Ok(&path[path.len() - parsed.path.len()..])
469            }
470            #[cfg(all(
471                not(feature = "opendal-s3"),
472                not(feature = "opendal-fs"),
473                not(feature = "opendal-gcs"),
474                not(feature = "opendal-oss"),
475                not(feature = "opendal-azdls"),
476                not(feature = "opendal-hf"),
477            ))]
478            _ => Err(Error::new(
479                ErrorKind::FeatureUnsupported,
480                "No storage service has been enabled",
481            )),
482        }
483    }
484}
485
486#[typetag::serde(name = "OpenDalStorage")]
487#[async_trait]
488impl Storage for OpenDalStorage {
489    async fn exists(&self, path: &str) -> Result<bool> {
490        let (op, relative_path) = self.create_operator(&path)?;
491        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
492    }
493
494    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
495        let (op, relative_path) = self.create_operator(&path)?;
496        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
497        Ok(FileMetadata {
498            size: meta.content_length(),
499        })
500    }
501
502    async fn read(&self, path: &str) -> Result<Bytes> {
503        let (op, relative_path) = self.create_operator(&path)?;
504        Ok(op
505            .read(relative_path)
506            .await
507            .map_err(from_opendal_error)?
508            .to_bytes())
509    }
510
511    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
512        let (op, relative_path) = self.create_operator(&path)?;
513        Ok(Box::new(OpenDalReader(
514            op.reader(relative_path).await.map_err(from_opendal_error)?,
515        )))
516    }
517
518    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
519        let (op, relative_path) = self.create_operator(&path)?;
520        op.write(relative_path, bs)
521            .await
522            .map_err(from_opendal_error)?;
523        Ok(())
524    }
525
526    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
527        let (op, relative_path) = self.create_operator(&path)?;
528        Ok(Box::new(OpenDalWriter(
529            op.writer(relative_path).await.map_err(from_opendal_error)?,
530        )))
531    }
532
533    async fn delete(&self, path: &str) -> Result<()> {
534        let (op, relative_path) = self.create_operator(&path)?;
535        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
536    }
537
538    async fn delete_prefix(&self, path: &str) -> Result<()> {
539        let (op, relative_path) = self.create_operator(&path)?;
540        let path = if relative_path.ends_with('/') {
541            relative_path.to_string()
542        } else {
543            format!("{relative_path}/")
544        };
545        Ok(op
546            .delete_with(&path)
547            .recursive(true)
548            .await
549            .map_err(from_opendal_error)?)
550    }
551
552    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
553        let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
554
555        while let Some(path) = paths.next().await {
556            let bucket = self.batch_key_for_path(&path);
557
558            let (relative_path, deleter) = match deleters.entry(bucket) {
559                Entry::Occupied(entry) => {
560                    (self.relativize_path(&path)?.to_string(), entry.into_mut())
561                }
562                Entry::Vacant(entry) => {
563                    let (op, rel) = self.create_operator(&path)?;
564                    let rel = rel.to_string();
565                    let deleter = op.deleter().await.map_err(from_opendal_error)?;
566                    (rel, entry.insert(deleter))
567                }
568            };
569
570            deleter
571                .delete(relative_path)
572                .await
573                .map_err(from_opendal_error)?;
574        }
575
576        for (_, mut deleter) in deleters {
577            deleter.close().await.map_err(from_opendal_error)?;
578        }
579
580        Ok(())
581    }
582
583    #[allow(unreachable_code, unused_variables)]
584    fn new_input(&self, path: &str) -> Result<InputFile> {
585        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
586    }
587
588    #[allow(unreachable_code, unused_variables)]
589    fn new_output(&self, path: &str) -> Result<OutputFile> {
590        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
591    }
592}
593
594// Newtype wrappers for opendal types to satisfy orphan rules.
595// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
596// Reader/Writer since neither trait nor type is defined in this crate.
597
598/// Wrapper around `opendal::Reader` that implements `FileRead`.
599pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
600
601#[async_trait]
602impl FileRead for OpenDalReader {
603    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
604        Ok(opendal::Reader::read(&self.0, range)
605            .await
606            .map_err(from_opendal_error)?
607            .to_bytes())
608    }
609}
610
611/// Wrapper around `opendal::Writer` that implements `FileWrite`.
612pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
613
614#[async_trait]
615impl FileWrite for OpenDalWriter {
616    async fn write(&mut self, bs: Bytes) -> Result<()> {
617        Ok(opendal::Writer::write(&mut self.0, bs)
618            .await
619            .map_err(from_opendal_error)?)
620    }
621
622    async fn close(&mut self) -> Result<()> {
623        let _ = opendal::Writer::close(&mut self.0)
624            .await
625            .map_err(from_opendal_error)?;
626        Ok(())
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    #[cfg(feature = "opendal-memory")]
635    #[test]
636    fn test_default_memory_operator() {
637        let op = default_memory_operator();
638        assert_eq!(op.info().scheme().to_string(), "memory");
639    }
640
641    #[cfg(feature = "opendal-memory")]
642    #[test]
643    fn test_relativize_path_memory() {
644        let storage = OpenDalStorage::Memory(default_memory_operator());
645
646        assert_eq!(
647            storage.relativize_path("memory:/path/to/file").unwrap(),
648            "path/to/file"
649        );
650        // Without the scheme prefix, falls back to stripping the leading slash
651        assert_eq!(
652            storage.relativize_path("/path/to/file").unwrap(),
653            "path/to/file"
654        );
655    }
656
657    #[cfg(feature = "opendal-fs")]
658    #[test]
659    fn test_relativize_path_fs() {
660        let storage = OpenDalStorage::LocalFs;
661
662        assert_eq!(
663            storage
664                .relativize_path("file:/tmp/data/file.parquet")
665                .unwrap(),
666            "tmp/data/file.parquet"
667        );
668        assert_eq!(
669            storage.relativize_path("/tmp/data/file.parquet").unwrap(),
670            "tmp/data/file.parquet"
671        );
672    }
673
674    #[cfg(feature = "opendal-s3")]
675    #[test]
676    fn test_relativize_path_s3() {
677        let storage = OpenDalStorage::S3 {
678            config: Arc::new(S3Config::default()),
679            customized_credential_load: None,
680        };
681
682        // All S3-family schemes are accepted by the same storage instance.
683        // Custom schemes for S3-compatible stores (e.g., `minio://`) are also
684        // accepted because the path's scheme is used as-is for prefix matching.
685        for scheme in ["s3", "s3a", "s3n", "minio"] {
686            assert_eq!(
687                storage
688                    .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
689                    .unwrap(),
690                "path/to/file.parquet"
691            );
692        }
693    }
694
695    #[cfg(feature = "opendal-gcs")]
696    #[test]
697    fn test_relativize_path_gcs() {
698        let storage = OpenDalStorage::Gcs {
699            config: Arc::new(GcsConfig::default()),
700        };
701
702        assert_eq!(
703            storage
704                .relativize_path("gs://my-bucket/path/to/file.parquet")
705                .unwrap(),
706            "path/to/file.parquet"
707        );
708    }
709
710    #[cfg(feature = "opendal-gcs")]
711    #[test]
712    fn test_relativize_path_gcs_invalid_scheme() {
713        let storage = OpenDalStorage::Gcs {
714            config: Arc::new(GcsConfig::default()),
715        };
716
717        assert!(
718            storage
719                .relativize_path("s3://my-bucket/path/to/file.parquet")
720                .is_err()
721        );
722    }
723
724    #[cfg(feature = "opendal-oss")]
725    #[test]
726    fn test_relativize_path_oss() {
727        let storage = OpenDalStorage::Oss {
728            config: Arc::new(OssConfig::default()),
729        };
730
731        assert_eq!(
732            storage
733                .relativize_path("oss://my-bucket/path/to/file.parquet")
734                .unwrap(),
735            "path/to/file.parquet"
736        );
737    }
738
739    #[cfg(feature = "opendal-oss")]
740    #[test]
741    fn test_relativize_path_oss_invalid_scheme() {
742        let storage = OpenDalStorage::Oss {
743            config: Arc::new(OssConfig::default()),
744        };
745
746        assert!(
747            storage
748                .relativize_path("s3://my-bucket/path/to/file.parquet")
749                .is_err()
750        );
751    }
752
753    #[cfg(feature = "opendal-azdls")]
754    #[test]
755    fn test_relativize_path_azdls() {
756        let storage = OpenDalStorage::Azdls {
757            config: Arc::new(AzdlsConfig {
758                account_name: Some("myaccount".to_string()),
759                endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
760                ..Default::default()
761            }),
762        };
763
764        assert_eq!(
765            storage
766                .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
767                .unwrap(),
768            "/path/to/file.parquet"
769        );
770    }
771}