iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::{Any, TypeId};
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use opendal::Operator;
25use url::Url;
26
27use super::storage::Storage;
28use crate::{Error, ErrorKind, Result};
29
30/// FileIO implementation, used to manipulate files in underlying storage.
31///
32/// # Note
33///
34/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`.
35/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`.
36///
37/// Supported storages:
38///
39/// | Storage            | Feature Flag      | Expected Path Format             | Schemes                       |
40/// |--------------------|-------------------|----------------------------------| ------------------------------|
41/// | Local file system  | `storage-fs`      | `file`                           | `file://path/to/file`         |
42/// | Memory             | `storage-memory`  | `memory`                         | `memory://path/to/file`       |
43/// | S3                 | `storage-s3`      | `s3`, `s3a`                      | `s3://<bucket>/path/to/file`  |
44/// | GCS                | `storage-gcs`     | `gs`, `gcs`                      | `gs://<bucket>/path/to/file`  |
45/// | OSS                | `storage-oss`     | `oss`                            | `oss://<bucket>/path/to/file` |
46/// | Azure Datalake     | `storage-azdls`   | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
47#[derive(Clone, Debug)]
48pub struct FileIO {
49    builder: FileIOBuilder,
50
51    inner: Arc<Storage>,
52}
53
54impl FileIO {
55    /// Convert FileIO into [`FileIOBuilder`] which used to build this FileIO.
56    ///
57    /// This function is useful when you want serialize and deserialize FileIO across
58    /// distributed systems.
59    pub fn into_builder(self) -> FileIOBuilder {
60        self.builder
61    }
62
63    /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
64    ///
65    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
66    /// - If it's not a valid url, will try to detect if it's a file path.
67    ///
68    /// Otherwise will return parsing error.
69    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
70        let url = Url::parse(path.as_ref())
71            .map_err(Error::from)
72            .or_else(|e| {
73                Url::from_file_path(path.as_ref()).map_err(|_| {
74                    Error::new(
75                        ErrorKind::DataInvalid,
76                        "Input is neither a valid url nor path",
77                    )
78                    .with_context("input", path.as_ref().to_string())
79                    .with_source(e)
80                })
81            })?;
82
83        Ok(FileIOBuilder::new(url.scheme()))
84    }
85
86    /// Deletes file.
87    ///
88    /// # Arguments
89    ///
90    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
91    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92        let (op, relative_path) = self.inner.create_operator(&path)?;
93        Ok(op.delete(relative_path).await?)
94    }
95
96    /// Remove the path and all nested dirs and files recursively.
97    ///
98    /// # Arguments
99    ///
100    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
101    #[deprecated(note = "use remove_dir_all instead", since = "0.4.0")]
102    pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
103        let (op, relative_path) = self.inner.create_operator(&path)?;
104        Ok(op.remove_all(relative_path).await?)
105    }
106
107    /// Remove the path and all nested dirs and files recursively.
108    ///
109    /// # Arguments
110    ///
111    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
112    ///
113    /// # Behavior
114    ///
115    /// - If the path is a file or not exist, this function will be no-op.
116    /// - If the path is a empty directory, this function will remove the directory itself.
117    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
118    pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
119        let (op, relative_path) = self.inner.create_operator(&path)?;
120        let path = if relative_path.ends_with('/') {
121            relative_path.to_string()
122        } else {
123            format!("{relative_path}/")
124        };
125        Ok(op.remove_all(&path).await?)
126    }
127
128    /// Check file exists.
129    ///
130    /// # Arguments
131    ///
132    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
133    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
134        let (op, relative_path) = self.inner.create_operator(&path)?;
135        Ok(op.exists(relative_path).await?)
136    }
137
138    /// Creates input file.
139    ///
140    /// # Arguments
141    ///
142    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
143    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
144        let (op, relative_path) = self.inner.create_operator(&path)?;
145        let path = path.as_ref().to_string();
146        let relative_path_pos = path.len() - relative_path.len();
147        Ok(InputFile {
148            op,
149            path,
150            relative_path_pos,
151        })
152    }
153
154    /// Creates output file.
155    ///
156    /// # Arguments
157    ///
158    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
159    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
160        let (op, relative_path) = self.inner.create_operator(&path)?;
161        let path = path.as_ref().to_string();
162        let relative_path_pos = path.len() - relative_path.len();
163        Ok(OutputFile {
164            op,
165            path,
166            relative_path_pos,
167        })
168    }
169}
170
171/// Container for storing type-safe extensions used to configure underlying FileIO behavior.
172#[derive(Clone, Debug, Default)]
173pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
174
175impl Extensions {
176    /// Add an extension.
177    pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
178        self.0.insert(TypeId::of::<T>(), Arc::new(ext));
179    }
180
181    /// Extends the current set of extensions with another set of extensions.
182    pub fn extend(&mut self, extensions: Extensions) {
183        self.0.extend(extensions.0);
184    }
185
186    /// Fetch an extension.
187    pub fn get<T>(&self) -> Option<Arc<T>>
188    where T: 'static + Send + Sync + Clone {
189        let type_id = TypeId::of::<T>();
190        self.0
191            .get(&type_id)
192            .and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
193    }
194}
195
196/// Builder for [`FileIO`].
197#[derive(Clone, Debug)]
198pub struct FileIOBuilder {
199    /// This is used to infer scheme of operator.
200    ///
201    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io.
202    scheme_str: Option<String>,
203    /// Arguments for operator.
204    props: HashMap<String, String>,
205    /// Optional extensions to configure the underlying FileIO behavior.
206    extensions: Extensions,
207}
208
209impl FileIOBuilder {
210    /// Creates a new builder with scheme.
211    /// See [`FileIO`] for supported schemes.
212    pub fn new(scheme_str: impl ToString) -> Self {
213        Self {
214            scheme_str: Some(scheme_str.to_string()),
215            props: HashMap::default(),
216            extensions: Extensions::default(),
217        }
218    }
219
220    /// Creates a new builder for local file io.
221    pub fn new_fs_io() -> Self {
222        Self {
223            scheme_str: None,
224            props: HashMap::default(),
225            extensions: Extensions::default(),
226        }
227    }
228
229    /// Fetch the scheme string.
230    ///
231    /// The scheme_str will be empty if it's None.
232    pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
233        (
234            self.scheme_str.unwrap_or_default(),
235            self.props,
236            self.extensions,
237        )
238    }
239
240    /// Add argument for operator.
241    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
242        self.props.insert(key.to_string(), value.to_string());
243        self
244    }
245
246    /// Add argument for operator.
247    pub fn with_props(
248        mut self,
249        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
250    ) -> Self {
251        self.props
252            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
253        self
254    }
255
256    /// Add an extension to the file IO builder.
257    pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
258        self.extensions.add(ext);
259        self
260    }
261
262    /// Adds multiple extensions to the file IO builder.
263    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
264        self.extensions.extend(extensions);
265        self
266    }
267
268    /// Fetch an extension from the file IO builder.
269    pub fn extension<T>(&self) -> Option<Arc<T>>
270    where T: 'static + Send + Sync + Clone {
271        self.extensions.get::<T>()
272    }
273
274    /// Builds [`FileIO`].
275    pub fn build(self) -> Result<FileIO> {
276        let storage = Storage::build(self.clone())?;
277        Ok(FileIO {
278            builder: self,
279            inner: Arc::new(storage),
280        })
281    }
282}
283
284/// The struct the represents the metadata of a file.
285///
286/// TODO: we can add last modified time, content type, etc. in the future.
287pub struct FileMetadata {
288    /// The size of the file.
289    pub size: u64,
290}
291
292/// Trait for reading file.
293///
294/// # TODO
295///
296/// It's possible for us to remove the async_trait, but we need to figure
297/// out how to handle the object safety.
298#[async_trait::async_trait]
299pub trait FileRead: Send + Sync + Unpin + 'static {
300    /// Read file content with given range.
301    ///
302    /// TODO: we can support reading non-contiguous bytes in the future.
303    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
304}
305
306#[async_trait::async_trait]
307impl FileRead for opendal::Reader {
308    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
309        Ok(opendal::Reader::read(self, range).await?.to_bytes())
310    }
311}
312
313/// Input file is used for reading from files.
314#[derive(Debug)]
315pub struct InputFile {
316    op: Operator,
317    // Absolution path of file.
318    path: String,
319    // Relative path of file to uri, starts at [`relative_path_pos`]
320    relative_path_pos: usize,
321}
322
323impl InputFile {
324    /// Absolute path to root uri.
325    pub fn location(&self) -> &str {
326        &self.path
327    }
328
329    /// Check if file exists.
330    pub async fn exists(&self) -> crate::Result<bool> {
331        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
332    }
333
334    /// Fetch and returns metadata of file.
335    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
336        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
337
338        Ok(FileMetadata {
339            size: meta.content_length(),
340        })
341    }
342
343    /// Read and returns whole content of file.
344    ///
345    /// For continuous reading, use [`Self::reader`] instead.
346    pub async fn read(&self) -> crate::Result<Bytes> {
347        Ok(self
348            .op
349            .read(&self.path[self.relative_path_pos..])
350            .await?
351            .to_bytes())
352    }
353
354    /// Creates [`FileRead`] for continuous reading.
355    ///
356    /// For one-time reading, use [`Self::read`] instead.
357    pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
358        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
359    }
360}
361
362/// Trait for writing file.
363///
364/// # TODO
365///
366/// It's possible for us to remove the async_trait, but we need to figure
367/// out how to handle the object safety.
368#[async_trait::async_trait]
369pub trait FileWrite: Send + Unpin + 'static {
370    /// Write bytes to file.
371    ///
372    /// TODO: we can support writing non-contiguous bytes in the future.
373    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
374
375    /// Close file.
376    ///
377    /// Calling close on closed file will generate an error.
378    async fn close(&mut self) -> crate::Result<()>;
379}
380
381#[async_trait::async_trait]
382impl FileWrite for opendal::Writer {
383    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
384        Ok(opendal::Writer::write(self, bs).await?)
385    }
386
387    async fn close(&mut self) -> crate::Result<()> {
388        let _ = opendal::Writer::close(self).await?;
389        Ok(())
390    }
391}
392
393#[async_trait::async_trait]
394impl FileWrite for Box<dyn FileWrite> {
395    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
396        self.as_mut().write(bs).await
397    }
398
399    async fn close(&mut self) -> crate::Result<()> {
400        self.as_mut().close().await
401    }
402}
403
404/// Output file is used for writing to files..
405#[derive(Debug)]
406pub struct OutputFile {
407    op: Operator,
408    // Absolution path of file.
409    path: String,
410    // Relative path of file to uri, starts at [`relative_path_pos`]
411    relative_path_pos: usize,
412}
413
414impl OutputFile {
415    /// Relative path to root uri.
416    pub fn location(&self) -> &str {
417        &self.path
418    }
419
420    /// Checks if file exists.
421    pub async fn exists(&self) -> Result<bool> {
422        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
423    }
424
425    /// Deletes file.
426    ///
427    /// If the file does not exist, it will not return error.
428    pub async fn delete(&self) -> Result<()> {
429        Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
430    }
431
432    /// Converts into [`InputFile`].
433    pub fn to_input_file(self) -> InputFile {
434        InputFile {
435            op: self.op,
436            path: self.path,
437            relative_path_pos: self.relative_path_pos,
438        }
439    }
440
441    /// Create a new output file with given bytes.
442    ///
443    /// # Notes
444    ///
445    /// Calling `write` will overwrite the file if it exists.
446    /// For continuous writing, use [`Self::writer`].
447    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
448        let mut writer = self.writer().await?;
449        writer.write(bs).await?;
450        writer.close().await
451    }
452
453    /// Creates output file for continuous writing.
454    ///
455    /// # Notes
456    ///
457    /// For one-time writing, use [`Self::write`] instead.
458    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
459        Ok(Box::new(
460            self.op.writer(&self.path[self.relative_path_pos..]).await?,
461        ))
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use std::fs::{File, create_dir_all};
468    use std::io::Write;
469    use std::path::Path;
470
471    use bytes::Bytes;
472    use futures::AsyncReadExt;
473    use futures::io::AllowStdIo;
474    use tempfile::TempDir;
475
476    use super::{FileIO, FileIOBuilder};
477
478    fn create_local_file_io() -> FileIO {
479        FileIOBuilder::new_fs_io().build().unwrap()
480    }
481
482    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
483        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
484        let mut f = File::create(path).unwrap();
485        write!(f, "{s}").unwrap();
486    }
487
488    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
489        let mut f = AllowStdIo::new(File::open(path).unwrap());
490        let mut s = String::new();
491        f.read_to_string(&mut s).await.unwrap();
492        s
493    }
494
495    #[tokio::test]
496    async fn test_local_input_file() {
497        let tmp_dir = TempDir::new().unwrap();
498
499        let file_name = "a.txt";
500        let content = "Iceberg loves rust.";
501
502        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
503        write_to_file(content, &full_path);
504
505        let file_io = create_local_file_io();
506        let input_file = file_io.new_input(&full_path).unwrap();
507
508        assert!(input_file.exists().await.unwrap());
509        // Remove heading slash
510        assert_eq!(&full_path, input_file.location());
511        let read_content = read_from_file(full_path).await;
512
513        assert_eq!(content, &read_content);
514    }
515
516    #[tokio::test]
517    async fn test_delete_local_file() {
518        let tmp_dir = TempDir::new().unwrap();
519
520        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
521        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
522        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
523        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
524        write_to_file("Iceberg loves rust.", &a_path);
525        write_to_file("Iceberg loves rust.", &b_path);
526        write_to_file("Iceberg loves rust.", &c_path);
527
528        let file_io = create_local_file_io();
529        assert!(file_io.exists(&a_path).await.unwrap());
530
531        // Remove a file should be no-op.
532        file_io.remove_dir_all(&a_path).await.unwrap();
533        assert!(file_io.exists(&a_path).await.unwrap());
534
535        // Remove a not exist dir should be no-op.
536        file_io.remove_dir_all("not_exists/").await.unwrap();
537
538        // Remove a dir should remove all files in it.
539        file_io.remove_dir_all(&sub_dir_path).await.unwrap();
540        assert!(!file_io.exists(&b_path).await.unwrap());
541        assert!(!file_io.exists(&c_path).await.unwrap());
542        assert!(file_io.exists(&a_path).await.unwrap());
543
544        file_io.delete(&a_path).await.unwrap();
545        assert!(!file_io.exists(&a_path).await.unwrap());
546    }
547
548    #[tokio::test]
549    async fn test_delete_non_exist_file() {
550        let tmp_dir = TempDir::new().unwrap();
551
552        let file_name = "a.txt";
553        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
554
555        let file_io = create_local_file_io();
556        assert!(!file_io.exists(&full_path).await.unwrap());
557        assert!(file_io.delete(&full_path).await.is_ok());
558        assert!(file_io.remove_dir_all(&full_path).await.is_ok());
559    }
560
561    #[tokio::test]
562    async fn test_local_output_file() {
563        let tmp_dir = TempDir::new().unwrap();
564
565        let file_name = "a.txt";
566        let content = "Iceberg loves rust.";
567
568        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
569
570        let file_io = create_local_file_io();
571        let output_file = file_io.new_output(&full_path).unwrap();
572
573        assert!(!output_file.exists().await.unwrap());
574        {
575            output_file.write(content.into()).await.unwrap();
576        }
577
578        assert_eq!(&full_path, output_file.location());
579
580        let read_content = read_from_file(full_path).await;
581
582        assert_eq!(content, &read_content);
583    }
584
585    #[test]
586    fn test_create_file_from_path() {
587        let io = FileIO::from_path("/tmp/a").unwrap();
588        assert_eq!("file", io.scheme_str.unwrap().as_str());
589
590        let io = FileIO::from_path("file:/tmp/b").unwrap();
591        assert_eq!("file", io.scheme_str.unwrap().as_str());
592
593        let io = FileIO::from_path("file:///tmp/c").unwrap();
594        assert_eq!("file", io.scheme_str.unwrap().as_str());
595
596        let io = FileIO::from_path("s3://bucket/a").unwrap();
597        assert_eq!("s3", io.scheme_str.unwrap().as_str());
598
599        let io = FileIO::from_path("tmp/||c");
600        assert!(io.is_err());
601    }
602
603    #[tokio::test]
604    async fn test_memory_io() {
605        let io = FileIOBuilder::new("memory").build().unwrap();
606
607        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
608
609        let output_file = io.new_output(&path).unwrap();
610        output_file.write("test".into()).await.unwrap();
611
612        assert!(io.exists(&path.clone()).await.unwrap());
613        let input_file = io.new_input(&path).unwrap();
614        let content = input_file.read().await.unwrap();
615        assert_eq!(content, Bytes::from("test"));
616
617        io.delete(&path).await.unwrap();
618        assert!(!io.exists(&path).await.unwrap());
619    }
620}