iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use opendal::Operator;
25use url::Url;
26
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30/// FileIO implementation, used to manipulate files in underlying storage.
31///
32/// # Note
33///
34/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`.
35/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`.
36///
37/// Supported storages:
38///
39/// | Storage            | Feature Flag      | Expected Path Format             | Schemes                       |
40/// |--------------------|-------------------|----------------------------------| ------------------------------|
41/// | Local file system  | `storage-fs`      | `file`                           | `file://path/to/file`         |
42/// | Memory             | `storage-memory`  | `memory`                         | `memory://path/to/file`       |
43/// | S3                 | `storage-s3`      | `s3`, `s3a`                      | `s3://<bucket>/path/to/file`  |
44/// | GCS                | `storage-gcs`     | `gs`, `gcs`                      | `gs://<bucket>/path/to/file`  |
45/// | OSS                | `storage-oss`     | `oss`                            | `oss://<bucket>/path/to/file` |
46/// | Azure Datalake     | `storage-azdls`   | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
47#[derive(Clone, Debug)]
48pub struct FileIO {
49    builder: FileIOBuilder,
50
51    inner: Arc<Storage>,
52}
53
54impl FileIO {
55    /// Convert FileIO into [`FileIOBuilder`] which used to build this FileIO.
56    ///
57    /// This function is useful when you want serialize and deserialize FileIO across
58    /// distributed systems.
59    pub fn into_builder(self) -> FileIOBuilder {
60        self.builder
61    }
62
63    /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
64    ///
65    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
66    /// - If it's not a valid url, will try to detect if it's a file path.
67    ///
68    /// Otherwise will return parsing error.
69    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70        let url = Url::parse(path.as_ref())
71            .map_err(Error::from)
72            .or_else(|e| {
73                Url::from_file_path(path.as_ref()).map_err(|_| {
74                    Error::new(
75                        ErrorKind::DataInvalid,
76                        "Input is neither a valid url nor path",
77                    )
78                    .with_context("input", path.as_ref().to_string())
79                    .with_source(e)
80                })
81            })?;
82
83        Ok(FileIOBuilder::new(url.scheme()))
84    }
85
86    /// Deletes file.
87    ///
88    /// # Arguments
89    ///
90    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
91    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92        let (op, relative_path) = self.inner.create_operator(&path)?;
93        Ok(op.delete(relative_path).await?)
94    }
95
96    /// Remove the path and all nested dirs and files recursively.
97    ///
98    /// # Arguments
99    ///
100    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
101    ///
102    /// # Behavior
103    ///
104    /// - If the path is a file or not exist, this function will be no-op.
105    /// - If the path is a empty directory, this function will remove the directory itself.
106    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
107    pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
108        let (op, relative_path) = self.inner.create_operator(&path)?;
109        let path = if relative_path.ends_with('/') {
110            relative_path.to_string()
111        } else {
112            format!("{relative_path}/")
113        };
114        Ok(op.remove_all(&path).await?)
115    }
116
117    /// Check file exists.
118    ///
119    /// # Arguments
120    ///
121    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
122    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
123        let (op, relative_path) = self.inner.create_operator(&path)?;
124        Ok(op.exists(relative_path).await?)
125    }
126
127    /// Creates input file.
128    ///
129    /// # Arguments
130    ///
131    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
132    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
133        let (op, relative_path) = self.inner.create_operator(&path)?;
134        let path = path.as_ref().to_string();
135        let relative_path_pos = path.len() - relative_path.len();
136        Ok(InputFile {
137            op,
138            path,
139            relative_path_pos,
140        })
141    }
142
143    /// Creates output file.
144    ///
145    /// # Arguments
146    ///
147    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
148    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
149        let (op, relative_path) = self.inner.create_operator(&path)?;
150        let path = path.as_ref().to_string();
151        let relative_path_pos = path.len() - relative_path.len();
152        Ok(OutputFile {
153            op,
154            path,
155            relative_path_pos,
156        })
157    }
158}
159
160/// Container for storing type-safe extensions used to configure underlying FileIO behavior.
161#[derive(Clone, Debug, Default)]
162pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
163
164impl Extensions {
165    /// Add an extension.
166    pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
167        self.0.insert(TypeId::of::<T>(), Arc::new(ext));
168    }
169
170    /// Extends the current set of extensions with another set of extensions.
171    pub fn extend(&mut self, extensions: Extensions) {
172        self.0.extend(extensions.0);
173    }
174
175    /// Fetch an extension.
176    pub fn get<T>(&self) -> Option<Arc<T>>
177    where T: 'static + Send + Sync + Clone {
178        let type_id = TypeId::of::<T>();
179        self.0
180            .get(&type_id)
181            .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
182    }
183}
184
185/// Builder for [`FileIO`].
186#[derive(Clone, Debug)]
187pub struct FileIOBuilder {
188    /// This is used to infer scheme of operator.
189    ///
190    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io.
191    scheme_str: Option<String>,
192    /// Arguments for operator.
193    props: HashMap<String, String>,
194    /// Optional extensions to configure the underlying FileIO behavior.
195    extensions: Extensions,
196}
197
198impl FileIOBuilder {
199    /// Creates a new builder with scheme.
200    /// See [`FileIO`] for supported schemes.
201    pub fn new(scheme_str: impl ToString) -> Self {
202        Self {
203            scheme_str: Some(scheme_str.to_string()),
204            props: HashMap::default(),
205            extensions: Extensions::default(),
206        }
207    }
208
209    /// Creates a new builder for local file io.
210    pub fn new_fs_io() -> Self {
211        Self {
212            scheme_str: None,
213            props: HashMap::default(),
214            extensions: Extensions::default(),
215        }
216    }
217
218    /// Fetch the scheme string.
219    ///
220    /// The scheme_str will be empty if it's None.
221    pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
222        (
223            self.scheme_str.unwrap_or_default(),
224            self.props,
225            self.extensions,
226        )
227    }
228
229    /// Add argument for operator.
230    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
231        self.props.insert(key.to_string(), value.to_string());
232        self
233    }
234
235    /// Add argument for operator.
236    pub fn with_props(
237        mut self,
238        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
239    ) -> Self {
240        self.props
241            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
242        self
243    }
244
245    /// Add an extension to the file IO builder.
246    pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
247        self.extensions.add(ext);
248        self
249    }
250
251    /// Adds multiple extensions to the file IO builder.
252    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
253        self.extensions.extend(extensions);
254        self
255    }
256
257    /// Fetch an extension from the file IO builder.
258    pub fn extension<T>(&self) -> Option<Arc<T>>
259    where T: 'static + Send + Sync + Clone {
260        self.extensions.get::<T>()
261    }
262
263    /// Builds [`FileIO`].
264    pub fn build(self) -> Result<FileIO> {
265        let storage = Storage::build(self.clone())?;
266        Ok(FileIO {
267            builder: self,
268            inner: Arc::new(storage),
269        })
270    }
271}
272
273/// The struct the represents the metadata of a file.
274///
275/// TODO: we can add last modified time, content type, etc. in the future.
276pub struct FileMetadata {
277    /// The size of the file.
278    pub size: u64,
279}
280
281/// Trait for reading file.
282///
283/// # TODO
284/// It's possible for us to remove the async_trait, but we need to figure
285/// out how to handle the object safety.
286#[async_trait::async_trait]
287pub trait FileRead: Send + Sync + Unpin + 'static {
288    /// Read file content with given range.
289    ///
290    /// TODO: we can support reading non-contiguous bytes in the future.
291    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
292}
293
294#[async_trait::async_trait]
295impl FileRead for opendal::Reader {
296    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
297        Ok(opendal::Reader::read(self, range).await?.to_bytes())
298    }
299}
300
301/// Input file is used for reading from files.
302#[derive(Debug)]
303pub struct InputFile {
304    op: Operator,
305    // Absolution path of file.
306    path: String,
307    // Relative path of file to uri, starts at [`relative_path_pos`]
308    relative_path_pos: usize,
309}
310
311impl InputFile {
312    /// Absolute path to root uri.
313    pub fn location(&self) -> &str {
314        &self.path
315    }
316
317    /// Check if file exists.
318    pub async fn exists(&self) -> crate::Result<bool> {
319        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
320    }
321
322    /// Fetch and returns metadata of file.
323    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
324        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
325
326        Ok(FileMetadata {
327            size: meta.content_length(),
328        })
329    }
330
331    /// Read and returns whole content of file.
332    ///
333    /// For continuous reading, use [`Self::reader`] instead.
334    pub async fn read(&self) -> crate::Result<Bytes> {
335        Ok(self
336            .op
337            .read(&self.path[self.relative_path_pos..])
338            .await?
339            .to_bytes())
340    }
341
342    /// Creates [`FileRead`] for continuous reading.
343    ///
344    /// For one-time reading, use [`Self::read`] instead.
345    pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
346        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
347    }
348}
349
350/// Trait for writing file.
351///
352/// # TODO
353///
354/// It's possible for us to remove the async_trait, but we need to figure
355/// out how to handle the object safety.
356#[async_trait::async_trait]
357pub trait FileWrite: Send + Unpin + 'static {
358    /// Write bytes to file.
359    ///
360    /// TODO: we can support writing non-contiguous bytes in the future.
361    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
362
363    /// Close file.
364    ///
365    /// Calling close on closed file will generate an error.
366    async fn close(&mut self) -> crate::Result<()>;
367}
368
369#[async_trait::async_trait]
370impl FileWrite for opendal::Writer {
371    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
372        Ok(opendal::Writer::write(self, bs).await?)
373    }
374
375    async fn close(&mut self) -> crate::Result<()> {
376        let _ = opendal::Writer::close(self).await?;
377        Ok(())
378    }
379}
380
381#[async_trait::async_trait]
382impl FileWrite for Box<dyn FileWrite> {
383    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
384        self.as_mut().write(bs).await
385    }
386
387    async fn close(&mut self) -> crate::Result<()> {
388        self.as_mut().close().await
389    }
390}
391
392/// Output file is used for writing to files..
393#[derive(Debug)]
394pub struct OutputFile {
395    op: Operator,
396    // Absolution path of file.
397    path: String,
398    // Relative path of file to uri, starts at [`relative_path_pos`]
399    relative_path_pos: usize,
400}
401
402impl OutputFile {
403    /// Relative path to root uri.
404    pub fn location(&self) -> &str {
405        &self.path
406    }
407
408    /// Checks if file exists.
409    pub async fn exists(&self) -> Result<bool> {
410        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
411    }
412
413    /// Deletes file.
414    ///
415    /// If the file does not exist, it will not return error.
416    pub async fn delete(&self) -> Result<()> {
417        Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
418    }
419
420    /// Converts into [`InputFile`].
421    pub fn to_input_file(self) -> InputFile {
422        InputFile {
423            op: self.op,
424            path: self.path,
425            relative_path_pos: self.relative_path_pos,
426        }
427    }
428
429    /// Create a new output file with given bytes.
430    ///
431    /// # Notes
432    ///
433    /// Calling `write` will overwrite the file if it exists.
434    /// For continuous writing, use [`Self::writer`].
435    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
436        let mut writer = self.writer().await?;
437        writer.write(bs).await?;
438        writer.close().await
439    }
440
441    /// Creates output file for continuous writing.
442    ///
443    /// # Notes
444    ///
445    /// For one-time writing, use [`Self::write`] instead.
446    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
447        Ok(Box::new(
448            self.op.writer(&self.path[self.relative_path_pos..]).await?,
449        ))
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use std::fs::{File, create_dir_all};
456    use std::io::Write;
457    use std::path::Path;
458
459    use bytes::Bytes;
460    use futures::AsyncReadExt;
461    use futures::io::AllowStdIo;
462    use tempfile::TempDir;
463
464    use super::{FileIO, FileIOBuilder};
465
466    fn create_local_file_io() -> FileIO {
467        FileIOBuilder::new_fs_io().build().unwrap()
468    }
469
470    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
471        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
472        let mut f = File::create(path).unwrap();
473        write!(f, "{s}").unwrap();
474    }
475
476    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
477        let mut f = AllowStdIo::new(File::open(path).unwrap());
478        let mut s = String::new();
479        f.read_to_string(&mut s).await.unwrap();
480        s
481    }
482
483    #[tokio::test]
484    async fn test_local_input_file() {
485        let tmp_dir = TempDir::new().unwrap();
486
487        let file_name = "a.txt";
488        let content = "Iceberg loves rust.";
489
490        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
491        write_to_file(content, &full_path);
492
493        let file_io = create_local_file_io();
494        let input_file = file_io.new_input(&full_path).unwrap();
495
496        assert!(input_file.exists().await.unwrap());
497        // Remove heading slash
498        assert_eq!(&full_path, input_file.location());
499        let read_content = read_from_file(full_path).await;
500
501        assert_eq!(content, &read_content);
502    }
503
504    #[tokio::test]
505    async fn test_delete_local_file() {
506        let tmp_dir = TempDir::new().unwrap();
507
508        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
509        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
510        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
511        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
512        write_to_file("Iceberg loves rust.", &a_path);
513        write_to_file("Iceberg loves rust.", &b_path);
514        write_to_file("Iceberg loves rust.", &c_path);
515
516        let file_io = create_local_file_io();
517        assert!(file_io.exists(&a_path).await.unwrap());
518
519        // Remove a file should be no-op.
520        file_io.remove_dir_all(&a_path).await.unwrap();
521        assert!(file_io.exists(&a_path).await.unwrap());
522
523        // Remove a not exist dir should be no-op.
524        file_io.remove_dir_all("not_exists/").await.unwrap();
525
526        // Remove a dir should remove all files in it.
527        file_io.remove_dir_all(&sub_dir_path).await.unwrap();
528        assert!(!file_io.exists(&b_path).await.unwrap());
529        assert!(!file_io.exists(&c_path).await.unwrap());
530        assert!(file_io.exists(&a_path).await.unwrap());
531
532        file_io.delete(&a_path).await.unwrap();
533        assert!(!file_io.exists(&a_path).await.unwrap());
534    }
535
536    #[tokio::test]
537    async fn test_delete_non_exist_file() {
538        let tmp_dir = TempDir::new().unwrap();
539
540        let file_name = "a.txt";
541        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
542
543        let file_io = create_local_file_io();
544        assert!(!file_io.exists(&full_path).await.unwrap());
545        assert!(file_io.delete(&full_path).await.is_ok());
546        assert!(file_io.remove_dir_all(&full_path).await.is_ok());
547    }
548
549    #[tokio::test]
550    async fn test_local_output_file() {
551        let tmp_dir = TempDir::new().unwrap();
552
553        let file_name = "a.txt";
554        let content = "Iceberg loves rust.";
555
556        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
557
558        let file_io = create_local_file_io();
559        let output_file = file_io.new_output(&full_path).unwrap();
560
561        assert!(!output_file.exists().await.unwrap());
562        {
563            output_file.write(content.into()).await.unwrap();
564        }
565
566        assert_eq!(&full_path, output_file.location());
567
568        let read_content = read_from_file(full_path).await;
569
570        assert_eq!(content, &read_content);
571    }
572
573    #[test]
574    fn test_create_file_from_path() {
575        let io = FileIO::from_path("/tmp/a").unwrap();
576        assert_eq!("file", io.scheme_str.unwrap().as_str());
577
578        let io = FileIO::from_path("file:/tmp/b").unwrap();
579        assert_eq!("file", io.scheme_str.unwrap().as_str());
580
581        let io = FileIO::from_path("file:///tmp/c").unwrap();
582        assert_eq!("file", io.scheme_str.unwrap().as_str());
583
584        let io = FileIO::from_path("s3://bucket/a").unwrap();
585        assert_eq!("s3", io.scheme_str.unwrap().as_str());
586
587        let io = FileIO::from_path("tmp/||c");
588        assert!(io.is_err());
589    }
590
591    #[tokio::test]
592    async fn test_memory_io() {
593        let io = FileIOBuilder::new("memory").build().unwrap();
594
595        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
596
597        let output_file = io.new_output(&path).unwrap();
598        output_file.write("test".into()).await.unwrap();
599
600        assert!(io.exists(&path.clone()).await.unwrap());
601        let input_file = io.new_input(&path).unwrap();
602        let content = input_file.read().await.unwrap();
603        assert_eq!(content, Bytes::from("test"));
604
605        io.delete(&path).await.unwrap();
606        assert!(!io.exists(&path).await.unwrap());
607    }
608}