1use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
25
26use crate::spec::{DataFile, PartitionKey, Struct};
27use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
28use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
29use crate::writer::file_writer::FileWriterBuilder;
30use crate::writer::{IcebergWriter, IcebergWriterBuilder};
31use crate::{Error, ErrorKind, Result};
32
33#[derive(Debug)]
35pub struct PositionDeleteFileWriterBuilder<
36 B: FileWriterBuilder,
37 L: LocationGenerator,
38 F: FileNameGenerator,
39> {
40 inner: RollingFileWriterBuilder<B, L, F>,
41 config: PositionDeleteWriterConfig,
42}
43
44impl<B, L, F> PositionDeleteFileWriterBuilder<B, L, F>
45where
46 B: FileWriterBuilder,
47 L: LocationGenerator,
48 F: FileNameGenerator,
49{
50 pub fn new(
52 inner: RollingFileWriterBuilder<B, L, F>,
53 config: PositionDeleteWriterConfig,
54 ) -> Self {
55 Self { inner, config }
56 }
57}
58
59#[derive(Clone, Debug)]
61pub struct PositionDeleteWriterConfig {
62 partition_value: Struct,
63 partition_spec_id: i32,
64 referenced_data_file: Option<String>,
65}
66
67impl PositionDeleteWriterConfig {
68 pub fn new(
75 partition_value: Option<Struct>,
76 partition_spec_id: i32,
77 referenced_data_file: Option<String>,
78 ) -> Self {
79 Self {
80 partition_value: partition_value.unwrap_or(Struct::empty()),
81 partition_spec_id,
82 referenced_data_file,
83 }
84 }
85
86 pub fn arrow_schema() -> ArrowSchemaRef {
92 Arc::new(ArrowSchema::new(vec![
93 Field::new("file_path", DataType::Utf8, false).with_metadata(
94 [(
95 PARQUET_FIELD_ID_META_KEY.to_string(),
96 "2147483546".to_string(),
97 )]
98 .into_iter()
99 .collect(),
100 ),
101 Field::new("pos", DataType::Int64, false).with_metadata(
102 [(
103 PARQUET_FIELD_ID_META_KEY.to_string(),
104 "2147483545".to_string(),
105 )]
106 .into_iter()
107 .collect(),
108 ),
109 ]))
110 }
111}
112
113#[async_trait::async_trait]
114impl<B, L, F> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B, L, F>
115where
116 B: FileWriterBuilder,
117 L: LocationGenerator,
118 F: FileNameGenerator,
119{
120 type R = PositionDeleteFileWriter<B, L, F>;
121
122 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
123 Ok(PositionDeleteFileWriter {
124 inner: Some(self.inner.build()),
125 partition_value: self.config.partition_value.clone(),
126 partition_spec_id: self.config.partition_spec_id,
127 referenced_data_file: self.config.referenced_data_file.clone(),
128 partition_key,
129 })
130 }
131}
132
133#[derive(Debug)]
140pub struct PositionDeleteFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
141{
142 inner: Option<RollingFileWriter<B, L, F>>,
143 partition_value: Struct,
144 partition_spec_id: i32,
145 referenced_data_file: Option<String>,
146 partition_key: Option<PartitionKey>,
147}
148
149#[async_trait::async_trait]
150impl<B, L, F> IcebergWriter for PositionDeleteFileWriter<B, L, F>
151where
152 B: FileWriterBuilder,
153 L: LocationGenerator,
154 F: FileNameGenerator,
155{
156 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
157 let expected_schema = PositionDeleteWriterConfig::arrow_schema();
159 if batch.schema().fields() != expected_schema.fields() {
160 return Err(Error::new(
161 ErrorKind::DataInvalid,
162 format!(
163 "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
164 expected_schema.fields(),
165 batch.schema().fields()
166 ),
167 ));
168 }
169
170 if let Some(writer) = self.inner.as_mut() {
171 writer.write(&self.partition_key, &batch).await
172 } else {
173 Err(Error::new(
174 ErrorKind::Unexpected,
175 "Position delete inner writer has been closed.",
176 ))
177 }
178 }
179
180 async fn close(&mut self) -> Result<Vec<DataFile>> {
181 if let Some(writer) = self.inner.take() {
182 writer
183 .close()
184 .await?
185 .into_iter()
186 .map(|mut res| {
187 res.content(crate::spec::DataContentType::PositionDeletes);
188 res.partition(self.partition_value.clone());
189 res.partition_spec_id(self.partition_spec_id);
190 if let Some(ref data_file) = self.referenced_data_file {
191 res.referenced_data_file(Some(data_file.clone()));
192 }
193 res.build().map_err(|e| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Failed to build position delete data file: {e}"),
198 )
199 })
200 })
201 .collect()
202 } else {
203 Err(Error::new(
204 ErrorKind::Unexpected,
205 "Position delete inner writer has been closed.",
206 ))
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use std::sync::Arc;
214
215 use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
216 use arrow_select::concat::concat_batches;
217 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
218 use parquet::file::properties::WriterProperties;
219 use tempfile::TempDir;
220
221 use super::*;
222 use crate::arrow::arrow_schema_to_schema;
223 use crate::io::{FileIO, FileIOBuilder};
224 use crate::spec::DataFileFormat;
225 use crate::writer::IcebergWriterBuilder;
226 use crate::writer::file_writer::ParquetWriterBuilder;
227 use crate::writer::file_writer::location_generator::{
228 DefaultFileNameGenerator, DefaultLocationGenerator,
229 };
230 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
231
232 async fn check_parquet_position_delete_file(
233 file_io: &FileIO,
234 data_file: &DataFile,
235 expected_batch: &RecordBatch,
236 ) {
237 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
238 assert_eq!(
239 data_file.content,
240 crate::spec::DataContentType::PositionDeletes
241 );
242
243 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
245 let input_content = input_file.read().await.unwrap();
246 let reader_builder =
247 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
248 let metadata = reader_builder.metadata().clone();
249
250 let reader = reader_builder.build().unwrap();
252 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
253 let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
254 assert_eq!(*expected_batch, actual);
255
256 assert_eq!(
258 data_file.record_count,
259 metadata
260 .row_groups()
261 .iter()
262 .map(|group| group.num_rows())
263 .sum::<i64>() as u64
264 );
265 assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
266
267 assert!(data_file.sort_order_id.is_none());
269 }
270
271 #[tokio::test]
272 async fn test_position_delete_writer() -> Result<()> {
273 let temp_dir = TempDir::new().unwrap();
274 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
275 let location_gen = DefaultLocationGenerator::with_data_location(
276 temp_dir.path().to_str().unwrap().to_string(),
277 );
278 let file_name_gen =
279 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
280
281 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
283 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
284
285 let config = PositionDeleteWriterConfig::new(None, 0, None);
287 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
288 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
289 pb,
290 schema,
291 file_io.clone(),
292 location_gen,
293 file_name_gen,
294 );
295 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
296 .build(None)
297 .await?;
298
299 let file_paths = StringArray::from(vec![
301 "s3://bucket/data/file1.parquet",
302 "s3://bucket/data/file1.parquet",
303 "s3://bucket/data/file1.parquet",
304 ]);
305 let positions = Int64Array::from(vec![5, 10, 15]);
306 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
307 Arc::new(file_paths),
308 Arc::new(positions),
309 ])?;
310
311 writer.write(batch.clone()).await?;
313 let data_files = writer.close().await?;
314
315 assert_eq!(data_files.len(), 1);
316 let data_file = &data_files[0];
317
318 check_parquet_position_delete_file(&file_io, data_file, &batch).await;
320
321 Ok(())
322 }
323
324 #[tokio::test]
325 async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
326 let temp_dir = TempDir::new().unwrap();
327 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
328 let location_gen = DefaultLocationGenerator::with_data_location(
329 temp_dir.path().to_str().unwrap().to_string(),
330 );
331 let file_name_gen =
332 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
333
334 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
335 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
336
337 let referenced_file = "s3://bucket/data/file1.parquet".to_string();
339 let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
340 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
341 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
342 pb,
343 schema,
344 file_io.clone(),
345 location_gen,
346 file_name_gen,
347 );
348 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
349 .build(None)
350 .await?;
351
352 let file_paths = StringArray::from(vec![referenced_file.as_str()]);
354 let positions = Int64Array::from(vec![42]);
355 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
356 Arc::new(file_paths),
357 Arc::new(positions),
358 ])?;
359
360 writer.write(batch.clone()).await?;
361 let data_files = writer.close().await?;
362
363 assert_eq!(data_files.len(), 1);
364 let data_file = &data_files[0];
365
366 assert_eq!(data_file.referenced_data_file, Some(referenced_file));
368
369 Ok(())
370 }
371
372 #[tokio::test]
373 async fn test_position_delete_writer_invalid_schema() -> Result<()> {
374 let temp_dir = TempDir::new().unwrap();
375 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
376 let location_gen = DefaultLocationGenerator::with_data_location(
377 temp_dir.path().to_str().unwrap().to_string(),
378 );
379 let file_name_gen =
380 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
381
382 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
383 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
384
385 let config = PositionDeleteWriterConfig::new(None, 0, None);
386 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
387 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
388 pb,
389 schema,
390 file_io.clone(),
391 location_gen,
392 file_name_gen,
393 );
394 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
395 .build(None)
396 .await?;
397
398 let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
400 "wrong_field",
401 DataType::Int32,
402 false,
403 )]));
404 let wrong_batch =
405 RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
406
407 let result = writer.write(wrong_batch).await;
408 assert!(result.is_err());
409 assert!(result.unwrap_err().to_string().contains("invalid schema"));
410
411 Ok(())
412 }
413
414 #[tokio::test]
415 async fn test_position_delete_multiple_files() -> Result<()> {
416 let temp_dir = TempDir::new().unwrap();
417 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
418 let location_gen = DefaultLocationGenerator::with_data_location(
419 temp_dir.path().to_str().unwrap().to_string(),
420 );
421 let file_name_gen =
422 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
423
424 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
425 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
426
427 let config = PositionDeleteWriterConfig::new(None, 0, None);
428 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
429 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
430 pb,
431 schema,
432 file_io.clone(),
433 location_gen,
434 file_name_gen,
435 );
436 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
437 .build(None)
438 .await?;
439
440 let file_paths = StringArray::from(vec![
442 "s3://bucket/data/file1.parquet",
443 "s3://bucket/data/file1.parquet",
444 "s3://bucket/data/file2.parquet",
445 "s3://bucket/data/file2.parquet",
446 "s3://bucket/data/file3.parquet",
447 ]);
448 let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
449 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
450 Arc::new(file_paths),
451 Arc::new(positions),
452 ])?;
453
454 writer.write(batch.clone()).await?;
455 let data_files = writer.close().await?;
456
457 assert_eq!(data_files.len(), 1);
458 check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
459
460 Ok(())
461 }
462}