1use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use opendal::Operator;
25use url::Url;
26
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30#[derive(Clone, Debug)]
48pub struct FileIO {
49 builder: FileIOBuilder,
50
51 inner: Arc<Storage>,
52}
53
54impl FileIO {
55 pub fn into_builder(self) -> FileIOBuilder {
60 self.builder
61 }
62
63 pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70 let url = Url::parse(path.as_ref())
71 .map_err(Error::from)
72 .or_else(|e| {
73 Url::from_file_path(path.as_ref()).map_err(|_| {
74 Error::new(
75 ErrorKind::DataInvalid,
76 "Input is neither a valid url nor path",
77 )
78 .with_context("input", path.as_ref().to_string())
79 .with_source(e)
80 })
81 })?;
82
83 Ok(FileIOBuilder::new(url.scheme()))
84 }
85
86 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92 let (op, relative_path) = self.inner.create_operator(&path)?;
93 Ok(op.delete(relative_path).await?)
94 }
95
96 pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
108 let (op, relative_path) = self.inner.create_operator(&path)?;
109 let path = if relative_path.ends_with('/') {
110 relative_path.to_string()
111 } else {
112 format!("{relative_path}/")
113 };
114 Ok(op.remove_all(&path).await?)
115 }
116
117 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
123 let (op, relative_path) = self.inner.create_operator(&path)?;
124 Ok(op.exists(relative_path).await?)
125 }
126
127 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
133 let (op, relative_path) = self.inner.create_operator(&path)?;
134 let path = path.as_ref().to_string();
135 let relative_path_pos = path.len() - relative_path.len();
136 Ok(InputFile {
137 op,
138 path,
139 relative_path_pos,
140 })
141 }
142
143 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
149 let (op, relative_path) = self.inner.create_operator(&path)?;
150 let path = path.as_ref().to_string();
151 let relative_path_pos = path.len() - relative_path.len();
152 Ok(OutputFile {
153 op,
154 path,
155 relative_path_pos,
156 })
157 }
158}
159
160#[derive(Clone, Debug, Default)]
162pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
163
164impl Extensions {
165 pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
167 self.0.insert(TypeId::of::<T>(), Arc::new(ext));
168 }
169
170 pub fn extend(&mut self, extensions: Extensions) {
172 self.0.extend(extensions.0);
173 }
174
175 pub fn get<T>(&self) -> Option<Arc<T>>
177 where T: 'static + Send + Sync + Clone {
178 let type_id = TypeId::of::<T>();
179 self.0
180 .get(&type_id)
181 .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
182 }
183}
184
185#[derive(Clone, Debug)]
187pub struct FileIOBuilder {
188 scheme_str: Option<String>,
192 props: HashMap<String, String>,
194 extensions: Extensions,
196}
197
198impl FileIOBuilder {
199 pub fn new(scheme_str: impl ToString) -> Self {
202 Self {
203 scheme_str: Some(scheme_str.to_string()),
204 props: HashMap::default(),
205 extensions: Extensions::default(),
206 }
207 }
208
209 pub fn new_fs_io() -> Self {
211 Self {
212 scheme_str: None,
213 props: HashMap::default(),
214 extensions: Extensions::default(),
215 }
216 }
217
218 pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
222 (
223 self.scheme_str.unwrap_or_default(),
224 self.props,
225 self.extensions,
226 )
227 }
228
229 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
231 self.props.insert(key.to_string(), value.to_string());
232 self
233 }
234
235 pub fn with_props(
237 mut self,
238 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
239 ) -> Self {
240 self.props
241 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
242 self
243 }
244
245 pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
247 self.extensions.add(ext);
248 self
249 }
250
251 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
253 self.extensions.extend(extensions);
254 self
255 }
256
257 pub fn extension<T>(&self) -> Option<Arc<T>>
259 where T: 'static + Send + Sync + Clone {
260 self.extensions.get::<T>()
261 }
262
263 pub fn build(self) -> Result<FileIO> {
265 let storage = Storage::build(self.clone())?;
266 Ok(FileIO {
267 builder: self,
268 inner: Arc::new(storage),
269 })
270 }
271}
272
273pub struct FileMetadata {
277 pub size: u64,
279}
280
281#[async_trait::async_trait]
287pub trait FileRead: Send + Sync + Unpin + 'static {
288 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
292}
293
294#[async_trait::async_trait]
295impl FileRead for opendal::Reader {
296 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
297 Ok(opendal::Reader::read(self, range).await?.to_bytes())
298 }
299}
300
301#[derive(Debug)]
303pub struct InputFile {
304 op: Operator,
305 path: String,
307 relative_path_pos: usize,
309}
310
311impl InputFile {
312 pub fn location(&self) -> &str {
314 &self.path
315 }
316
317 pub async fn exists(&self) -> crate::Result<bool> {
319 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
320 }
321
322 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
324 let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
325
326 Ok(FileMetadata {
327 size: meta.content_length(),
328 })
329 }
330
331 pub async fn read(&self) -> crate::Result<Bytes> {
335 Ok(self
336 .op
337 .read(&self.path[self.relative_path_pos..])
338 .await?
339 .to_bytes())
340 }
341
342 pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
346 Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
347 }
348}
349
350#[async_trait::async_trait]
357pub trait FileWrite: Send + Unpin + 'static {
358 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
362
363 async fn close(&mut self) -> crate::Result<()>;
367}
368
369#[async_trait::async_trait]
370impl FileWrite for opendal::Writer {
371 async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
372 Ok(opendal::Writer::write(self, bs).await?)
373 }
374
375 async fn close(&mut self) -> crate::Result<()> {
376 let _ = opendal::Writer::close(self).await?;
377 Ok(())
378 }
379}
380
381#[async_trait::async_trait]
382impl FileWrite for Box<dyn FileWrite> {
383 async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
384 self.as_mut().write(bs).await
385 }
386
387 async fn close(&mut self) -> crate::Result<()> {
388 self.as_mut().close().await
389 }
390}
391
392#[derive(Debug)]
394pub struct OutputFile {
395 op: Operator,
396 path: String,
398 relative_path_pos: usize,
400}
401
402impl OutputFile {
403 pub fn location(&self) -> &str {
405 &self.path
406 }
407
408 pub async fn exists(&self) -> Result<bool> {
410 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
411 }
412
413 pub async fn delete(&self) -> Result<()> {
417 Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
418 }
419
420 pub fn to_input_file(self) -> InputFile {
422 InputFile {
423 op: self.op,
424 path: self.path,
425 relative_path_pos: self.relative_path_pos,
426 }
427 }
428
429 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
436 let mut writer = self.writer().await?;
437 writer.write(bs).await?;
438 writer.close().await
439 }
440
441 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
447 Ok(Box::new(
448 self.op.writer(&self.path[self.relative_path_pos..]).await?,
449 ))
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use std::fs::{File, create_dir_all};
456 use std::io::Write;
457 use std::path::Path;
458
459 use bytes::Bytes;
460 use futures::AsyncReadExt;
461 use futures::io::AllowStdIo;
462 use tempfile::TempDir;
463
464 use super::{FileIO, FileIOBuilder};
465
466 fn create_local_file_io() -> FileIO {
467 FileIOBuilder::new_fs_io().build().unwrap()
468 }
469
470 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
471 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
472 let mut f = File::create(path).unwrap();
473 write!(f, "{s}").unwrap();
474 }
475
476 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
477 let mut f = AllowStdIo::new(File::open(path).unwrap());
478 let mut s = String::new();
479 f.read_to_string(&mut s).await.unwrap();
480 s
481 }
482
483 #[tokio::test]
484 async fn test_local_input_file() {
485 let tmp_dir = TempDir::new().unwrap();
486
487 let file_name = "a.txt";
488 let content = "Iceberg loves rust.";
489
490 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
491 write_to_file(content, &full_path);
492
493 let file_io = create_local_file_io();
494 let input_file = file_io.new_input(&full_path).unwrap();
495
496 assert!(input_file.exists().await.unwrap());
497 assert_eq!(&full_path, input_file.location());
499 let read_content = read_from_file(full_path).await;
500
501 assert_eq!(content, &read_content);
502 }
503
504 #[tokio::test]
505 async fn test_delete_local_file() {
506 let tmp_dir = TempDir::new().unwrap();
507
508 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
509 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
510 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
511 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
512 write_to_file("Iceberg loves rust.", &a_path);
513 write_to_file("Iceberg loves rust.", &b_path);
514 write_to_file("Iceberg loves rust.", &c_path);
515
516 let file_io = create_local_file_io();
517 assert!(file_io.exists(&a_path).await.unwrap());
518
519 file_io.remove_dir_all(&a_path).await.unwrap();
521 assert!(file_io.exists(&a_path).await.unwrap());
522
523 file_io.remove_dir_all("not_exists/").await.unwrap();
525
526 file_io.remove_dir_all(&sub_dir_path).await.unwrap();
528 assert!(!file_io.exists(&b_path).await.unwrap());
529 assert!(!file_io.exists(&c_path).await.unwrap());
530 assert!(file_io.exists(&a_path).await.unwrap());
531
532 file_io.delete(&a_path).await.unwrap();
533 assert!(!file_io.exists(&a_path).await.unwrap());
534 }
535
536 #[tokio::test]
537 async fn test_delete_non_exist_file() {
538 let tmp_dir = TempDir::new().unwrap();
539
540 let file_name = "a.txt";
541 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
542
543 let file_io = create_local_file_io();
544 assert!(!file_io.exists(&full_path).await.unwrap());
545 assert!(file_io.delete(&full_path).await.is_ok());
546 assert!(file_io.remove_dir_all(&full_path).await.is_ok());
547 }
548
549 #[tokio::test]
550 async fn test_local_output_file() {
551 let tmp_dir = TempDir::new().unwrap();
552
553 let file_name = "a.txt";
554 let content = "Iceberg loves rust.";
555
556 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
557
558 let file_io = create_local_file_io();
559 let output_file = file_io.new_output(&full_path).unwrap();
560
561 assert!(!output_file.exists().await.unwrap());
562 {
563 output_file.write(content.into()).await.unwrap();
564 }
565
566 assert_eq!(&full_path, output_file.location());
567
568 let read_content = read_from_file(full_path).await;
569
570 assert_eq!(content, &read_content);
571 }
572
573 #[test]
574 fn test_create_file_from_path() {
575 let io = FileIO::from_path("/tmp/a").unwrap();
576 assert_eq!("file", io.scheme_str.unwrap().as_str());
577
578 let io = FileIO::from_path("file:/tmp/b").unwrap();
579 assert_eq!("file", io.scheme_str.unwrap().as_str());
580
581 let io = FileIO::from_path("file:///tmp/c").unwrap();
582 assert_eq!("file", io.scheme_str.unwrap().as_str());
583
584 let io = FileIO::from_path("s3://bucket/a").unwrap();
585 assert_eq!("s3", io.scheme_str.unwrap().as_str());
586
587 let io = FileIO::from_path("tmp/||c");
588 assert!(io.is_err());
589 }
590
591 #[tokio::test]
592 async fn test_memory_io() {
593 let io = FileIOBuilder::new("memory").build().unwrap();
594
595 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
596
597 let output_file = io.new_output(&path).unwrap();
598 output_file.write("test".into()).await.unwrap();
599
600 assert!(io.exists(&path.clone()).await.unwrap());
601 let input_file = io.new_input(&path).unwrap();
602 let content = input_file.read().await.unwrap();
603 assert_eq!(content, Bytes::from("test"));
604
605 io.delete(&path).await.unwrap();
606 assert!(!io.exists(&path).await.unwrap());
607 }
608}