iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22use futures::{Stream, StreamExt};
23
24use super::storage::{
25    LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
26};
27use crate::Result;
28
29/// FileIO implementation, used to manipulate files in underlying storage.
30///
31/// FileIO wraps a `dyn Storage` with lazy initialization via `StorageFactory`.
32/// The storage is created on first use and cached for subsequent operations.
33///
34/// # Note
35///
36/// All paths passed to `FileIO` must be absolute paths starting with the scheme string
37/// appropriate for the storage backend being used.
38///
39/// This crate provides native support for local filesystem (`file://`) and
40/// memory (`memory://`) storage. For extensive storage backend support (S3, GCS,
41/// OSS, Azure, etc.), use the
42/// [`iceberg-storage-opendal`](https://crates.io/crates/iceberg-storage-opendal) crate.
43///
44/// # Example
45///
46/// ```rust,ignore
47/// use iceberg::io::{FileIO, FileIOBuilder};
48/// use iceberg::io::{LocalFsStorageFactory, MemoryStorageFactory};
49/// use std::sync::Arc;
50///
51/// // Create FileIO with memory storage for testing
52/// let file_io = FileIO::new_with_memory();
53///
54/// // Create FileIO with local filesystem storage
55/// let file_io = FileIO::new_with_fs();
56///
57/// // Create FileIO with custom factory
58/// let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory))
59///     .with_prop("key", "value")
60///     .build();
61/// ```
62#[derive(Clone, Debug)]
63pub struct FileIO {
64    /// Storage configuration containing properties
65    config: StorageConfig,
66    /// Factory for creating storage instances
67    factory: Arc<dyn StorageFactory>,
68    /// Cached storage instance (lazily initialized)
69    storage: Arc<OnceLock<Arc<dyn Storage>>>,
70}
71
72impl FileIO {
73    /// Create a new FileIO backed by in-memory storage.
74    ///
75    /// This is useful for testing scenarios where persistent storage is not needed.
76    pub fn new_with_memory() -> Self {
77        Self {
78            config: StorageConfig::new(),
79            factory: Arc::new(MemoryStorageFactory),
80            storage: Arc::new(OnceLock::new()),
81        }
82    }
83
84    /// Create a new FileIO backed by local filesystem storage.
85    ///
86    /// This is useful for local development and testing with real files.
87    pub fn new_with_fs() -> Self {
88        Self {
89            config: StorageConfig::new(),
90            factory: Arc::new(LocalFsStorageFactory),
91            storage: Arc::new(OnceLock::new()),
92        }
93    }
94
95    /// Get the storage configuration.
96    pub fn config(&self) -> &StorageConfig {
97        &self.config
98    }
99
100    /// Get or create the storage instance.
101    ///
102    /// The factory is invoked on first access and the result is cached
103    /// for all subsequent operations.
104    fn get_storage(&self) -> Result<Arc<dyn Storage>> {
105        // Check if already initialized
106        if let Some(storage) = self.storage.get() {
107            return Ok(storage.clone());
108        }
109
110        // Build the storage
111        let storage = self.factory.build(&self.config)?;
112
113        // Try to set it (another thread might have set it first)
114        let _ = self.storage.set(storage.clone());
115
116        // Return whatever is in the cell (either ours or another thread's)
117        Ok(self.storage.get().unwrap().clone())
118    }
119
120    /// Deletes file.
121    ///
122    /// # Arguments
123    ///
124    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
125    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
126        self.get_storage()?.delete(path.as_ref()).await
127    }
128
129    /// Remove the path and all nested dirs and files recursively.
130    ///
131    /// # Arguments
132    ///
133    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
134    ///
135    /// # Behavior
136    ///
137    /// - If the path is a file or not exist, this function will be no-op.
138    /// - If the path is a empty directory, this function will remove the directory itself.
139    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
140    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
141        self.get_storage()?.delete_prefix(path.as_ref()).await
142    }
143
144    /// Delete multiple files from a stream of paths.
145    ///
146    /// # Arguments
147    ///
148    /// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`].
149    pub async fn delete_stream(
150        &self,
151        paths: impl Stream<Item = String> + Send + 'static,
152    ) -> Result<()> {
153        self.get_storage()?.delete_stream(paths.boxed()).await
154    }
155
156    /// Check file exists.
157    ///
158    /// # Arguments
159    ///
160    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
161    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
162        self.get_storage()?.exists(path.as_ref()).await
163    }
164
165    /// Creates input file.
166    ///
167    /// # Arguments
168    ///
169    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
170    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
171        self.get_storage()?.new_input(path.as_ref())
172    }
173
174    /// Creates output file.
175    ///
176    /// # Arguments
177    ///
178    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
179    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
180        self.get_storage()?.new_output(path.as_ref())
181    }
182}
183
184/// Builder for [`FileIO`].
185///
186/// The builder accepts an explicit `StorageFactory` and configuration properties.
187/// Storage is lazily initialized on first use.
188#[derive(Clone, Debug)]
189pub struct FileIOBuilder {
190    /// Factory for creating storage instances
191    factory: Arc<dyn StorageFactory>,
192    /// Storage configuration
193    config: StorageConfig,
194}
195
196impl FileIOBuilder {
197    /// Creates a new builder with the given storage factory.
198    pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
199        Self {
200            factory,
201            config: StorageConfig::new(),
202        }
203    }
204
205    /// Add a configuration property.
206    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
207        self.config = self.config.with_prop(key.to_string(), value.to_string());
208        self
209    }
210
211    /// Add multiple configuration properties.
212    pub fn with_props(
213        mut self,
214        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
215    ) -> Self {
216        self.config = self
217            .config
218            .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219        self
220    }
221
222    /// Get the storage configuration.
223    pub fn config(&self) -> &StorageConfig {
224        &self.config
225    }
226
227    /// Builds [`FileIO`].
228    pub fn build(self) -> FileIO {
229        FileIO {
230            config: self.config,
231            factory: self.factory,
232            storage: Arc::new(OnceLock::new()),
233        }
234    }
235}
236
237/// The struct the represents the metadata of a file.
238///
239/// TODO: we can add last modified time, content type, etc. in the future.
240pub struct FileMetadata {
241    /// The size of the file.
242    pub size: u64,
243}
244
245/// Trait for reading file.
246///
247/// # TODO
248/// It's possible for us to remove the async_trait, but we need to figure
249/// out how to handle the object safety.
250#[async_trait::async_trait]
251pub trait FileRead: Send + Sync + Unpin + 'static {
252    /// Read file content with given range.
253    ///
254    /// TODO: we can support reading non-contiguous bytes in the future.
255    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
256}
257
258/// Input file is used for reading from files.
259#[derive(Debug)]
260pub struct InputFile {
261    storage: Arc<dyn Storage>,
262    // Absolute path of file.
263    path: String,
264}
265
266impl InputFile {
267    /// Creates a new input file.
268    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
269        Self { storage, path }
270    }
271
272    /// Absolute path to root uri.
273    pub fn location(&self) -> &str {
274        &self.path
275    }
276
277    /// Check if file exists.
278    pub async fn exists(&self) -> crate::Result<bool> {
279        self.storage.exists(&self.path).await
280    }
281
282    /// Fetch and returns metadata of file.
283    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
284        self.storage.metadata(&self.path).await
285    }
286
287    /// Read and returns whole content of file.
288    ///
289    /// For continuous reading, use [`Self::reader`] instead.
290    pub async fn read(&self) -> crate::Result<Bytes> {
291        self.storage.read(&self.path).await
292    }
293
294    /// Creates [`FileRead`] for continuous reading.
295    ///
296    /// For one-time reading, use [`Self::read`] instead.
297    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
298        self.storage.reader(&self.path).await
299    }
300}
301
302/// Trait for writing file.
303///
304/// # TODO
305///
306/// It's possible for us to remove the async_trait, but we need to figure
307/// out how to handle the object safety.
308#[async_trait::async_trait]
309pub trait FileWrite: Send + Unpin + 'static {
310    /// Write bytes to file.
311    ///
312    /// TODO: we can support writing non-contiguous bytes in the future.
313    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
314
315    /// Close file.
316    ///
317    /// Calling close on closed file will generate an error.
318    async fn close(&mut self) -> crate::Result<()>;
319}
320
321/// Output file is used for writing to files..
322#[derive(Debug)]
323pub struct OutputFile {
324    storage: Arc<dyn Storage>,
325    // Absolute path of file.
326    path: String,
327}
328
329impl OutputFile {
330    /// Creates a new output file.
331    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
332        Self { storage, path }
333    }
334
335    /// Relative path to root uri.
336    pub fn location(&self) -> &str {
337        &self.path
338    }
339
340    /// Checks if file exists.
341    pub async fn exists(&self) -> Result<bool> {
342        self.storage.exists(&self.path).await
343    }
344
345    /// Deletes file.
346    ///
347    /// If the file does not exist, it will not return error.
348    pub async fn delete(&self) -> Result<()> {
349        self.storage.delete(&self.path).await
350    }
351
352    /// Converts into [`InputFile`].
353    pub fn to_input_file(self) -> InputFile {
354        InputFile {
355            storage: self.storage,
356            path: self.path,
357        }
358    }
359
360    /// Create a new output file with given bytes.
361    ///
362    /// # Notes
363    ///
364    /// Calling `write` will overwrite the file if it exists.
365    /// For continuous writing, use [`Self::writer`].
366    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
367        self.storage.write(&self.path, bs).await
368    }
369
370    /// Creates output file for continuous writing.
371    ///
372    /// # Notes
373    ///
374    /// For one-time writing, use [`Self::write`] instead.
375    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
376        self.storage.writer(&self.path).await
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use std::fs::{File, create_dir_all};
383    use std::io::Write;
384    use std::path::Path;
385    use std::sync::Arc;
386
387    use bytes::Bytes;
388    use futures::AsyncReadExt;
389    use futures::io::AllowStdIo;
390    use tempfile::TempDir;
391
392    use super::{FileIO, FileIOBuilder};
393    use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
394
395    fn create_local_file_io() -> FileIO {
396        FileIO::new_with_fs()
397    }
398
399    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
400        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
401        let mut f = File::create(path).unwrap();
402        write!(f, "{s}").unwrap();
403    }
404
405    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
406        let mut f = AllowStdIo::new(File::open(path).unwrap());
407        let mut s = String::new();
408        f.read_to_string(&mut s).await.unwrap();
409        s
410    }
411
412    #[tokio::test]
413    async fn test_local_input_file() {
414        let tmp_dir = TempDir::new().unwrap();
415
416        let file_name = "a.txt";
417        let content = "Iceberg loves rust.";
418
419        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
420        write_to_file(content, &full_path);
421
422        let file_io = create_local_file_io();
423        let input_file = file_io.new_input(&full_path).unwrap();
424
425        assert!(input_file.exists().await.unwrap());
426        assert_eq!(&full_path, input_file.location());
427        let read_content = read_from_file(full_path).await;
428
429        assert_eq!(content, &read_content);
430    }
431
432    #[tokio::test]
433    async fn test_delete_local_file() {
434        let tmp_dir = TempDir::new().unwrap();
435
436        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
437        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
438        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
439        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
440        write_to_file("Iceberg loves rust.", &a_path);
441        write_to_file("Iceberg loves rust.", &b_path);
442        write_to_file("Iceberg loves rust.", &c_path);
443
444        let file_io = create_local_file_io();
445        assert!(file_io.exists(&a_path).await.unwrap());
446
447        // Remove a file should be no-op.
448        file_io.delete_prefix(&a_path).await.unwrap();
449        assert!(file_io.exists(&a_path).await.unwrap());
450
451        // Remove a not exist dir should be no-op.
452        file_io.delete_prefix("not_exists/").await.unwrap();
453
454        // Remove a dir should remove all files in it.
455        file_io.delete_prefix(&sub_dir_path).await.unwrap();
456        assert!(!file_io.exists(&b_path).await.unwrap());
457        assert!(!file_io.exists(&c_path).await.unwrap());
458        assert!(file_io.exists(&a_path).await.unwrap());
459
460        file_io.delete(&a_path).await.unwrap();
461        assert!(!file_io.exists(&a_path).await.unwrap());
462    }
463
464    #[tokio::test]
465    async fn test_delete_non_exist_file() {
466        let tmp_dir = TempDir::new().unwrap();
467
468        let file_name = "a.txt";
469        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
470
471        let file_io = create_local_file_io();
472        assert!(!file_io.exists(&full_path).await.unwrap());
473        assert!(file_io.delete(&full_path).await.is_ok());
474        assert!(file_io.delete_prefix(&full_path).await.is_ok());
475    }
476
477    #[tokio::test]
478    async fn test_local_output_file() {
479        let tmp_dir = TempDir::new().unwrap();
480
481        let file_name = "a.txt";
482        let content = "Iceberg loves rust.";
483
484        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
485
486        let file_io = create_local_file_io();
487        let output_file = file_io.new_output(&full_path).unwrap();
488
489        assert!(!output_file.exists().await.unwrap());
490        {
491            output_file.write(content.into()).await.unwrap();
492        }
493
494        assert_eq!(&full_path, output_file.location());
495
496        let read_content = read_from_file(full_path).await;
497
498        assert_eq!(content, &read_content);
499    }
500
501    #[tokio::test]
502    async fn test_memory_io() {
503        let io = FileIO::new_with_memory();
504
505        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
506
507        let output_file = io.new_output(&path).unwrap();
508        output_file.write("test".into()).await.unwrap();
509
510        assert!(io.exists(&path.clone()).await.unwrap());
511        let input_file = io.new_input(&path).unwrap();
512        let content = input_file.read().await.unwrap();
513        assert_eq!(content, Bytes::from("test"));
514
515        io.delete(&path).await.unwrap();
516        assert!(!io.exists(&path).await.unwrap());
517    }
518
519    #[tokio::test]
520    async fn test_file_io_builder_with_props() {
521        let factory = Arc::new(MemoryStorageFactory);
522        let file_io = FileIOBuilder::new(factory)
523            .with_prop("key1", "value1")
524            .with_prop("key2", "value2")
525            .build();
526
527        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
528        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
529    }
530
531    #[tokio::test]
532    async fn test_file_io_builder_with_multiple_props() {
533        let factory = Arc::new(LocalFsStorageFactory);
534        let props = vec![("key1", "value1"), ("key2", "value2")];
535        let file_io = FileIOBuilder::new(factory).with_props(props).build();
536
537        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
538        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
539    }
540}