Skip to main content

iceberg/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22
23use super::storage::{
24    LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
25};
26use crate::Result;
27
28/// FileIO implementation, used to manipulate files in underlying storage.
29///
30/// FileIO wraps a `dyn Storage` with lazy initialization via `StorageFactory`.
31/// The storage is created on first use and cached for subsequent operations.
32///
33/// # Note
34///
35/// All paths passed to `FileIO` must be absolute paths starting with the scheme string
36/// appropriate for the storage backend being used.
37///
38/// This crate provides native support for local filesystem (`file://`) and
39/// memory (`memory://`) storage. For extensive storage backend support (S3, GCS,
40/// OSS, Azure, etc.), use the
41/// [`iceberg-storage-opendal`](https://crates.io/crates/iceberg-storage-opendal) crate.
42///
43/// # Example
44///
45/// ```rust,ignore
46/// use iceberg::io::{FileIO, FileIOBuilder};
47/// use iceberg::io::{LocalFsStorageFactory, MemoryStorageFactory};
48/// use std::sync::Arc;
49///
50/// // Create FileIO with memory storage for testing
51/// let file_io = FileIO::new_with_memory();
52///
53/// // Create FileIO with local filesystem storage
54/// let file_io = FileIO::new_with_fs();
55///
56/// // Create FileIO with custom factory
57/// let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory))
58///     .with_prop("key", "value")
59///     .build();
60/// ```
61#[derive(Clone, Debug)]
62pub struct FileIO {
63    /// Storage configuration containing properties
64    config: StorageConfig,
65    /// Factory for creating storage instances
66    factory: Arc<dyn StorageFactory>,
67    /// Cached storage instance (lazily initialized)
68    storage: Arc<OnceLock<Arc<dyn Storage>>>,
69}
70
71impl FileIO {
72    /// Create a new FileIO backed by in-memory storage.
73    ///
74    /// This is useful for testing scenarios where persistent storage is not needed.
75    pub fn new_with_memory() -> Self {
76        Self {
77            config: StorageConfig::new(),
78            factory: Arc::new(MemoryStorageFactory),
79            storage: Arc::new(OnceLock::new()),
80        }
81    }
82
83    /// Create a new FileIO backed by local filesystem storage.
84    ///
85    /// This is useful for local development and testing with real files.
86    pub fn new_with_fs() -> Self {
87        Self {
88            config: StorageConfig::new(),
89            factory: Arc::new(LocalFsStorageFactory),
90            storage: Arc::new(OnceLock::new()),
91        }
92    }
93
94    /// Get the storage configuration.
95    pub fn config(&self) -> &StorageConfig {
96        &self.config
97    }
98
99    /// Get or create the storage instance.
100    ///
101    /// The factory is invoked on first access and the result is cached
102    /// for all subsequent operations.
103    fn get_storage(&self) -> Result<Arc<dyn Storage>> {
104        // Check if already initialized
105        if let Some(storage) = self.storage.get() {
106            return Ok(storage.clone());
107        }
108
109        // Build the storage
110        let storage = self.factory.build(&self.config)?;
111
112        // Try to set it (another thread might have set it first)
113        let _ = self.storage.set(storage.clone());
114
115        // Return whatever is in the cell (either ours or another thread's)
116        Ok(self.storage.get().unwrap().clone())
117    }
118
119    /// Deletes file.
120    ///
121    /// # Arguments
122    ///
123    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
124    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
125        self.get_storage()?.delete(path.as_ref()).await
126    }
127
128    /// Remove the path and all nested dirs and files recursively.
129    ///
130    /// # Arguments
131    ///
132    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
133    ///
134    /// # Behavior
135    ///
136    /// - If the path is a file or not exist, this function will be no-op.
137    /// - If the path is a empty directory, this function will remove the directory itself.
138    /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
139    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
140        self.get_storage()?.delete_prefix(path.as_ref()).await
141    }
142
143    /// Check file exists.
144    ///
145    /// # Arguments
146    ///
147    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
148    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
149        self.get_storage()?.exists(path.as_ref()).await
150    }
151
152    /// Creates input file.
153    ///
154    /// # Arguments
155    ///
156    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
157    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
158        self.get_storage()?.new_input(path.as_ref())
159    }
160
161    /// Creates output file.
162    ///
163    /// # Arguments
164    ///
165    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
166    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
167        self.get_storage()?.new_output(path.as_ref())
168    }
169}
170
171/// Builder for [`FileIO`].
172///
173/// The builder accepts an explicit `StorageFactory` and configuration properties.
174/// Storage is lazily initialized on first use.
175#[derive(Clone, Debug)]
176pub struct FileIOBuilder {
177    /// Factory for creating storage instances
178    factory: Arc<dyn StorageFactory>,
179    /// Storage configuration
180    config: StorageConfig,
181}
182
183impl FileIOBuilder {
184    /// Creates a new builder with the given storage factory.
185    pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
186        Self {
187            factory,
188            config: StorageConfig::new(),
189        }
190    }
191
192    /// Add a configuration property.
193    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
194        self.config = self.config.with_prop(key.to_string(), value.to_string());
195        self
196    }
197
198    /// Add multiple configuration properties.
199    pub fn with_props(
200        mut self,
201        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
202    ) -> Self {
203        self.config = self
204            .config
205            .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
206        self
207    }
208
209    /// Get the storage configuration.
210    pub fn config(&self) -> &StorageConfig {
211        &self.config
212    }
213
214    /// Builds [`FileIO`].
215    pub fn build(self) -> FileIO {
216        FileIO {
217            config: self.config,
218            factory: self.factory,
219            storage: Arc::new(OnceLock::new()),
220        }
221    }
222}
223
224/// The struct the represents the metadata of a file.
225///
226/// TODO: we can add last modified time, content type, etc. in the future.
227pub struct FileMetadata {
228    /// The size of the file.
229    pub size: u64,
230}
231
232/// Trait for reading file.
233///
234/// # TODO
235/// It's possible for us to remove the async_trait, but we need to figure
236/// out how to handle the object safety.
237#[async_trait::async_trait]
238pub trait FileRead: Send + Sync + Unpin + 'static {
239    /// Read file content with given range.
240    ///
241    /// TODO: we can support reading non-contiguous bytes in the future.
242    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
243}
244
245/// Input file is used for reading from files.
246#[derive(Debug)]
247pub struct InputFile {
248    storage: Arc<dyn Storage>,
249    // Absolute path of file.
250    path: String,
251}
252
253impl InputFile {
254    /// Creates a new input file.
255    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
256        Self { storage, path }
257    }
258
259    /// Absolute path to root uri.
260    pub fn location(&self) -> &str {
261        &self.path
262    }
263
264    /// Check if file exists.
265    pub async fn exists(&self) -> crate::Result<bool> {
266        self.storage.exists(&self.path).await
267    }
268
269    /// Fetch and returns metadata of file.
270    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
271        self.storage.metadata(&self.path).await
272    }
273
274    /// Read and returns whole content of file.
275    ///
276    /// For continuous reading, use [`Self::reader`] instead.
277    pub async fn read(&self) -> crate::Result<Bytes> {
278        self.storage.read(&self.path).await
279    }
280
281    /// Creates [`FileRead`] for continuous reading.
282    ///
283    /// For one-time reading, use [`Self::read`] instead.
284    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
285        self.storage.reader(&self.path).await
286    }
287}
288
289/// Trait for writing file.
290///
291/// # TODO
292///
293/// It's possible for us to remove the async_trait, but we need to figure
294/// out how to handle the object safety.
295#[async_trait::async_trait]
296pub trait FileWrite: Send + Unpin + 'static {
297    /// Write bytes to file.
298    ///
299    /// TODO: we can support writing non-contiguous bytes in the future.
300    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
301
302    /// Close file.
303    ///
304    /// Calling close on closed file will generate an error.
305    async fn close(&mut self) -> crate::Result<()>;
306}
307
308/// Output file is used for writing to files..
309#[derive(Debug)]
310pub struct OutputFile {
311    storage: Arc<dyn Storage>,
312    // Absolute path of file.
313    path: String,
314}
315
316impl OutputFile {
317    /// Creates a new output file.
318    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
319        Self { storage, path }
320    }
321
322    /// Relative path to root uri.
323    pub fn location(&self) -> &str {
324        &self.path
325    }
326
327    /// Checks if file exists.
328    pub async fn exists(&self) -> Result<bool> {
329        self.storage.exists(&self.path).await
330    }
331
332    /// Deletes file.
333    ///
334    /// If the file does not exist, it will not return error.
335    pub async fn delete(&self) -> Result<()> {
336        self.storage.delete(&self.path).await
337    }
338
339    /// Converts into [`InputFile`].
340    pub fn to_input_file(self) -> InputFile {
341        InputFile {
342            storage: self.storage,
343            path: self.path,
344        }
345    }
346
347    /// Create a new output file with given bytes.
348    ///
349    /// # Notes
350    ///
351    /// Calling `write` will overwrite the file if it exists.
352    /// For continuous writing, use [`Self::writer`].
353    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
354        self.storage.write(&self.path, bs).await
355    }
356
357    /// Creates output file for continuous writing.
358    ///
359    /// # Notes
360    ///
361    /// For one-time writing, use [`Self::write`] instead.
362    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
363        self.storage.writer(&self.path).await
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use std::fs::{File, create_dir_all};
370    use std::io::Write;
371    use std::path::Path;
372    use std::sync::Arc;
373
374    use bytes::Bytes;
375    use futures::AsyncReadExt;
376    use futures::io::AllowStdIo;
377    use tempfile::TempDir;
378
379    use super::{FileIO, FileIOBuilder};
380    use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
381
382    fn create_local_file_io() -> FileIO {
383        FileIO::new_with_fs()
384    }
385
386    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
387        create_dir_all(path.as_ref().parent().unwrap()).unwrap();
388        let mut f = File::create(path).unwrap();
389        write!(f, "{s}").unwrap();
390    }
391
392    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
393        let mut f = AllowStdIo::new(File::open(path).unwrap());
394        let mut s = String::new();
395        f.read_to_string(&mut s).await.unwrap();
396        s
397    }
398
399    #[tokio::test]
400    async fn test_local_input_file() {
401        let tmp_dir = TempDir::new().unwrap();
402
403        let file_name = "a.txt";
404        let content = "Iceberg loves rust.";
405
406        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
407        write_to_file(content, &full_path);
408
409        let file_io = create_local_file_io();
410        let input_file = file_io.new_input(&full_path).unwrap();
411
412        assert!(input_file.exists().await.unwrap());
413        assert_eq!(&full_path, input_file.location());
414        let read_content = read_from_file(full_path).await;
415
416        assert_eq!(content, &read_content);
417    }
418
419    #[tokio::test]
420    async fn test_delete_local_file() {
421        let tmp_dir = TempDir::new().unwrap();
422
423        let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
424        let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
425        let b_path = format!("{}/{}", sub_dir_path, "b.txt");
426        let c_path = format!("{}/{}", sub_dir_path, "c.txt");
427        write_to_file("Iceberg loves rust.", &a_path);
428        write_to_file("Iceberg loves rust.", &b_path);
429        write_to_file("Iceberg loves rust.", &c_path);
430
431        let file_io = create_local_file_io();
432        assert!(file_io.exists(&a_path).await.unwrap());
433
434        // Remove a file should be no-op.
435        file_io.delete_prefix(&a_path).await.unwrap();
436        assert!(file_io.exists(&a_path).await.unwrap());
437
438        // Remove a not exist dir should be no-op.
439        file_io.delete_prefix("not_exists/").await.unwrap();
440
441        // Remove a dir should remove all files in it.
442        file_io.delete_prefix(&sub_dir_path).await.unwrap();
443        assert!(!file_io.exists(&b_path).await.unwrap());
444        assert!(!file_io.exists(&c_path).await.unwrap());
445        assert!(file_io.exists(&a_path).await.unwrap());
446
447        file_io.delete(&a_path).await.unwrap();
448        assert!(!file_io.exists(&a_path).await.unwrap());
449    }
450
451    #[tokio::test]
452    async fn test_delete_non_exist_file() {
453        let tmp_dir = TempDir::new().unwrap();
454
455        let file_name = "a.txt";
456        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
457
458        let file_io = create_local_file_io();
459        assert!(!file_io.exists(&full_path).await.unwrap());
460        assert!(file_io.delete(&full_path).await.is_ok());
461        assert!(file_io.delete_prefix(&full_path).await.is_ok());
462    }
463
464    #[tokio::test]
465    async fn test_local_output_file() {
466        let tmp_dir = TempDir::new().unwrap();
467
468        let file_name = "a.txt";
469        let content = "Iceberg loves rust.";
470
471        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
472
473        let file_io = create_local_file_io();
474        let output_file = file_io.new_output(&full_path).unwrap();
475
476        assert!(!output_file.exists().await.unwrap());
477        {
478            output_file.write(content.into()).await.unwrap();
479        }
480
481        assert_eq!(&full_path, output_file.location());
482
483        let read_content = read_from_file(full_path).await;
484
485        assert_eq!(content, &read_content);
486    }
487
488    #[tokio::test]
489    async fn test_memory_io() {
490        let io = FileIO::new_with_memory();
491
492        let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
493
494        let output_file = io.new_output(&path).unwrap();
495        output_file.write("test".into()).await.unwrap();
496
497        assert!(io.exists(&path.clone()).await.unwrap());
498        let input_file = io.new_input(&path).unwrap();
499        let content = input_file.read().await.unwrap();
500        assert_eq!(content, Bytes::from("test"));
501
502        io.delete(&path).await.unwrap();
503        assert!(!io.exists(&path).await.unwrap());
504    }
505
506    #[tokio::test]
507    async fn test_file_io_builder_with_props() {
508        let factory = Arc::new(MemoryStorageFactory);
509        let file_io = FileIOBuilder::new(factory)
510            .with_prop("key1", "value1")
511            .with_prop("key2", "value2")
512            .build();
513
514        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
515        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
516    }
517
518    #[tokio::test]
519    async fn test_file_io_builder_with_multiple_props() {
520        let factory = Arc::new(LocalFsStorageFactory);
521        let props = vec![("key1", "value1"), ("key2", "value2")];
522        let file_io = FileIOBuilder::new(factory).with_props(props).build();
523
524        assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
525        assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
526    }
527}