iceberg/io/opendal/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation.
19
20use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24#[cfg(feature = "storage-azdls")]
25use azdls::AzureStorageScheme;
26use bytes::Bytes;
27use opendal::layers::RetryLayer;
28#[cfg(feature = "storage-azdls")]
29use opendal::services::AzdlsConfig;
30#[cfg(feature = "storage-gcs")]
31use opendal::services::GcsConfig;
32#[cfg(feature = "storage-oss")]
33use opendal::services::OssConfig;
34#[cfg(feature = "storage-s3")]
35use opendal::services::S3Config;
36use opendal::{Operator, Scheme};
37#[cfg(feature = "storage-s3")]
38pub use s3::CustomAwsCredentialLoader;
39use serde::{Deserialize, Serialize};
40
41use super::{
42    FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
43    StorageConfig, StorageFactory,
44};
45use crate::{Error, ErrorKind, Result};
46
47#[cfg(feature = "storage-azdls")]
48mod azdls;
49#[cfg(feature = "storage-fs")]
50mod fs;
51#[cfg(feature = "storage-gcs")]
52mod gcs;
53#[cfg(feature = "storage-memory")]
54mod memory;
55#[cfg(feature = "storage-oss")]
56mod oss;
57#[cfg(feature = "storage-s3")]
58mod s3;
59
60#[cfg(feature = "storage-azdls")]
61use azdls::*;
62#[cfg(feature = "storage-fs")]
63use fs::*;
64#[cfg(feature = "storage-gcs")]
65use gcs::*;
66#[cfg(feature = "storage-memory")]
67use memory::*;
68#[cfg(feature = "storage-oss")]
69use oss::*;
70#[cfg(feature = "storage-s3")]
71pub use s3::*;
72
73/// OpenDAL-based storage factory.
74///
75/// Maps scheme to the corresponding OpenDalStorage storage variant.
76///
77/// TODO this is currently not used, we still use OpenDalStorage::build() for now
78#[derive(Clone, Debug, Serialize, Deserialize)]
79pub enum OpenDalStorageFactory {
80    /// Memory storage factory.
81    #[cfg(feature = "storage-memory")]
82    Memory,
83    /// Local filesystem storage factory.
84    #[cfg(feature = "storage-fs")]
85    Fs,
86    /// S3 storage factory.
87    #[cfg(feature = "storage-s3")]
88    S3 {
89        /// Custom AWS credential loader.
90        #[serde(skip)]
91        customized_credential_load: Option<CustomAwsCredentialLoader>,
92    },
93    /// GCS storage factory.
94    #[cfg(feature = "storage-gcs")]
95    Gcs,
96    /// OSS storage factory.
97    #[cfg(feature = "storage-oss")]
98    Oss,
99    /// Azure Data Lake Storage factory.
100    #[cfg(feature = "storage-azdls")]
101    Azdls {
102        /// The configured Azure storage scheme.
103        configured_scheme: AzureStorageScheme,
104    },
105}
106
107#[typetag::serde(name = "OpenDalStorageFactory")]
108impl StorageFactory for OpenDalStorageFactory {
109    #[allow(unused_variables)]
110    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
111        match self {
112            #[cfg(feature = "storage-memory")]
113            OpenDalStorageFactory::Memory => {
114                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
115            }
116            #[cfg(feature = "storage-fs")]
117            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
118            #[cfg(feature = "storage-s3")]
119            OpenDalStorageFactory::S3 {
120                customized_credential_load,
121            } => Ok(Arc::new(OpenDalStorage::S3 {
122                configured_scheme: "s3".to_string(),
123                config: s3_config_parse(config.props().clone())?.into(),
124                customized_credential_load: customized_credential_load.clone(),
125            })),
126            #[cfg(feature = "storage-gcs")]
127            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
128                config: gcs_config_parse(config.props().clone())?.into(),
129            })),
130            #[cfg(feature = "storage-oss")]
131            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
132                config: oss_config_parse(config.props().clone())?.into(),
133            })),
134            #[cfg(feature = "storage-azdls")]
135            OpenDalStorageFactory::Azdls { configured_scheme } => {
136                Ok(Arc::new(OpenDalStorage::Azdls {
137                    configured_scheme: configured_scheme.clone(),
138                    config: azdls_config_parse(config.props().clone())?.into(),
139                }))
140            }
141            #[cfg(all(
142                not(feature = "storage-memory"),
143                not(feature = "storage-fs"),
144                not(feature = "storage-s3"),
145                not(feature = "storage-gcs"),
146                not(feature = "storage-oss"),
147                not(feature = "storage-azdls"),
148            ))]
149            _ => Err(Error::new(
150                ErrorKind::FeatureUnsupported,
151                "No storage service has been enabled",
152            )),
153        }
154    }
155}
156
157/// Default memory operator for serde deserialization.
158#[cfg(feature = "storage-memory")]
159fn default_memory_operator() -> Operator {
160    memory_config_build().expect("Failed to create default memory operator")
161}
162
163/// OpenDAL-based storage implementation.
164#[derive(Clone, Debug, Serialize, Deserialize)]
165pub enum OpenDalStorage {
166    /// Memory storage variant.
167    #[cfg(feature = "storage-memory")]
168    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
169    /// Local filesystem storage variant.
170    #[cfg(feature = "storage-fs")]
171    LocalFs,
172    /// S3 storage variant.
173    #[cfg(feature = "storage-s3")]
174    S3 {
175        /// s3 storage could have `s3://` and `s3a://`.
176        /// Storing the scheme string here to return the correct path.
177        configured_scheme: String,
178        /// S3 configuration.
179        config: Arc<S3Config>,
180        /// Custom AWS credential loader.
181        #[serde(skip)]
182        customized_credential_load: Option<CustomAwsCredentialLoader>,
183    },
184    /// GCS storage variant.
185    #[cfg(feature = "storage-gcs")]
186    Gcs {
187        /// GCS configuration.
188        config: Arc<GcsConfig>,
189    },
190    /// OSS storage variant.
191    #[cfg(feature = "storage-oss")]
192    Oss {
193        /// OSS configuration.
194        config: Arc<OssConfig>,
195    },
196    /// Azure Data Lake Storage variant.
197    /// Expects paths of the form
198    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
199    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
200    #[cfg(feature = "storage-azdls")]
201    #[allow(private_interfaces)]
202    Azdls {
203        /// The configured Azure storage scheme.
204        /// Because Azdls accepts multiple possible schemes, we store the full
205        /// passed scheme here to later validate schemes passed via paths.
206        configured_scheme: AzureStorageScheme,
207        /// Azure DLS configuration.
208        config: Arc<AzdlsConfig>,
209    },
210}
211
212impl OpenDalStorage {
213    /// Convert iceberg config to opendal config.
214    ///
215    /// TODO Switch to use OpenDalStorageFactory::build()
216    pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
217        let (scheme_str, props, extensions) = file_io_builder.into_parts();
218        let _ = (&props, &extensions);
219        let scheme = Self::parse_scheme(&scheme_str)?;
220
221        match scheme {
222            #[cfg(feature = "storage-memory")]
223            Scheme::Memory => Ok(Self::Memory(memory_config_build()?)),
224            #[cfg(feature = "storage-fs")]
225            Scheme::Fs => Ok(Self::LocalFs),
226            #[cfg(feature = "storage-s3")]
227            Scheme::S3 => Ok(Self::S3 {
228                configured_scheme: scheme_str,
229                config: s3_config_parse(props)?.into(),
230                customized_credential_load: extensions
231                    .get::<CustomAwsCredentialLoader>()
232                    .map(Arc::unwrap_or_clone),
233            }),
234            #[cfg(feature = "storage-gcs")]
235            Scheme::Gcs => Ok(Self::Gcs {
236                config: gcs_config_parse(props)?.into(),
237            }),
238            #[cfg(feature = "storage-oss")]
239            Scheme::Oss => Ok(Self::Oss {
240                config: oss_config_parse(props)?.into(),
241            }),
242            #[cfg(feature = "storage-azdls")]
243            Scheme::Azdls => {
244                let scheme = scheme_str.parse::<AzureStorageScheme>()?;
245                Ok(Self::Azdls {
246                    config: azdls_config_parse(props)?.into(),
247                    configured_scheme: scheme,
248                })
249            }
250            // Update doc on [`FileIO`] when adding new schemes.
251            _ => Err(Error::new(
252                ErrorKind::FeatureUnsupported,
253                format!("Constructing file io from scheme: {scheme} not supported now",),
254            )),
255        }
256    }
257
258    /// Creates operator from path.
259    ///
260    /// # Arguments
261    ///
262    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
263    ///
264    /// # Returns
265    ///
266    /// The return value consists of two parts:
267    ///
268    /// * An [`opendal::Operator`] instance used to operate on file.
269    /// * Relative path to the root uri of [`opendal::Operator`].
270    #[allow(unreachable_code, unused_variables)]
271    pub(crate) fn create_operator<'a>(
272        &self,
273        path: &'a impl AsRef<str>,
274    ) -> Result<(Operator, &'a str)> {
275        let path = path.as_ref();
276        let (operator, relative_path): (Operator, &str) = match self {
277            #[cfg(feature = "storage-memory")]
278            OpenDalStorage::Memory(op) => {
279                if let Some(stripped) = path.strip_prefix("memory:/") {
280                    (op.clone(), stripped)
281                } else {
282                    (op.clone(), &path[1..])
283                }
284            }
285            #[cfg(feature = "storage-fs")]
286            OpenDalStorage::LocalFs => {
287                let op = fs_config_build()?;
288                if let Some(stripped) = path.strip_prefix("file:/") {
289                    (op, stripped)
290                } else {
291                    (op, &path[1..])
292                }
293            }
294            #[cfg(feature = "storage-s3")]
295            OpenDalStorage::S3 {
296                configured_scheme,
297                config,
298                customized_credential_load,
299            } => {
300                let op = s3_config_build(config, customized_credential_load, path)?;
301                let op_info = op.info();
302
303                // Check prefix of s3 path.
304                let prefix = format!("{}://{}/", configured_scheme, op_info.name());
305                if path.starts_with(&prefix) {
306                    (op, &path[prefix.len()..])
307                } else {
308                    return Err(Error::new(
309                        ErrorKind::DataInvalid,
310                        format!("Invalid s3 url: {path}, should start with {prefix}"),
311                    ));
312                }
313            }
314            #[cfg(feature = "storage-gcs")]
315            OpenDalStorage::Gcs { config } => {
316                let operator = gcs_config_build(config, path)?;
317                let prefix = format!("gs://{}/", operator.info().name());
318                if path.starts_with(&prefix) {
319                    (operator, &path[prefix.len()..])
320                } else {
321                    return Err(Error::new(
322                        ErrorKind::DataInvalid,
323                        format!("Invalid gcs url: {path}, should start with {prefix}"),
324                    ));
325                }
326            }
327            #[cfg(feature = "storage-oss")]
328            OpenDalStorage::Oss { config } => {
329                let op = oss_config_build(config, path)?;
330                let prefix = format!("oss://{}/", op.info().name());
331                if path.starts_with(&prefix) {
332                    (op, &path[prefix.len()..])
333                } else {
334                    return Err(Error::new(
335                        ErrorKind::DataInvalid,
336                        format!("Invalid oss url: {path}, should start with {prefix}"),
337                    ));
338                }
339            }
340            #[cfg(feature = "storage-azdls")]
341            OpenDalStorage::Azdls {
342                configured_scheme,
343                config,
344            } => azdls_create_operator(path, config, configured_scheme)?,
345            #[cfg(all(
346                not(feature = "storage-s3"),
347                not(feature = "storage-fs"),
348                not(feature = "storage-gcs"),
349                not(feature = "storage-oss"),
350                not(feature = "storage-azdls"),
351            ))]
352            _ => {
353                return Err(Error::new(
354                    ErrorKind::FeatureUnsupported,
355                    "No storage service has been enabled",
356                ));
357            }
358        };
359
360        // Transient errors are common for object stores; however there's no
361        // harm in retrying temporary failures for other storage backends as well.
362        let operator = operator.layer(RetryLayer::new());
363        Ok((operator, relative_path))
364    }
365
366    /// Parse scheme.
367    fn parse_scheme(scheme: &str) -> Result<Scheme> {
368        match scheme {
369            "memory" => Ok(Scheme::Memory),
370            "file" | "" => Ok(Scheme::Fs),
371            "s3" | "s3a" => Ok(Scheme::S3),
372            "gs" | "gcs" => Ok(Scheme::Gcs),
373            "oss" => Ok(Scheme::Oss),
374            "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
375            s => Ok(s.parse::<Scheme>()?),
376        }
377    }
378}
379
380#[typetag::serde(name = "OpenDalStorage")]
381#[async_trait]
382impl Storage for OpenDalStorage {
383    async fn exists(&self, path: &str) -> Result<bool> {
384        let (op, relative_path) = self.create_operator(&path)?;
385        Ok(op.exists(relative_path).await?)
386    }
387
388    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
389        let (op, relative_path) = self.create_operator(&path)?;
390        let meta = op.stat(relative_path).await?;
391        Ok(FileMetadata {
392            size: meta.content_length(),
393        })
394    }
395
396    async fn read(&self, path: &str) -> Result<Bytes> {
397        let (op, relative_path) = self.create_operator(&path)?;
398        Ok(op.read(relative_path).await?.to_bytes())
399    }
400
401    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
402        let (op, relative_path) = self.create_operator(&path)?;
403        Ok(Box::new(op.reader(relative_path).await?))
404    }
405
406    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
407        let (op, relative_path) = self.create_operator(&path)?;
408        op.write(relative_path, bs).await?;
409        Ok(())
410    }
411
412    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
413        let (op, relative_path) = self.create_operator(&path)?;
414        Ok(Box::new(op.writer(relative_path).await?))
415    }
416
417    async fn delete(&self, path: &str) -> Result<()> {
418        let (op, relative_path) = self.create_operator(&path)?;
419        Ok(op.delete(relative_path).await?)
420    }
421
422    async fn delete_prefix(&self, path: &str) -> Result<()> {
423        let (op, relative_path) = self.create_operator(&path)?;
424        let path = if relative_path.ends_with('/') {
425            relative_path.to_string()
426        } else {
427            format!("{relative_path}/")
428        };
429        Ok(op.remove_all(&path).await?)
430    }
431
432    #[allow(unreachable_code, unused_variables)]
433    fn new_input(&self, path: &str) -> Result<InputFile> {
434        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
435    }
436
437    #[allow(unreachable_code, unused_variables)]
438    fn new_output(&self, path: &str) -> Result<OutputFile> {
439        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
440    }
441}
442
443// OpenDAL implementations for FileRead and FileWrite traits
444
445#[async_trait]
446impl FileRead for opendal::Reader {
447    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
448        Ok(opendal::Reader::read(self, range).await?.to_bytes())
449    }
450}
451
452#[async_trait]
453impl FileWrite for opendal::Writer {
454    async fn write(&mut self, bs: Bytes) -> Result<()> {
455        Ok(opendal::Writer::write(self, bs).await?)
456    }
457
458    async fn close(&mut self) -> Result<()> {
459        let _ = opendal::Writer::close(self).await?;
460        Ok(())
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[cfg(feature = "storage-memory")]
469    #[test]
470    fn test_default_memory_operator() {
471        let op = default_memory_operator();
472        assert_eq!(op.info().scheme().to_string(), "memory");
473    }
474}