iceberg/io/storage.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use opendal::layers::RetryLayer;
24#[cfg(feature = "storage-azdls")]
25use opendal::services::AzdlsConfig;
26#[cfg(feature = "storage-gcs")]
27use opendal::services::GcsConfig;
28#[cfg(feature = "storage-oss")]
29use opendal::services::OssConfig;
30#[cfg(feature = "storage-s3")]
31use opendal::services::S3Config;
32use opendal::{Operator, Scheme};
33
34#[cfg(feature = "storage-azdls")]
35use super::AzureStorageScheme;
36use super::{
37 FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, StorageConfig,
38};
39#[cfg(feature = "storage-s3")]
40use crate::io::CustomAwsCredentialLoader;
41use crate::{Error, ErrorKind, Result};
42
43/// Trait for storage operations in Iceberg.
44///
45/// The trait supports serialization via `typetag`, allowing storage instances to be
46/// serialized and deserialized across process boundaries.
47///
48/// Third-party implementations can implement this trait to provide custom storage backends.
49///
50/// # Implementing Custom Storage
51///
52/// To implement a custom storage backend:
53///
54/// 1. Create a struct that implements this trait
55/// 2. Add `#[typetag::serde]` attribute for serialization support
56/// 3. Implement all required methods
57///
58/// # Example
59///
60/// ```rust,ignore
61/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62/// struct MyStorage {
63/// // custom fields
64/// }
65///
66/// #[async_trait]
67/// #[typetag::serde]
68/// impl Storage for MyStorage {
69/// async fn exists(&self, path: &str) -> Result<bool> {
70/// // implementation
71/// todo!()
72/// }
73/// // ... implement other methods
74/// }
75///
76/// TODO remove below when the trait is integrated with FileIO and Catalog
77/// # NOTE
78/// This trait is under heavy development and is not used anywhere as of now
79/// Please DO NOT implement it
80/// ```
81#[async_trait]
82#[typetag::serde(tag = "type")]
83pub trait Storage: Debug + Send + Sync {
84 /// Check if a file exists at the given path
85 async fn exists(&self, path: &str) -> Result<bool>;
86
87 /// Get metadata from an input path
88 async fn metadata(&self, path: &str) -> Result<FileMetadata>;
89
90 /// Read bytes from a path
91 async fn read(&self, path: &str) -> Result<Bytes>;
92
93 /// Get FileRead from a path
94 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>>;
95
96 /// Write bytes to an output path
97 async fn write(&self, path: &str, bs: Bytes) -> Result<()>;
98
99 /// Get FileWrite from a path
100 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>>;
101
102 /// Delete a file at the given path
103 async fn delete(&self, path: &str) -> Result<()>;
104
105 /// Delete all files with the given prefix
106 async fn delete_prefix(&self, path: &str) -> Result<()>;
107
108 /// Create a new input file for reading
109 fn new_input(&self, path: &str) -> Result<InputFile>;
110
111 /// Create a new output file for writing
112 fn new_output(&self, path: &str) -> Result<OutputFile>;
113}
114
115/// Factory for creating Storage instances from configuration.
116///
117/// Implement this trait to provide custom storage backends. The factory pattern
118/// allows for lazy initialization of storage instances and enables users to
119/// inject custom storage implementations into catalogs.
120///
121/// # Example
122///
123/// ```rust,ignore
124/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
125/// struct MyCustomStorageFactory {
126/// // custom configuration
127/// }
128///
129/// #[typetag::serde]
130/// impl StorageFactory for MyCustomStorageFactory {
131/// fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
132/// // Create and return custom storage implementation
133/// todo!()
134/// }
135/// }
136///
137/// TODO remove below when the trait is integrated with FileIO and Catalog
138/// # NOTE
139/// This trait is under heavy development and is not used anywhere as of now
140/// Please DO NOT implement it
141/// ```
142#[typetag::serde(tag = "type")]
143pub trait StorageFactory: Debug + Send + Sync {
144 /// Build a new Storage instance from the given configuration.
145 ///
146 /// # Arguments
147 ///
148 /// * `config` - The storage configuration containing scheme and properties
149 ///
150 /// # Returns
151 ///
152 /// A `Result` containing an `Arc<dyn Storage>` on success, or an error
153 /// if the storage could not be created.
154 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
155}
156
157/// The storage carries all supported storage services in iceberg
158#[derive(Debug)]
159pub(crate) enum OpenDalStorage {
160 #[cfg(feature = "storage-memory")]
161 Memory(Operator),
162 #[cfg(feature = "storage-fs")]
163 LocalFs,
164 /// Expects paths of the form `s3[a]://<bucket>/<path>`.
165 #[cfg(feature = "storage-s3")]
166 S3 {
167 /// s3 storage could have `s3://` and `s3a://`.
168 /// Storing the scheme string here to return the correct path.
169 configured_scheme: String,
170 config: Arc<S3Config>,
171 customized_credential_load: Option<CustomAwsCredentialLoader>,
172 },
173 #[cfg(feature = "storage-gcs")]
174 Gcs { config: Arc<GcsConfig> },
175 #[cfg(feature = "storage-oss")]
176 Oss { config: Arc<OssConfig> },
177 /// Expects paths of the form
178 /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
179 /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
180 #[cfg(feature = "storage-azdls")]
181 Azdls {
182 /// Because Azdls accepts multiple possible schemes, we store the full
183 /// passed scheme here to later validate schemes passed via paths.
184 configured_scheme: AzureStorageScheme,
185 config: Arc<AzdlsConfig>,
186 },
187}
188
189impl OpenDalStorage {
190 /// Convert iceberg config to opendal config.
191 pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
192 let (scheme_str, props, extensions) = file_io_builder.into_parts();
193 let _ = (&props, &extensions);
194 let scheme = Self::parse_scheme(&scheme_str)?;
195
196 match scheme {
197 #[cfg(feature = "storage-memory")]
198 Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
199 #[cfg(feature = "storage-fs")]
200 Scheme::Fs => Ok(Self::LocalFs),
201 #[cfg(feature = "storage-s3")]
202 Scheme::S3 => Ok(Self::S3 {
203 configured_scheme: scheme_str,
204 config: super::s3_config_parse(props)?.into(),
205 customized_credential_load: extensions
206 .get::<CustomAwsCredentialLoader>()
207 .map(Arc::unwrap_or_clone),
208 }),
209 #[cfg(feature = "storage-gcs")]
210 Scheme::Gcs => Ok(Self::Gcs {
211 config: super::gcs_config_parse(props)?.into(),
212 }),
213 #[cfg(feature = "storage-oss")]
214 Scheme::Oss => Ok(Self::Oss {
215 config: super::oss_config_parse(props)?.into(),
216 }),
217 #[cfg(feature = "storage-azdls")]
218 Scheme::Azdls => {
219 let scheme = scheme_str.parse::<AzureStorageScheme>()?;
220 Ok(Self::Azdls {
221 config: super::azdls_config_parse(props)?.into(),
222 configured_scheme: scheme,
223 })
224 }
225 // Update doc on [`FileIO`] when adding new schemes.
226 _ => Err(Error::new(
227 ErrorKind::FeatureUnsupported,
228 format!("Constructing file io from scheme: {scheme} not supported now",),
229 )),
230 }
231 }
232
233 /// Creates operator from path.
234 ///
235 /// # Arguments
236 ///
237 /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
238 ///
239 /// # Returns
240 ///
241 /// The return value consists of two parts:
242 ///
243 /// * An [`opendal::Operator`] instance used to operate on file.
244 /// * Relative path to the root uri of [`opendal::Operator`].
245 pub(crate) fn create_operator<'a>(
246 &self,
247 path: &'a impl AsRef<str>,
248 ) -> crate::Result<(Operator, &'a str)> {
249 let path = path.as_ref();
250 let _ = path;
251 let (operator, relative_path): (Operator, &str) = match self {
252 #[cfg(feature = "storage-memory")]
253 OpenDalStorage::Memory(op) => {
254 if let Some(stripped) = path.strip_prefix("memory:/") {
255 Ok::<_, crate::Error>((op.clone(), stripped))
256 } else {
257 Ok::<_, crate::Error>((op.clone(), &path[1..]))
258 }
259 }
260 #[cfg(feature = "storage-fs")]
261 OpenDalStorage::LocalFs => {
262 let op = super::fs_config_build()?;
263
264 if let Some(stripped) = path.strip_prefix("file:/") {
265 Ok::<_, crate::Error>((op, stripped))
266 } else {
267 Ok::<_, crate::Error>((op, &path[1..]))
268 }
269 }
270 #[cfg(feature = "storage-s3")]
271 OpenDalStorage::S3 {
272 configured_scheme,
273 config,
274 customized_credential_load,
275 } => {
276 let op = super::s3_config_build(config, customized_credential_load, path)?;
277 let op_info = op.info();
278
279 // Check prefix of s3 path.
280 let prefix = format!("{}://{}/", configured_scheme, op_info.name());
281 if path.starts_with(&prefix) {
282 Ok((op, &path[prefix.len()..]))
283 } else {
284 Err(Error::new(
285 ErrorKind::DataInvalid,
286 format!("Invalid s3 url: {path}, should start with {prefix}"),
287 ))
288 }
289 }
290 #[cfg(feature = "storage-gcs")]
291 OpenDalStorage::Gcs { config } => {
292 let operator = super::gcs_config_build(config, path)?;
293 let prefix = format!("gs://{}/", operator.info().name());
294 if path.starts_with(&prefix) {
295 Ok((operator, &path[prefix.len()..]))
296 } else {
297 Err(Error::new(
298 ErrorKind::DataInvalid,
299 format!("Invalid gcs url: {path}, should start with {prefix}"),
300 ))
301 }
302 }
303 #[cfg(feature = "storage-oss")]
304 OpenDalStorage::Oss { config } => {
305 let op = super::oss_config_build(config, path)?;
306
307 // Check prefix of oss path.
308 let prefix = format!("oss://{}/", op.info().name());
309 if path.starts_with(&prefix) {
310 Ok((op, &path[prefix.len()..]))
311 } else {
312 Err(Error::new(
313 ErrorKind::DataInvalid,
314 format!("Invalid oss url: {path}, should start with {prefix}"),
315 ))
316 }
317 }
318 #[cfg(feature = "storage-azdls")]
319 OpenDalStorage::Azdls {
320 configured_scheme,
321 config,
322 } => super::azdls_create_operator(path, config, configured_scheme),
323 #[cfg(all(
324 not(feature = "storage-s3"),
325 not(feature = "storage-fs"),
326 not(feature = "storage-gcs"),
327 not(feature = "storage-oss"),
328 not(feature = "storage-azdls"),
329 ))]
330 _ => Err(Error::new(
331 ErrorKind::FeatureUnsupported,
332 "No storage service has been enabled",
333 )),
334 }?;
335
336 // Transient errors are common for object stores; however there's no
337 // harm in retrying temporary failures for other storage backends as well.
338 let operator = operator.layer(RetryLayer::new());
339
340 Ok((operator, relative_path))
341 }
342
343 /// Parse scheme.
344 fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
345 match scheme {
346 "memory" => Ok(Scheme::Memory),
347 "file" | "" => Ok(Scheme::Fs),
348 "s3" | "s3a" => Ok(Scheme::S3),
349 "gs" | "gcs" => Ok(Scheme::Gcs),
350 "oss" => Ok(Scheme::Oss),
351 "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
352 s => Ok(s.parse::<Scheme>()?),
353 }
354 }
355}