iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use url::Url;
25
26use super::storage::{OpenDalStorage, Storage};
27use crate::{Error, ErrorKind, Result};
28
29/// FileIO implementation, used to manipulate files in underlying storage.
30///
31/// # Note
32///
33/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`.
34/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`.
35///
36/// Supported storages:
37///
38/// | Storage            | Feature Flag      | Expected Path Format             | Schemes                       |
39/// |--------------------|-------------------|----------------------------------| ------------------------------|
40/// | Local file system  | `storage-fs`      | `file`                           | `file://path/to/file`         |
41/// | Memory             | `storage-memory`  | `memory`                         | `memory://path/to/file`       |
42/// | S3                 | `storage-s3`      | `s3`, `s3a`                      | `s3://<bucket>/path/to/file`  |
43/// | GCS                | `storage-gcs`     | `gs`, `gcs`                      | `gs://<bucket>/path/to/file`  |
44/// | OSS                | `storage-oss`     | `oss`                            | `oss://<bucket>/path/to/file` |
45/// | Azure Datalake     | `storage-azdls`   | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
46#[derive(Clone, Debug)]
47pub struct FileIO {
48    builder: FileIOBuilder,
49
50    inner: Arc<OpenDalStorage>,
51}
52
53impl FileIO {
54    /// Convert FileIO into [`FileIOBuilder`] which used to build this FileIO.
55    ///
56    /// This function is useful when you want serialize and deserialize FileIO across
57    /// distributed systems.
58    pub fn into_builder(self) -> FileIOBuilder {
59        self.builder
60    }
61
62    /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
63    ///
64    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
65    /// - If it's not a valid url, will try to detect if it's a file path.
66    ///
67    /// Otherwise will return parsing error.
68    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
69        let url = Url::parse(path.as_ref())
70            .map_err(Error::from)
71            .or_else(|e| {
72                Url::from_file_path(path.as_ref()).map_err(|_| {
73                    Error::new(
74                        ErrorKind::DataInvalid,
75                        "Input is neither a valid url nor path",
76                    )
77                    .with_context("input", path.as_ref().to_string())
78                    .with_source(e)
79                })
80            })?;
81
82        Ok(FileIOBuilder::new(url.scheme()))
83    }
84
85    /// Deletes file.
86    ///
87    /// # Arguments
88    ///
89    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
90    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
91        self.inner.delete(path.as_ref()).await
92    }
93
94    /// Remove the path and all nested dirs and files recursively.
95    ///
96    /// # Arguments
97    ///
98    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
99    ///
100    /// # Behavior
101    ///
102    /// - If the path is a file or not exist, this function will be no-op.
103    /// - If the path is a empty directory, this function will remove the directory itself.
104    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
105    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
106        self.inner.delete_prefix(path.as_ref()).await
107    }
108
109    /// Check file exists.
110    ///
111    /// # Arguments
112    ///
113    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
114    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
115        self.inner.exists(path.as_ref()).await
116    }
117
118    /// Creates input file.
119    ///
120    /// # Arguments
121    ///
122    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
123    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
124        self.inner.new_input(path.as_ref())
125    }
126
127    /// Creates output file.
128    ///
129    /// # Arguments
130    ///
131    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
132    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
133        self.inner.new_output(path.as_ref())
134    }
135}
136
137/// Container for storing type-safe extensions used to configure underlying FileIO behavior.
138#[derive(Clone, Debug, Default)]
139pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
140
141impl Extensions {
142    /// Add an extension.
143    pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
144        self.0.insert(TypeId::of::<T>(), Arc::new(ext));
145    }
146
147    /// Extends the current set of extensions with another set of extensions.
148    pub fn extend(&mut self, extensions: Extensions) {
149        self.0.extend(extensions.0);
150    }
151
152    /// Fetch an extension.
153    pub fn get<T>(&self) -> Option<Arc<T>>
154    where T: 'static + Send + Sync + Clone {
155        let type_id = TypeId::of::<T>();
156        self.0
157            .get(&type_id)
158            .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
159    }
160}
161
162/// Builder for [`FileIO`].
163#[derive(Clone, Debug)]
164pub struct FileIOBuilder {
165    /// This is used to infer scheme of operator.
166    ///
167    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io.
168    scheme_str: Option<String>,
169    /// Arguments for operator.
170    props: HashMap<String, String>,
171    /// Optional extensions to configure the underlying FileIO behavior.
172    extensions: Extensions,
173}
174
175impl FileIOBuilder {
176    /// Creates a new builder with scheme.
177    /// See [`FileIO`] for supported schemes.
178    pub fn new(scheme_str: impl ToString) -> Self {
179        Self {
180            scheme_str: Some(scheme_str.to_string()),
181            props: HashMap::default(),
182            extensions: Extensions::default(),
183        }
184    }
185
186    /// Creates a new builder for local file io.
187    pub fn new_fs_io() -> Self {
188        Self {
189            scheme_str: None,
190            props: HashMap::default(),
191            extensions: Extensions::default(),
192        }
193    }
194
195    /// Fetch the scheme string.
196    ///
197    /// The scheme_str will be empty if it's None.
198    pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
199        (
200            self.scheme_str.unwrap_or_default(),
201            self.props,
202            self.extensions,
203        )
204    }
205
206    /// Add argument for operator.
207    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
208        self.props.insert(key.to_string(), value.to_string());
209        self
210    }
211
212    /// Add argument for operator.
213    pub fn with_props(
214        mut self,
215        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
216    ) -> Self {
217        self.props
218            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219        self
220    }
221
222    /// Add an extension to the file IO builder.
223    pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
224        self.extensions.add(ext);
225        self
226    }
227
228    /// Adds multiple extensions to the file IO builder.
229    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
230        self.extensions.extend(extensions);
231        self
232    }
233
234    /// Fetch an extension from the file IO builder.
235    pub fn extension<T>(&self) -> Option<Arc<T>>
236    where T: 'static + Send + Sync + Clone {
237        self.extensions.get::<T>()
238    }
239
240    /// Builds [`FileIO`].
241    pub fn build(self) -> Result<FileIO> {
242        let storage = OpenDalStorage::build(self.clone())?;
243        Ok(FileIO {
244            builder: self,
245            inner: Arc::new(storage),
246        })
247    }
248}
249
250/// The struct the represents the metadata of a file.
251///
252/// TODO: we can add last modified time, content type, etc. in the future.
253pub struct FileMetadata {
254    /// The size of the file.
255    pub size: u64,
256}
257
258/// Trait for reading file.
259///
260/// # TODO
261/// It's possible for us to remove the async_trait, but we need to figure
262/// out how to handle the object safety.
263#[async_trait::async_trait]
264pub trait FileRead: Send + Sync + Unpin + 'static {
265    /// Read file content with given range.
266    ///
267    /// TODO: we can support reading non-contiguous bytes in the future.
268    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
269}
270
271/// Input file is used for reading from files.
272#[derive(Debug)]
273pub struct InputFile {
274    storage: Arc<dyn Storage>,
275    // Absolute path of file.
276    path: String,
277}
278
279impl InputFile {
280    /// Creates a new input file.
281    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
282        Self { storage, path }
283    }
284
285    /// Absolute path to root uri.
286    pub fn location(&self) -> &str {
287        &self.path
288    }
289
290    /// Check if file exists.
291    pub async fn exists(&self) -> crate::Result<bool> {
292        self.storage.exists(&self.path).await
293    }
294
295    /// Fetch and returns metadata of file.
296    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
297        self.storage.metadata(&self.path).await
298    }
299
300    /// Read and returns whole content of file.
301    ///
302    /// For continuous reading, use [`Self::reader`] instead.
303    pub async fn read(&self) -> crate::Result<Bytes> {
304        self.storage.read(&self.path).await
305    }
306
307    /// Creates [`FileRead`] for continuous reading.
308    ///
309    /// For one-time reading, use [`Self::read`] instead.
310    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
311        self.storage.reader(&self.path).await
312    }
313}
314
315/// Trait for writing file.
316///
317/// # TODO
318///
319/// It's possible for us to remove the async_trait, but we need to figure
320/// out how to handle the object safety.
321#[async_trait::async_trait]
322pub trait FileWrite: Send + Unpin + 'static {
323    /// Write bytes to file.
324    ///
325    /// TODO: we can support writing non-contiguous bytes in the future.
326    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
327
328    /// Close file.
329    ///
330    /// Calling close on closed file will generate an error.
331    async fn close(&mut self) -> crate::Result<()>;
332}
333
334/// Output file is used for writing to files..
335#[derive(Debug)]
336pub struct OutputFile {
337    storage: Arc<dyn Storage>,
338    // Absolute path of file.
339    path: String,
340}
341
342impl OutputFile {
343    /// Creates a new output file.
344    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
345        Self { storage, path }
346    }
347
348    /// Relative path to root uri.
349    pub fn location(&self) -> &str {
350        &self.path
351    }
352
353    /// Checks if file exists.
354    pub async fn exists(&self) -> Result<bool> {
355        self.storage.exists(&self.path).await
356    }
357
358    /// Deletes file.
359    ///
360    /// If the file does not exist, it will not return error.
361    pub async fn delete(&self) -> Result<()> {
362        self.storage.delete(&self.path).await
363    }
364
365    /// Converts into [`InputFile`].
366    pub fn to_input_file(self) -> InputFile {
367        InputFile {
368            storage: self.storage,
369            path: self.path,
370        }
371    }
372
373    /// Create a new output file with given bytes.
374    ///
375    /// # Notes
376    ///
377    /// Calling `write` will overwrite the file if it exists.
378    /// For continuous writing, use [`Self::writer`].
379    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
380        self.storage.write(&self.path, bs).await
381    }
382
383    /// Creates output file for continuous writing.
384    ///
385    /// # Notes
386    ///
387    /// For one-time writing, use [`Self::write`] instead.
388    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
389        self.storage.writer(&self.path).await
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use std::fs::{File, create_dir_all};
396    use std::io::Write;
397    use std::path::Path;
398
399    use bytes::Bytes;
400    use futures::AsyncReadExt;
401    use futures::io::AllowStdIo;
402    use tempfile::TempDir;
403
404    use super::{FileIO, FileIOBuilder};
405
406    fn create_local_file_io() -> FileIO {
407        FileIOBuilder::new_fs_io().build().unwrap()
408    }
409
410    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
411        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
412        let mut f = File::create(path).unwrap();
413        write!(f, "{s}").unwrap();
414    }
415
416    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
417        let mut f = AllowStdIo::new(File::open(path).unwrap());
418        let mut s = String::new();
419        f.read_to_string(&mut s).await.unwrap();
420        s
421    }
422
423    #[tokio::test]
424    async fn test_local_input_file() {
425        let tmp_dir = TempDir::new().unwrap();
426
427        let file_name = "a.txt";
428        let content = "Iceberg loves rust.";
429
430        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
431        write_to_file(content, &full_path);
432
433        let file_io = create_local_file_io();
434        let input_file = file_io.new_input(&full_path).unwrap();
435
436        assert!(input_file.exists().await.unwrap());
437        // Remove heading slash
438        assert_eq!(&full_path, input_file.location());
439        let read_content = read_from_file(full_path).await;
440
441        assert_eq!(content, &read_content);
442    }
443
444    #[tokio::test]
445    async fn test_delete_local_file() {
446        let tmp_dir = TempDir::new().unwrap();
447
448        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
449        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
450        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
451        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
452        write_to_file("Iceberg loves rust.", &a_path);
453        write_to_file("Iceberg loves rust.", &b_path);
454        write_to_file("Iceberg loves rust.", &c_path);
455
456        let file_io = create_local_file_io();
457        assert!(file_io.exists(&a_path).await.unwrap());
458
459        // Remove a file should be no-op.
460        file_io.delete_prefix(&a_path).await.unwrap();
461        assert!(file_io.exists(&a_path).await.unwrap());
462
463        // Remove a not exist dir should be no-op.
464        file_io.delete_prefix("not_exists/").await.unwrap();
465
466        // Remove a dir should remove all files in it.
467        file_io.delete_prefix(&sub_dir_path).await.unwrap();
468        assert!(!file_io.exists(&b_path).await.unwrap());
469        assert!(!file_io.exists(&c_path).await.unwrap());
470        assert!(file_io.exists(&a_path).await.unwrap());
471
472        file_io.delete(&a_path).await.unwrap();
473        assert!(!file_io.exists(&a_path).await.unwrap());
474    }
475
476    #[tokio::test]
477    async fn test_delete_non_exist_file() {
478        let tmp_dir = TempDir::new().unwrap();
479
480        let file_name = "a.txt";
481        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
482
483        let file_io = create_local_file_io();
484        assert!(!file_io.exists(&full_path).await.unwrap());
485        assert!(file_io.delete(&full_path).await.is_ok());
486        assert!(file_io.delete_prefix(&full_path).await.is_ok());
487    }
488
489    #[tokio::test]
490    async fn test_local_output_file() {
491        let tmp_dir = TempDir::new().unwrap();
492
493        let file_name = "a.txt";
494        let content = "Iceberg loves rust.";
495
496        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
497
498        let file_io = create_local_file_io();
499        let output_file = file_io.new_output(&full_path).unwrap();
500
501        assert!(!output_file.exists().await.unwrap());
502        {
503            output_file.write(content.into()).await.unwrap();
504        }
505
506        assert_eq!(&full_path, output_file.location());
507
508        let read_content = read_from_file(full_path).await;
509
510        assert_eq!(content, &read_content);
511    }
512
513    #[test]
514    fn test_create_file_from_path() {
515        let io = FileIO::from_path("/tmp/a").unwrap();
516        assert_eq!("file", io.scheme_str.unwrap().as_str());
517
518        let io = FileIO::from_path("file:/tmp/b").unwrap();
519        assert_eq!("file", io.scheme_str.unwrap().as_str());
520
521        let io = FileIO::from_path("file:///tmp/c").unwrap();
522        assert_eq!("file", io.scheme_str.unwrap().as_str());
523
524        let io = FileIO::from_path("s3://bucket/a").unwrap();
525        assert_eq!("s3", io.scheme_str.unwrap().as_str());
526
527        let io = FileIO::from_path("tmp/||c");
528        assert!(io.is_err());
529    }
530
531    #[tokio::test]
532    async fn test_memory_io() {
533        let io = FileIOBuilder::new("memory").build().unwrap();
534
535        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
536
537        let output_file = io.new_output(&path).unwrap();
538        output_file.write("test".into()).await.unwrap();
539
540        assert!(io.exists(&path.clone()).await.unwrap());
541        let input_file = io.new_input(&path).unwrap();
542        let content = input_file.read().await.unwrap();
543        assert_eq!(content, Bytes::from("test"));
544
545        io.delete(&path).await.unwrap();
546        assert!(!io.exists(&path).await.unwrap());
547    }
548}