1use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use url::Url;
25
26use super::opendal::OpenDalStorage;
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30#[derive(Clone, Debug)]
48pub struct FileIO {
49 builder: FileIOBuilder,
50
51 inner: Arc<OpenDalStorage>,
52}
53
54impl FileIO {
55 pub fn into_builder(self) -> FileIOBuilder {
60 self.builder
61 }
62
63 pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70 let url = Url::parse(path.as_ref())
71 .map_err(Error::from)
72 .or_else(|e| {
73 Url::from_file_path(path.as_ref()).map_err(|_| {
74 Error::new(
75 ErrorKind::DataInvalid,
76 "Input is neither a valid url nor path",
77 )
78 .with_context("input", path.as_ref().to_string())
79 .with_source(e)
80 })
81 })?;
82
83 Ok(FileIOBuilder::new(url.scheme()))
84 }
85
86 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92 self.inner.delete(path.as_ref()).await
93 }
94
95 pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
107 self.inner.delete_prefix(path.as_ref()).await
108 }
109
110 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
116 self.inner.exists(path.as_ref()).await
117 }
118
119 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
125 self.inner.new_input(path.as_ref())
126 }
127
128 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
134 self.inner.new_output(path.as_ref())
135 }
136}
137
138#[derive(Clone, Debug, Default)]
140pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
141
142impl Extensions {
143 pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
145 self.0.insert(TypeId::of::<T>(), Arc::new(ext));
146 }
147
148 pub fn extend(&mut self, extensions: Extensions) {
150 self.0.extend(extensions.0);
151 }
152
153 pub fn get<T>(&self) -> Option<Arc<T>>
155 where T: 'static + Send + Sync + Clone {
156 let type_id = TypeId::of::<T>();
157 self.0
158 .get(&type_id)
159 .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
160 }
161}
162
163#[derive(Clone, Debug)]
165pub struct FileIOBuilder {
166 scheme_str: Option<String>,
170 props: HashMap<String, String>,
172 extensions: Extensions,
174}
175
176impl FileIOBuilder {
177 pub fn new(scheme_str: impl ToString) -> Self {
180 Self {
181 scheme_str: Some(scheme_str.to_string()),
182 props: HashMap::default(),
183 extensions: Extensions::default(),
184 }
185 }
186
187 pub fn new_fs_io() -> Self {
189 Self {
190 scheme_str: None,
191 props: HashMap::default(),
192 extensions: Extensions::default(),
193 }
194 }
195
196 pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
200 (
201 self.scheme_str.unwrap_or_default(),
202 self.props,
203 self.extensions,
204 )
205 }
206
207 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
209 self.props.insert(key.to_string(), value.to_string());
210 self
211 }
212
213 pub fn with_props(
215 mut self,
216 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
217 ) -> Self {
218 self.props
219 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
220 self
221 }
222
223 pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
225 self.extensions.add(ext);
226 self
227 }
228
229 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
231 self.extensions.extend(extensions);
232 self
233 }
234
235 pub fn extension<T>(&self) -> Option<Arc<T>>
237 where T: 'static + Send + Sync + Clone {
238 self.extensions.get::<T>()
239 }
240
241 pub fn build(self) -> Result<FileIO> {
243 let storage = OpenDalStorage::build(self.clone())?;
244 Ok(FileIO {
245 builder: self,
246 inner: Arc::new(storage),
247 })
248 }
249}
250
251pub struct FileMetadata {
255 pub size: u64,
257}
258
259#[async_trait::async_trait]
265pub trait FileRead: Send + Sync + Unpin + 'static {
266 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
270}
271
272#[derive(Debug)]
274pub struct InputFile {
275 storage: Arc<dyn Storage>,
276 path: String,
278}
279
280impl InputFile {
281 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
283 Self { storage, path }
284 }
285
286 pub fn location(&self) -> &str {
288 &self.path
289 }
290
291 pub async fn exists(&self) -> crate::Result<bool> {
293 self.storage.exists(&self.path).await
294 }
295
296 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
298 self.storage.metadata(&self.path).await
299 }
300
301 pub async fn read(&self) -> crate::Result<Bytes> {
305 self.storage.read(&self.path).await
306 }
307
308 pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
312 self.storage.reader(&self.path).await
313 }
314}
315
316#[async_trait::async_trait]
323pub trait FileWrite: Send + Unpin + 'static {
324 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
328
329 async fn close(&mut self) -> crate::Result<()>;
333}
334
335#[derive(Debug)]
337pub struct OutputFile {
338 storage: Arc<dyn Storage>,
339 path: String,
341}
342
343impl OutputFile {
344 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
346 Self { storage, path }
347 }
348
349 pub fn location(&self) -> &str {
351 &self.path
352 }
353
354 pub async fn exists(&self) -> Result<bool> {
356 self.storage.exists(&self.path).await
357 }
358
359 pub async fn delete(&self) -> Result<()> {
363 self.storage.delete(&self.path).await
364 }
365
366 pub fn to_input_file(self) -> InputFile {
368 InputFile {
369 storage: self.storage,
370 path: self.path,
371 }
372 }
373
374 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
381 self.storage.write(&self.path, bs).await
382 }
383
384 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
390 self.storage.writer(&self.path).await
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use std::fs::{File, create_dir_all};
397 use std::io::Write;
398 use std::path::Path;
399
400 use bytes::Bytes;
401 use futures::AsyncReadExt;
402 use futures::io::AllowStdIo;
403 use tempfile::TempDir;
404
405 use super::{FileIO, FileIOBuilder};
406
407 fn create_local_file_io() -> FileIO {
408 FileIOBuilder::new_fs_io().build().unwrap()
409 }
410
411 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
412 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
413 let mut f = File::create(path).unwrap();
414 write!(f, "{s}").unwrap();
415 }
416
417 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
418 let mut f = AllowStdIo::new(File::open(path).unwrap());
419 let mut s = String::new();
420 f.read_to_string(&mut s).await.unwrap();
421 s
422 }
423
424 #[tokio::test]
425 async fn test_local_input_file() {
426 let tmp_dir = TempDir::new().unwrap();
427
428 let file_name = "a.txt";
429 let content = "Iceberg loves rust.";
430
431 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
432 write_to_file(content, &full_path);
433
434 let file_io = create_local_file_io();
435 let input_file = file_io.new_input(&full_path).unwrap();
436
437 assert!(input_file.exists().await.unwrap());
438 assert_eq!(&full_path, input_file.location());
440 let read_content = read_from_file(full_path).await;
441
442 assert_eq!(content, &read_content);
443 }
444
445 #[tokio::test]
446 async fn test_delete_local_file() {
447 let tmp_dir = TempDir::new().unwrap();
448
449 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
450 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
451 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
452 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
453 write_to_file("Iceberg loves rust.", &a_path);
454 write_to_file("Iceberg loves rust.", &b_path);
455 write_to_file("Iceberg loves rust.", &c_path);
456
457 let file_io = create_local_file_io();
458 assert!(file_io.exists(&a_path).await.unwrap());
459
460 file_io.delete_prefix(&a_path).await.unwrap();
462 assert!(file_io.exists(&a_path).await.unwrap());
463
464 file_io.delete_prefix("not_exists/").await.unwrap();
466
467 file_io.delete_prefix(&sub_dir_path).await.unwrap();
469 assert!(!file_io.exists(&b_path).await.unwrap());
470 assert!(!file_io.exists(&c_path).await.unwrap());
471 assert!(file_io.exists(&a_path).await.unwrap());
472
473 file_io.delete(&a_path).await.unwrap();
474 assert!(!file_io.exists(&a_path).await.unwrap());
475 }
476
477 #[tokio::test]
478 async fn test_delete_non_exist_file() {
479 let tmp_dir = TempDir::new().unwrap();
480
481 let file_name = "a.txt";
482 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
483
484 let file_io = create_local_file_io();
485 assert!(!file_io.exists(&full_path).await.unwrap());
486 assert!(file_io.delete(&full_path).await.is_ok());
487 assert!(file_io.delete_prefix(&full_path).await.is_ok());
488 }
489
490 #[tokio::test]
491 async fn test_local_output_file() {
492 let tmp_dir = TempDir::new().unwrap();
493
494 let file_name = "a.txt";
495 let content = "Iceberg loves rust.";
496
497 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
498
499 let file_io = create_local_file_io();
500 let output_file = file_io.new_output(&full_path).unwrap();
501
502 assert!(!output_file.exists().await.unwrap());
503 {
504 output_file.write(content.into()).await.unwrap();
505 }
506
507 assert_eq!(&full_path, output_file.location());
508
509 let read_content = read_from_file(full_path).await;
510
511 assert_eq!(content, &read_content);
512 }
513
514 #[test]
515 fn test_create_file_from_path() {
516 let io = FileIO::from_path("/tmp/a").unwrap();
517 assert_eq!("file", io.scheme_str.unwrap().as_str());
518
519 let io = FileIO::from_path("file:/tmp/b").unwrap();
520 assert_eq!("file", io.scheme_str.unwrap().as_str());
521
522 let io = FileIO::from_path("file:///tmp/c").unwrap();
523 assert_eq!("file", io.scheme_str.unwrap().as_str());
524
525 let io = FileIO::from_path("s3://bucket/a").unwrap();
526 assert_eq!("s3", io.scheme_str.unwrap().as_str());
527
528 let io = FileIO::from_path("tmp/||c");
529 assert!(io.is_err());
530 }
531
532 #[tokio::test]
533 async fn test_memory_io() {
534 let io = FileIOBuilder::new("memory").build().unwrap();
535
536 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
537
538 let output_file = io.new_output(&path).unwrap();
539 output_file.write("test".into()).await.unwrap();
540
541 assert!(io.exists(&path.clone()).await.unwrap());
542 let input_file = io.new_input(&path).unwrap();
543 let content = input_file.read().await.unwrap();
544 assert_eq!(content, Bytes::from("test"));
545
546 io.delete(&path).await.unwrap();
547 assert!(!io.exists(&path).await.unwrap());
548 }
549}