iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22use futures::{Stream, StreamExt};
23
24use super::storage::{
25    LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
26};
27use crate::Result;
28
29/// FileIO implementation, used to manipulate files in underlying storage.
30///
31/// FileIO wraps a `dyn Storage` with lazy initialization via `StorageFactory`.
32/// The storage is created on first use and cached for subsequent operations.
33///
34/// # Note
35///
36/// All paths passed to `FileIO` must be absolute paths starting with the scheme string
37/// appropriate for the storage backend being used.
38///
39/// This crate provides native support for local filesystem (`file://`) and
40/// memory (`memory://`) storage. For extensive storage backend support (S3, GCS,
41/// OSS, Azure, etc.), use the
42/// [`iceberg-storage-opendal`](https://crates.io/crates/iceberg-storage-opendal) crate.
43///
44/// # Example
45///
46/// ```rust,ignore
47/// use iceberg::io::{FileIO, FileIOBuilder};
48/// use iceberg::io::{LocalFsStorageFactory, MemoryStorageFactory};
49/// use std::sync::Arc;
50///
51/// // Create FileIO with memory storage for testing
52/// let file_io = FileIO::new_with_memory();
53///
54/// // Create FileIO with local filesystem storage
55/// let file_io = FileIO::new_with_fs();
56///
57/// // Create FileIO with custom factory
58/// let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory))
59///     .with_prop("key", "value")
60///     .build();
61/// ```
62#[derive(Clone, Debug)]
63pub struct FileIO {
64    /// Storage configuration containing properties
65    config: StorageConfig,
66    /// Factory for creating storage instances
67    factory: Arc<dyn StorageFactory>,
68    /// Cached storage instance (lazily initialized)
69    storage: Arc<OnceLock<Arc<dyn Storage>>>,
70}
71
72impl FileIO {
73    /// Create a new FileIO backed by in-memory storage.
74    ///
75    /// This is useful for testing scenarios where persistent storage is not needed.
76    pub fn new_with_memory() -> Self {
77        Self {
78            config: StorageConfig::new(),
79            factory: Arc::new(MemoryStorageFactory),
80            storage: Arc::new(OnceLock::new()),
81        }
82    }
83
84    /// Create a new FileIO backed by local filesystem storage.
85    ///
86    /// This is useful for local development and testing with real files.
87    pub fn new_with_fs() -> Self {
88        Self {
89            config: StorageConfig::new(),
90            factory: Arc::new(LocalFsStorageFactory),
91            storage: Arc::new(OnceLock::new()),
92        }
93    }
94
95    /// Get the storage configuration.
96    pub fn config(&self) -> &StorageConfig {
97        &self.config
98    }
99
100    /// Get or create the storage instance.
101    ///
102    /// The factory is invoked on first access and the result is cached
103    /// for all subsequent operations.
104    fn get_storage(&self) -> Result<Arc<dyn Storage>> {
105        // Check if already initialized
106        if let Some(storage) = self.storage.get() {
107            return Ok(storage.clone());
108        }
109
110        // Build the storage
111        let storage = self.factory.build(&self.config)?;
112
113        // Try to set it (another thread might have set it first)
114        let _ = self.storage.set(storage.clone());
115
116        // Return whatever is in the cell (either ours or another thread's)
117        Ok(self.storage.get().unwrap().clone())
118    }
119
120    /// Deletes file.
121    ///
122    /// # Arguments
123    ///
124    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
125    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
126        self.get_storage()?.delete(path.as_ref()).await
127    }
128
129    /// Remove the path and all nested dirs and files recursively.
130    ///
131    /// # Arguments
132    ///
133    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
134    ///
135    /// # Behavior
136    ///
137    /// - If the path is a file or not exist, this function will be no-op.
138    /// - If the path is a empty directory, this function will remove the directory itself.
139    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
140    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
141        self.get_storage()?.delete_prefix(path.as_ref()).await
142    }
143
144    /// Delete multiple files from a stream of paths.
145    ///
146    /// # Arguments
147    ///
148    /// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`].
149    pub async fn delete_stream(
150        &self,
151        paths: impl Stream<Item = String> + Send + 'static,
152    ) -> Result<()> {
153        self.get_storage()?.delete_stream(paths.boxed()).await
154    }
155
156    /// Check file exists.
157    ///
158    /// # Arguments
159    ///
160    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
161    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
162        self.get_storage()?.exists(path.as_ref()).await
163    }
164
165    /// Creates input file.
166    ///
167    /// # Arguments
168    ///
169    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
170    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
171        self.get_storage()?.new_input(path.as_ref())
172    }
173
174    /// Creates output file.
175    ///
176    /// # Arguments
177    ///
178    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
179    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
180        self.get_storage()?.new_output(path.as_ref())
181    }
182}
183
184/// Builder for [`FileIO`].
185///
186/// The builder accepts an explicit `StorageFactory` and configuration properties.
187/// Storage is lazily initialized on first use.
188#[derive(Clone, Debug)]
189pub struct FileIOBuilder {
190    /// Factory for creating storage instances
191    factory: Arc<dyn StorageFactory>,
192    /// Storage configuration
193    config: StorageConfig,
194}
195
196impl FileIOBuilder {
197    /// Creates a new builder with the given storage factory.
198    pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
199        Self {
200            factory,
201            config: StorageConfig::new(),
202        }
203    }
204
205    /// Add a configuration property.
206    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
207        self.config = self.config.with_prop(key.to_string(), value.to_string());
208        self
209    }
210
211    /// Add multiple configuration properties.
212    pub fn with_props(
213        mut self,
214        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
215    ) -> Self {
216        self.config = self
217            .config
218            .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219        self
220    }
221
222    /// Get the storage configuration.
223    pub fn config(&self) -> &StorageConfig {
224        &self.config
225    }
226
227    /// Builds [`FileIO`].
228    pub fn build(self) -> FileIO {
229        FileIO {
230            config: self.config,
231            factory: self.factory,
232            storage: Arc::new(OnceLock::new()),
233        }
234    }
235}
236
237/// The struct the represents the metadata of a file.
238///
239/// TODO: we can add last modified time, content type, etc. in the future.
240pub struct FileMetadata {
241    /// The size of the file.
242    pub size: u64,
243}
244
245/// Trait for reading file.
246///
247/// # TODO
248/// It's possible for us to remove the async_trait, but we need to figure
249/// out how to handle the object safety.
250#[async_trait::async_trait]
251pub trait FileRead: Send + Sync + Unpin + 'static {
252    /// Read file content with given range.
253    ///
254    /// TODO: we can support reading non-contiguous bytes in the future.
255    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
256}
257
258#[async_trait::async_trait]
259impl<T: AsRef<dyn FileRead> + Send + Sync + Unpin + 'static> FileRead for T {
260    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
261        self.as_ref().read(range).await
262    }
263}
264
265/// Input file is used for reading from files.
266#[derive(Debug)]
267pub struct InputFile {
268    storage: Arc<dyn Storage>,
269    // Absolute path of file.
270    path: String,
271}
272
273impl InputFile {
274    /// Creates a new input file.
275    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
276        Self { storage, path }
277    }
278
279    /// Absolute path to root uri.
280    pub fn location(&self) -> &str {
281        &self.path
282    }
283
284    /// Check if file exists.
285    pub async fn exists(&self) -> crate::Result<bool> {
286        self.storage.exists(&self.path).await
287    }
288
289    /// Fetch and returns metadata of file.
290    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
291        self.storage.metadata(&self.path).await
292    }
293
294    /// Read and returns whole content of file.
295    ///
296    /// For continuous reading, use [`Self::reader`] instead.
297    pub async fn read(&self) -> crate::Result<Bytes> {
298        self.storage.read(&self.path).await
299    }
300
301    /// Creates [`FileRead`] for continuous reading.
302    ///
303    /// For one-time reading, use [`Self::read`] instead.
304    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
305        self.storage.reader(&self.path).await
306    }
307}
308
309/// Trait for writing file.
310///
311/// # TODO
312///
313/// It's possible for us to remove the async_trait, but we need to figure
314/// out how to handle the object safety.
315#[async_trait::async_trait]
316pub trait FileWrite: Send + Unpin + 'static {
317    /// Write bytes to file.
318    ///
319    /// TODO: we can support writing non-contiguous bytes in the future.
320    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
321
322    /// Close file.
323    ///
324    /// Calling close on closed file will generate an error.
325    async fn close(&mut self) -> crate::Result<()>;
326}
327
328/// Output file is used for writing to files..
329#[derive(Debug)]
330pub struct OutputFile {
331    storage: Arc<dyn Storage>,
332    // Absolute path of file.
333    path: String,
334}
335
336impl OutputFile {
337    /// Creates a new output file.
338    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
339        Self { storage, path }
340    }
341
342    /// Relative path to root uri.
343    pub fn location(&self) -> &str {
344        &self.path
345    }
346
347    /// Checks if file exists.
348    pub async fn exists(&self) -> Result<bool> {
349        self.storage.exists(&self.path).await
350    }
351
352    /// Deletes file.
353    ///
354    /// If the file does not exist, it will not return error.
355    pub async fn delete(&self) -> Result<()> {
356        self.storage.delete(&self.path).await
357    }
358
359    /// Converts into [`InputFile`].
360    pub fn to_input_file(self) -> InputFile {
361        InputFile {
362            storage: self.storage,
363            path: self.path,
364        }
365    }
366
367    /// Create a new output file with given bytes.
368    ///
369    /// # Notes
370    ///
371    /// Calling `write` will overwrite the file if it exists.
372    /// For continuous writing, use [`Self::writer`].
373    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
374        self.storage.write(&self.path, bs).await
375    }
376
377    /// Creates output file for continuous writing.
378    ///
379    /// # Notes
380    ///
381    /// For one-time writing, use [`Self::write`] instead.
382    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
383        self.storage.writer(&self.path).await
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use std::fs::{File, create_dir_all};
390    use std::io::Write;
391    use std::path::Path;
392    use std::sync::Arc;
393
394    use bytes::Bytes;
395    use futures::AsyncReadExt;
396    use futures::io::AllowStdIo;
397    use tempfile::TempDir;
398
399    use super::{FileIO, FileIOBuilder};
400    use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
401
402    fn create_local_file_io() -> FileIO {
403        FileIO::new_with_fs()
404    }
405
406    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
407        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
408        let mut f = File::create(path).unwrap();
409        write!(f, "{s}").unwrap();
410    }
411
412    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
413        let mut f = AllowStdIo::new(File::open(path).unwrap());
414        let mut s = String::new();
415        f.read_to_string(&mut s).await.unwrap();
416        s
417    }
418
419    #[tokio::test]
420    async fn test_local_input_file() {
421        let tmp_dir = TempDir::new().unwrap();
422
423        let file_name = "a.txt";
424        let content = "Iceberg loves rust.";
425
426        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
427        write_to_file(content, &full_path);
428
429        let file_io = create_local_file_io();
430        let input_file = file_io.new_input(&full_path).unwrap();
431
432        assert!(input_file.exists().await.unwrap());
433        assert_eq!(&full_path, input_file.location());
434        let read_content = read_from_file(full_path).await;
435
436        assert_eq!(content, &read_content);
437    }
438
439    #[tokio::test]
440    async fn test_delete_local_file() {
441        let tmp_dir = TempDir::new().unwrap();
442
443        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
444        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
445        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
446        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
447        write_to_file("Iceberg loves rust.", &a_path);
448        write_to_file("Iceberg loves rust.", &b_path);
449        write_to_file("Iceberg loves rust.", &c_path);
450
451        let file_io = create_local_file_io();
452        assert!(file_io.exists(&a_path).await.unwrap());
453
454        // Remove a file should be no-op.
455        file_io.delete_prefix(&a_path).await.unwrap();
456        assert!(file_io.exists(&a_path).await.unwrap());
457
458        // Remove a not exist dir should be no-op.
459        file_io.delete_prefix("not_exists/").await.unwrap();
460
461        // Remove a dir should remove all files in it.
462        file_io.delete_prefix(&sub_dir_path).await.unwrap();
463        assert!(!file_io.exists(&b_path).await.unwrap());
464        assert!(!file_io.exists(&c_path).await.unwrap());
465        assert!(file_io.exists(&a_path).await.unwrap());
466
467        file_io.delete(&a_path).await.unwrap();
468        assert!(!file_io.exists(&a_path).await.unwrap());
469    }
470
471    #[tokio::test]
472    async fn test_delete_non_exist_file() {
473        let tmp_dir = TempDir::new().unwrap();
474
475        let file_name = "a.txt";
476        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
477
478        let file_io = create_local_file_io();
479        assert!(!file_io.exists(&full_path).await.unwrap());
480        assert!(file_io.delete(&full_path).await.is_ok());
481        assert!(file_io.delete_prefix(&full_path).await.is_ok());
482    }
483
484    #[tokio::test]
485    async fn test_local_output_file() {
486        let tmp_dir = TempDir::new().unwrap();
487
488        let file_name = "a.txt";
489        let content = "Iceberg loves rust.";
490
491        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
492
493        let file_io = create_local_file_io();
494        let output_file = file_io.new_output(&full_path).unwrap();
495
496        assert!(!output_file.exists().await.unwrap());
497        {
498            output_file.write(content.into()).await.unwrap();
499        }
500
501        assert_eq!(&full_path, output_file.location());
502
503        let read_content = read_from_file(full_path).await;
504
505        assert_eq!(content, &read_content);
506    }
507
508    #[tokio::test]
509    async fn test_memory_io() {
510        let io = FileIO::new_with_memory();
511
512        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
513
514        let output_file = io.new_output(&path).unwrap();
515        output_file.write("test".into()).await.unwrap();
516
517        assert!(io.exists(&path.clone()).await.unwrap());
518        let input_file = io.new_input(&path).unwrap();
519        let content = input_file.read().await.unwrap();
520        assert_eq!(content, Bytes::from("test"));
521
522        io.delete(&path).await.unwrap();
523        assert!(!io.exists(&path).await.unwrap());
524    }
525
526    #[tokio::test]
527    async fn test_file_io_builder_with_props() {
528        let factory = Arc::new(MemoryStorageFactory);
529        let file_io = FileIOBuilder::new(factory)
530            .with_prop("key1", "value1")
531            .with_prop("key2", "value2")
532            .build();
533
534        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
535        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
536    }
537
538    #[tokio::test]
539    async fn test_file_io_builder_with_multiple_props() {
540        let factory = Arc::new(LocalFsStorageFactory);
541        let props = vec![("key1", "value1"), ("key2", "value2")];
542        let file_io = FileIOBuilder::new(factory).with_props(props).build();
543
544        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
545        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
546    }
547}