1use std::fs;
25use std::io::{Read, Seek, SeekFrom, Write};
26use std::ops::Range;
27use std::path::PathBuf;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use bytes::Bytes;
32use futures::StreamExt;
33use futures::stream::BoxStream;
34use serde::{Deserialize, Serialize};
35
36use crate::io::{
37 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
38 StorageFactory,
39};
40use crate::{Error, ErrorKind, Result};
41
42#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct LocalFsStorage;
56
57impl LocalFsStorage {
58 pub fn new() -> Self {
60 Self
61 }
62
63 pub(crate) fn normalize_path(path: &str) -> PathBuf {
71 let path = if let Some(stripped) = path.strip_prefix("file://") {
72 if stripped.starts_with('/') {
74 stripped.to_string()
75 } else {
76 format!("/{stripped}")
77 }
78 } else if let Some(stripped) = path.strip_prefix("file:") {
79 if stripped.starts_with('/') {
81 stripped.to_string()
82 } else {
83 format!("/{stripped}")
84 }
85 } else {
86 path.to_string()
87 };
88 PathBuf::from(path)
89 }
90}
91
92#[async_trait]
93#[typetag::serde]
94impl Storage for LocalFsStorage {
95 async fn exists(&self, path: &str) -> Result<bool> {
96 let path = Self::normalize_path(path);
97 Ok(path.exists())
98 }
99
100 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
101 let path = Self::normalize_path(path);
102 let metadata = fs::metadata(&path).map_err(|e| {
103 Error::new(
104 ErrorKind::DataInvalid,
105 format!("Failed to get metadata for {}: {}", path.display(), e),
106 )
107 })?;
108 Ok(FileMetadata {
109 size: metadata.len(),
110 })
111 }
112
113 async fn read(&self, path: &str) -> Result<Bytes> {
114 let path = Self::normalize_path(path);
115 let content = fs::read(&path).map_err(|e| {
116 Error::new(
117 ErrorKind::DataInvalid,
118 format!("Failed to read file {}: {}", path.display(), e),
119 )
120 })?;
121 Ok(Bytes::from(content))
122 }
123
124 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
125 let path = Self::normalize_path(path);
126 let file = fs::File::open(&path).map_err(|e| {
127 Error::new(
128 ErrorKind::DataInvalid,
129 format!("Failed to open file {}: {}", path.display(), e),
130 )
131 })?;
132 Ok(Box::new(LocalFsFileRead::new(file)))
133 }
134
135 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
136 let path = Self::normalize_path(path);
137
138 if let Some(parent) = path.parent() {
140 fs::create_dir_all(parent).map_err(|e| {
141 Error::new(
142 ErrorKind::Unexpected,
143 format!("Failed to create directory {}: {}", parent.display(), e),
144 )
145 })?;
146 }
147
148 fs::write(&path, &bs).map_err(|e| {
149 Error::new(
150 ErrorKind::Unexpected,
151 format!("Failed to write file {}: {}", path.display(), e),
152 )
153 })?;
154 Ok(())
155 }
156
157 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
158 let path = Self::normalize_path(path);
159
160 if let Some(parent) = path.parent() {
162 fs::create_dir_all(parent).map_err(|e| {
163 Error::new(
164 ErrorKind::Unexpected,
165 format!("Failed to create directory {}: {}", parent.display(), e),
166 )
167 })?;
168 }
169
170 let file = fs::File::create(&path).map_err(|e| {
171 Error::new(
172 ErrorKind::Unexpected,
173 format!("Failed to create file {}: {}", path.display(), e),
174 )
175 })?;
176 Ok(Box::new(LocalFsFileWrite::new(file)))
177 }
178
179 async fn delete(&self, path: &str) -> Result<()> {
180 let path = Self::normalize_path(path);
181 if path.exists() {
182 fs::remove_file(&path).map_err(|e| {
183 Error::new(
184 ErrorKind::Unexpected,
185 format!("Failed to delete file {}: {}", path.display(), e),
186 )
187 })?;
188 }
189 Ok(())
190 }
191
192 async fn delete_prefix(&self, path: &str) -> Result<()> {
193 let path = Self::normalize_path(path);
194 if path.is_dir() {
195 fs::remove_dir_all(&path).map_err(|e| {
196 Error::new(
197 ErrorKind::Unexpected,
198 format!("Failed to delete directory {}: {}", path.display(), e),
199 )
200 })?;
201 }
202 Ok(())
203 }
204
205 async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
206 while let Some(path) = paths.next().await {
207 self.delete(&path).await?;
208 }
209 Ok(())
210 }
211
212 fn new_input(&self, path: &str) -> Result<InputFile> {
213 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
214 }
215
216 fn new_output(&self, path: &str) -> Result<OutputFile> {
217 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
218 }
219}
220
221#[derive(Debug)]
223pub struct LocalFsFileRead {
224 file: std::sync::Mutex<fs::File>,
225}
226
227impl LocalFsFileRead {
228 pub fn new(file: fs::File) -> Self {
230 Self {
231 file: std::sync::Mutex::new(file),
232 }
233 }
234}
235
236#[async_trait]
237impl FileRead for LocalFsFileRead {
238 async fn read(&self, range: Range<u64>) -> Result<Bytes> {
239 let mut file = self.file.lock().map_err(|e| {
240 Error::new(
241 ErrorKind::Unexpected,
242 format!("Failed to acquire file lock: {e}"),
243 )
244 })?;
245
246 file.seek(SeekFrom::Start(range.start)).map_err(|e| {
247 Error::new(
248 ErrorKind::DataInvalid,
249 format!("Failed to seek to position {}: {}", range.start, e),
250 )
251 })?;
252
253 let len = (range.end - range.start) as usize;
254 let mut buffer = vec![0u8; len];
255 file.read_exact(&mut buffer).map_err(|e| {
256 Error::new(
257 ErrorKind::DataInvalid,
258 format!("Failed to read {len} bytes: {e}"),
259 )
260 })?;
261
262 Ok(Bytes::from(buffer))
263 }
264}
265
266#[derive(Debug)]
270pub struct LocalFsFileWrite {
271 file: Option<fs::File>,
272}
273
274impl LocalFsFileWrite {
275 pub fn new(file: fs::File) -> Self {
277 Self { file: Some(file) }
278 }
279}
280
281#[async_trait]
282impl FileWrite for LocalFsFileWrite {
283 async fn write(&mut self, bs: Bytes) -> Result<()> {
284 let file = self
285 .file
286 .as_mut()
287 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Cannot write to closed file"))?;
288
289 file.write_all(&bs).map_err(|e| {
290 Error::new(
291 ErrorKind::Unexpected,
292 format!("Failed to write to file: {e}"),
293 )
294 })?;
295
296 Ok(())
297 }
298
299 async fn close(&mut self) -> Result<()> {
300 let file = self
301 .file
302 .take()
303 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "File already closed"))?;
304
305 file.sync_all()
306 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to sync file: {e}")))?;
307
308 Ok(())
309 }
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize)]
327pub struct LocalFsStorageFactory;
328
329#[typetag::serde]
330impl StorageFactory for LocalFsStorageFactory {
331 fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
332 Ok(Arc::new(LocalFsStorage::new()))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use tempfile::TempDir;
339
340 use super::*;
341
342 #[test]
343 fn test_normalize_path() {
344 assert_eq!(
346 LocalFsStorage::normalize_path("file:///path/to/file"),
347 PathBuf::from("/path/to/file")
348 );
349
350 assert_eq!(
352 LocalFsStorage::normalize_path("file://path/to/file"),
353 PathBuf::from("/path/to/file")
354 );
355
356 assert_eq!(
358 LocalFsStorage::normalize_path("file:/path/to/file"),
359 PathBuf::from("/path/to/file")
360 );
361
362 assert_eq!(
364 LocalFsStorage::normalize_path("/path/to/file"),
365 PathBuf::from("/path/to/file")
366 );
367 }
368
369 #[tokio::test]
370 async fn test_local_fs_storage_write_read() {
371 let tmp_dir = TempDir::new().unwrap();
372 let storage = LocalFsStorage::new();
373 let path = tmp_dir.path().join("test.txt");
374 let path_str = path.to_str().unwrap();
375 let content = Bytes::from("Hello, World!");
376
377 storage.write(path_str, content.clone()).await.unwrap();
379
380 let read_content = storage.read(path_str).await.unwrap();
382 assert_eq!(read_content, content);
383 }
384
385 #[tokio::test]
386 async fn test_local_fs_storage_exists() {
387 let tmp_dir = TempDir::new().unwrap();
388 let storage = LocalFsStorage::new();
389 let path = tmp_dir.path().join("test.txt");
390 let path_str = path.to_str().unwrap();
391
392 assert!(!storage.exists(path_str).await.unwrap());
394
395 storage.write(path_str, Bytes::from("test")).await.unwrap();
397
398 assert!(storage.exists(path_str).await.unwrap());
400 }
401
402 #[tokio::test]
403 async fn test_local_fs_storage_metadata() {
404 let tmp_dir = TempDir::new().unwrap();
405 let storage = LocalFsStorage::new();
406 let path = tmp_dir.path().join("test.txt");
407 let path_str = path.to_str().unwrap();
408 let content = Bytes::from("Hello, World!");
409
410 storage.write(path_str, content.clone()).await.unwrap();
411
412 let metadata = storage.metadata(path_str).await.unwrap();
413 assert_eq!(metadata.size, content.len() as u64);
414 }
415
416 #[tokio::test]
417 async fn test_local_fs_storage_delete() {
418 let tmp_dir = TempDir::new().unwrap();
419 let storage = LocalFsStorage::new();
420 let path = tmp_dir.path().join("test.txt");
421 let path_str = path.to_str().unwrap();
422
423 storage.write(path_str, Bytes::from("test")).await.unwrap();
424 assert!(storage.exists(path_str).await.unwrap());
425
426 storage.delete(path_str).await.unwrap();
427 assert!(!storage.exists(path_str).await.unwrap());
428 }
429
430 #[tokio::test]
431 async fn test_local_fs_storage_delete_prefix() {
432 let tmp_dir = TempDir::new().unwrap();
433 let storage = LocalFsStorage::new();
434 let dir_path = tmp_dir.path().join("subdir");
435 let file1 = dir_path.join("file1.txt");
436 let file2 = dir_path.join("file2.txt");
437
438 storage
440 .write(file1.to_str().unwrap(), Bytes::from("1"))
441 .await
442 .unwrap();
443 storage
444 .write(file2.to_str().unwrap(), Bytes::from("2"))
445 .await
446 .unwrap();
447
448 storage
450 .delete_prefix(dir_path.to_str().unwrap())
451 .await
452 .unwrap();
453
454 assert!(!dir_path.exists());
456 }
457
458 #[tokio::test]
459 async fn test_local_fs_storage_reader() {
460 let tmp_dir = TempDir::new().unwrap();
461 let storage = LocalFsStorage::new();
462 let path = tmp_dir.path().join("test.txt");
463 let path_str = path.to_str().unwrap();
464 let content = Bytes::from("Hello, World!");
465
466 storage.write(path_str, content.clone()).await.unwrap();
467
468 let reader = storage.reader(path_str).await.unwrap();
469 let read_content = reader.read(0..content.len() as u64).await.unwrap();
470 assert_eq!(read_content, content);
471
472 let partial = reader.read(0..5).await.unwrap();
474 assert_eq!(partial, Bytes::from("Hello"));
475 }
476
477 #[tokio::test]
478 async fn test_local_fs_storage_writer() {
479 let tmp_dir = TempDir::new().unwrap();
480 let storage = LocalFsStorage::new();
481 let path = tmp_dir.path().join("test.txt");
482 let path_str = path.to_str().unwrap();
483
484 let mut writer = storage.writer(path_str).await.unwrap();
485 writer.write(Bytes::from("Hello, ")).await.unwrap();
486 writer.write(Bytes::from("World!")).await.unwrap();
487 writer.close().await.unwrap();
488
489 let content = storage.read(path_str).await.unwrap();
490 assert_eq!(content, Bytes::from("Hello, World!"));
491 }
492
493 #[tokio::test]
494 async fn test_local_fs_file_write_double_close() {
495 let tmp_dir = TempDir::new().unwrap();
496 let storage = LocalFsStorage::new();
497 let path = tmp_dir.path().join("test.txt");
498 let path_str = path.to_str().unwrap();
499
500 let mut writer = storage.writer(path_str).await.unwrap();
501 writer.write(Bytes::from("test")).await.unwrap();
502 writer.close().await.unwrap();
503
504 let result = writer.close().await;
506 assert!(result.is_err());
507 }
508
509 #[tokio::test]
510 async fn test_local_fs_file_write_after_close() {
511 let tmp_dir = TempDir::new().unwrap();
512 let storage = LocalFsStorage::new();
513 let path = tmp_dir.path().join("test.txt");
514 let path_str = path.to_str().unwrap();
515
516 let mut writer = storage.writer(path_str).await.unwrap();
517 writer.close().await.unwrap();
518
519 let result = writer.write(Bytes::from("test")).await;
521 assert!(result.is_err());
522 }
523
524 #[test]
525 fn test_local_fs_storage_factory() {
526 let factory = LocalFsStorageFactory;
527 let config = StorageConfig::new();
528 let storage = factory.build(&config).unwrap();
529
530 assert!(format!("{storage:?}").contains("LocalFsStorage"));
532 }
533
534 #[tokio::test]
535 async fn test_local_fs_creates_parent_directories() {
536 let tmp_dir = TempDir::new().unwrap();
537 let storage = LocalFsStorage::new();
538 let path = tmp_dir.path().join("a/b/c/test.txt");
539 let path_str = path.to_str().unwrap();
540
541 storage.write(path_str, Bytes::from("test")).await.unwrap();
543
544 assert!(path.exists());
545 }
546
547 #[tokio::test]
548 async fn test_local_fs_storage_delete_stream() {
549 use futures::stream;
550
551 let tmp_dir = TempDir::new().unwrap();
552 let storage = LocalFsStorage::new();
553
554 let file1 = tmp_dir.path().join("file1.txt");
556 let file2 = tmp_dir.path().join("file2.txt");
557 let file3 = tmp_dir.path().join("file3.txt");
558
559 storage
560 .write(file1.to_str().unwrap(), Bytes::from("1"))
561 .await
562 .unwrap();
563 storage
564 .write(file2.to_str().unwrap(), Bytes::from("2"))
565 .await
566 .unwrap();
567 storage
568 .write(file3.to_str().unwrap(), Bytes::from("3"))
569 .await
570 .unwrap();
571
572 assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
574 assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
575 assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
576
577 let paths = vec![
579 file1.to_str().unwrap().to_string(),
580 file2.to_str().unwrap().to_string(),
581 ];
582 let path_stream = stream::iter(paths).boxed();
583 storage.delete_stream(path_stream).await.unwrap();
584
585 assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
587 assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());
588
589 assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
591 }
592
593 #[tokio::test]
594 async fn test_local_fs_storage_delete_stream_empty() {
595 use futures::stream;
596
597 let storage = LocalFsStorage::new();
598
599 let path_stream = stream::iter(Vec::<String>::new()).boxed();
601 storage.delete_stream(path_stream).await.unwrap();
602 }
603}