1use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22use futures::{Stream, StreamExt};
23
24use super::storage::{
25 LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
26};
27use crate::Result;
28
29#[derive(Clone, Debug)]
63pub struct FileIO {
64 config: StorageConfig,
66 factory: Arc<dyn StorageFactory>,
68 storage: Arc<OnceLock<Arc<dyn Storage>>>,
70}
71
72impl FileIO {
73 pub fn new_with_memory() -> Self {
77 Self {
78 config: StorageConfig::new(),
79 factory: Arc::new(MemoryStorageFactory),
80 storage: Arc::new(OnceLock::new()),
81 }
82 }
83
84 pub fn new_with_fs() -> Self {
88 Self {
89 config: StorageConfig::new(),
90 factory: Arc::new(LocalFsStorageFactory),
91 storage: Arc::new(OnceLock::new()),
92 }
93 }
94
95 pub fn config(&self) -> &StorageConfig {
97 &self.config
98 }
99
100 fn get_storage(&self) -> Result<Arc<dyn Storage>> {
105 if let Some(storage) = self.storage.get() {
107 return Ok(storage.clone());
108 }
109
110 let storage = self.factory.build(&self.config)?;
112
113 let _ = self.storage.set(storage.clone());
115
116 Ok(self.storage.get().unwrap().clone())
118 }
119
120 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
126 self.get_storage()?.delete(path.as_ref()).await
127 }
128
129 pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
141 self.get_storage()?.delete_prefix(path.as_ref()).await
142 }
143
144 pub async fn delete_stream(
150 &self,
151 paths: impl Stream<Item = String> + Send + 'static,
152 ) -> Result<()> {
153 self.get_storage()?.delete_stream(paths.boxed()).await
154 }
155
156 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
162 self.get_storage()?.exists(path.as_ref()).await
163 }
164
165 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
171 self.get_storage()?.new_input(path.as_ref())
172 }
173
174 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
180 self.get_storage()?.new_output(path.as_ref())
181 }
182}
183
184#[derive(Clone, Debug)]
189pub struct FileIOBuilder {
190 factory: Arc<dyn StorageFactory>,
192 config: StorageConfig,
194}
195
196impl FileIOBuilder {
197 pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
199 Self {
200 factory,
201 config: StorageConfig::new(),
202 }
203 }
204
205 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
207 self.config = self.config.with_prop(key.to_string(), value.to_string());
208 self
209 }
210
211 pub fn with_props(
213 mut self,
214 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
215 ) -> Self {
216 self.config = self
217 .config
218 .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219 self
220 }
221
222 pub fn config(&self) -> &StorageConfig {
224 &self.config
225 }
226
227 pub fn build(self) -> FileIO {
229 FileIO {
230 config: self.config,
231 factory: self.factory,
232 storage: Arc::new(OnceLock::new()),
233 }
234 }
235}
236
237pub struct FileMetadata {
241 pub size: u64,
243}
244
245#[async_trait::async_trait]
251pub trait FileRead: Send + Sync + Unpin + 'static {
252 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
256}
257
258#[async_trait::async_trait]
259impl<T: AsRef<dyn FileRead> + Send + Sync + Unpin + 'static> FileRead for T {
260 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
261 self.as_ref().read(range).await
262 }
263}
264
265#[derive(Debug)]
267pub struct InputFile {
268 storage: Arc<dyn Storage>,
269 path: String,
271}
272
273impl InputFile {
274 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
276 Self { storage, path }
277 }
278
279 pub fn location(&self) -> &str {
281 &self.path
282 }
283
284 pub async fn exists(&self) -> crate::Result<bool> {
286 self.storage.exists(&self.path).await
287 }
288
289 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
291 self.storage.metadata(&self.path).await
292 }
293
294 pub async fn read(&self) -> crate::Result<Bytes> {
298 self.storage.read(&self.path).await
299 }
300
301 pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
305 self.storage.reader(&self.path).await
306 }
307}
308
309#[async_trait::async_trait]
316pub trait FileWrite: Send + Unpin + 'static {
317 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
321
322 async fn close(&mut self) -> crate::Result<()>;
326}
327
328#[derive(Debug)]
330pub struct OutputFile {
331 storage: Arc<dyn Storage>,
332 path: String,
334}
335
336impl OutputFile {
337 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
339 Self { storage, path }
340 }
341
342 pub fn location(&self) -> &str {
344 &self.path
345 }
346
347 pub async fn exists(&self) -> Result<bool> {
349 self.storage.exists(&self.path).await
350 }
351
352 pub async fn delete(&self) -> Result<()> {
356 self.storage.delete(&self.path).await
357 }
358
359 pub fn to_input_file(self) -> InputFile {
361 InputFile {
362 storage: self.storage,
363 path: self.path,
364 }
365 }
366
367 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
374 self.storage.write(&self.path, bs).await
375 }
376
377 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
383 self.storage.writer(&self.path).await
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use std::fs::{File, create_dir_all};
390 use std::io::Write;
391 use std::path::Path;
392 use std::sync::Arc;
393
394 use bytes::Bytes;
395 use futures::AsyncReadExt;
396 use futures::io::AllowStdIo;
397 use tempfile::TempDir;
398
399 use super::{FileIO, FileIOBuilder};
400 use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
401
402 fn create_local_file_io() -> FileIO {
403 FileIO::new_with_fs()
404 }
405
406 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
407 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
408 let mut f = File::create(path).unwrap();
409 write!(f, "{s}").unwrap();
410 }
411
412 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
413 let mut f = AllowStdIo::new(File::open(path).unwrap());
414 let mut s = String::new();
415 f.read_to_string(&mut s).await.unwrap();
416 s
417 }
418
419 #[tokio::test]
420 async fn test_local_input_file() {
421 let tmp_dir = TempDir::new().unwrap();
422
423 let file_name = "a.txt";
424 let content = "Iceberg loves rust.";
425
426 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
427 write_to_file(content, &full_path);
428
429 let file_io = create_local_file_io();
430 let input_file = file_io.new_input(&full_path).unwrap();
431
432 assert!(input_file.exists().await.unwrap());
433 assert_eq!(&full_path, input_file.location());
434 let read_content = read_from_file(full_path).await;
435
436 assert_eq!(content, &read_content);
437 }
438
439 #[tokio::test]
440 async fn test_delete_local_file() {
441 let tmp_dir = TempDir::new().unwrap();
442
443 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
444 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
445 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
446 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
447 write_to_file("Iceberg loves rust.", &a_path);
448 write_to_file("Iceberg loves rust.", &b_path);
449 write_to_file("Iceberg loves rust.", &c_path);
450
451 let file_io = create_local_file_io();
452 assert!(file_io.exists(&a_path).await.unwrap());
453
454 file_io.delete_prefix(&a_path).await.unwrap();
456 assert!(file_io.exists(&a_path).await.unwrap());
457
458 file_io.delete_prefix("not_exists/").await.unwrap();
460
461 file_io.delete_prefix(&sub_dir_path).await.unwrap();
463 assert!(!file_io.exists(&b_path).await.unwrap());
464 assert!(!file_io.exists(&c_path).await.unwrap());
465 assert!(file_io.exists(&a_path).await.unwrap());
466
467 file_io.delete(&a_path).await.unwrap();
468 assert!(!file_io.exists(&a_path).await.unwrap());
469 }
470
471 #[tokio::test]
472 async fn test_delete_non_exist_file() {
473 let tmp_dir = TempDir::new().unwrap();
474
475 let file_name = "a.txt";
476 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
477
478 let file_io = create_local_file_io();
479 assert!(!file_io.exists(&full_path).await.unwrap());
480 assert!(file_io.delete(&full_path).await.is_ok());
481 assert!(file_io.delete_prefix(&full_path).await.is_ok());
482 }
483
484 #[tokio::test]
485 async fn test_local_output_file() {
486 let tmp_dir = TempDir::new().unwrap();
487
488 let file_name = "a.txt";
489 let content = "Iceberg loves rust.";
490
491 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
492
493 let file_io = create_local_file_io();
494 let output_file = file_io.new_output(&full_path).unwrap();
495
496 assert!(!output_file.exists().await.unwrap());
497 {
498 output_file.write(content.into()).await.unwrap();
499 }
500
501 assert_eq!(&full_path, output_file.location());
502
503 let read_content = read_from_file(full_path).await;
504
505 assert_eq!(content, &read_content);
506 }
507
508 #[tokio::test]
509 async fn test_memory_io() {
510 let io = FileIO::new_with_memory();
511
512 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
513
514 let output_file = io.new_output(&path).unwrap();
515 output_file.write("test".into()).await.unwrap();
516
517 assert!(io.exists(&path.clone()).await.unwrap());
518 let input_file = io.new_input(&path).unwrap();
519 let content = input_file.read().await.unwrap();
520 assert_eq!(content, Bytes::from("test"));
521
522 io.delete(&path).await.unwrap();
523 assert!(!io.exists(&path).await.unwrap());
524 }
525
526 #[tokio::test]
527 async fn test_file_io_builder_with_props() {
528 let factory = Arc::new(MemoryStorageFactory);
529 let file_io = FileIOBuilder::new(factory)
530 .with_prop("key1", "value1")
531 .with_prop("key2", "value2")
532 .build();
533
534 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
535 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
536 }
537
538 #[tokio::test]
539 async fn test_file_io_builder_with_multiple_props() {
540 let factory = Arc::new(LocalFsStorageFactory);
541 let props = vec![("key1", "value1"), ("key2", "value2")];
542 let file_io = FileIOBuilder::new(factory).with_props(props).build();
543
544 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
545 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
546 }
547}