1use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use url::Url;
25
26use super::storage::{OpenDalStorage, Storage};
27use crate::{Error, ErrorKind, Result};
28
29#[derive(Clone, Debug)]
47pub struct FileIO {
48 builder: FileIOBuilder,
49
50 inner: Arc<OpenDalStorage>,
51}
52
53impl FileIO {
54 pub fn into_builder(self) -> FileIOBuilder {
59 self.builder
60 }
61
62 pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
69 let url = Url::parse(path.as_ref())
70 .map_err(Error::from)
71 .or_else(|e| {
72 Url::from_file_path(path.as_ref()).map_err(|_| {
73 Error::new(
74 ErrorKind::DataInvalid,
75 "Input is neither a valid url nor path",
76 )
77 .with_context("input", path.as_ref().to_string())
78 .with_source(e)
79 })
80 })?;
81
82 Ok(FileIOBuilder::new(url.scheme()))
83 }
84
85 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
91 self.inner.delete(path.as_ref()).await
92 }
93
94 pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
106 self.inner.delete_prefix(path.as_ref()).await
107 }
108
109 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
115 self.inner.exists(path.as_ref()).await
116 }
117
118 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
124 self.inner.new_input(path.as_ref())
125 }
126
127 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
133 self.inner.new_output(path.as_ref())
134 }
135}
136
137#[derive(Clone, Debug, Default)]
139pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
140
141impl Extensions {
142 pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
144 self.0.insert(TypeId::of::<T>(), Arc::new(ext));
145 }
146
147 pub fn extend(&mut self, extensions: Extensions) {
149 self.0.extend(extensions.0);
150 }
151
152 pub fn get<T>(&self) -> Option<Arc<T>>
154 where T: 'static + Send + Sync + Clone {
155 let type_id = TypeId::of::<T>();
156 self.0
157 .get(&type_id)
158 .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
159 }
160}
161
162#[derive(Clone, Debug)]
164pub struct FileIOBuilder {
165 scheme_str: Option<String>,
169 props: HashMap<String, String>,
171 extensions: Extensions,
173}
174
175impl FileIOBuilder {
176 pub fn new(scheme_str: impl ToString) -> Self {
179 Self {
180 scheme_str: Some(scheme_str.to_string()),
181 props: HashMap::default(),
182 extensions: Extensions::default(),
183 }
184 }
185
186 pub fn new_fs_io() -> Self {
188 Self {
189 scheme_str: None,
190 props: HashMap::default(),
191 extensions: Extensions::default(),
192 }
193 }
194
195 pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
199 (
200 self.scheme_str.unwrap_or_default(),
201 self.props,
202 self.extensions,
203 )
204 }
205
206 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
208 self.props.insert(key.to_string(), value.to_string());
209 self
210 }
211
212 pub fn with_props(
214 mut self,
215 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
216 ) -> Self {
217 self.props
218 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
219 self
220 }
221
222 pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
224 self.extensions.add(ext);
225 self
226 }
227
228 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
230 self.extensions.extend(extensions);
231 self
232 }
233
234 pub fn extension<T>(&self) -> Option<Arc<T>>
236 where T: 'static + Send + Sync + Clone {
237 self.extensions.get::<T>()
238 }
239
240 pub fn build(self) -> Result<FileIO> {
242 let storage = OpenDalStorage::build(self.clone())?;
243 Ok(FileIO {
244 builder: self,
245 inner: Arc::new(storage),
246 })
247 }
248}
249
250pub struct FileMetadata {
254 pub size: u64,
256}
257
258#[async_trait::async_trait]
264pub trait FileRead: Send + Sync + Unpin + 'static {
265 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
269}
270
271#[derive(Debug)]
273pub struct InputFile {
274 storage: Arc<dyn Storage>,
275 path: String,
277}
278
279impl InputFile {
280 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
282 Self { storage, path }
283 }
284
285 pub fn location(&self) -> &str {
287 &self.path
288 }
289
290 pub async fn exists(&self) -> crate::Result<bool> {
292 self.storage.exists(&self.path).await
293 }
294
295 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
297 self.storage.metadata(&self.path).await
298 }
299
300 pub async fn read(&self) -> crate::Result<Bytes> {
304 self.storage.read(&self.path).await
305 }
306
307 pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
311 self.storage.reader(&self.path).await
312 }
313}
314
315#[async_trait::async_trait]
322pub trait FileWrite: Send + Unpin + 'static {
323 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
327
328 async fn close(&mut self) -> crate::Result<()>;
332}
333
334#[derive(Debug)]
336pub struct OutputFile {
337 storage: Arc<dyn Storage>,
338 path: String,
340}
341
342impl OutputFile {
343 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
345 Self { storage, path }
346 }
347
348 pub fn location(&self) -> &str {
350 &self.path
351 }
352
353 pub async fn exists(&self) -> Result<bool> {
355 self.storage.exists(&self.path).await
356 }
357
358 pub async fn delete(&self) -> Result<()> {
362 self.storage.delete(&self.path).await
363 }
364
365 pub fn to_input_file(self) -> InputFile {
367 InputFile {
368 storage: self.storage,
369 path: self.path,
370 }
371 }
372
373 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
380 self.storage.write(&self.path, bs).await
381 }
382
383 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
389 self.storage.writer(&self.path).await
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use std::fs::{File, create_dir_all};
396 use std::io::Write;
397 use std::path::Path;
398
399 use bytes::Bytes;
400 use futures::AsyncReadExt;
401 use futures::io::AllowStdIo;
402 use tempfile::TempDir;
403
404 use super::{FileIO, FileIOBuilder};
405
406 fn create_local_file_io() -> FileIO {
407 FileIOBuilder::new_fs_io().build().unwrap()
408 }
409
410 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
411 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
412 let mut f = File::create(path).unwrap();
413 write!(f, "{s}").unwrap();
414 }
415
416 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
417 let mut f = AllowStdIo::new(File::open(path).unwrap());
418 let mut s = String::new();
419 f.read_to_string(&mut s).await.unwrap();
420 s
421 }
422
423 #[tokio::test]
424 async fn test_local_input_file() {
425 let tmp_dir = TempDir::new().unwrap();
426
427 let file_name = "a.txt";
428 let content = "Iceberg loves rust.";
429
430 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
431 write_to_file(content, &full_path);
432
433 let file_io = create_local_file_io();
434 let input_file = file_io.new_input(&full_path).unwrap();
435
436 assert!(input_file.exists().await.unwrap());
437 assert_eq!(&full_path, input_file.location());
439 let read_content = read_from_file(full_path).await;
440
441 assert_eq!(content, &read_content);
442 }
443
444 #[tokio::test]
445 async fn test_delete_local_file() {
446 let tmp_dir = TempDir::new().unwrap();
447
448 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
449 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
450 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
451 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
452 write_to_file("Iceberg loves rust.", &a_path);
453 write_to_file("Iceberg loves rust.", &b_path);
454 write_to_file("Iceberg loves rust.", &c_path);
455
456 let file_io = create_local_file_io();
457 assert!(file_io.exists(&a_path).await.unwrap());
458
459 file_io.delete_prefix(&a_path).await.unwrap();
461 assert!(file_io.exists(&a_path).await.unwrap());
462
463 file_io.delete_prefix("not_exists/").await.unwrap();
465
466 file_io.delete_prefix(&sub_dir_path).await.unwrap();
468 assert!(!file_io.exists(&b_path).await.unwrap());
469 assert!(!file_io.exists(&c_path).await.unwrap());
470 assert!(file_io.exists(&a_path).await.unwrap());
471
472 file_io.delete(&a_path).await.unwrap();
473 assert!(!file_io.exists(&a_path).await.unwrap());
474 }
475
476 #[tokio::test]
477 async fn test_delete_non_exist_file() {
478 let tmp_dir = TempDir::new().unwrap();
479
480 let file_name = "a.txt";
481 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
482
483 let file_io = create_local_file_io();
484 assert!(!file_io.exists(&full_path).await.unwrap());
485 assert!(file_io.delete(&full_path).await.is_ok());
486 assert!(file_io.delete_prefix(&full_path).await.is_ok());
487 }
488
489 #[tokio::test]
490 async fn test_local_output_file() {
491 let tmp_dir = TempDir::new().unwrap();
492
493 let file_name = "a.txt";
494 let content = "Iceberg loves rust.";
495
496 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
497
498 let file_io = create_local_file_io();
499 let output_file = file_io.new_output(&full_path).unwrap();
500
501 assert!(!output_file.exists().await.unwrap());
502 {
503 output_file.write(content.into()).await.unwrap();
504 }
505
506 assert_eq!(&full_path, output_file.location());
507
508 let read_content = read_from_file(full_path).await;
509
510 assert_eq!(content, &read_content);
511 }
512
513 #[test]
514 fn test_create_file_from_path() {
515 let io = FileIO::from_path("/tmp/a").unwrap();
516 assert_eq!("file", io.scheme_str.unwrap().as_str());
517
518 let io = FileIO::from_path("file:/tmp/b").unwrap();
519 assert_eq!("file", io.scheme_str.unwrap().as_str());
520
521 let io = FileIO::from_path("file:///tmp/c").unwrap();
522 assert_eq!("file", io.scheme_str.unwrap().as_str());
523
524 let io = FileIO::from_path("s3://bucket/a").unwrap();
525 assert_eq!("s3", io.scheme_str.unwrap().as_str());
526
527 let io = FileIO::from_path("tmp/||c");
528 assert!(io.is_err());
529 }
530
531 #[tokio::test]
532 async fn test_memory_io() {
533 let io = FileIOBuilder::new("memory").build().unwrap();
534
535 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
536
537 let output_file = io.new_output(&path).unwrap();
538 output_file.write("test".into()).await.unwrap();
539
540 assert!(io.exists(&path.clone()).await.unwrap());
541 let input_file = io.new_input(&path).unwrap();
542 let content = input_file.read().await.unwrap();
543 assert_eq!(content, Bytes::from("test"));
544
545 io.delete(&path).await.unwrap();
546 assert!(!io.exists(&path).await.unwrap());
547 }
548}