1use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use opendal::Operator;
25use url::Url;
26
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30#[derive(Clone, Debug)]
48pub struct FileIO {
49 builder: FileIOBuilder,
50
51 inner: Arc<Storage>,
52}
53
54impl FileIO {
55 pub fn into_builder(self) -> FileIOBuilder {
60 self.builder
61 }
62
63 pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70 let url = Url::parse(path.as_ref())
71 .map_err(Error::from)
72 .or_else(|e| {
73 Url::from_file_path(path.as_ref()).map_err(|_| {
74 Error::new(
75 ErrorKind::DataInvalid,
76 "Input is neither a valid url nor path",
77 )
78 .with_context("input", path.as_ref().to_string())
79 .with_source(e)
80 })
81 })?;
82
83 Ok(FileIOBuilder::new(url.scheme()))
84 }
85
86 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92 let (op, relative_path) = self.inner.create_operator(&path)?;
93 Ok(op.delete(relative_path).await?)
94 }
95
96 #[deprecated(note = "use remove_dir_all instead", since = "0.4.0")]
102 pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
103 let (op, relative_path) = self.inner.create_operator(&path)?;
104 Ok(op.remove_all(relative_path).await?)
105 }
106
107 pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
119 let (op, relative_path) = self.inner.create_operator(&path)?;
120 let path = if relative_path.ends_with('/') {
121 relative_path.to_string()
122 } else {
123 format!("{relative_path}/")
124 };
125 Ok(op.remove_all(&path).await?)
126 }
127
128 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
134 let (op, relative_path) = self.inner.create_operator(&path)?;
135 Ok(op.exists(relative_path).await?)
136 }
137
138 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
144 let (op, relative_path) = self.inner.create_operator(&path)?;
145 let path = path.as_ref().to_string();
146 let relative_path_pos = path.len() - relative_path.len();
147 Ok(InputFile {
148 op,
149 path,
150 relative_path_pos,
151 })
152 }
153
154 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
160 let (op, relative_path) = self.inner.create_operator(&path)?;
161 let path = path.as_ref().to_string();
162 let relative_path_pos = path.len() - relative_path.len();
163 Ok(OutputFile {
164 op,
165 path,
166 relative_path_pos,
167 })
168 }
169}
170
171#[derive(Clone, Debug, Default)]
173pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
174
175impl Extensions {
176 pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
178 self.0.insert(TypeId::of::<T>(), Arc::new(ext));
179 }
180
181 pub fn extend(&mut self, extensions: Extensions) {
183 self.0.extend(extensions.0);
184 }
185
186 pub fn get<T>(&self) -> Option<Arc<T>>
188 where T: 'static + Send + Sync + Clone {
189 let type_id = TypeId::of::<T>();
190 self.0
191 .get(&type_id)
192 .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
193 }
194}
195
196#[derive(Clone, Debug)]
198pub struct FileIOBuilder {
199 scheme_str: Option<String>,
203 props: HashMap<String, String>,
205 extensions: Extensions,
207}
208
209impl FileIOBuilder {
210 pub fn new(scheme_str: impl ToString) -> Self {
213 Self {
214 scheme_str: Some(scheme_str.to_string()),
215 props: HashMap::default(),
216 extensions: Extensions::default(),
217 }
218 }
219
220 pub fn new_fs_io() -> Self {
222 Self {
223 scheme_str: None,
224 props: HashMap::default(),
225 extensions: Extensions::default(),
226 }
227 }
228
229 pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
233 (
234 self.scheme_str.unwrap_or_default(),
235 self.props,
236 self.extensions,
237 )
238 }
239
240 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
242 self.props.insert(key.to_string(), value.to_string());
243 self
244 }
245
246 pub fn with_props(
248 mut self,
249 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
250 ) -> Self {
251 self.props
252 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
253 self
254 }
255
256 pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
258 self.extensions.add(ext);
259 self
260 }
261
262 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
264 self.extensions.extend(extensions);
265 self
266 }
267
268 pub fn extension<T>(&self) -> Option<Arc<T>>
270 where T: 'static + Send + Sync + Clone {
271 self.extensions.get::<T>()
272 }
273
274 pub fn build(self) -> Result<FileIO> {
276 let storage = Storage::build(self.clone())?;
277 Ok(FileIO {
278 builder: self,
279 inner: Arc::new(storage),
280 })
281 }
282}
283
284pub struct FileMetadata {
288 pub size: u64,
290}
291
292#[async_trait::async_trait]
299pub trait FileRead: Send + Sync + Unpin + 'static {
300 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
304}
305
306#[async_trait::async_trait]
307impl FileRead for opendal::Reader {
308 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
309 Ok(opendal::Reader::read(self, range).await?.to_bytes())
310 }
311}
312
313#[derive(Debug)]
315pub struct InputFile {
316 op: Operator,
317 path: String,
319 relative_path_pos: usize,
321}
322
323impl InputFile {
324 pub fn location(&self) -> &str {
326 &self.path
327 }
328
329 pub async fn exists(&self) -> crate::Result<bool> {
331 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
332 }
333
334 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
336 let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
337
338 Ok(FileMetadata {
339 size: meta.content_length(),
340 })
341 }
342
343 pub async fn read(&self) -> crate::Result<Bytes> {
347 Ok(self
348 .op
349 .read(&self.path[self.relative_path_pos..])
350 .await?
351 .to_bytes())
352 }
353
354 pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
358 Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
359 }
360}
361
362#[async_trait::async_trait]
369pub trait FileWrite: Send + Unpin + 'static {
370 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
374
375 async fn close(&mut self) -> crate::Result<()>;
379}
380
381#[async_trait::async_trait]
382impl FileWrite for opendal::Writer {
383 async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
384 Ok(opendal::Writer::write(self, bs).await?)
385 }
386
387 async fn close(&mut self) -> crate::Result<()> {
388 let _ = opendal::Writer::close(self).await?;
389 Ok(())
390 }
391}
392
393#[async_trait::async_trait]
394impl FileWrite for Box<dyn FileWrite> {
395 async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
396 self.as_mut().write(bs).await
397 }
398
399 async fn close(&mut self) -> crate::Result<()> {
400 self.as_mut().close().await
401 }
402}
403
404#[derive(Debug)]
406pub struct OutputFile {
407 op: Operator,
408 path: String,
410 relative_path_pos: usize,
412}
413
414impl OutputFile {
415 pub fn location(&self) -> &str {
417 &self.path
418 }
419
420 pub async fn exists(&self) -> Result<bool> {
422 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
423 }
424
425 pub async fn delete(&self) -> Result<()> {
429 Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
430 }
431
432 pub fn to_input_file(self) -> InputFile {
434 InputFile {
435 op: self.op,
436 path: self.path,
437 relative_path_pos: self.relative_path_pos,
438 }
439 }
440
441 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
448 let mut writer = self.writer().await?;
449 writer.write(bs).await?;
450 writer.close().await
451 }
452
453 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
459 Ok(Box::new(
460 self.op.writer(&self.path[self.relative_path_pos..]).await?,
461 ))
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use std::fs::{File, create_dir_all};
468 use std::io::Write;
469 use std::path::Path;
470
471 use bytes::Bytes;
472 use futures::AsyncReadExt;
473 use futures::io::AllowStdIo;
474 use tempfile::TempDir;
475
476 use super::{FileIO, FileIOBuilder};
477
478 fn create_local_file_io() -> FileIO {
479 FileIOBuilder::new_fs_io().build().unwrap()
480 }
481
482 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
483 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
484 let mut f = File::create(path).unwrap();
485 write!(f, "{s}").unwrap();
486 }
487
488 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
489 let mut f = AllowStdIo::new(File::open(path).unwrap());
490 let mut s = String::new();
491 f.read_to_string(&mut s).await.unwrap();
492 s
493 }
494
495 #[tokio::test]
496 async fn test_local_input_file() {
497 let tmp_dir = TempDir::new().unwrap();
498
499 let file_name = "a.txt";
500 let content = "Iceberg loves rust.";
501
502 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
503 write_to_file(content, &full_path);
504
505 let file_io = create_local_file_io();
506 let input_file = file_io.new_input(&full_path).unwrap();
507
508 assert!(input_file.exists().await.unwrap());
509 assert_eq!(&full_path, input_file.location());
511 let read_content = read_from_file(full_path).await;
512
513 assert_eq!(content, &read_content);
514 }
515
516 #[tokio::test]
517 async fn test_delete_local_file() {
518 let tmp_dir = TempDir::new().unwrap();
519
520 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
521 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
522 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
523 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
524 write_to_file("Iceberg loves rust.", &a_path);
525 write_to_file("Iceberg loves rust.", &b_path);
526 write_to_file("Iceberg loves rust.", &c_path);
527
528 let file_io = create_local_file_io();
529 assert!(file_io.exists(&a_path).await.unwrap());
530
531 file_io.remove_dir_all(&a_path).await.unwrap();
533 assert!(file_io.exists(&a_path).await.unwrap());
534
535 file_io.remove_dir_all("not_exists/").await.unwrap();
537
538 file_io.remove_dir_all(&sub_dir_path).await.unwrap();
540 assert!(!file_io.exists(&b_path).await.unwrap());
541 assert!(!file_io.exists(&c_path).await.unwrap());
542 assert!(file_io.exists(&a_path).await.unwrap());
543
544 file_io.delete(&a_path).await.unwrap();
545 assert!(!file_io.exists(&a_path).await.unwrap());
546 }
547
548 #[tokio::test]
549 async fn test_delete_non_exist_file() {
550 let tmp_dir = TempDir::new().unwrap();
551
552 let file_name = "a.txt";
553 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
554
555 let file_io = create_local_file_io();
556 assert!(!file_io.exists(&full_path).await.unwrap());
557 assert!(file_io.delete(&full_path).await.is_ok());
558 assert!(file_io.remove_dir_all(&full_path).await.is_ok());
559 }
560
561 #[tokio::test]
562 async fn test_local_output_file() {
563 let tmp_dir = TempDir::new().unwrap();
564
565 let file_name = "a.txt";
566 let content = "Iceberg loves rust.";
567
568 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
569
570 let file_io = create_local_file_io();
571 let output_file = file_io.new_output(&full_path).unwrap();
572
573 assert!(!output_file.exists().await.unwrap());
574 {
575 output_file.write(content.into()).await.unwrap();
576 }
577
578 assert_eq!(&full_path, output_file.location());
579
580 let read_content = read_from_file(full_path).await;
581
582 assert_eq!(content, &read_content);
583 }
584
585 #[test]
586 fn test_create_file_from_path() {
587 let io = FileIO::from_path("/tmp/a").unwrap();
588 assert_eq!("file", io.scheme_str.unwrap().as_str());
589
590 let io = FileIO::from_path("file:/tmp/b").unwrap();
591 assert_eq!("file", io.scheme_str.unwrap().as_str());
592
593 let io = FileIO::from_path("file:///tmp/c").unwrap();
594 assert_eq!("file", io.scheme_str.unwrap().as_str());
595
596 let io = FileIO::from_path("s3://bucket/a").unwrap();
597 assert_eq!("s3", io.scheme_str.unwrap().as_str());
598
599 let io = FileIO::from_path("tmp/||c");
600 assert!(io.is_err());
601 }
602
603 #[tokio::test]
604 async fn test_memory_io() {
605 let io = FileIOBuilder::new("memory").build().unwrap();
606
607 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
608
609 let output_file = io.new_output(&path).unwrap();
610 output_file.write("test".into()).await.unwrap();
611
612 assert!(io.exists(&path.clone()).await.unwrap());
613 let input_file = io.new_input(&path).unwrap();
614 let content = input_file.read().await.unwrap();
615 assert_eq!(content, Bytes::from("test"));
616
617 io.delete(&path).await.unwrap();
618 assert!(!io.exists(&path).await.unwrap());
619 }
620}