iceberg/io/
storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use opendal::layers::RetryLayer;
24#[cfg(feature = "storage-azdls")]
25use opendal::services::AzdlsConfig;
26#[cfg(feature = "storage-gcs")]
27use opendal::services::GcsConfig;
28#[cfg(feature = "storage-oss")]
29use opendal::services::OssConfig;
30#[cfg(feature = "storage-s3")]
31use opendal::services::S3Config;
32use opendal::{Operator, Scheme};
33
34#[cfg(feature = "storage-azdls")]
35use super::AzureStorageScheme;
36use super::{
37    FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, StorageConfig,
38};
39#[cfg(feature = "storage-s3")]
40use crate::io::CustomAwsCredentialLoader;
41use crate::{Error, ErrorKind, Result};
42
43/// Trait for storage operations in Iceberg.
44///
45/// The trait supports serialization via `typetag`, allowing storage instances to be
46/// serialized and deserialized across process boundaries.
47///
48/// Third-party implementations can implement this trait to provide custom storage backends.
49///
50/// # Implementing Custom Storage
51///
52/// To implement a custom storage backend:
53///
54/// 1. Create a struct that implements this trait
55/// 2. Add `#[typetag::serde]` attribute for serialization support
56/// 3. Implement all required methods
57///
58/// # Example
59///
60/// ```rust,ignore
61/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62/// struct MyStorage {
63///     // custom fields
64/// }
65///
66/// #[async_trait]
67/// #[typetag::serde]
68/// impl Storage for MyStorage {
69///     async fn exists(&self, path: &str) -> Result<bool> {
70///         // implementation
71///         todo!()
72///     }
73///     // ... implement other methods
74/// }
75///
76/// TODO remove below when the trait is integrated with FileIO and Catalog
77/// # NOTE
78/// This trait is under heavy development and is not used anywhere as of now
79/// Please DO NOT implement it
80/// ```
81#[async_trait]
82#[typetag::serde(tag = "type")]
83pub trait Storage: Debug + Send + Sync {
84    /// Check if a file exists at the given path
85    async fn exists(&self, path: &str) -> Result<bool>;
86
87    /// Get metadata from an input path
88    async fn metadata(&self, path: &str) -> Result<FileMetadata>;
89
90    /// Read bytes from a path
91    async fn read(&self, path: &str) -> Result<Bytes>;
92
93    /// Get FileRead from a path
94    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>>;
95
96    /// Write bytes to an output path
97    async fn write(&self, path: &str, bs: Bytes) -> Result<()>;
98
99    /// Get FileWrite from a path
100    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>>;
101
102    /// Delete a file at the given path
103    async fn delete(&self, path: &str) -> Result<()>;
104
105    /// Delete all files with the given prefix
106    async fn delete_prefix(&self, path: &str) -> Result<()>;
107
108    /// Create a new input file for reading
109    fn new_input(&self, path: &str) -> Result<InputFile>;
110
111    /// Create a new output file for writing
112    fn new_output(&self, path: &str) -> Result<OutputFile>;
113}
114
115/// Factory for creating Storage instances from configuration.
116///
117/// Implement this trait to provide custom storage backends. The factory pattern
118/// allows for lazy initialization of storage instances and enables users to
119/// inject custom storage implementations into catalogs.
120///
121/// # Example
122///
123/// ```rust,ignore
124/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
125/// struct MyCustomStorageFactory {
126///     // custom configuration
127/// }
128///
129/// #[typetag::serde]
130/// impl StorageFactory for MyCustomStorageFactory {
131///     fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
132///         // Create and return custom storage implementation
133///         todo!()
134///     }
135/// }
136///
137/// TODO remove below when the trait is integrated with FileIO and Catalog
138/// # NOTE
139/// This trait is under heavy development and is not used anywhere as of now
140/// Please DO NOT implement it
141/// ```
142#[typetag::serde(tag = "type")]
143pub trait StorageFactory: Debug + Send + Sync {
144    /// Build a new Storage instance from the given configuration.
145    ///
146    /// # Arguments
147    ///
148    /// * `config` - The storage configuration containing scheme and properties
149    ///
150    /// # Returns
151    ///
152    /// A `Result` containing an `Arc<dyn Storage>` on success, or an error
153    /// if the storage could not be created.
154    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
155}
156
157/// The storage carries all supported storage services in iceberg
158#[derive(Debug)]
159pub(crate) enum OpenDalStorage {
160    #[cfg(feature = "storage-memory")]
161    Memory(Operator),
162    #[cfg(feature = "storage-fs")]
163    LocalFs,
164    /// Expects paths of the form `s3[a]://<bucket>/<path>`.
165    #[cfg(feature = "storage-s3")]
166    S3 {
167        /// s3 storage could have `s3://` and `s3a://`.
168        /// Storing the scheme string here to return the correct path.
169        configured_scheme: String,
170        config: Arc<S3Config>,
171        customized_credential_load: Option<CustomAwsCredentialLoader>,
172    },
173    #[cfg(feature = "storage-gcs")]
174    Gcs { config: Arc<GcsConfig> },
175    #[cfg(feature = "storage-oss")]
176    Oss { config: Arc<OssConfig> },
177    /// Expects paths of the form
178    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
179    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
180    #[cfg(feature = "storage-azdls")]
181    Azdls {
182        /// Because Azdls accepts multiple possible schemes, we store the full
183        /// passed scheme here to later validate schemes passed via paths.
184        configured_scheme: AzureStorageScheme,
185        config: Arc<AzdlsConfig>,
186    },
187}
188
189impl OpenDalStorage {
190    /// Convert iceberg config to opendal config.
191    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
192        let (scheme_str, props, extensions) = file_io_builder.into_parts();
193        let _ = (&props, &extensions);
194        let scheme = Self::parse_scheme(&scheme_str)?;
195
196        match scheme {
197            #[cfg(feature = "storage-memory")]
198            Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
199            #[cfg(feature = "storage-fs")]
200            Scheme::Fs => Ok(Self::LocalFs),
201            #[cfg(feature = "storage-s3")]
202            Scheme::S3 => Ok(Self::S3 {
203                configured_scheme: scheme_str,
204                config: super::s3_config_parse(props)?.into(),
205                customized_credential_load: extensions
206                    .get::<CustomAwsCredentialLoader>()
207                    .map(Arc::unwrap_or_clone),
208            }),
209            #[cfg(feature = "storage-gcs")]
210            Scheme::Gcs => Ok(Self::Gcs {
211                config: super::gcs_config_parse(props)?.into(),
212            }),
213            #[cfg(feature = "storage-oss")]
214            Scheme::Oss => Ok(Self::Oss {
215                config: super::oss_config_parse(props)?.into(),
216            }),
217            #[cfg(feature = "storage-azdls")]
218            Scheme::Azdls => {
219                let scheme = scheme_str.parse::<AzureStorageScheme>()?;
220                Ok(Self::Azdls {
221                    config: super::azdls_config_parse(props)?.into(),
222                    configured_scheme: scheme,
223                })
224            }
225            // Update doc on [`FileIO`] when adding new schemes.
226            _ => Err(Error::new(
227                ErrorKind::FeatureUnsupported,
228                format!("Constructing file io from scheme: {scheme} not supported now",),
229            )),
230        }
231    }
232
233    /// Creates operator from path.
234    ///
235    /// # Arguments
236    ///
237    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
238    ///
239    /// # Returns
240    ///
241    /// The return value consists of two parts:
242    ///
243    /// * An [`opendal::Operator`] instance used to operate on file.
244    /// * Relative path to the root uri of [`opendal::Operator`].
245    pub(crate) fn create_operator<'a>(
246        &self,
247        path: &'a impl AsRef<str>,
248    ) -> crate::Result<(Operator, &'a str)> {
249        let path = path.as_ref();
250        let _ = path;
251        let (operator, relative_path): (Operator, &str) = match self {
252            #[cfg(feature = "storage-memory")]
253            OpenDalStorage::Memory(op) => {
254                if let Some(stripped) = path.strip_prefix("memory:/") {
255                    Ok::<_, crate::Error>((op.clone(), stripped))
256                } else {
257                    Ok::<_, crate::Error>((op.clone(), &path[1..]))
258                }
259            }
260            #[cfg(feature = "storage-fs")]
261            OpenDalStorage::LocalFs => {
262                let op = super::fs_config_build()?;
263
264                if let Some(stripped) = path.strip_prefix("file:/") {
265                    Ok::<_, crate::Error>((op, stripped))
266                } else {
267                    Ok::<_, crate::Error>((op, &path[1..]))
268                }
269            }
270            #[cfg(feature = "storage-s3")]
271            OpenDalStorage::S3 {
272                configured_scheme,
273                config,
274                customized_credential_load,
275            } => {
276                let op = super::s3_config_build(config, customized_credential_load, path)?;
277                let op_info = op.info();
278
279                // Check prefix of s3 path.
280                let prefix = format!("{}://{}/", configured_scheme, op_info.name());
281                if path.starts_with(&prefix) {
282                    Ok((op, &path[prefix.len()..]))
283                } else {
284                    Err(Error::new(
285                        ErrorKind::DataInvalid,
286                        format!("Invalid s3 url: {path}, should start with {prefix}"),
287                    ))
288                }
289            }
290            #[cfg(feature = "storage-gcs")]
291            OpenDalStorage::Gcs { config } => {
292                let operator = super::gcs_config_build(config, path)?;
293                let prefix = format!("gs://{}/", operator.info().name());
294                if path.starts_with(&prefix) {
295                    Ok((operator, &path[prefix.len()..]))
296                } else {
297                    Err(Error::new(
298                        ErrorKind::DataInvalid,
299                        format!("Invalid gcs url: {path}, should start with {prefix}"),
300                    ))
301                }
302            }
303            #[cfg(feature = "storage-oss")]
304            OpenDalStorage::Oss { config } => {
305                let op = super::oss_config_build(config, path)?;
306
307                // Check prefix of oss path.
308                let prefix = format!("oss://{}/", op.info().name());
309                if path.starts_with(&prefix) {
310                    Ok((op, &path[prefix.len()..]))
311                } else {
312                    Err(Error::new(
313                        ErrorKind::DataInvalid,
314                        format!("Invalid oss url: {path}, should start with {prefix}"),
315                    ))
316                }
317            }
318            #[cfg(feature = "storage-azdls")]
319            OpenDalStorage::Azdls {
320                configured_scheme,
321                config,
322            } => super::azdls_create_operator(path, config, configured_scheme),
323            #[cfg(all(
324                not(feature = "storage-s3"),
325                not(feature = "storage-fs"),
326                not(feature = "storage-gcs"),
327                not(feature = "storage-oss"),
328                not(feature = "storage-azdls"),
329            ))]
330            _ => Err(Error::new(
331                ErrorKind::FeatureUnsupported,
332                "No storage service has been enabled",
333            )),
334        }?;
335
336        // Transient errors are common for object stores; however there's no
337        // harm in retrying temporary failures for other storage backends as well.
338        let operator = operator.layer(RetryLayer::new());
339
340        Ok((operator, relative_path))
341    }
342
343    /// Parse scheme.
344    fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
345        match scheme {
346            "memory" => Ok(Scheme::Memory),
347            "file" | "" => Ok(Scheme::Fs),
348            "s3" | "s3a" => Ok(Scheme::S3),
349            "gs" | "gcs" => Ok(Scheme::Gcs),
350            "oss" => Ok(Scheme::Oss),
351            "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
352            s => Ok(s.parse::<Scheme>()?),
353        }
354    }
355}