iceberg/io/storage/
local_fs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Local filesystem storage implementation for testing.
19//!
20//! This module provides a `LocalFsStorage` implementation that uses standard
21//! Rust filesystem operations. It is primarily intended for unit testing
22//! scenarios where tests need to read/write files on the local filesystem.
23
24use std::fs;
25use std::io::{Read, Seek, SeekFrom, Write};
26use std::ops::Range;
27use std::path::PathBuf;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use bytes::Bytes;
32use futures::StreamExt;
33use futures::stream::BoxStream;
34use serde::{Deserialize, Serialize};
35
36use crate::io::{
37    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38    StorageFactory,
39};
40use crate::{Error, ErrorKind, Result};
41
42/// Local filesystem storage implementation.
43///
44/// This storage implementation uses standard Rust filesystem operations,
45/// making it suitable for unit tests that need to read/write files on disk.
46///
47/// # Path Normalization
48///
49/// The storage normalizes paths to handle various formats:
50/// - `file:///path/to/file` -> `/path/to/file`
51/// - `file:/path/to/file` -> `/path/to/file`
52/// - `/path/to/file` -> `/path/to/file`
53/// ```
54#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct LocalFsStorage;
56
57impl LocalFsStorage {
58    /// Create a new `LocalFsStorage` instance.
59    pub fn new() -> Self {
60        Self
61    }
62
63    /// Normalize a path by removing scheme prefixes.
64    ///
65    /// This handles the following formats:
66    /// - `file:///path` -> `/path`
67    /// - `file://path` -> `/path` (treats as absolute)
68    /// - `file:/path` -> `/path`
69    /// - `/path` -> `/path`
70    pub(crate) fn normalize_path(path: &str) -> PathBuf {
71        let path = if let Some(stripped) = path.strip_prefix("file://") {
72            // file:///path -> /path or file://path -> /path
73            if stripped.starts_with('/') {
74                stripped.to_string()
75            } else {
76                format!("/{stripped}")
77            }
78        } else if let Some(stripped) = path.strip_prefix("file:") {
79            // file:/path -> /path
80            if stripped.starts_with('/') {
81                stripped.to_string()
82            } else {
83                format!("/{stripped}")
84            }
85        } else {
86            path.to_string()
87        };
88        PathBuf::from(path)
89    }
90}
91
92#[async_trait]
93#[typetag::serde]
94impl Storage for LocalFsStorage {
95    async fn exists(&self, path: &str) -> Result<bool> {
96        let path = Self::normalize_path(path);
97        Ok(path.exists())
98    }
99
100    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
101        let path = Self::normalize_path(path);
102        let metadata = fs::metadata(&path).map_err(|e| {
103            Error::new(
104                ErrorKind::DataInvalid,
105                format!("Failed to get metadata for {}: {}", path.display(), e),
106            )
107        })?;
108        Ok(FileMetadata {
109            size: metadata.len(),
110        })
111    }
112
113    async fn read(&self, path: &str) -> Result<Bytes> {
114        let path = Self::normalize_path(path);
115        let content = fs::read(&path).map_err(|e| {
116            Error::new(
117                ErrorKind::DataInvalid,
118                format!("Failed to read file {}: {}", path.display(), e),
119            )
120        })?;
121        Ok(Bytes::from(content))
122    }
123
124    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
125        let path = Self::normalize_path(path);
126        let file = fs::File::open(&path).map_err(|e| {
127            Error::new(
128                ErrorKind::DataInvalid,
129                format!("Failed to open file {}: {}", path.display(), e),
130            )
131        })?;
132        Ok(Box::new(LocalFsFileRead::new(file)))
133    }
134
135    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
136        let path = Self::normalize_path(path);
137
138        // Create parent directories if they don't exist
139        if let Some(parent) = path.parent() {
140            fs::create_dir_all(parent).map_err(|e| {
141                Error::new(
142                    ErrorKind::Unexpected,
143                    format!("Failed to create directory {}: {}", parent.display(), e),
144                )
145            })?;
146        }
147
148        fs::write(&path, &bs).map_err(|e| {
149            Error::new(
150                ErrorKind::Unexpected,
151                format!("Failed to write file {}: {}", path.display(), e),
152            )
153        })?;
154        Ok(())
155    }
156
157    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
158        let path = Self::normalize_path(path);
159
160        // Create parent directories if they don't exist
161        if let Some(parent) = path.parent() {
162            fs::create_dir_all(parent).map_err(|e| {
163                Error::new(
164                    ErrorKind::Unexpected,
165                    format!("Failed to create directory {}: {}", parent.display(), e),
166                )
167            })?;
168        }
169
170        let file = fs::File::create(&path).map_err(|e| {
171            Error::new(
172                ErrorKind::Unexpected,
173                format!("Failed to create file {}: {}", path.display(), e),
174            )
175        })?;
176        Ok(Box::new(LocalFsFileWrite::new(file)))
177    }
178
179    async fn delete(&self, path: &str) -> Result<()> {
180        let path = Self::normalize_path(path);
181        if path.exists() {
182            fs::remove_file(&path).map_err(|e| {
183                Error::new(
184                    ErrorKind::Unexpected,
185                    format!("Failed to delete file {}: {}", path.display(), e),
186                )
187            })?;
188        }
189        Ok(())
190    }
191
192    async fn delete_prefix(&self, path: &str) -> Result<()> {
193        let path = Self::normalize_path(path);
194        if path.is_dir() {
195            fs::remove_dir_all(&path).map_err(|e| {
196                Error::new(
197                    ErrorKind::Unexpected,
198                    format!("Failed to delete directory {}: {}", path.display(), e),
199                )
200            })?;
201        }
202        Ok(())
203    }
204
205    async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
206        while let Some(path) = paths.next().await {
207            self.delete(&path).await?;
208        }
209        Ok(())
210    }
211
212    fn new_input(&self, path: &str) -> Result<InputFile> {
213        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
214    }
215
216    fn new_output(&self, path: &str) -> Result<OutputFile> {
217        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
218    }
219}
220
221/// File reader for local filesystem storage.
222#[derive(Debug)]
223pub struct LocalFsFileRead {
224    file: std::sync::Mutex<fs::File>,
225}
226
227impl LocalFsFileRead {
228    /// Create a new `LocalFsFileRead` with the given file.
229    pub fn new(file: fs::File) -> Self {
230        Self {
231            file: std::sync::Mutex::new(file),
232        }
233    }
234}
235
236#[async_trait]
237impl FileRead for LocalFsFileRead {
238    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
239        let mut file = self.file.lock().map_err(|e| {
240            Error::new(
241                ErrorKind::Unexpected,
242                format!("Failed to acquire file lock: {e}"),
243            )
244        })?;
245
246        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
247            Error::new(
248                ErrorKind::DataInvalid,
249                format!("Failed to seek to position {}: {}", range.start, e),
250            )
251        })?;
252
253        let len = (range.end - range.start) as usize;
254        let mut buffer = vec![0u8; len];
255        file.read_exact(&mut buffer).map_err(|e| {
256            Error::new(
257                ErrorKind::DataInvalid,
258                format!("Failed to read {len} bytes: {e}"),
259            )
260        })?;
261
262        Ok(Bytes::from(buffer))
263    }
264}
265
266/// File writer for local filesystem storage.
267///
268/// This struct implements `FileWrite` for writing to local files.
269#[derive(Debug)]
270pub struct LocalFsFileWrite {
271    file: Option<fs::File>,
272}
273
274impl LocalFsFileWrite {
275    /// Create a new `LocalFsFileWrite` for the given file.
276    pub fn new(file: fs::File) -> Self {
277        Self { file: Some(file) }
278    }
279}
280
281#[async_trait]
282impl FileWrite for LocalFsFileWrite {
283    async fn write(&mut self, bs: Bytes) -> Result<()> {
284        let file = self
285            .file
286            .as_mut()
287            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Cannot write to closed file"))?;
288
289        file.write_all(&bs).map_err(|e| {
290            Error::new(
291                ErrorKind::Unexpected,
292                format!("Failed to write to file: {e}"),
293            )
294        })?;
295
296        Ok(())
297    }
298
299    async fn close(&mut self) -> Result<()> {
300        let file = self
301            .file
302            .take()
303            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "File already closed"))?;
304
305        file.sync_all()
306            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to sync file: {e}")))?;
307
308        Ok(())
309    }
310}
311
312/// Factory for creating `LocalFsStorage` instances.
313///
314/// This factory implements `StorageFactory` and creates `LocalFsStorage`
315/// instances for the "file" scheme.
316///
317/// # Example
318///
319/// ```rust,ignore
320/// use iceberg::io::{StorageConfig, StorageFactory, LocalFsStorageFactory};
321///
322/// let factory = LocalFsStorageFactory;
323/// let config = StorageConfig::new();
324/// let storage = factory.build(&config)?;
325/// ```
326#[derive(Clone, Debug, Default, Serialize, Deserialize)]
327pub struct LocalFsStorageFactory;
328
329#[typetag::serde]
330impl StorageFactory for LocalFsStorageFactory {
331    fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
332        Ok(Arc::new(LocalFsStorage::new()))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use tempfile::TempDir;
339
340    use super::*;
341
342    #[test]
343    fn test_normalize_path() {
344        // Test file:/// prefix
345        assert_eq!(
346            LocalFsStorage::normalize_path("file:///path/to/file"),
347            PathBuf::from("/path/to/file")
348        );
349
350        // Test file:// prefix (without leading slash in path)
351        assert_eq!(
352            LocalFsStorage::normalize_path("file://path/to/file"),
353            PathBuf::from("/path/to/file")
354        );
355
356        // Test file:/ prefix
357        assert_eq!(
358            LocalFsStorage::normalize_path("file:/path/to/file"),
359            PathBuf::from("/path/to/file")
360        );
361
362        // Test bare path
363        assert_eq!(
364            LocalFsStorage::normalize_path("/path/to/file"),
365            PathBuf::from("/path/to/file")
366        );
367    }
368
369    #[tokio::test]
370    async fn test_local_fs_storage_write_read() {
371        let tmp_dir = TempDir::new().unwrap();
372        let storage = LocalFsStorage::new();
373        let path = tmp_dir.path().join("test.txt");
374        let path_str = path.to_str().unwrap();
375        let content = Bytes::from("Hello, World!");
376
377        // Write
378        storage.write(path_str, content.clone()).await.unwrap();
379
380        // Read
381        let read_content = storage.read(path_str).await.unwrap();
382        assert_eq!(read_content, content);
383    }
384
385    #[tokio::test]
386    async fn test_local_fs_storage_exists() {
387        let tmp_dir = TempDir::new().unwrap();
388        let storage = LocalFsStorage::new();
389        let path = tmp_dir.path().join("test.txt");
390        let path_str = path.to_str().unwrap();
391
392        // File doesn't exist initially
393        assert!(!storage.exists(path_str).await.unwrap());
394
395        // Write file
396        storage.write(path_str, Bytes::from("test")).await.unwrap();
397
398        // File exists now
399        assert!(storage.exists(path_str).await.unwrap());
400    }
401
402    #[tokio::test]
403    async fn test_local_fs_storage_metadata() {
404        let tmp_dir = TempDir::new().unwrap();
405        let storage = LocalFsStorage::new();
406        let path = tmp_dir.path().join("test.txt");
407        let path_str = path.to_str().unwrap();
408        let content = Bytes::from("Hello, World!");
409
410        storage.write(path_str, content.clone()).await.unwrap();
411
412        let metadata = storage.metadata(path_str).await.unwrap();
413        assert_eq!(metadata.size, content.len() as u64);
414    }
415
416    #[tokio::test]
417    async fn test_local_fs_storage_delete() {
418        let tmp_dir = TempDir::new().unwrap();
419        let storage = LocalFsStorage::new();
420        let path = tmp_dir.path().join("test.txt");
421        let path_str = path.to_str().unwrap();
422
423        storage.write(path_str, Bytes::from("test")).await.unwrap();
424        assert!(storage.exists(path_str).await.unwrap());
425
426        storage.delete(path_str).await.unwrap();
427        assert!(!storage.exists(path_str).await.unwrap());
428    }
429
430    #[tokio::test]
431    async fn test_local_fs_storage_delete_prefix() {
432        let tmp_dir = TempDir::new().unwrap();
433        let storage = LocalFsStorage::new();
434        let dir_path = tmp_dir.path().join("subdir");
435        let file1 = dir_path.join("file1.txt");
436        let file2 = dir_path.join("file2.txt");
437
438        // Create files in subdirectory
439        storage
440            .write(file1.to_str().unwrap(), Bytes::from("1"))
441            .await
442            .unwrap();
443        storage
444            .write(file2.to_str().unwrap(), Bytes::from("2"))
445            .await
446            .unwrap();
447
448        // Delete prefix (directory)
449        storage
450            .delete_prefix(dir_path.to_str().unwrap())
451            .await
452            .unwrap();
453
454        // Directory should be deleted
455        assert!(!dir_path.exists());
456    }
457
458    #[tokio::test]
459    async fn test_local_fs_storage_reader() {
460        let tmp_dir = TempDir::new().unwrap();
461        let storage = LocalFsStorage::new();
462        let path = tmp_dir.path().join("test.txt");
463        let path_str = path.to_str().unwrap();
464        let content = Bytes::from("Hello, World!");
465
466        storage.write(path_str, content.clone()).await.unwrap();
467
468        let reader = storage.reader(path_str).await.unwrap();
469        let read_content = reader.read(0..content.len() as u64).await.unwrap();
470        assert_eq!(read_content, content);
471
472        // Test partial read
473        let partial = reader.read(0..5).await.unwrap();
474        assert_eq!(partial, Bytes::from("Hello"));
475    }
476
477    #[tokio::test]
478    async fn test_local_fs_storage_writer() {
479        let tmp_dir = TempDir::new().unwrap();
480        let storage = LocalFsStorage::new();
481        let path = tmp_dir.path().join("test.txt");
482        let path_str = path.to_str().unwrap();
483
484        let mut writer = storage.writer(path_str).await.unwrap();
485        writer.write(Bytes::from("Hello, ")).await.unwrap();
486        writer.write(Bytes::from("World!")).await.unwrap();
487        writer.close().await.unwrap();
488
489        let content = storage.read(path_str).await.unwrap();
490        assert_eq!(content, Bytes::from("Hello, World!"));
491    }
492
493    #[tokio::test]
494    async fn test_local_fs_file_write_double_close() {
495        let tmp_dir = TempDir::new().unwrap();
496        let storage = LocalFsStorage::new();
497        let path = tmp_dir.path().join("test.txt");
498        let path_str = path.to_str().unwrap();
499
500        let mut writer = storage.writer(path_str).await.unwrap();
501        writer.write(Bytes::from("test")).await.unwrap();
502        writer.close().await.unwrap();
503
504        // Second close should fail
505        let result = writer.close().await;
506        assert!(result.is_err());
507    }
508
509    #[tokio::test]
510    async fn test_local_fs_file_write_after_close() {
511        let tmp_dir = TempDir::new().unwrap();
512        let storage = LocalFsStorage::new();
513        let path = tmp_dir.path().join("test.txt");
514        let path_str = path.to_str().unwrap();
515
516        let mut writer = storage.writer(path_str).await.unwrap();
517        writer.close().await.unwrap();
518
519        // Write after close should fail
520        let result = writer.write(Bytes::from("test")).await;
521        assert!(result.is_err());
522    }
523
524    #[test]
525    fn test_local_fs_storage_factory() {
526        let factory = LocalFsStorageFactory;
527        let config = StorageConfig::new();
528        let storage = factory.build(&config).unwrap();
529
530        // Verify we got a valid storage instance
531        assert!(format!("{storage:?}").contains("LocalFsStorage"));
532    }
533
534    #[tokio::test]
535    async fn test_local_fs_creates_parent_directories() {
536        let tmp_dir = TempDir::new().unwrap();
537        let storage = LocalFsStorage::new();
538        let path = tmp_dir.path().join("a/b/c/test.txt");
539        let path_str = path.to_str().unwrap();
540
541        // Write should create parent directories
542        storage.write(path_str, Bytes::from("test")).await.unwrap();
543
544        assert!(path.exists());
545    }
546
547    #[tokio::test]
548    async fn test_local_fs_storage_delete_stream() {
549        use futures::stream;
550
551        let tmp_dir = TempDir::new().unwrap();
552        let storage = LocalFsStorage::new();
553
554        // Create multiple files
555        let file1 = tmp_dir.path().join("file1.txt");
556        let file2 = tmp_dir.path().join("file2.txt");
557        let file3 = tmp_dir.path().join("file3.txt");
558
559        storage
560            .write(file1.to_str().unwrap(), Bytes::from("1"))
561            .await
562            .unwrap();
563        storage
564            .write(file2.to_str().unwrap(), Bytes::from("2"))
565            .await
566            .unwrap();
567        storage
568            .write(file3.to_str().unwrap(), Bytes::from("3"))
569            .await
570            .unwrap();
571
572        // Verify files exist
573        assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
574        assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
575        assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
576
577        // Delete multiple files using stream
578        let paths = vec![
579            file1.to_str().unwrap().to_string(),
580            file2.to_str().unwrap().to_string(),
581        ];
582        let path_stream = stream::iter(paths).boxed();
583        storage.delete_stream(path_stream).await.unwrap();
584
585        // Verify deleted files no longer exist
586        assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
587        assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());
588
589        // Verify file3 still exists
590        assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
591    }
592
593    #[tokio::test]
594    async fn test_local_fs_storage_delete_stream_empty() {
595        use futures::stream;
596
597        let storage = LocalFsStorage::new();
598
599        // Delete with empty stream should succeed
600        let path_stream = stream::iter(Vec::<String>::new()).boxed();
601        storage.delete_stream(path_stream).await.unwrap();
602    }
603}