iceberg/io/storage/
local_fs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Local filesystem storage implementation for testing.
19//!
20//! This module provides a `LocalFsStorage` implementation that uses standard
21//! Rust filesystem operations. It is primarily intended for unit testing
22//! scenarios where tests need to read/write files on the local filesystem.
23
24use std::fs;
25use std::io::{Read, Seek, SeekFrom, Write};
26use std::ops::Range;
27use std::path::PathBuf;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use bytes::Bytes;
32use serde::{Deserialize, Serialize};
33
34use crate::io::{
35    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
36    StorageFactory,
37};
38use crate::{Error, ErrorKind, Result};
39
40/// Local filesystem storage implementation.
41///
42/// This storage implementation uses standard Rust filesystem operations,
43/// making it suitable for unit tests that need to read/write files on disk.
44///
45/// # Path Normalization
46///
47/// The storage normalizes paths to handle various formats:
48/// - `file:///path/to/file` -> `/path/to/file`
49/// - `file:/path/to/file` -> `/path/to/file`
50/// - `/path/to/file` -> `/path/to/file`
51/// ```
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct LocalFsStorage;
54
55impl LocalFsStorage {
56    /// Create a new `LocalFsStorage` instance.
57    pub fn new() -> Self {
58        Self
59    }
60
61    /// Normalize a path by removing scheme prefixes.
62    ///
63    /// This handles the following formats:
64    /// - `file:///path` -> `/path`
65    /// - `file://path` -> `/path` (treats as absolute)
66    /// - `file:/path` -> `/path`
67    /// - `/path` -> `/path`
68    pub(crate) fn normalize_path(path: &str) -> PathBuf {
69        let path = if let Some(stripped) = path.strip_prefix("file://") {
70            // file:///path -> /path or file://path -> /path
71            if stripped.starts_with('/') {
72                stripped.to_string()
73            } else {
74                format!("/{stripped}")
75            }
76        } else if let Some(stripped) = path.strip_prefix("file:") {
77            // file:/path -> /path
78            if stripped.starts_with('/') {
79                stripped.to_string()
80            } else {
81                format!("/{stripped}")
82            }
83        } else {
84            path.to_string()
85        };
86        PathBuf::from(path)
87    }
88}
89
90#[async_trait]
91#[typetag::serde]
92impl Storage for LocalFsStorage {
93    async fn exists(&self, path: &str) -> Result<bool> {
94        let path = Self::normalize_path(path);
95        Ok(path.exists())
96    }
97
98    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
99        let path = Self::normalize_path(path);
100        let metadata = fs::metadata(&path).map_err(|e| {
101            Error::new(
102                ErrorKind::DataInvalid,
103                format!("Failed to get metadata for {}: {}", path.display(), e),
104            )
105        })?;
106        Ok(FileMetadata {
107            size: metadata.len(),
108        })
109    }
110
111    async fn read(&self, path: &str) -> Result<Bytes> {
112        let path = Self::normalize_path(path);
113        let content = fs::read(&path).map_err(|e| {
114            Error::new(
115                ErrorKind::DataInvalid,
116                format!("Failed to read file {}: {}", path.display(), e),
117            )
118        })?;
119        Ok(Bytes::from(content))
120    }
121
122    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
123        let path = Self::normalize_path(path);
124        let file = fs::File::open(&path).map_err(|e| {
125            Error::new(
126                ErrorKind::DataInvalid,
127                format!("Failed to open file {}: {}", path.display(), e),
128            )
129        })?;
130        Ok(Box::new(LocalFsFileRead::new(file)))
131    }
132
133    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
134        let path = Self::normalize_path(path);
135
136        // Create parent directories if they don't exist
137        if let Some(parent) = path.parent() {
138            fs::create_dir_all(parent).map_err(|e| {
139                Error::new(
140                    ErrorKind::Unexpected,
141                    format!("Failed to create directory {}: {}", parent.display(), e),
142                )
143            })?;
144        }
145
146        fs::write(&path, &bs).map_err(|e| {
147            Error::new(
148                ErrorKind::Unexpected,
149                format!("Failed to write file {}: {}", path.display(), e),
150            )
151        })?;
152        Ok(())
153    }
154
155    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
156        let path = Self::normalize_path(path);
157
158        // Create parent directories if they don't exist
159        if let Some(parent) = path.parent() {
160            fs::create_dir_all(parent).map_err(|e| {
161                Error::new(
162                    ErrorKind::Unexpected,
163                    format!("Failed to create directory {}: {}", parent.display(), e),
164                )
165            })?;
166        }
167
168        let file = fs::File::create(&path).map_err(|e| {
169            Error::new(
170                ErrorKind::Unexpected,
171                format!("Failed to create file {}: {}", path.display(), e),
172            )
173        })?;
174        Ok(Box::new(LocalFsFileWrite::new(file)))
175    }
176
177    async fn delete(&self, path: &str) -> Result<()> {
178        let path = Self::normalize_path(path);
179        if path.exists() {
180            fs::remove_file(&path).map_err(|e| {
181                Error::new(
182                    ErrorKind::Unexpected,
183                    format!("Failed to delete file {}: {}", path.display(), e),
184                )
185            })?;
186        }
187        Ok(())
188    }
189
190    async fn delete_prefix(&self, path: &str) -> Result<()> {
191        let path = Self::normalize_path(path);
192        if path.is_dir() {
193            fs::remove_dir_all(&path).map_err(|e| {
194                Error::new(
195                    ErrorKind::Unexpected,
196                    format!("Failed to delete directory {}: {}", path.display(), e),
197                )
198            })?;
199        }
200        Ok(())
201    }
202
203    fn new_input(&self, path: &str) -> Result<InputFile> {
204        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
205    }
206
207    fn new_output(&self, path: &str) -> Result<OutputFile> {
208        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
209    }
210}
211
212/// File reader for local filesystem storage.
213#[derive(Debug)]
214pub struct LocalFsFileRead {
215    file: std::sync::Mutex<fs::File>,
216}
217
218impl LocalFsFileRead {
219    /// Create a new `LocalFsFileRead` with the given file.
220    pub fn new(file: fs::File) -> Self {
221        Self {
222            file: std::sync::Mutex::new(file),
223        }
224    }
225}
226
227#[async_trait]
228impl FileRead for LocalFsFileRead {
229    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
230        let mut file = self.file.lock().map_err(|e| {
231            Error::new(
232                ErrorKind::Unexpected,
233                format!("Failed to acquire file lock: {e}"),
234            )
235        })?;
236
237        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
238            Error::new(
239                ErrorKind::DataInvalid,
240                format!("Failed to seek to position {}: {}", range.start, e),
241            )
242        })?;
243
244        let len = (range.end - range.start) as usize;
245        let mut buffer = vec![0u8; len];
246        file.read_exact(&mut buffer).map_err(|e| {
247            Error::new(
248                ErrorKind::DataInvalid,
249                format!("Failed to read {len} bytes: {e}"),
250            )
251        })?;
252
253        Ok(Bytes::from(buffer))
254    }
255}
256
257/// File writer for local filesystem storage.
258///
259/// This struct implements `FileWrite` for writing to local files.
260#[derive(Debug)]
261pub struct LocalFsFileWrite {
262    file: Option<fs::File>,
263}
264
265impl LocalFsFileWrite {
266    /// Create a new `LocalFsFileWrite` for the given file.
267    pub fn new(file: fs::File) -> Self {
268        Self { file: Some(file) }
269    }
270}
271
272#[async_trait]
273impl FileWrite for LocalFsFileWrite {
274    async fn write(&mut self, bs: Bytes) -> Result<()> {
275        let file = self
276            .file
277            .as_mut()
278            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Cannot write to closed file"))?;
279
280        file.write_all(&bs).map_err(|e| {
281            Error::new(
282                ErrorKind::Unexpected,
283                format!("Failed to write to file: {e}"),
284            )
285        })?;
286
287        Ok(())
288    }
289
290    async fn close(&mut self) -> Result<()> {
291        let file = self
292            .file
293            .take()
294            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "File already closed"))?;
295
296        file.sync_all()
297            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to sync file: {e}")))?;
298
299        Ok(())
300    }
301}
302
303/// Factory for creating `LocalFsStorage` instances.
304///
305/// This factory implements `StorageFactory` and creates `LocalFsStorage`
306/// instances for the "file" scheme.
307///
308/// # Example
309///
310/// ```rust,ignore
311/// use iceberg::io::{StorageConfig, StorageFactory, LocalFsStorageFactory};
312///
313/// let factory = LocalFsStorageFactory;
314/// let config = StorageConfig::new();
315/// let storage = factory.build(&config)?;
316/// ```
317#[derive(Clone, Debug, Default, Serialize, Deserialize)]
318pub struct LocalFsStorageFactory;
319
320#[typetag::serde]
321impl StorageFactory for LocalFsStorageFactory {
322    fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
323        Ok(Arc::new(LocalFsStorage::new()))
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use tempfile::TempDir;
330
331    use super::*;
332
333    #[test]
334    fn test_normalize_path() {
335        // Test file:/// prefix
336        assert_eq!(
337            LocalFsStorage::normalize_path("file:///path/to/file"),
338            PathBuf::from("/path/to/file")
339        );
340
341        // Test file:// prefix (without leading slash in path)
342        assert_eq!(
343            LocalFsStorage::normalize_path("file://path/to/file"),
344            PathBuf::from("/path/to/file")
345        );
346
347        // Test file:/ prefix
348        assert_eq!(
349            LocalFsStorage::normalize_path("file:/path/to/file"),
350            PathBuf::from("/path/to/file")
351        );
352
353        // Test bare path
354        assert_eq!(
355            LocalFsStorage::normalize_path("/path/to/file"),
356            PathBuf::from("/path/to/file")
357        );
358    }
359
360    #[tokio::test]
361    async fn test_local_fs_storage_write_read() {
362        let tmp_dir = TempDir::new().unwrap();
363        let storage = LocalFsStorage::new();
364        let path = tmp_dir.path().join("test.txt");
365        let path_str = path.to_str().unwrap();
366        let content = Bytes::from("Hello, World!");
367
368        // Write
369        storage.write(path_str, content.clone()).await.unwrap();
370
371        // Read
372        let read_content = storage.read(path_str).await.unwrap();
373        assert_eq!(read_content, content);
374    }
375
376    #[tokio::test]
377    async fn test_local_fs_storage_exists() {
378        let tmp_dir = TempDir::new().unwrap();
379        let storage = LocalFsStorage::new();
380        let path = tmp_dir.path().join("test.txt");
381        let path_str = path.to_str().unwrap();
382
383        // File doesn't exist initially
384        assert!(!storage.exists(path_str).await.unwrap());
385
386        // Write file
387        storage.write(path_str, Bytes::from("test")).await.unwrap();
388
389        // File exists now
390        assert!(storage.exists(path_str).await.unwrap());
391    }
392
393    #[tokio::test]
394    async fn test_local_fs_storage_metadata() {
395        let tmp_dir = TempDir::new().unwrap();
396        let storage = LocalFsStorage::new();
397        let path = tmp_dir.path().join("test.txt");
398        let path_str = path.to_str().unwrap();
399        let content = Bytes::from("Hello, World!");
400
401        storage.write(path_str, content.clone()).await.unwrap();
402
403        let metadata = storage.metadata(path_str).await.unwrap();
404        assert_eq!(metadata.size, content.len() as u64);
405    }
406
407    #[tokio::test]
408    async fn test_local_fs_storage_delete() {
409        let tmp_dir = TempDir::new().unwrap();
410        let storage = LocalFsStorage::new();
411        let path = tmp_dir.path().join("test.txt");
412        let path_str = path.to_str().unwrap();
413
414        storage.write(path_str, Bytes::from("test")).await.unwrap();
415        assert!(storage.exists(path_str).await.unwrap());
416
417        storage.delete(path_str).await.unwrap();
418        assert!(!storage.exists(path_str).await.unwrap());
419    }
420
421    #[tokio::test]
422    async fn test_local_fs_storage_delete_prefix() {
423        let tmp_dir = TempDir::new().unwrap();
424        let storage = LocalFsStorage::new();
425        let dir_path = tmp_dir.path().join("subdir");
426        let file1 = dir_path.join("file1.txt");
427        let file2 = dir_path.join("file2.txt");
428
429        // Create files in subdirectory
430        storage
431            .write(file1.to_str().unwrap(), Bytes::from("1"))
432            .await
433            .unwrap();
434        storage
435            .write(file2.to_str().unwrap(), Bytes::from("2"))
436            .await
437            .unwrap();
438
439        // Delete prefix (directory)
440        storage
441            .delete_prefix(dir_path.to_str().unwrap())
442            .await
443            .unwrap();
444
445        // Directory should be deleted
446        assert!(!dir_path.exists());
447    }
448
449    #[tokio::test]
450    async fn test_local_fs_storage_reader() {
451        let tmp_dir = TempDir::new().unwrap();
452        let storage = LocalFsStorage::new();
453        let path = tmp_dir.path().join("test.txt");
454        let path_str = path.to_str().unwrap();
455        let content = Bytes::from("Hello, World!");
456
457        storage.write(path_str, content.clone()).await.unwrap();
458
459        let reader = storage.reader(path_str).await.unwrap();
460        let read_content = reader.read(0..content.len() as u64).await.unwrap();
461        assert_eq!(read_content, content);
462
463        // Test partial read
464        let partial = reader.read(0..5).await.unwrap();
465        assert_eq!(partial, Bytes::from("Hello"));
466    }
467
468    #[tokio::test]
469    async fn test_local_fs_storage_writer() {
470        let tmp_dir = TempDir::new().unwrap();
471        let storage = LocalFsStorage::new();
472        let path = tmp_dir.path().join("test.txt");
473        let path_str = path.to_str().unwrap();
474
475        let mut writer = storage.writer(path_str).await.unwrap();
476        writer.write(Bytes::from("Hello, ")).await.unwrap();
477        writer.write(Bytes::from("World!")).await.unwrap();
478        writer.close().await.unwrap();
479
480        let content = storage.read(path_str).await.unwrap();
481        assert_eq!(content, Bytes::from("Hello, World!"));
482    }
483
484    #[tokio::test]
485    async fn test_local_fs_file_write_double_close() {
486        let tmp_dir = TempDir::new().unwrap();
487        let storage = LocalFsStorage::new();
488        let path = tmp_dir.path().join("test.txt");
489        let path_str = path.to_str().unwrap();
490
491        let mut writer = storage.writer(path_str).await.unwrap();
492        writer.write(Bytes::from("test")).await.unwrap();
493        writer.close().await.unwrap();
494
495        // Second close should fail
496        let result = writer.close().await;
497        assert!(result.is_err());
498    }
499
500    #[tokio::test]
501    async fn test_local_fs_file_write_after_close() {
502        let tmp_dir = TempDir::new().unwrap();
503        let storage = LocalFsStorage::new();
504        let path = tmp_dir.path().join("test.txt");
505        let path_str = path.to_str().unwrap();
506
507        let mut writer = storage.writer(path_str).await.unwrap();
508        writer.close().await.unwrap();
509
510        // Write after close should fail
511        let result = writer.write(Bytes::from("test")).await;
512        assert!(result.is_err());
513    }
514
515    #[test]
516    fn test_local_fs_storage_factory() {
517        let factory = LocalFsStorageFactory;
518        let config = StorageConfig::new();
519        let storage = factory.build(&config).unwrap();
520
521        // Verify we got a valid storage instance
522        assert!(format!("{storage:?}").contains("LocalFsStorage"));
523    }
524
525    #[tokio::test]
526    async fn test_local_fs_creates_parent_directories() {
527        let tmp_dir = TempDir::new().unwrap();
528        let storage = LocalFsStorage::new();
529        let path = tmp_dir.path().join("a/b/c/test.txt");
530        let path_str = path.to_str().unwrap();
531
532        // Write should create parent directories
533        storage.write(path_str, Bytes::from("test")).await.unwrap();
534
535        assert!(path.exists());
536    }
537}