1use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22use futures::{Stream, StreamExt};
23
24use super::storage::{
25 LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
26};
27use crate::Result;
28
29#[derive(Clone, Debug)]
63pub struct FileIO {
64 config: StorageConfig,
66 factory: Arc<dyn StorageFactory>,
68 storage: Arc<OnceLock<Arc<dyn Storage>>>,
70}
71
72impl FileIO {
73 pub fn new_with_memory() -> Self {
77 Self {
78 config: StorageConfig::new(),
79 factory: Arc::new(MemoryStorageFactory),
80 storage: Arc::new(OnceLock::new()),
81 }
82 }
83
84 pub fn new_with_fs() -> Self {
88 Self {
89 config: StorageConfig::new(),
90 factory: Arc::new(LocalFsStorageFactory),
91 storage: Arc::new(OnceLock::new()),
92 }
93 }
94
95 pub fn config(&self) -> &StorageConfig {
97 &self.config
98 }
99
100 fn get_storage(&self) -> Result<Arc<dyn Storage>> {
105 if let Some(storage) = self.storage.get() {
107 return Ok(storage.clone());
108 }
109
110 let storage = self.factory.build(&self.config)?;
112
113 let _ = self.storage.set(storage.clone());
115
116 Ok(self.storage.get().unwrap().clone())
118 }
119
120 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
126 self.get_storage()?.delete(path.as_ref()).await
127 }
128
129 pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
141 self.get_storage()?.delete_prefix(path.as_ref()).await
142 }
143
144 pub async fn delete_stream(
150 &self,
151 paths: impl Stream<Item = String> + Send + 'static,
152 ) -> Result<()> {
153 self.get_storage()?.delete_stream(paths.boxed()).await
154 }
155
156 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
162 self.get_storage()?.exists(path.as_ref()).await
163 }
164
165 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
171 self.get_storage()?.new_input(path.as_ref())
172 }
173
174 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
180 self.get_storage()?.new_output(path.as_ref())
181 }
182}
183
184#[derive(Clone, Debug)]
189pub struct FileIOBuilder {
190 factory: Arc<dyn StorageFactory>,
192 config: StorageConfig,
194}
195
196impl FileIOBuilder {
197 pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
199 Self {
200 factory,
201 config: StorageConfig::new(),
202 }
203 }
204
205 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
207 self.config = self.config.with_prop(key.to_string(), value.to_string());
208 self
209 }
210
211 pub fn with_props(
213 mut self,
214 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
215 ) -> Self {
216 self.config = self
217 .config
218 .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219 self
220 }
221
222 pub fn config(&self) -> &StorageConfig {
224 &self.config
225 }
226
227 pub fn build(self) -> FileIO {
229 FileIO {
230 config: self.config,
231 factory: self.factory,
232 storage: Arc::new(OnceLock::new()),
233 }
234 }
235}
236
237pub struct FileMetadata {
241 pub size: u64,
243}
244
245#[async_trait::async_trait]
251pub trait FileRead: Send + Sync + Unpin + 'static {
252 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
256}
257
258#[derive(Debug)]
260pub struct InputFile {
261 storage: Arc<dyn Storage>,
262 path: String,
264}
265
266impl InputFile {
267 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
269 Self { storage, path }
270 }
271
272 pub fn location(&self) -> &str {
274 &self.path
275 }
276
277 pub async fn exists(&self) -> crate::Result<bool> {
279 self.storage.exists(&self.path).await
280 }
281
282 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
284 self.storage.metadata(&self.path).await
285 }
286
287 pub async fn read(&self) -> crate::Result<Bytes> {
291 self.storage.read(&self.path).await
292 }
293
294 pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
298 self.storage.reader(&self.path).await
299 }
300}
301
302#[async_trait::async_trait]
309pub trait FileWrite: Send + Unpin + 'static {
310 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
314
315 async fn close(&mut self) -> crate::Result<()>;
319}
320
321#[derive(Debug)]
323pub struct OutputFile {
324 storage: Arc<dyn Storage>,
325 path: String,
327}
328
329impl OutputFile {
330 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
332 Self { storage, path }
333 }
334
335 pub fn location(&self) -> &str {
337 &self.path
338 }
339
340 pub async fn exists(&self) -> Result<bool> {
342 self.storage.exists(&self.path).await
343 }
344
345 pub async fn delete(&self) -> Result<()> {
349 self.storage.delete(&self.path).await
350 }
351
352 pub fn to_input_file(self) -> InputFile {
354 InputFile {
355 storage: self.storage,
356 path: self.path,
357 }
358 }
359
360 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
367 self.storage.write(&self.path, bs).await
368 }
369
370 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
376 self.storage.writer(&self.path).await
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use std::fs::{File, create_dir_all};
383 use std::io::Write;
384 use std::path::Path;
385 use std::sync::Arc;
386
387 use bytes::Bytes;
388 use futures::AsyncReadExt;
389 use futures::io::AllowStdIo;
390 use tempfile::TempDir;
391
392 use super::{FileIO, FileIOBuilder};
393 use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
394
395 fn create_local_file_io() -> FileIO {
396 FileIO::new_with_fs()
397 }
398
399 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
400 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
401 let mut f = File::create(path).unwrap();
402 write!(f, "{s}").unwrap();
403 }
404
405 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
406 let mut f = AllowStdIo::new(File::open(path).unwrap());
407 let mut s = String::new();
408 f.read_to_string(&mut s).await.unwrap();
409 s
410 }
411
412 #[tokio::test]
413 async fn test_local_input_file() {
414 let tmp_dir = TempDir::new().unwrap();
415
416 let file_name = "a.txt";
417 let content = "Iceberg loves rust.";
418
419 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
420 write_to_file(content, &full_path);
421
422 let file_io = create_local_file_io();
423 let input_file = file_io.new_input(&full_path).unwrap();
424
425 assert!(input_file.exists().await.unwrap());
426 assert_eq!(&full_path, input_file.location());
427 let read_content = read_from_file(full_path).await;
428
429 assert_eq!(content, &read_content);
430 }
431
432 #[tokio::test]
433 async fn test_delete_local_file() {
434 let tmp_dir = TempDir::new().unwrap();
435
436 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
437 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
438 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
439 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
440 write_to_file("Iceberg loves rust.", &a_path);
441 write_to_file("Iceberg loves rust.", &b_path);
442 write_to_file("Iceberg loves rust.", &c_path);
443
444 let file_io = create_local_file_io();
445 assert!(file_io.exists(&a_path).await.unwrap());
446
447 file_io.delete_prefix(&a_path).await.unwrap();
449 assert!(file_io.exists(&a_path).await.unwrap());
450
451 file_io.delete_prefix("not_exists/").await.unwrap();
453
454 file_io.delete_prefix(&sub_dir_path).await.unwrap();
456 assert!(!file_io.exists(&b_path).await.unwrap());
457 assert!(!file_io.exists(&c_path).await.unwrap());
458 assert!(file_io.exists(&a_path).await.unwrap());
459
460 file_io.delete(&a_path).await.unwrap();
461 assert!(!file_io.exists(&a_path).await.unwrap());
462 }
463
464 #[tokio::test]
465 async fn test_delete_non_exist_file() {
466 let tmp_dir = TempDir::new().unwrap();
467
468 let file_name = "a.txt";
469 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
470
471 let file_io = create_local_file_io();
472 assert!(!file_io.exists(&full_path).await.unwrap());
473 assert!(file_io.delete(&full_path).await.is_ok());
474 assert!(file_io.delete_prefix(&full_path).await.is_ok());
475 }
476
477 #[tokio::test]
478 async fn test_local_output_file() {
479 let tmp_dir = TempDir::new().unwrap();
480
481 let file_name = "a.txt";
482 let content = "Iceberg loves rust.";
483
484 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
485
486 let file_io = create_local_file_io();
487 let output_file = file_io.new_output(&full_path).unwrap();
488
489 assert!(!output_file.exists().await.unwrap());
490 {
491 output_file.write(content.into()).await.unwrap();
492 }
493
494 assert_eq!(&full_path, output_file.location());
495
496 let read_content = read_from_file(full_path).await;
497
498 assert_eq!(content, &read_content);
499 }
500
501 #[tokio::test]
502 async fn test_memory_io() {
503 let io = FileIO::new_with_memory();
504
505 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
506
507 let output_file = io.new_output(&path).unwrap();
508 output_file.write("test".into()).await.unwrap();
509
510 assert!(io.exists(&path.clone()).await.unwrap());
511 let input_file = io.new_input(&path).unwrap();
512 let content = input_file.read().await.unwrap();
513 assert_eq!(content, Bytes::from("test"));
514
515 io.delete(&path).await.unwrap();
516 assert!(!io.exists(&path).await.unwrap());
517 }
518
519 #[tokio::test]
520 async fn test_file_io_builder_with_props() {
521 let factory = Arc::new(MemoryStorageFactory);
522 let file_io = FileIOBuilder::new(factory)
523 .with_prop("key1", "value1")
524 .with_prop("key2", "value2")
525 .build();
526
527 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
528 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
529 }
530
531 #[tokio::test]
532 async fn test_file_io_builder_with_multiple_props() {
533 let factory = Arc::new(LocalFsStorageFactory);
534 let props = vec![("key1", "value1"), ("key2", "value2")];
535 let file_io = FileIOBuilder::new(factory).with_props(props).build();
536
537 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
538 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
539 }
540}