1use std::ops::Range;
19use std::sync::{Arc, OnceLock};
20
21use bytes::Bytes;
22
23use super::storage::{
24 LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
25};
26use crate::Result;
27
28#[derive(Clone, Debug)]
62pub struct FileIO {
63 config: StorageConfig,
65 factory: Arc<dyn StorageFactory>,
67 storage: Arc<OnceLock<Arc<dyn Storage>>>,
69}
70
71impl FileIO {
72 pub fn new_with_memory() -> Self {
76 Self {
77 config: StorageConfig::new(),
78 factory: Arc::new(MemoryStorageFactory),
79 storage: Arc::new(OnceLock::new()),
80 }
81 }
82
83 pub fn new_with_fs() -> Self {
87 Self {
88 config: StorageConfig::new(),
89 factory: Arc::new(LocalFsStorageFactory),
90 storage: Arc::new(OnceLock::new()),
91 }
92 }
93
94 pub fn config(&self) -> &StorageConfig {
96 &self.config
97 }
98
99 fn get_storage(&self) -> Result<Arc<dyn Storage>> {
104 if let Some(storage) = self.storage.get() {
106 return Ok(storage.clone());
107 }
108
109 let storage = self.factory.build(&self.config)?;
111
112 let _ = self.storage.set(storage.clone());
114
115 Ok(self.storage.get().unwrap().clone())
117 }
118
119 pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
125 self.get_storage()?.delete(path.as_ref()).await
126 }
127
128 pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
140 self.get_storage()?.delete_prefix(path.as_ref()).await
141 }
142
143 pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
149 self.get_storage()?.exists(path.as_ref()).await
150 }
151
152 pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
158 self.get_storage()?.new_input(path.as_ref())
159 }
160
161 pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
167 self.get_storage()?.new_output(path.as_ref())
168 }
169}
170
171#[derive(Clone, Debug)]
176pub struct FileIOBuilder {
177 factory: Arc<dyn StorageFactory>,
179 config: StorageConfig,
181}
182
183impl FileIOBuilder {
184 pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
186 Self {
187 factory,
188 config: StorageConfig::new(),
189 }
190 }
191
192 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
194 self.config = self.config.with_prop(key.to_string(), value.to_string());
195 self
196 }
197
198 pub fn with_props(
200 mut self,
201 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
202 ) -> Self {
203 self.config = self
204 .config
205 .with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
206 self
207 }
208
209 pub fn config(&self) -> &StorageConfig {
211 &self.config
212 }
213
214 pub fn build(self) -> FileIO {
216 FileIO {
217 config: self.config,
218 factory: self.factory,
219 storage: Arc::new(OnceLock::new()),
220 }
221 }
222}
223
224pub struct FileMetadata {
228 pub size: u64,
230}
231
232#[async_trait::async_trait]
238pub trait FileRead: Send + Sync + Unpin + 'static {
239 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
243}
244
245#[derive(Debug)]
247pub struct InputFile {
248 storage: Arc<dyn Storage>,
249 path: String,
251}
252
253impl InputFile {
254 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
256 Self { storage, path }
257 }
258
259 pub fn location(&self) -> &str {
261 &self.path
262 }
263
264 pub async fn exists(&self) -> crate::Result<bool> {
266 self.storage.exists(&self.path).await
267 }
268
269 pub async fn metadata(&self) -> crate::Result<FileMetadata> {
271 self.storage.metadata(&self.path).await
272 }
273
274 pub async fn read(&self) -> crate::Result<Bytes> {
278 self.storage.read(&self.path).await
279 }
280
281 pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
285 self.storage.reader(&self.path).await
286 }
287}
288
289#[async_trait::async_trait]
296pub trait FileWrite: Send + Unpin + 'static {
297 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
301
302 async fn close(&mut self) -> crate::Result<()>;
306}
307
308#[derive(Debug)]
310pub struct OutputFile {
311 storage: Arc<dyn Storage>,
312 path: String,
314}
315
316impl OutputFile {
317 pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
319 Self { storage, path }
320 }
321
322 pub fn location(&self) -> &str {
324 &self.path
325 }
326
327 pub async fn exists(&self) -> Result<bool> {
329 self.storage.exists(&self.path).await
330 }
331
332 pub async fn delete(&self) -> Result<()> {
336 self.storage.delete(&self.path).await
337 }
338
339 pub fn to_input_file(self) -> InputFile {
341 InputFile {
342 storage: self.storage,
343 path: self.path,
344 }
345 }
346
347 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
354 self.storage.write(&self.path, bs).await
355 }
356
357 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
363 self.storage.writer(&self.path).await
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use std::fs::{File, create_dir_all};
370 use std::io::Write;
371 use std::path::Path;
372 use std::sync::Arc;
373
374 use bytes::Bytes;
375 use futures::AsyncReadExt;
376 use futures::io::AllowStdIo;
377 use tempfile::TempDir;
378
379 use super::{FileIO, FileIOBuilder};
380 use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
381
382 fn create_local_file_io() -> FileIO {
383 FileIO::new_with_fs()
384 }
385
386 fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
387 create_dir_all(path.as_ref().parent().unwrap()).unwrap();
388 let mut f = File::create(path).unwrap();
389 write!(f, "{s}").unwrap();
390 }
391
392 async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
393 let mut f = AllowStdIo::new(File::open(path).unwrap());
394 let mut s = String::new();
395 f.read_to_string(&mut s).await.unwrap();
396 s
397 }
398
399 #[tokio::test]
400 async fn test_local_input_file() {
401 let tmp_dir = TempDir::new().unwrap();
402
403 let file_name = "a.txt";
404 let content = "Iceberg loves rust.";
405
406 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
407 write_to_file(content, &full_path);
408
409 let file_io = create_local_file_io();
410 let input_file = file_io.new_input(&full_path).unwrap();
411
412 assert!(input_file.exists().await.unwrap());
413 assert_eq!(&full_path, input_file.location());
414 let read_content = read_from_file(full_path).await;
415
416 assert_eq!(content, &read_content);
417 }
418
419 #[tokio::test]
420 async fn test_delete_local_file() {
421 let tmp_dir = TempDir::new().unwrap();
422
423 let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
424 let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
425 let b_path = format!("{}/{}", sub_dir_path, "b.txt");
426 let c_path = format!("{}/{}", sub_dir_path, "c.txt");
427 write_to_file("Iceberg loves rust.", &a_path);
428 write_to_file("Iceberg loves rust.", &b_path);
429 write_to_file("Iceberg loves rust.", &c_path);
430
431 let file_io = create_local_file_io();
432 assert!(file_io.exists(&a_path).await.unwrap());
433
434 file_io.delete_prefix(&a_path).await.unwrap();
436 assert!(file_io.exists(&a_path).await.unwrap());
437
438 file_io.delete_prefix("not_exists/").await.unwrap();
440
441 file_io.delete_prefix(&sub_dir_path).await.unwrap();
443 assert!(!file_io.exists(&b_path).await.unwrap());
444 assert!(!file_io.exists(&c_path).await.unwrap());
445 assert!(file_io.exists(&a_path).await.unwrap());
446
447 file_io.delete(&a_path).await.unwrap();
448 assert!(!file_io.exists(&a_path).await.unwrap());
449 }
450
451 #[tokio::test]
452 async fn test_delete_non_exist_file() {
453 let tmp_dir = TempDir::new().unwrap();
454
455 let file_name = "a.txt";
456 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
457
458 let file_io = create_local_file_io();
459 assert!(!file_io.exists(&full_path).await.unwrap());
460 assert!(file_io.delete(&full_path).await.is_ok());
461 assert!(file_io.delete_prefix(&full_path).await.is_ok());
462 }
463
464 #[tokio::test]
465 async fn test_local_output_file() {
466 let tmp_dir = TempDir::new().unwrap();
467
468 let file_name = "a.txt";
469 let content = "Iceberg loves rust.";
470
471 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
472
473 let file_io = create_local_file_io();
474 let output_file = file_io.new_output(&full_path).unwrap();
475
476 assert!(!output_file.exists().await.unwrap());
477 {
478 output_file.write(content.into()).await.unwrap();
479 }
480
481 assert_eq!(&full_path, output_file.location());
482
483 let read_content = read_from_file(full_path).await;
484
485 assert_eq!(content, &read_content);
486 }
487
488 #[tokio::test]
489 async fn test_memory_io() {
490 let io = FileIO::new_with_memory();
491
492 let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
493
494 let output_file = io.new_output(&path).unwrap();
495 output_file.write("test".into()).await.unwrap();
496
497 assert!(io.exists(&path.clone()).await.unwrap());
498 let input_file = io.new_input(&path).unwrap();
499 let content = input_file.read().await.unwrap();
500 assert_eq!(content, Bytes::from("test"));
501
502 io.delete(&path).await.unwrap();
503 assert!(!io.exists(&path).await.unwrap());
504 }
505
506 #[tokio::test]
507 async fn test_file_io_builder_with_props() {
508 let factory = Arc::new(MemoryStorageFactory);
509 let file_io = FileIOBuilder::new(factory)
510 .with_prop("key1", "value1")
511 .with_prop("key2", "value2")
512 .build();
513
514 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
515 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
516 }
517
518 #[tokio::test]
519 async fn test_file_io_builder_with_multiple_props() {
520 let factory = Arc::new(LocalFsStorageFactory);
521 let props = vec![("key1", "value1"), ("key2", "value2")];
522 let file_io = FileIOBuilder::new(factory).with_props(props).build();
523
524 assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
525 assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
526 }
527}