1use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
25
26use crate::spec::{DataFile, PartitionKey, Struct};
27use crate::writer::file_writer::FileWriterBuilder;
28use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
29use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
30use crate::writer::{IcebergWriter, IcebergWriterBuilder};
31use crate::{Error, ErrorKind, Result};
32
33#[derive(Debug)]
35pub struct PositionDeleteFileWriterBuilder<
36 B: FileWriterBuilder,
37 L: LocationGenerator,
38 F: FileNameGenerator,
39> {
40 inner: RollingFileWriterBuilder<B, L, F>,
41 config: PositionDeleteWriterConfig,
42}
43
44impl<B, L, F> PositionDeleteFileWriterBuilder<B, L, F>
45where
46 B: FileWriterBuilder,
47 L: LocationGenerator,
48 F: FileNameGenerator,
49{
50 pub fn new(
52 inner: RollingFileWriterBuilder<B, L, F>,
53 config: PositionDeleteWriterConfig,
54 ) -> Self {
55 Self { inner, config }
56 }
57}
58
59#[derive(Clone, Debug)]
61pub struct PositionDeleteWriterConfig {
62 partition_value: Struct,
63 partition_spec_id: i32,
64 referenced_data_file: Option<String>,
65}
66
67impl PositionDeleteWriterConfig {
68 pub fn new(
75 partition_value: Option<Struct>,
76 partition_spec_id: i32,
77 referenced_data_file: Option<String>,
78 ) -> Self {
79 Self {
80 partition_value: partition_value.unwrap_or(Struct::empty()),
81 partition_spec_id,
82 referenced_data_file,
83 }
84 }
85
86 pub fn arrow_schema() -> ArrowSchemaRef {
92 Arc::new(ArrowSchema::new(vec![
93 Field::new("file_path", DataType::Utf8, false).with_metadata(
94 [(
95 PARQUET_FIELD_ID_META_KEY.to_string(),
96 "2147483546".to_string(),
97 )]
98 .into_iter()
99 .collect(),
100 ),
101 Field::new("pos", DataType::Int64, false).with_metadata(
102 [(
103 PARQUET_FIELD_ID_META_KEY.to_string(),
104 "2147483545".to_string(),
105 )]
106 .into_iter()
107 .collect(),
108 ),
109 ]))
110 }
111}
112
113#[async_trait::async_trait]
114impl<B, L, F> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B, L, F>
115where
116 B: FileWriterBuilder,
117 L: LocationGenerator,
118 F: FileNameGenerator,
119{
120 type R = PositionDeleteFileWriter<B, L, F>;
121
122 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
123 Ok(PositionDeleteFileWriter {
124 inner: Some(self.inner.build()),
125 partition_value: self.config.partition_value.clone(),
126 partition_spec_id: self.config.partition_spec_id,
127 referenced_data_file: self.config.referenced_data_file.clone(),
128 partition_key,
129 })
130 }
131}
132
133#[derive(Debug)]
140pub struct PositionDeleteFileWriter<
141 B: FileWriterBuilder,
142 L: LocationGenerator,
143 F: FileNameGenerator,
144> {
145 inner: Option<RollingFileWriter<B, L, F>>,
146 partition_value: Struct,
147 partition_spec_id: i32,
148 referenced_data_file: Option<String>,
149 partition_key: Option<PartitionKey>,
150}
151
152#[async_trait::async_trait]
153impl<B, L, F> IcebergWriter for PositionDeleteFileWriter<B, L, F>
154where
155 B: FileWriterBuilder,
156 L: LocationGenerator,
157 F: FileNameGenerator,
158{
159 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
160 let expected_schema = PositionDeleteWriterConfig::arrow_schema();
162 if batch.schema().fields() != expected_schema.fields() {
163 return Err(Error::new(
164 ErrorKind::DataInvalid,
165 format!(
166 "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
167 expected_schema.fields(),
168 batch.schema().fields()
169 ),
170 ));
171 }
172
173 if let Some(writer) = self.inner.as_mut() {
174 writer.write(&self.partition_key, &batch).await
175 } else {
176 Err(Error::new(
177 ErrorKind::Unexpected,
178 "Position delete inner writer has been closed.",
179 ))
180 }
181 }
182
183 async fn close(&mut self) -> Result<Vec<DataFile>> {
184 if let Some(writer) = self.inner.take() {
185 writer
186 .close()
187 .await?
188 .into_iter()
189 .map(|mut res| {
190 res.content(crate::spec::DataContentType::PositionDeletes);
191 res.partition(self.partition_value.clone());
192 res.partition_spec_id(self.partition_spec_id);
193 if let Some(ref data_file) = self.referenced_data_file {
194 res.referenced_data_file(Some(data_file.clone()));
195 }
196 res.build().map_err(|e| {
198 Error::new(
199 ErrorKind::DataInvalid,
200 format!("Failed to build position delete data file: {e}"),
201 )
202 })
203 })
204 .collect()
205 } else {
206 Err(Error::new(
207 ErrorKind::Unexpected,
208 "Position delete inner writer has been closed.",
209 ))
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use std::sync::Arc;
217
218 use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
219 use arrow_select::concat::concat_batches;
220 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
221 use parquet::file::properties::WriterProperties;
222 use tempfile::TempDir;
223
224 use super::*;
225 use crate::arrow::arrow_schema_to_schema;
226 use crate::io::FileIO;
227 use crate::spec::DataFileFormat;
228 use crate::writer::IcebergWriterBuilder;
229 use crate::writer::file_writer::ParquetWriterBuilder;
230 use crate::writer::file_writer::location_generator::{
231 DefaultFileNameGenerator, DefaultLocationGenerator,
232 };
233 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
234
235 async fn check_parquet_position_delete_file(
236 file_io: &FileIO,
237 data_file: &DataFile,
238 expected_batch: &RecordBatch,
239 ) {
240 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
241 assert_eq!(
242 data_file.content,
243 crate::spec::DataContentType::PositionDeletes
244 );
245
246 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
248 let input_content = input_file.read().await.unwrap();
249 let reader_builder =
250 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
251 let metadata = reader_builder.metadata().clone();
252
253 let reader = reader_builder.build().unwrap();
255 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
256 let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
257 assert_eq!(*expected_batch, actual);
258
259 assert_eq!(
261 data_file.record_count,
262 metadata
263 .row_groups()
264 .iter()
265 .map(|group| group.num_rows())
266 .sum::<i64>() as u64
267 );
268 assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
269
270 assert!(data_file.sort_order_id.is_none());
272 }
273
274 #[tokio::test]
275 async fn test_position_delete_writer() -> Result<()> {
276 let temp_dir = TempDir::new().unwrap();
277 let file_io = FileIO::new_with_fs();
278 let location_gen = DefaultLocationGenerator::with_data_location(
279 temp_dir.path().to_str().unwrap().to_string(),
280 );
281 let file_name_gen =
282 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
283
284 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
286 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
287
288 let config = PositionDeleteWriterConfig::new(None, 0, None);
290 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
291 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
292 pb,
293 schema,
294 file_io.clone(),
295 location_gen,
296 file_name_gen,
297 );
298 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
299 .build(None)
300 .await?;
301
302 let file_paths = StringArray::from(vec![
304 "s3://bucket/data/file1.parquet",
305 "s3://bucket/data/file1.parquet",
306 "s3://bucket/data/file1.parquet",
307 ]);
308 let positions = Int64Array::from(vec![5, 10, 15]);
309 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
310 Arc::new(file_paths),
311 Arc::new(positions),
312 ])?;
313
314 writer.write(batch.clone()).await?;
316 let data_files = writer.close().await?;
317
318 assert_eq!(data_files.len(), 1);
319 let data_file = &data_files[0];
320
321 check_parquet_position_delete_file(&file_io, data_file, &batch).await;
323
324 Ok(())
325 }
326
327 #[tokio::test]
328 async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
329 let temp_dir = TempDir::new().unwrap();
330 let file_io = FileIO::new_with_fs();
331 let location_gen = DefaultLocationGenerator::with_data_location(
332 temp_dir.path().to_str().unwrap().to_string(),
333 );
334 let file_name_gen =
335 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
336
337 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
338 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
339
340 let referenced_file = "s3://bucket/data/file1.parquet".to_string();
342 let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
343 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
344 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
345 pb,
346 schema,
347 file_io.clone(),
348 location_gen,
349 file_name_gen,
350 );
351 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
352 .build(None)
353 .await?;
354
355 let file_paths = StringArray::from(vec![referenced_file.as_str()]);
357 let positions = Int64Array::from(vec![42]);
358 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
359 Arc::new(file_paths),
360 Arc::new(positions),
361 ])?;
362
363 writer.write(batch.clone()).await?;
364 let data_files = writer.close().await?;
365
366 assert_eq!(data_files.len(), 1);
367 let data_file = &data_files[0];
368
369 assert_eq!(data_file.referenced_data_file, Some(referenced_file));
371
372 Ok(())
373 }
374
375 #[tokio::test]
376 async fn test_position_delete_writer_invalid_schema() -> Result<()> {
377 let temp_dir = TempDir::new().unwrap();
378 let file_io = FileIO::new_with_fs();
379 let location_gen = DefaultLocationGenerator::with_data_location(
380 temp_dir.path().to_str().unwrap().to_string(),
381 );
382 let file_name_gen =
383 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
384
385 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
386 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
387
388 let config = PositionDeleteWriterConfig::new(None, 0, None);
389 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
390 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
391 pb,
392 schema,
393 file_io.clone(),
394 location_gen,
395 file_name_gen,
396 );
397 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
398 .build(None)
399 .await?;
400
401 let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
403 "wrong_field",
404 DataType::Int32,
405 false,
406 )]));
407 let wrong_batch =
408 RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
409
410 let result = writer.write(wrong_batch).await;
411 assert!(result.is_err());
412 assert!(result.unwrap_err().to_string().contains("invalid schema"));
413
414 Ok(())
415 }
416
417 #[tokio::test]
418 async fn test_position_delete_multiple_files() -> Result<()> {
419 let temp_dir = TempDir::new().unwrap();
420 let file_io = FileIO::new_with_fs();
421 let location_gen = DefaultLocationGenerator::with_data_location(
422 temp_dir.path().to_str().unwrap().to_string(),
423 );
424 let file_name_gen =
425 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
426
427 let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
428 let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
429
430 let config = PositionDeleteWriterConfig::new(None, 0, None);
431 let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
432 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
433 pb,
434 schema,
435 file_io.clone(),
436 location_gen,
437 file_name_gen,
438 );
439 let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
440 .build(None)
441 .await?;
442
443 let file_paths = StringArray::from(vec![
445 "s3://bucket/data/file1.parquet",
446 "s3://bucket/data/file1.parquet",
447 "s3://bucket/data/file2.parquet",
448 "s3://bucket/data/file2.parquet",
449 "s3://bucket/data/file3.parquet",
450 ]);
451 let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
452 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
453 Arc::new(file_paths),
454 Arc::new(positions),
455 ])?;
456
457 writer.write(batch.clone()).await?;
458 let data_files = writer.close().await?;
459
460 assert_eq!(data_files.len(), 1);
461 check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
462
463 Ok(())
464 }
465}