iceberg/arrow/
delete_file_loader.rs1use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21
22use crate::arrow::ArrowReader;
23use crate::arrow::record_batch_transformer::RecordBatchTransformer;
24use crate::io::FileIO;
25use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
26use crate::spec::{Schema, SchemaRef};
27use crate::{Error, ErrorKind, Result};
28
29#[allow(unused)]
31#[async_trait::async_trait]
32pub trait DeleteFileLoader {
33 async fn read_delete_file(
37 &self,
38 task: &FileScanTaskDeleteFile,
39 schema: SchemaRef,
40 ) -> Result<ArrowRecordBatchStream>;
41}
42
43#[derive(Clone, Debug)]
44pub(crate) struct BasicDeleteFileLoader {
45 file_io: FileIO,
46}
47
48#[allow(unused_variables)]
49impl BasicDeleteFileLoader {
50 pub fn new(file_io: FileIO) -> Self {
51 BasicDeleteFileLoader { file_io }
52 }
53 pub(crate) async fn parquet_to_batch_stream(
55 &self,
56 data_file_path: &str,
57 ) -> Result<ArrowRecordBatchStream> {
58 let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
63 data_file_path,
64 self.file_io.clone(),
65 false,
66 )
67 .await?
68 .build()?
69 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e)));
70
71 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
72 }
73
74 pub(crate) async fn evolve_schema(
76 record_batch_stream: ArrowRecordBatchStream,
77 target_schema: Arc<Schema>,
78 ) -> Result<ArrowRecordBatchStream> {
79 let eq_ids = target_schema
80 .as_ref()
81 .field_id_to_name_map()
82 .keys()
83 .cloned()
84 .collect::<Vec<_>>();
85
86 let mut record_batch_transformer =
87 RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
88
89 let record_batch_stream = record_batch_stream.map(move |record_batch| {
90 record_batch.and_then(|record_batch| {
91 record_batch_transformer.process_record_batch(record_batch)
92 })
93 });
94
95 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
96 }
97}
98
99#[async_trait::async_trait]
100impl DeleteFileLoader for BasicDeleteFileLoader {
101 async fn read_delete_file(
102 &self,
103 task: &FileScanTaskDeleteFile,
104 schema: SchemaRef,
105 ) -> Result<ArrowRecordBatchStream> {
106 let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;
107
108 Self::evolve_schema(raw_batch_stream, schema).await
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use tempfile::TempDir;
115
116 use super::*;
117 use crate::arrow::delete_filter::tests::setup;
118
119 #[tokio::test]
120 async fn test_basic_delete_file_loader_read_delete_file() {
121 let tmp_dir = TempDir::new().unwrap();
122 let table_location = tmp_dir.path();
123 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
124 .unwrap()
125 .build()
126 .unwrap();
127
128 let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
129
130 let file_scan_tasks = setup(table_location);
131
132 let result = delete_file_loader
133 .read_delete_file(
134 &file_scan_tasks[0].deletes[0],
135 file_scan_tasks[0].schema_ref(),
136 )
137 .await
138 .unwrap();
139
140 let result = result.try_collect::<Vec<_>>().await.unwrap();
141
142 assert_eq!(result.len(), 1);
143 }
144}