1use std::fs;
25use std::io::{Read, Seek, SeekFrom, Write};
26use std::ops::Range;
27use std::path::PathBuf;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use bytes::Bytes;
32use serde::{Deserialize, Serialize};
33
34use crate::io::{
35 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
36 StorageFactory,
37};
38use crate::{Error, ErrorKind, Result};
39
40#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct LocalFsStorage;
54
55impl LocalFsStorage {
56 pub fn new() -> Self {
58 Self
59 }
60
61 pub(crate) fn normalize_path(path: &str) -> PathBuf {
69 let path = if let Some(stripped) = path.strip_prefix("file://") {
70 if stripped.starts_with('/') {
72 stripped.to_string()
73 } else {
74 format!("/{stripped}")
75 }
76 } else if let Some(stripped) = path.strip_prefix("file:") {
77 if stripped.starts_with('/') {
79 stripped.to_string()
80 } else {
81 format!("/{stripped}")
82 }
83 } else {
84 path.to_string()
85 };
86 PathBuf::from(path)
87 }
88}
89
90#[async_trait]
91#[typetag::serde]
92impl Storage for LocalFsStorage {
93 async fn exists(&self, path: &str) -> Result<bool> {
94 let path = Self::normalize_path(path);
95 Ok(path.exists())
96 }
97
98 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
99 let path = Self::normalize_path(path);
100 let metadata = fs::metadata(&path).map_err(|e| {
101 Error::new(
102 ErrorKind::DataInvalid,
103 format!("Failed to get metadata for {}: {}", path.display(), e),
104 )
105 })?;
106 Ok(FileMetadata {
107 size: metadata.len(),
108 })
109 }
110
111 async fn read(&self, path: &str) -> Result<Bytes> {
112 let path = Self::normalize_path(path);
113 let content = fs::read(&path).map_err(|e| {
114 Error::new(
115 ErrorKind::DataInvalid,
116 format!("Failed to read file {}: {}", path.display(), e),
117 )
118 })?;
119 Ok(Bytes::from(content))
120 }
121
122 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
123 let path = Self::normalize_path(path);
124 let file = fs::File::open(&path).map_err(|e| {
125 Error::new(
126 ErrorKind::DataInvalid,
127 format!("Failed to open file {}: {}", path.display(), e),
128 )
129 })?;
130 Ok(Box::new(LocalFsFileRead::new(file)))
131 }
132
133 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
134 let path = Self::normalize_path(path);
135
136 if let Some(parent) = path.parent() {
138 fs::create_dir_all(parent).map_err(|e| {
139 Error::new(
140 ErrorKind::Unexpected,
141 format!("Failed to create directory {}: {}", parent.display(), e),
142 )
143 })?;
144 }
145
146 fs::write(&path, &bs).map_err(|e| {
147 Error::new(
148 ErrorKind::Unexpected,
149 format!("Failed to write file {}: {}", path.display(), e),
150 )
151 })?;
152 Ok(())
153 }
154
155 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
156 let path = Self::normalize_path(path);
157
158 if let Some(parent) = path.parent() {
160 fs::create_dir_all(parent).map_err(|e| {
161 Error::new(
162 ErrorKind::Unexpected,
163 format!("Failed to create directory {}: {}", parent.display(), e),
164 )
165 })?;
166 }
167
168 let file = fs::File::create(&path).map_err(|e| {
169 Error::new(
170 ErrorKind::Unexpected,
171 format!("Failed to create file {}: {}", path.display(), e),
172 )
173 })?;
174 Ok(Box::new(LocalFsFileWrite::new(file)))
175 }
176
177 async fn delete(&self, path: &str) -> Result<()> {
178 let path = Self::normalize_path(path);
179 if path.exists() {
180 fs::remove_file(&path).map_err(|e| {
181 Error::new(
182 ErrorKind::Unexpected,
183 format!("Failed to delete file {}: {}", path.display(), e),
184 )
185 })?;
186 }
187 Ok(())
188 }
189
190 async fn delete_prefix(&self, path: &str) -> Result<()> {
191 let path = Self::normalize_path(path);
192 if path.is_dir() {
193 fs::remove_dir_all(&path).map_err(|e| {
194 Error::new(
195 ErrorKind::Unexpected,
196 format!("Failed to delete directory {}: {}", path.display(), e),
197 )
198 })?;
199 }
200 Ok(())
201 }
202
203 fn new_input(&self, path: &str) -> Result<InputFile> {
204 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
205 }
206
207 fn new_output(&self, path: &str) -> Result<OutputFile> {
208 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
209 }
210}
211
212#[derive(Debug)]
214pub struct LocalFsFileRead {
215 file: std::sync::Mutex<fs::File>,
216}
217
218impl LocalFsFileRead {
219 pub fn new(file: fs::File) -> Self {
221 Self {
222 file: std::sync::Mutex::new(file),
223 }
224 }
225}
226
227#[async_trait]
228impl FileRead for LocalFsFileRead {
229 async fn read(&self, range: Range<u64>) -> Result<Bytes> {
230 let mut file = self.file.lock().map_err(|e| {
231 Error::new(
232 ErrorKind::Unexpected,
233 format!("Failed to acquire file lock: {e}"),
234 )
235 })?;
236
237 file.seek(SeekFrom::Start(range.start)).map_err(|e| {
238 Error::new(
239 ErrorKind::DataInvalid,
240 format!("Failed to seek to position {}: {}", range.start, e),
241 )
242 })?;
243
244 let len = (range.end - range.start) as usize;
245 let mut buffer = vec![0u8; len];
246 file.read_exact(&mut buffer).map_err(|e| {
247 Error::new(
248 ErrorKind::DataInvalid,
249 format!("Failed to read {len} bytes: {e}"),
250 )
251 })?;
252
253 Ok(Bytes::from(buffer))
254 }
255}
256
257#[derive(Debug)]
261pub struct LocalFsFileWrite {
262 file: Option<fs::File>,
263}
264
265impl LocalFsFileWrite {
266 pub fn new(file: fs::File) -> Self {
268 Self { file: Some(file) }
269 }
270}
271
272#[async_trait]
273impl FileWrite for LocalFsFileWrite {
274 async fn write(&mut self, bs: Bytes) -> Result<()> {
275 let file = self
276 .file
277 .as_mut()
278 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Cannot write to closed file"))?;
279
280 file.write_all(&bs).map_err(|e| {
281 Error::new(
282 ErrorKind::Unexpected,
283 format!("Failed to write to file: {e}"),
284 )
285 })?;
286
287 Ok(())
288 }
289
290 async fn close(&mut self) -> Result<()> {
291 let file = self
292 .file
293 .take()
294 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "File already closed"))?;
295
296 file.sync_all()
297 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to sync file: {e}")))?;
298
299 Ok(())
300 }
301}
302
303#[derive(Clone, Debug, Default, Serialize, Deserialize)]
318pub struct LocalFsStorageFactory;
319
320#[typetag::serde]
321impl StorageFactory for LocalFsStorageFactory {
322 fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
323 Ok(Arc::new(LocalFsStorage::new()))
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use tempfile::TempDir;
330
331 use super::*;
332
333 #[test]
334 fn test_normalize_path() {
335 assert_eq!(
337 LocalFsStorage::normalize_path("file:///path/to/file"),
338 PathBuf::from("/path/to/file")
339 );
340
341 assert_eq!(
343 LocalFsStorage::normalize_path("file://path/to/file"),
344 PathBuf::from("/path/to/file")
345 );
346
347 assert_eq!(
349 LocalFsStorage::normalize_path("file:/path/to/file"),
350 PathBuf::from("/path/to/file")
351 );
352
353 assert_eq!(
355 LocalFsStorage::normalize_path("/path/to/file"),
356 PathBuf::from("/path/to/file")
357 );
358 }
359
360 #[tokio::test]
361 async fn test_local_fs_storage_write_read() {
362 let tmp_dir = TempDir::new().unwrap();
363 let storage = LocalFsStorage::new();
364 let path = tmp_dir.path().join("test.txt");
365 let path_str = path.to_str().unwrap();
366 let content = Bytes::from("Hello, World!");
367
368 storage.write(path_str, content.clone()).await.unwrap();
370
371 let read_content = storage.read(path_str).await.unwrap();
373 assert_eq!(read_content, content);
374 }
375
376 #[tokio::test]
377 async fn test_local_fs_storage_exists() {
378 let tmp_dir = TempDir::new().unwrap();
379 let storage = LocalFsStorage::new();
380 let path = tmp_dir.path().join("test.txt");
381 let path_str = path.to_str().unwrap();
382
383 assert!(!storage.exists(path_str).await.unwrap());
385
386 storage.write(path_str, Bytes::from("test")).await.unwrap();
388
389 assert!(storage.exists(path_str).await.unwrap());
391 }
392
393 #[tokio::test]
394 async fn test_local_fs_storage_metadata() {
395 let tmp_dir = TempDir::new().unwrap();
396 let storage = LocalFsStorage::new();
397 let path = tmp_dir.path().join("test.txt");
398 let path_str = path.to_str().unwrap();
399 let content = Bytes::from("Hello, World!");
400
401 storage.write(path_str, content.clone()).await.unwrap();
402
403 let metadata = storage.metadata(path_str).await.unwrap();
404 assert_eq!(metadata.size, content.len() as u64);
405 }
406
407 #[tokio::test]
408 async fn test_local_fs_storage_delete() {
409 let tmp_dir = TempDir::new().unwrap();
410 let storage = LocalFsStorage::new();
411 let path = tmp_dir.path().join("test.txt");
412 let path_str = path.to_str().unwrap();
413
414 storage.write(path_str, Bytes::from("test")).await.unwrap();
415 assert!(storage.exists(path_str).await.unwrap());
416
417 storage.delete(path_str).await.unwrap();
418 assert!(!storage.exists(path_str).await.unwrap());
419 }
420
421 #[tokio::test]
422 async fn test_local_fs_storage_delete_prefix() {
423 let tmp_dir = TempDir::new().unwrap();
424 let storage = LocalFsStorage::new();
425 let dir_path = tmp_dir.path().join("subdir");
426 let file1 = dir_path.join("file1.txt");
427 let file2 = dir_path.join("file2.txt");
428
429 storage
431 .write(file1.to_str().unwrap(), Bytes::from("1"))
432 .await
433 .unwrap();
434 storage
435 .write(file2.to_str().unwrap(), Bytes::from("2"))
436 .await
437 .unwrap();
438
439 storage
441 .delete_prefix(dir_path.to_str().unwrap())
442 .await
443 .unwrap();
444
445 assert!(!dir_path.exists());
447 }
448
449 #[tokio::test]
450 async fn test_local_fs_storage_reader() {
451 let tmp_dir = TempDir::new().unwrap();
452 let storage = LocalFsStorage::new();
453 let path = tmp_dir.path().join("test.txt");
454 let path_str = path.to_str().unwrap();
455 let content = Bytes::from("Hello, World!");
456
457 storage.write(path_str, content.clone()).await.unwrap();
458
459 let reader = storage.reader(path_str).await.unwrap();
460 let read_content = reader.read(0..content.len() as u64).await.unwrap();
461 assert_eq!(read_content, content);
462
463 let partial = reader.read(0..5).await.unwrap();
465 assert_eq!(partial, Bytes::from("Hello"));
466 }
467
468 #[tokio::test]
469 async fn test_local_fs_storage_writer() {
470 let tmp_dir = TempDir::new().unwrap();
471 let storage = LocalFsStorage::new();
472 let path = tmp_dir.path().join("test.txt");
473 let path_str = path.to_str().unwrap();
474
475 let mut writer = storage.writer(path_str).await.unwrap();
476 writer.write(Bytes::from("Hello, ")).await.unwrap();
477 writer.write(Bytes::from("World!")).await.unwrap();
478 writer.close().await.unwrap();
479
480 let content = storage.read(path_str).await.unwrap();
481 assert_eq!(content, Bytes::from("Hello, World!"));
482 }
483
484 #[tokio::test]
485 async fn test_local_fs_file_write_double_close() {
486 let tmp_dir = TempDir::new().unwrap();
487 let storage = LocalFsStorage::new();
488 let path = tmp_dir.path().join("test.txt");
489 let path_str = path.to_str().unwrap();
490
491 let mut writer = storage.writer(path_str).await.unwrap();
492 writer.write(Bytes::from("test")).await.unwrap();
493 writer.close().await.unwrap();
494
495 let result = writer.close().await;
497 assert!(result.is_err());
498 }
499
500 #[tokio::test]
501 async fn test_local_fs_file_write_after_close() {
502 let tmp_dir = TempDir::new().unwrap();
503 let storage = LocalFsStorage::new();
504 let path = tmp_dir.path().join("test.txt");
505 let path_str = path.to_str().unwrap();
506
507 let mut writer = storage.writer(path_str).await.unwrap();
508 writer.close().await.unwrap();
509
510 let result = writer.write(Bytes::from("test")).await;
512 assert!(result.is_err());
513 }
514
515 #[test]
516 fn test_local_fs_storage_factory() {
517 let factory = LocalFsStorageFactory;
518 let config = StorageConfig::new();
519 let storage = factory.build(&config).unwrap();
520
521 assert!(format!("{storage:?}").contains("LocalFsStorage"));
523 }
524
525 #[tokio::test]
526 async fn test_local_fs_creates_parent_directories() {
527 let tmp_dir = TempDir::new().unwrap();
528 let storage = LocalFsStorage::new();
529 let path = tmp_dir.path().join("a/b/c/test.txt");
530 let path_str = path.to_str().unwrap();
531
532 storage.write(path_str, Bytes::from("test")).await.unwrap();
534
535 assert!(path.exists());
536 }
537}