iceberg_storage_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation for Apache Iceberg.
19//!
20//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
21//! which implement the [`Storage`](iceberg::io::Storage) and
22//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate
23//! using [OpenDAL](https://opendal.apache.org/) as the backend.
24
25mod utils;
26
27use std::collections::HashMap;
28use std::collections::hash_map::Entry;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use cfg_if::cfg_if;
34use futures::StreamExt;
35use futures::stream::BoxStream;
36use iceberg::io::{
37    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38    StorageFactory,
39};
40use iceberg::{Error, ErrorKind, Result};
41use opendal::Operator;
42use opendal::layers::RetryLayer;
43use serde::{Deserialize, Serialize};
44use utils::from_opendal_error;
45
46cfg_if! {
47    if #[cfg(feature = "opendal-azdls")] {
48        mod azdls;
49        use azdls::*;
50        use opendal::services::AzdlsConfig;
51    }
52}
53
54cfg_if! {
55    if #[cfg(feature = "opendal-fs")] {
56        mod fs;
57        use fs::*;
58    }
59}
60
61cfg_if! {
62    if #[cfg(feature = "opendal-gcs")] {
63        mod gcs;
64        use gcs::*;
65        use opendal::services::GcsConfig;
66    }
67}
68
69cfg_if! {
70    if #[cfg(feature = "opendal-memory")] {
71        mod memory;
72        use memory::*;
73    }
74}
75
76cfg_if! {
77    if #[cfg(feature = "opendal-oss")] {
78        mod oss;
79        use opendal::services::OssConfig;
80        use oss::*;
81    }
82}
83
84cfg_if! {
85    if #[cfg(feature = "opendal-s3")] {
86        mod s3;
87        use opendal::services::S3Config;
88        pub use s3::*;
89    }
90}
91
92mod resolving;
93pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
94
95/// OpenDAL-based storage factory.
96///
97/// Maps scheme to the corresponding OpenDalStorage storage variant.
98/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances.
99#[derive(Clone, Debug, Serialize, Deserialize)]
100pub enum OpenDalStorageFactory {
101    /// Memory storage factory.
102    #[cfg(feature = "opendal-memory")]
103    Memory,
104    /// Local filesystem storage factory.
105    #[cfg(feature = "opendal-fs")]
106    Fs,
107    /// S3 storage factory.
108    #[cfg(feature = "opendal-s3")]
109    S3 {
110        /// Custom AWS credential loader.
111        #[serde(skip)]
112        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
113    },
114    /// GCS storage factory.
115    #[cfg(feature = "opendal-gcs")]
116    Gcs,
117    /// OSS storage factory.
118    #[cfg(feature = "opendal-oss")]
119    Oss,
120    /// Azure Data Lake Storage factory.
121    #[cfg(feature = "opendal-azdls")]
122    Azdls,
123}
124
125#[typetag::serde(name = "OpenDalStorageFactory")]
126impl StorageFactory for OpenDalStorageFactory {
127    #[allow(unused_variables)]
128    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
129        match self {
130            #[cfg(feature = "opendal-memory")]
131            OpenDalStorageFactory::Memory => {
132                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
133            }
134            #[cfg(feature = "opendal-fs")]
135            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
136            #[cfg(feature = "opendal-s3")]
137            OpenDalStorageFactory::S3 {
138                customized_credential_load,
139            } => Ok(Arc::new(OpenDalStorage::S3 {
140                config: s3_config_parse(config.props().clone())?.into(),
141                customized_credential_load: customized_credential_load.clone(),
142            })),
143            #[cfg(feature = "opendal-gcs")]
144            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
145                config: gcs_config_parse(config.props().clone())?.into(),
146            })),
147            #[cfg(feature = "opendal-oss")]
148            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
149                config: oss_config_parse(config.props().clone())?.into(),
150            })),
151            #[cfg(feature = "opendal-azdls")]
152            OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
153                config: azdls_config_parse(config.props().clone())?.into(),
154            })),
155            #[cfg(all(
156                not(feature = "opendal-memory"),
157                not(feature = "opendal-fs"),
158                not(feature = "opendal-s3"),
159                not(feature = "opendal-gcs"),
160                not(feature = "opendal-oss"),
161                not(feature = "opendal-azdls"),
162            ))]
163            _ => Err(Error::new(
164                ErrorKind::FeatureUnsupported,
165                "No storage service has been enabled",
166            )),
167        }
168    }
169}
170
171/// Default memory operator for serde deserialization.
172#[cfg(feature = "opendal-memory")]
173fn default_memory_operator() -> Operator {
174    memory_config_build().expect("Failed to create default memory operator")
175}
176
177/// OpenDAL-based storage implementation.
178#[derive(Clone, Debug, Serialize, Deserialize)]
179pub enum OpenDalStorage {
180    /// Memory storage variant.
181    #[cfg(feature = "opendal-memory")]
182    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
183    /// Local filesystem storage variant.
184    #[cfg(feature = "opendal-fs")]
185    LocalFs,
186    /// S3 storage variant.
187    ///
188    /// Accepts any S3-family URL (`s3://`, `s3a://`, `s3n://`); the scheme is
189    /// derived from the path at call time.
190    #[cfg(feature = "opendal-s3")]
191    S3 {
192        /// S3 configuration.
193        config: Arc<S3Config>,
194        /// Custom AWS credential loader.
195        #[serde(skip)]
196        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
197    },
198    /// GCS storage variant.
199    #[cfg(feature = "opendal-gcs")]
200    Gcs {
201        /// GCS configuration.
202        config: Arc<GcsConfig>,
203    },
204    /// OSS storage variant.
205    #[cfg(feature = "opendal-oss")]
206    Oss {
207        /// OSS configuration.
208        config: Arc<OssConfig>,
209    },
210    /// Azure Data Lake Storage variant.
211    ///
212    /// Accepts paths of the form
213    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
214    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
215    /// The scheme is derived from the path at call time.
216    #[cfg(feature = "opendal-azdls")]
217    Azdls {
218        /// Azure DLS configuration.
219        config: Arc<AzdlsConfig>,
220    },
221}
222
223impl OpenDalStorage {
224    /// Creates operator from path.
225    ///
226    /// # Arguments
227    ///
228    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO).
229    ///
230    /// # Returns
231    ///
232    /// The return value consists of two parts:
233    ///
234    /// * An [`opendal::Operator`] instance used to operate on file.
235    /// * Relative path to the root uri of [`opendal::Operator`].
236    #[allow(unreachable_code, unused_variables)]
237    pub(crate) fn create_operator<'a>(
238        &self,
239        path: &'a impl AsRef<str>,
240    ) -> Result<(Operator, &'a str)> {
241        let path = path.as_ref();
242        let (operator, relative_path): (Operator, &str) = match self {
243            #[cfg(feature = "opendal-memory")]
244            OpenDalStorage::Memory(op) => {
245                if let Some(stripped) = path.strip_prefix("memory:/") {
246                    (op.clone(), stripped)
247                } else {
248                    (op.clone(), &path[1..])
249                }
250            }
251            #[cfg(feature = "opendal-fs")]
252            OpenDalStorage::LocalFs => {
253                let op = fs_config_build()?;
254                if let Some(stripped) = path.strip_prefix("file:/") {
255                    (op, stripped)
256                } else {
257                    (op, &path[1..])
258                }
259            }
260            #[cfg(feature = "opendal-s3")]
261            OpenDalStorage::S3 {
262                config,
263                customized_credential_load,
264            } => {
265                let op = s3_config_build(config, customized_credential_load, path)?;
266                let op_info = op.info();
267
268                // Use the URL scheme in the path for prefix matching. This enables
269                // use of S3-compatible storage backends using custom schemes (e.g., `minio://`, `r2://`).
270                let url = url::Url::parse(path).map_err(|e| {
271                    Error::new(
272                        ErrorKind::DataInvalid,
273                        format!("Invalid s3 url: {path}: {e}"),
274                    )
275                })?;
276                let prefix = format!("{}://{}/", url.scheme(), op_info.name());
277                if path.starts_with(&prefix) {
278                    (op, &path[prefix.len()..])
279                } else {
280                    return Err(Error::new(
281                        ErrorKind::DataInvalid,
282                        format!("Invalid s3 url: {path}, should start with {prefix}"),
283                    ));
284                }
285            }
286            #[cfg(feature = "opendal-gcs")]
287            OpenDalStorage::Gcs { config } => {
288                let operator = gcs_config_build(config, path)?;
289                let prefix = format!("gs://{}/", operator.info().name());
290                if path.starts_with(&prefix) {
291                    (operator, &path[prefix.len()..])
292                } else {
293                    return Err(Error::new(
294                        ErrorKind::DataInvalid,
295                        format!("Invalid gcs url: {path}, should start with {prefix}"),
296                    ));
297                }
298            }
299            #[cfg(feature = "opendal-oss")]
300            OpenDalStorage::Oss { config } => {
301                let op = oss_config_build(config, path)?;
302                let prefix = format!("oss://{}/", op.info().name());
303                if path.starts_with(&prefix) {
304                    (op, &path[prefix.len()..])
305                } else {
306                    return Err(Error::new(
307                        ErrorKind::DataInvalid,
308                        format!("Invalid oss url: {path}, should start with {prefix}"),
309                    ));
310                }
311            }
312            #[cfg(feature = "opendal-azdls")]
313            OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
314            #[cfg(all(
315                not(feature = "opendal-s3"),
316                not(feature = "opendal-fs"),
317                not(feature = "opendal-gcs"),
318                not(feature = "opendal-oss"),
319                not(feature = "opendal-azdls"),
320            ))]
321            _ => {
322                return Err(Error::new(
323                    ErrorKind::FeatureUnsupported,
324                    "No storage service has been enabled",
325                ));
326            }
327        };
328
329        // Transient errors are common for object stores; however there's no
330        // harm in retrying temporary failures for other storage backends as well.
331        let operator = operator.layer(RetryLayer::new());
332        Ok((operator, relative_path))
333    }
334
335    /// Extracts the relative path from an absolute path without building an operator.
336    ///
337    /// This is a lightweight alternative to [`create_operator`](Self::create_operator) for cases
338    /// where only the relative path is needed (e.g. bulk deletes where the operator is already
339    /// available).
340    #[allow(unreachable_code, unused_variables)]
341    pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str> {
342        match self {
343            #[cfg(feature = "opendal-memory")]
344            OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
345            #[cfg(feature = "opendal-fs")]
346            OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
347            #[cfg(feature = "opendal-s3")]
348            OpenDalStorage::S3 { .. } => {
349                let url = url::Url::parse(path)?;
350                let bucket = url.host_str().ok_or_else(|| {
351                    Error::new(
352                        ErrorKind::DataInvalid,
353                        format!("Invalid s3 url: {path}, missing bucket"),
354                    )
355                })?;
356                let prefix = format!("{}://{}/", url.scheme(), bucket);
357                if path.starts_with(&prefix) {
358                    Ok(&path[prefix.len()..])
359                } else {
360                    Err(Error::new(
361                        ErrorKind::DataInvalid,
362                        format!("Invalid s3 url: {path}, should start with {prefix}"),
363                    ))
364                }
365            }
366            #[cfg(feature = "opendal-gcs")]
367            OpenDalStorage::Gcs { .. } => {
368                let url = url::Url::parse(path)?;
369                let bucket = url.host_str().ok_or_else(|| {
370                    Error::new(
371                        ErrorKind::DataInvalid,
372                        format!("Invalid gcs url: {path}, missing bucket"),
373                    )
374                })?;
375                let prefix = format!("gs://{}/", bucket);
376                if path.starts_with(&prefix) {
377                    Ok(&path[prefix.len()..])
378                } else {
379                    Err(Error::new(
380                        ErrorKind::DataInvalid,
381                        format!("Invalid gcs url: {path}, should start with {prefix}"),
382                    ))
383                }
384            }
385            #[cfg(feature = "opendal-oss")]
386            OpenDalStorage::Oss { .. } => {
387                let url = url::Url::parse(path)?;
388                let bucket = url.host_str().ok_or_else(|| {
389                    Error::new(
390                        ErrorKind::DataInvalid,
391                        format!("Invalid oss url: {path}, missing bucket"),
392                    )
393                })?;
394                let prefix = format!("oss://{}/", bucket);
395                if path.starts_with(&prefix) {
396                    Ok(&path[prefix.len()..])
397                } else {
398                    Err(Error::new(
399                        ErrorKind::DataInvalid,
400                        format!("Invalid oss url: {path}, should start with {prefix}"),
401                    ))
402                }
403            }
404            #[cfg(feature = "opendal-azdls")]
405            OpenDalStorage::Azdls { config } => {
406                let azure_path = path.parse::<AzureStoragePath>()?;
407                match_path_with_config(&azure_path, config)?;
408                let relative_path_len = azure_path.path.len();
409                Ok(&path[path.len() - relative_path_len..])
410            }
411            #[cfg(all(
412                not(feature = "opendal-s3"),
413                not(feature = "opendal-fs"),
414                not(feature = "opendal-gcs"),
415                not(feature = "opendal-oss"),
416                not(feature = "opendal-azdls"),
417            ))]
418            _ => Err(Error::new(
419                ErrorKind::FeatureUnsupported,
420                "No storage service has been enabled",
421            )),
422        }
423    }
424}
425
426#[typetag::serde(name = "OpenDalStorage")]
427#[async_trait]
428impl Storage for OpenDalStorage {
429    async fn exists(&self, path: &str) -> Result<bool> {
430        let (op, relative_path) = self.create_operator(&path)?;
431        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
432    }
433
434    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
435        let (op, relative_path) = self.create_operator(&path)?;
436        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
437        Ok(FileMetadata {
438            size: meta.content_length(),
439        })
440    }
441
442    async fn read(&self, path: &str) -> Result<Bytes> {
443        let (op, relative_path) = self.create_operator(&path)?;
444        Ok(op
445            .read(relative_path)
446            .await
447            .map_err(from_opendal_error)?
448            .to_bytes())
449    }
450
451    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
452        let (op, relative_path) = self.create_operator(&path)?;
453        Ok(Box::new(OpenDalReader(
454            op.reader(relative_path).await.map_err(from_opendal_error)?,
455        )))
456    }
457
458    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
459        let (op, relative_path) = self.create_operator(&path)?;
460        op.write(relative_path, bs)
461            .await
462            .map_err(from_opendal_error)?;
463        Ok(())
464    }
465
466    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
467        let (op, relative_path) = self.create_operator(&path)?;
468        Ok(Box::new(OpenDalWriter(
469            op.writer(relative_path).await.map_err(from_opendal_error)?,
470        )))
471    }
472
473    async fn delete(&self, path: &str) -> Result<()> {
474        let (op, relative_path) = self.create_operator(&path)?;
475        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
476    }
477
478    async fn delete_prefix(&self, path: &str) -> Result<()> {
479        let (op, relative_path) = self.create_operator(&path)?;
480        let path = if relative_path.ends_with('/') {
481            relative_path.to_string()
482        } else {
483            format!("{relative_path}/")
484        };
485        Ok(op
486            .delete_with(&path)
487            .recursive(true)
488            .await
489            .map_err(from_opendal_error)?)
490    }
491
492    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
493        let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
494
495        while let Some(path) = paths.next().await {
496            let bucket = url::Url::parse(&path)
497                .ok()
498                .and_then(|u| u.host_str().map(|s| s.to_string()))
499                .unwrap_or_default();
500
501            let (relative_path, deleter) = match deleters.entry(bucket) {
502                Entry::Occupied(entry) => {
503                    (self.relativize_path(&path)?.to_string(), entry.into_mut())
504                }
505                Entry::Vacant(entry) => {
506                    let (op, rel) = self.create_operator(&path)?;
507                    let rel = rel.to_string();
508                    let deleter = op.deleter().await.map_err(from_opendal_error)?;
509                    (rel, entry.insert(deleter))
510                }
511            };
512
513            deleter
514                .delete(relative_path)
515                .await
516                .map_err(from_opendal_error)?;
517        }
518
519        for (_, mut deleter) in deleters {
520            deleter.close().await.map_err(from_opendal_error)?;
521        }
522
523        Ok(())
524    }
525
526    #[allow(unreachable_code, unused_variables)]
527    fn new_input(&self, path: &str) -> Result<InputFile> {
528        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
529    }
530
531    #[allow(unreachable_code, unused_variables)]
532    fn new_output(&self, path: &str) -> Result<OutputFile> {
533        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
534    }
535}
536
537// Newtype wrappers for opendal types to satisfy orphan rules.
538// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
539// Reader/Writer since neither trait nor type is defined in this crate.
540
541/// Wrapper around `opendal::Reader` that implements `FileRead`.
542pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
543
544#[async_trait]
545impl FileRead for OpenDalReader {
546    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
547        Ok(opendal::Reader::read(&self.0, range)
548            .await
549            .map_err(from_opendal_error)?
550            .to_bytes())
551    }
552}
553
554/// Wrapper around `opendal::Writer` that implements `FileWrite`.
555pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
556
557#[async_trait]
558impl FileWrite for OpenDalWriter {
559    async fn write(&mut self, bs: Bytes) -> Result<()> {
560        Ok(opendal::Writer::write(&mut self.0, bs)
561            .await
562            .map_err(from_opendal_error)?)
563    }
564
565    async fn close(&mut self) -> Result<()> {
566        let _ = opendal::Writer::close(&mut self.0)
567            .await
568            .map_err(from_opendal_error)?;
569        Ok(())
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    #[cfg(feature = "opendal-memory")]
578    #[test]
579    fn test_default_memory_operator() {
580        let op = default_memory_operator();
581        assert_eq!(op.info().scheme().to_string(), "memory");
582    }
583
584    #[cfg(feature = "opendal-memory")]
585    #[test]
586    fn test_relativize_path_memory() {
587        let storage = OpenDalStorage::Memory(default_memory_operator());
588
589        assert_eq!(
590            storage.relativize_path("memory:/path/to/file").unwrap(),
591            "path/to/file"
592        );
593        // Without the scheme prefix, falls back to stripping the leading slash
594        assert_eq!(
595            storage.relativize_path("/path/to/file").unwrap(),
596            "path/to/file"
597        );
598    }
599
600    #[cfg(feature = "opendal-fs")]
601    #[test]
602    fn test_relativize_path_fs() {
603        let storage = OpenDalStorage::LocalFs;
604
605        assert_eq!(
606            storage
607                .relativize_path("file:/tmp/data/file.parquet")
608                .unwrap(),
609            "tmp/data/file.parquet"
610        );
611        assert_eq!(
612            storage.relativize_path("/tmp/data/file.parquet").unwrap(),
613            "tmp/data/file.parquet"
614        );
615    }
616
617    #[cfg(feature = "opendal-s3")]
618    #[test]
619    fn test_relativize_path_s3() {
620        let storage = OpenDalStorage::S3 {
621            config: Arc::new(S3Config::default()),
622            customized_credential_load: None,
623        };
624
625        // All S3-family schemes are accepted by the same storage instance.
626        // Custom schemes for S3-compatible stores (e.g., `minio://`) are also
627        // accepted because the path's scheme is used as-is for prefix matching.
628        for scheme in ["s3", "s3a", "s3n", "minio"] {
629            assert_eq!(
630                storage
631                    .relativize_path(&format!("{scheme}://my-bucket/path/to/file.parquet"))
632                    .unwrap(),
633                "path/to/file.parquet"
634            );
635        }
636    }
637
638    #[cfg(feature = "opendal-gcs")]
639    #[test]
640    fn test_relativize_path_gcs() {
641        let storage = OpenDalStorage::Gcs {
642            config: Arc::new(GcsConfig::default()),
643        };
644
645        assert_eq!(
646            storage
647                .relativize_path("gs://my-bucket/path/to/file.parquet")
648                .unwrap(),
649            "path/to/file.parquet"
650        );
651    }
652
653    #[cfg(feature = "opendal-gcs")]
654    #[test]
655    fn test_relativize_path_gcs_invalid_scheme() {
656        let storage = OpenDalStorage::Gcs {
657            config: Arc::new(GcsConfig::default()),
658        };
659
660        assert!(
661            storage
662                .relativize_path("s3://my-bucket/path/to/file.parquet")
663                .is_err()
664        );
665    }
666
667    #[cfg(feature = "opendal-oss")]
668    #[test]
669    fn test_relativize_path_oss() {
670        let storage = OpenDalStorage::Oss {
671            config: Arc::new(OssConfig::default()),
672        };
673
674        assert_eq!(
675            storage
676                .relativize_path("oss://my-bucket/path/to/file.parquet")
677                .unwrap(),
678            "path/to/file.parquet"
679        );
680    }
681
682    #[cfg(feature = "opendal-oss")]
683    #[test]
684    fn test_relativize_path_oss_invalid_scheme() {
685        let storage = OpenDalStorage::Oss {
686            config: Arc::new(OssConfig::default()),
687        };
688
689        assert!(
690            storage
691                .relativize_path("s3://my-bucket/path/to/file.parquet")
692                .is_err()
693        );
694    }
695
696    #[cfg(feature = "opendal-azdls")]
697    #[test]
698    fn test_relativize_path_azdls() {
699        let storage = OpenDalStorage::Azdls {
700            config: Arc::new(AzdlsConfig {
701                account_name: Some("myaccount".to_string()),
702                endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
703                ..Default::default()
704            }),
705        };
706
707        assert_eq!(
708            storage
709                .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet")
710                .unwrap(),
711            "/path/to/file.parquet"
712        );
713    }
714}