iceberg/writer/base_writer/
position_delete_writer.rs1use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::spec::{DataFile, Struct};
28use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
29use crate::writer::{IcebergWriter, IcebergWriterBuilder};
30use crate::{Error, ErrorKind, Result};
31
32#[derive(Clone, Debug)]
34pub struct PositionDeleteFileWriterBuilder<B: FileWriterBuilder> {
35 inner: B,
36 config: PositionDeleteWriterConfig,
37}
38
39impl<B: FileWriterBuilder> PositionDeleteFileWriterBuilder<B> {
40 pub fn new(inner: B, config: PositionDeleteWriterConfig) -> Self {
42 Self { inner, config }
43 }
44}
45
46#[derive(Clone, Debug)]
48pub struct PositionDeleteWriterConfig {
49 partition_value: Struct,
50 partition_spec_id: i32,
51 referenced_data_file: Option<String>,
52}
53
54impl PositionDeleteWriterConfig {
55 pub fn new(
62 partition_value: Option<Struct>,
63 partition_spec_id: i32,
64 referenced_data_file: Option<String>,
65 ) -> Self {
66 Self {
67 partition_value: partition_value.unwrap_or(Struct::empty()),
68 partition_spec_id,
69 referenced_data_file,
70 }
71 }
72
73 pub fn arrow_schema() -> ArrowSchemaRef {
79 Arc::new(ArrowSchema::new(vec![
80 Field::new("file_path", DataType::Utf8, false).with_metadata(
81 [(
82 PARQUET_FIELD_ID_META_KEY.to_string(),
83 "2147483546".to_string(),
84 )]
85 .into_iter()
86 .collect(),
87 ),
88 Field::new("pos", DataType::Int64, false).with_metadata(
89 [(
90 PARQUET_FIELD_ID_META_KEY.to_string(),
91 "2147483545".to_string(),
92 )]
93 .into_iter()
94 .collect(),
95 ),
96 ]))
97 }
98}
99
100#[async_trait::async_trait]
101impl<B: FileWriterBuilder> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B> {
102 type R = PositionDeleteFileWriter<B>;
103
104 async fn build(self) -> Result<Self::R> {
105 Ok(PositionDeleteFileWriter {
106 inner_writer: Some(self.inner.clone().build().await?),
107 partition_value: self.config.partition_value,
108 partition_spec_id: self.config.partition_spec_id,
109 referenced_data_file: self.config.referenced_data_file,
110 })
111 }
112}
113
114#[derive(Debug)]
160pub struct PositionDeleteFileWriter<B: FileWriterBuilder> {
161 inner_writer: Option<B::R>,
162 partition_value: Struct,
163 partition_spec_id: i32,
164 referenced_data_file: Option<String>,
165}
166
167#[async_trait::async_trait]
168impl<B: FileWriterBuilder> IcebergWriter for PositionDeleteFileWriter<B> {
169 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
170 let expected_schema = PositionDeleteWriterConfig::arrow_schema();
172 if batch.schema().fields() != expected_schema.fields() {
173 return Err(Error::new(
174 ErrorKind::DataInvalid,
175 format!(
176 "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
177 expected_schema.fields(),
178 batch.schema().fields()
179 ),
180 ));
181 }
182
183 if let Some(writer) = self.inner_writer.as_mut() {
184 writer.write(&batch).await
185 } else {
186 Err(Error::new(
187 ErrorKind::Unexpected,
188 "Position delete inner writer has been closed.",
189 ))
190 }
191 }
192
193 async fn close(&mut self) -> Result<Vec<DataFile>> {
194 if let Some(writer) = self.inner_writer.take() {
195 Ok(writer
196 .close()
197 .await?
198 .into_iter()
199 .map(|mut res| {
200 res.content(crate::spec::DataContentType::PositionDeletes);
201 res.partition(self.partition_value.clone());
202 res.partition_spec_id(self.partition_spec_id);
203 if let Some(ref data_file) = self.referenced_data_file {
204 res.referenced_data_file(Some(data_file.clone()));
205 }
206 res.build()
208 .expect("Failed to build position delete data file")
209 })
210 .collect_vec())
211 } else {
212 Err(Error::new(
213 ErrorKind::Unexpected,
214 "Position delete inner writer has been closed.",
215 ))
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use std::sync::Arc;
223
224 use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
225 use arrow_select::concat::concat_batches;
226 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
227 use parquet::file::properties::WriterProperties;
228 use tempfile::TempDir;
229
230 use super::*;
231 use crate::arrow::arrow_schema_to_schema;
232 use crate::io::{FileIO, FileIOBuilder};
233 use crate::spec::DataFileFormat;
234 use crate::writer::file_writer::location_generator::{
235 DefaultFileNameGenerator, DefaultLocationGenerator,
236 };
237 use crate::writer::file_writer::ParquetWriterBuilder;
238 use crate::writer::IcebergWriterBuilder;
239
240 async fn check_parquet_position_delete_file(
241 file_io: &FileIO,
242 data_file: &DataFile,
243 expected_batch: &RecordBatch,
244 ) {
245 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
246 assert_eq!(
247 data_file.content,
248 crate::spec::DataContentType::PositionDeletes
249 );
250
251 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
253 let input_content = input_file.read().await.unwrap();
254 let reader_builder =
255 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
256 let metadata = reader_builder.metadata().clone();
257
258 let reader = reader_builder.build().unwrap();
260 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
261 let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
262 assert_eq!(*expected_batch, actual);
263
264 assert_eq!(
266 data_file.record_count,
267 metadata
268 .row_groups()
269 .iter()
270 .map(|group| group.num_rows())
271 .sum::<i64>() as u64
272 );
273 assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
274
275 assert!(data_file.sort_order_id.is_none());
277 }
278
279 #[tokio::test]
280 async fn test_position_delete_writer() -> Result<()> {
281 let temp_dir = TempDir::new().unwrap();
282 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
283 let location_gen = DefaultLocationGenerator::with_data_location(
284 temp_dir.path().to_str().unwrap().to_string(),
285 );
286 let file_name_gen =
287 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
288
289 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
291 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
292
293 let config = PositionDeleteWriterConfig::new(None, 0, None);
295 let pb = ParquetWriterBuilder::new(
296 WriterProperties::builder().build(),
297 schema,
298 None,
299 file_io.clone(),
300 location_gen,
301 file_name_gen,
302 );
303 let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
304 .build()
305 .await?;
306
307 let file_paths = StringArray::from(vec![
309 "s3://bucket/data/file1.parquet",
310 "s3://bucket/data/file1.parquet",
311 "s3://bucket/data/file1.parquet",
312 ]);
313 let positions = Int64Array::from(vec![5, 10, 15]);
314 let batch = RecordBatch::try_new(
315 arrow_schema.clone(),
316 vec![Arc::new(file_paths), Arc::new(positions)],
317 )?;
318
319 writer.write(batch.clone()).await?;
321 let data_files = writer.close().await?;
322
323 assert_eq!(data_files.len(), 1);
324 let data_file = &data_files[0];
325
326 check_parquet_position_delete_file(&file_io, data_file, &batch).await;
328
329 Ok(())
330 }
331
332 #[tokio::test]
333 async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
334 let temp_dir = TempDir::new().unwrap();
335 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
336 let location_gen = DefaultLocationGenerator::with_data_location(
337 temp_dir.path().to_str().unwrap().to_string(),
338 );
339 let file_name_gen =
340 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
341
342 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
343 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
344
345 let referenced_file = "s3://bucket/data/file1.parquet".to_string();
347 let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
348 let pb = ParquetWriterBuilder::new(
349 WriterProperties::builder().build(),
350 schema,
351 None,
352 file_io.clone(),
353 location_gen,
354 file_name_gen,
355 );
356 let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
357 .build()
358 .await?;
359
360 let file_paths = StringArray::from(vec![referenced_file.as_str()]);
362 let positions = Int64Array::from(vec![42]);
363 let batch = RecordBatch::try_new(
364 arrow_schema.clone(),
365 vec![Arc::new(file_paths), Arc::new(positions)],
366 )?;
367
368 writer.write(batch.clone()).await?;
369 let data_files = writer.close().await?;
370
371 assert_eq!(data_files.len(), 1);
372 let data_file = &data_files[0];
373
374 assert_eq!(data_file.referenced_data_file, Some(referenced_file));
376
377 Ok(())
378 }
379
380 #[tokio::test]
381 async fn test_position_delete_writer_invalid_schema() -> Result<()> {
382 let temp_dir = TempDir::new().unwrap();
383 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
384 let location_gen = DefaultLocationGenerator::with_data_location(
385 temp_dir.path().to_str().unwrap().to_string(),
386 );
387 let file_name_gen =
388 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
389
390 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
391 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
392
393 let config = PositionDeleteWriterConfig::new(None, 0, None);
394 let pb = ParquetWriterBuilder::new(
395 WriterProperties::builder().build(),
396 schema,
397 None,
398 file_io.clone(),
399 location_gen,
400 file_name_gen,
401 );
402 let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
403 .build()
404 .await?;
405
406 let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
408 "wrong_field",
409 DataType::Int32,
410 false,
411 )]));
412 let wrong_batch =
413 RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
414
415 let result = writer.write(wrong_batch).await;
416 assert!(result.is_err());
417 assert!(result
418 .unwrap_err()
419 .to_string()
420 .contains("invalid schema"));
421
422 Ok(())
423 }
424
425 #[tokio::test]
426 async fn test_position_delete_multiple_files() -> Result<()> {
427 let temp_dir = TempDir::new().unwrap();
428 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
429 let location_gen = DefaultLocationGenerator::with_data_location(
430 temp_dir.path().to_str().unwrap().to_string(),
431 );
432 let file_name_gen =
433 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
434
435 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
436 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
437
438 let config = PositionDeleteWriterConfig::new(None, 0, None);
439 let pb = ParquetWriterBuilder::new(
440 WriterProperties::builder().build(),
441 schema,
442 None,
443 file_io.clone(),
444 location_gen,
445 file_name_gen,
446 );
447 let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
448 .build()
449 .await?;
450
451 let file_paths = StringArray::from(vec![
453 "s3://bucket/data/file1.parquet",
454 "s3://bucket/data/file1.parquet",
455 "s3://bucket/data/file2.parquet",
456 "s3://bucket/data/file2.parquet",
457 "s3://bucket/data/file3.parquet",
458 ]);
459 let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
460 let batch = RecordBatch::try_new(
461 arrow_schema.clone(),
462 vec![Arc::new(file_paths), Arc::new(positions)],
463 )?;
464
465 writer.write(batch.clone()).await?;
466 let data_files = writer.close().await?;
467
468 assert_eq!(data_files.len(), 1);
469 check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
470
471 Ok(())
472 }
473}