iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use url::Url;
25
26use super::opendal::OpenDalStorage;
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30/// FileIO implementation, used to manipulate files in underlying storage.
31///
32/// # Note
33///
34/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`.
35/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`.
36///
37/// Supported storages:
38///
39/// | Storage            | Feature Flag      | Expected Path Format             | Schemes                       |
40/// |--------------------|-------------------|----------------------------------| ------------------------------|
41/// | Local file system  | `storage-fs`      | `file`                           | `file://path/to/file`         |
42/// | Memory             | `storage-memory`  | `memory`                         | `memory://path/to/file`       |
43/// | S3                 | `storage-s3`      | `s3`, `s3a`                      | `s3://<bucket>/path/to/file`  |
44/// | GCS                | `storage-gcs`     | `gs`, `gcs`                      | `gs://<bucket>/path/to/file`  |
45/// | OSS                | `storage-oss`     | `oss`                            | `oss://<bucket>/path/to/file` |
46/// | Azure Datalake     | `storage-azdls`   | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
47#[derive(Clone, Debug)]
48pub struct FileIO {
49    builder: FileIOBuilder,
50
51    inner: Arc<OpenDalStorage>,
52}
53
54impl FileIO {
55    /// Convert FileIO into [`FileIOBuilder`] which used to build this FileIO.
56    ///
57    /// This function is useful when you want serialize and deserialize FileIO across
58    /// distributed systems.
59    pub fn into_builder(self) -> FileIOBuilder {
60        self.builder
61    }
62
63    /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
64    ///
65    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
66    /// - If it's not a valid url, will try to detect if it's a file path.
67    ///
68    /// Otherwise will return parsing error.
69    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70        let url = Url::parse(path.as_ref())
71            .map_err(Error::from)
72            .or_else(|e| {
73                Url::from_file_path(path.as_ref()).map_err(|_| {
74                    Error::new(
75                        ErrorKind::DataInvalid,
76                        "Input is neither a valid url nor path",
77                    )
78                    .with_context("input", path.as_ref().to_string())
79                    .with_source(e)
80                })
81            })?;
82
83        Ok(FileIOBuilder::new(url.scheme()))
84    }
85
86    /// Deletes file.
87    ///
88    /// # Arguments
89    ///
90    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
91    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92        self.inner.delete(path.as_ref()).await
93    }
94
95    /// Remove the path and all nested dirs and files recursively.
96    ///
97    /// # Arguments
98    ///
99    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
100    ///
101    /// # Behavior
102    ///
103    /// - If the path is a file or not exist, this function will be no-op.
104    /// - If the path is a empty directory, this function will remove the directory itself.
105    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
106    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
107        self.inner.delete_prefix(path.as_ref()).await
108    }
109
110    /// Check file exists.
111    ///
112    /// # Arguments
113    ///
114    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
115    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
116        self.inner.exists(path.as_ref()).await
117    }
118
119    /// Creates input file.
120    ///
121    /// # Arguments
122    ///
123    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
124    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
125        self.inner.new_input(path.as_ref())
126    }
127
128    /// Creates output file.
129    ///
130    /// # Arguments
131    ///
132    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
133    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
134        self.inner.new_output(path.as_ref())
135    }
136}
137
138/// Container for storing type-safe extensions used to configure underlying FileIO behavior.
139#[derive(Clone, Debug, Default)]
140pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
141
142impl Extensions {
143    /// Add an extension.
144    pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
145        self.0.insert(TypeId::of::<T>(), Arc::new(ext));
146    }
147
148    /// Extends the current set of extensions with another set of extensions.
149    pub fn extend(&mut self, extensions: Extensions) {
150        self.0.extend(extensions.0);
151    }
152
153    /// Fetch an extension.
154    pub fn get<T>(&self) -> Option<Arc<T>>
155    where T: 'static + Send + Sync + Clone {
156        let type_id = TypeId::of::<T>();
157        self.0
158            .get(&type_id)
159            .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
160    }
161}
162
163/// Builder for [`FileIO`].
164#[derive(Clone, Debug)]
165pub struct FileIOBuilder {
166    /// This is used to infer scheme of operator.
167    ///
168    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io.
169    scheme_str: Option<String>,
170    /// Arguments for operator.
171    props: HashMap<String, String>,
172    /// Optional extensions to configure the underlying FileIO behavior.
173    extensions: Extensions,
174}
175
176impl FileIOBuilder {
177    /// Creates a new builder with scheme.
178    /// See [`FileIO`] for supported schemes.
179    pub fn new(scheme_str: impl ToString) -> Self {
180        Self {
181            scheme_str: Some(scheme_str.to_string()),
182            props: HashMap::default(),
183            extensions: Extensions::default(),
184        }
185    }
186
187    /// Creates a new builder for local file io.
188    pub fn new_fs_io() -> Self {
189        Self {
190            scheme_str: None,
191            props: HashMap::default(),
192            extensions: Extensions::default(),
193        }
194    }
195
196    /// Fetch the scheme string.
197    ///
198    /// The scheme_str will be empty if it's None.
199    pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
200        (
201            self.scheme_str.unwrap_or_default(),
202            self.props,
203            self.extensions,
204        )
205    }
206
207    /// Add argument for operator.
208    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
209        self.props.insert(key.to_string(), value.to_string());
210        self
211    }
212
213    /// Add argument for operator.
214    pub fn with_props(
215        mut self,
216        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
217    ) -> Self {
218        self.props
219            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
220        self
221    }
222
223    /// Add an extension to the file IO builder.
224    pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
225        self.extensions.add(ext);
226        self
227    }
228
229    /// Adds multiple extensions to the file IO builder.
230    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
231        self.extensions.extend(extensions);
232        self
233    }
234
235    /// Fetch an extension from the file IO builder.
236    pub fn extension<T>(&self) -> Option<Arc<T>>
237    where T: 'static + Send + Sync + Clone {
238        self.extensions.get::<T>()
239    }
240
241    /// Builds [`FileIO`].
242    pub fn build(self) -> Result<FileIO> {
243        let storage = OpenDalStorage::build(self.clone())?;
244        Ok(FileIO {
245            builder: self,
246            inner: Arc::new(storage),
247        })
248    }
249}
250
251/// The struct the represents the metadata of a file.
252///
253/// TODO: we can add last modified time, content type, etc. in the future.
254pub struct FileMetadata {
255    /// The size of the file.
256    pub size: u64,
257}
258
259/// Trait for reading file.
260///
261/// # TODO
262/// It's possible for us to remove the async_trait, but we need to figure
263/// out how to handle the object safety.
264#[async_trait::async_trait]
265pub trait FileRead: Send + Sync + Unpin + 'static {
266    /// Read file content with given range.
267    ///
268    /// TODO: we can support reading non-contiguous bytes in the future.
269    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
270}
271
272/// Input file is used for reading from files.
273#[derive(Debug)]
274pub struct InputFile {
275    storage: Arc<dyn Storage>,
276    // Absolute path of file.
277    path: String,
278}
279
280impl InputFile {
281    /// Creates a new input file.
282    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
283        Self { storage, path }
284    }
285
286    /// Absolute path to root uri.
287    pub fn location(&self) -> &str {
288        &self.path
289    }
290
291    /// Check if file exists.
292    pub async fn exists(&self) -> crate::Result<bool> {
293        self.storage.exists(&self.path).await
294    }
295
296    /// Fetch and returns metadata of file.
297    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
298        self.storage.metadata(&self.path).await
299    }
300
301    /// Read and returns whole content of file.
302    ///
303    /// For continuous reading, use [`Self::reader`] instead.
304    pub async fn read(&self) -> crate::Result<Bytes> {
305        self.storage.read(&self.path).await
306    }
307
308    /// Creates [`FileRead`] for continuous reading.
309    ///
310    /// For one-time reading, use [`Self::read`] instead.
311    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
312        self.storage.reader(&self.path).await
313    }
314}
315
316/// Trait for writing file.
317///
318/// # TODO
319///
320/// It's possible for us to remove the async_trait, but we need to figure
321/// out how to handle the object safety.
322#[async_trait::async_trait]
323pub trait FileWrite: Send + Unpin + 'static {
324    /// Write bytes to file.
325    ///
326    /// TODO: we can support writing non-contiguous bytes in the future.
327    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
328
329    /// Close file.
330    ///
331    /// Calling close on closed file will generate an error.
332    async fn close(&mut self) -> crate::Result<()>;
333}
334
335/// Output file is used for writing to files..
336#[derive(Debug)]
337pub struct OutputFile {
338    storage: Arc<dyn Storage>,
339    // Absolute path of file.
340    path: String,
341}
342
343impl OutputFile {
344    /// Creates a new output file.
345    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
346        Self { storage, path }
347    }
348
349    /// Relative path to root uri.
350    pub fn location(&self) -> &str {
351        &self.path
352    }
353
354    /// Checks if file exists.
355    pub async fn exists(&self) -> Result<bool> {
356        self.storage.exists(&self.path).await
357    }
358
359    /// Deletes file.
360    ///
361    /// If the file does not exist, it will not return error.
362    pub async fn delete(&self) -> Result<()> {
363        self.storage.delete(&self.path).await
364    }
365
366    /// Converts into [`InputFile`].
367    pub fn to_input_file(self) -> InputFile {
368        InputFile {
369            storage: self.storage,
370            path: self.path,
371        }
372    }
373
374    /// Create a new output file with given bytes.
375    ///
376    /// # Notes
377    ///
378    /// Calling `write` will overwrite the file if it exists.
379    /// For continuous writing, use [`Self::writer`].
380    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
381        self.storage.write(&self.path, bs).await
382    }
383
384    /// Creates output file for continuous writing.
385    ///
386    /// # Notes
387    ///
388    /// For one-time writing, use [`Self::write`] instead.
389    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
390        self.storage.writer(&self.path).await
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use std::fs::{File, create_dir_all};
397    use std::io::Write;
398    use std::path::Path;
399
400    use bytes::Bytes;
401    use futures::AsyncReadExt;
402    use futures::io::AllowStdIo;
403    use tempfile::TempDir;
404
405    use super::{FileIO, FileIOBuilder};
406
407    fn create_local_file_io() -> FileIO {
408        FileIOBuilder::new_fs_io().build().unwrap()
409    }
410
411    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
412        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
413        let mut f = File::create(path).unwrap();
414        write!(f, "{s}").unwrap();
415    }
416
417    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
418        let mut f = AllowStdIo::new(File::open(path).unwrap());
419        let mut s = String::new();
420        f.read_to_string(&mut s).await.unwrap();
421        s
422    }
423
424    #[tokio::test]
425    async fn test_local_input_file() {
426        let tmp_dir = TempDir::new().unwrap();
427
428        let file_name = "a.txt";
429        let content = "Iceberg loves rust.";
430
431        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
432        write_to_file(content, &full_path);
433
434        let file_io = create_local_file_io();
435        let input_file = file_io.new_input(&full_path).unwrap();
436
437        assert!(input_file.exists().await.unwrap());
438        // Remove heading slash
439        assert_eq!(&full_path, input_file.location());
440        let read_content = read_from_file(full_path).await;
441
442        assert_eq!(content, &read_content);
443    }
444
445    #[tokio::test]
446    async fn test_delete_local_file() {
447        let tmp_dir = TempDir::new().unwrap();
448
449        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
450        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
451        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
452        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
453        write_to_file("Iceberg loves rust.", &a_path);
454        write_to_file("Iceberg loves rust.", &b_path);
455        write_to_file("Iceberg loves rust.", &c_path);
456
457        let file_io = create_local_file_io();
458        assert!(file_io.exists(&a_path).await.unwrap());
459
460        // Remove a file should be no-op.
461        file_io.delete_prefix(&a_path).await.unwrap();
462        assert!(file_io.exists(&a_path).await.unwrap());
463
464        // Remove a not exist dir should be no-op.
465        file_io.delete_prefix("not_exists/").await.unwrap();
466
467        // Remove a dir should remove all files in it.
468        file_io.delete_prefix(&sub_dir_path).await.unwrap();
469        assert!(!file_io.exists(&b_path).await.unwrap());
470        assert!(!file_io.exists(&c_path).await.unwrap());
471        assert!(file_io.exists(&a_path).await.unwrap());
472
473        file_io.delete(&a_path).await.unwrap();
474        assert!(!file_io.exists(&a_path).await.unwrap());
475    }
476
477    #[tokio::test]
478    async fn test_delete_non_exist_file() {
479        let tmp_dir = TempDir::new().unwrap();
480
481        let file_name = "a.txt";
482        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
483
484        let file_io = create_local_file_io();
485        assert!(!file_io.exists(&full_path).await.unwrap());
486        assert!(file_io.delete(&full_path).await.is_ok());
487        assert!(file_io.delete_prefix(&full_path).await.is_ok());
488    }
489
490    #[tokio::test]
491    async fn test_local_output_file() {
492        let tmp_dir = TempDir::new().unwrap();
493
494        let file_name = "a.txt";
495        let content = "Iceberg loves rust.";
496
497        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
498
499        let file_io = create_local_file_io();
500        let output_file = file_io.new_output(&full_path).unwrap();
501
502        assert!(!output_file.exists().await.unwrap());
503        {
504            output_file.write(content.into()).await.unwrap();
505        }
506
507        assert_eq!(&full_path, output_file.location());
508
509        let read_content = read_from_file(full_path).await;
510
511        assert_eq!(content, &read_content);
512    }
513
514    #[test]
515    fn test_create_file_from_path() {
516        let io = FileIO::from_path("/tmp/a").unwrap();
517        assert_eq!("file", io.scheme_str.unwrap().as_str());
518
519        let io = FileIO::from_path("file:/tmp/b").unwrap();
520        assert_eq!("file", io.scheme_str.unwrap().as_str());
521
522        let io = FileIO::from_path("file:///tmp/c").unwrap();
523        assert_eq!("file", io.scheme_str.unwrap().as_str());
524
525        let io = FileIO::from_path("s3://bucket/a").unwrap();
526        assert_eq!("s3", io.scheme_str.unwrap().as_str());
527
528        let io = FileIO::from_path("tmp/||c");
529        assert!(io.is_err());
530    }
531
532    #[tokio::test]
533    async fn test_memory_io() {
534        let io = FileIOBuilder::new("memory").build().unwrap();
535
536        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
537
538        let output_file = io.new_output(&path).unwrap();
539        output_file.write("test".into()).await.unwrap();
540
541        assert!(io.exists(&path.clone()).await.unwrap());
542        let input_file = io.new_input(&path).unwrap();
543        let content = input_file.read().await.unwrap();
544        assert_eq!(content, Bytes::from("test"));
545
546        io.delete(&path).await.unwrap();
547        assert!(!io.exists(&path).await.unwrap());
548    }
549}