iceberg/writer/base_writer/
position_delete_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11//   Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides `PositionDeleteWriter`.
19
20use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
25
26use crate::spec::{DataFile, PartitionKey, Struct};
27use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
28use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
29use crate::writer::file_writer::FileWriterBuilder;
30use crate::writer::{IcebergWriter, IcebergWriterBuilder};
31use crate::{Error, ErrorKind, Result};
32
33/// Builder for `PositionDeleteWriter`.
34#[derive(Debug)]
35pub struct PositionDeleteFileWriterBuilder<
36    B: FileWriterBuilder,
37    L: LocationGenerator,
38    F: FileNameGenerator,
39> {
40    inner: RollingFileWriterBuilder<B, L, F>,
41    config: PositionDeleteWriterConfig,
42}
43
44impl<B, L, F> PositionDeleteFileWriterBuilder<B, L, F>
45where
46    B: FileWriterBuilder,
47    L: LocationGenerator,
48    F: FileNameGenerator,
49{
50    /// Create a new `PositionDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`.
51    pub fn new(
52        inner: RollingFileWriterBuilder<B, L, F>,
53        config: PositionDeleteWriterConfig,
54    ) -> Self {
55        Self { inner, config }
56    }
57}
58
59/// Config for `PositionDeleteWriter`.
60#[derive(Clone, Debug)]
61pub struct PositionDeleteWriterConfig {
62    partition_value: Struct,
63    partition_spec_id: i32,
64    referenced_data_file: Option<String>,
65}
66
67impl PositionDeleteWriterConfig {
68    /// Create a new `PositionDeleteWriterConfig`.
69    ///
70    /// # Arguments
71    /// * `partition_value` - The partition value for the delete file
72    /// * `partition_spec_id` - The partition spec ID
73    /// * `referenced_data_file` - Optional path to the data file being deleted from
74    pub fn new(
75        partition_value: Option<Struct>,
76        partition_spec_id: i32,
77        referenced_data_file: Option<String>,
78    ) -> Self {
79        Self {
80            partition_value: partition_value.unwrap_or(Struct::empty()),
81            partition_spec_id,
82            referenced_data_file,
83        }
84    }
85
86    /// Returns the Arrow schema for position delete files.
87    ///
88    /// Position delete files have a fixed schema:
89    /// - file_path: string (field id 2147483546)
90    /// - pos: long (field id 2147483545)
91    pub fn arrow_schema() -> ArrowSchemaRef {
92        Arc::new(ArrowSchema::new(vec![
93            Field::new("file_path", DataType::Utf8, false).with_metadata(
94                [(
95                    PARQUET_FIELD_ID_META_KEY.to_string(),
96                    "2147483546".to_string(),
97                )]
98                .into_iter()
99                .collect(),
100            ),
101            Field::new("pos", DataType::Int64, false).with_metadata(
102                [(
103                    PARQUET_FIELD_ID_META_KEY.to_string(),
104                    "2147483545".to_string(),
105                )]
106                .into_iter()
107                .collect(),
108            ),
109        ]))
110    }
111}
112
113#[async_trait::async_trait]
114impl<B, L, F> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B, L, F>
115where
116    B: FileWriterBuilder,
117    L: LocationGenerator,
118    F: FileNameGenerator,
119{
120    type R = PositionDeleteFileWriter<B, L, F>;
121
122    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
123        Ok(PositionDeleteFileWriter {
124            inner: Some(self.inner.build()),
125            partition_value: self.config.partition_value.clone(),
126            partition_spec_id: self.config.partition_spec_id,
127            referenced_data_file: self.config.referenced_data_file.clone(),
128            partition_key,
129        })
130    }
131}
132
133/// Writer used to write position delete files.
134///
135/// Position delete files mark specific rows in data files as deleted
136/// by their position (row number). The schema is fixed with two columns:
137/// - file_path: The path to the data file
138/// - pos: The row position (0-indexed) in that file
139#[derive(Debug)]
140pub struct PositionDeleteFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
141{
142    inner: Option<RollingFileWriter<B, L, F>>,
143    partition_value: Struct,
144    partition_spec_id: i32,
145    referenced_data_file: Option<String>,
146    partition_key: Option<PartitionKey>,
147}
148
149#[async_trait::async_trait]
150impl<B, L, F> IcebergWriter for PositionDeleteFileWriter<B, L, F>
151where
152    B: FileWriterBuilder,
153    L: LocationGenerator,
154    F: FileNameGenerator,
155{
156    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
157        // Validate the batch has the correct schema
158        let expected_schema = PositionDeleteWriterConfig::arrow_schema();
159        if batch.schema().fields() != expected_schema.fields() {
160            return Err(Error::new(
161                ErrorKind::DataInvalid,
162                format!(
163                    "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
164                    expected_schema.fields(),
165                    batch.schema().fields()
166                ),
167            ));
168        }
169
170        if let Some(writer) = self.inner.as_mut() {
171            writer.write(&self.partition_key, &batch).await
172        } else {
173            Err(Error::new(
174                ErrorKind::Unexpected,
175                "Position delete inner writer has been closed.",
176            ))
177        }
178    }
179
180    async fn close(&mut self) -> Result<Vec<DataFile>> {
181        if let Some(writer) = self.inner.take() {
182            writer
183                .close()
184                .await?
185                .into_iter()
186                .map(|mut res| {
187                    res.content(crate::spec::DataContentType::PositionDeletes);
188                    res.partition(self.partition_value.clone());
189                    res.partition_spec_id(self.partition_spec_id);
190                    if let Some(ref data_file) = self.referenced_data_file {
191                        res.referenced_data_file(Some(data_file.clone()));
192                    }
193                    // Position deletes must have null sort_order_id (default is None)
194                    res.build().map_err(|e| {
195                        Error::new(
196                            ErrorKind::DataInvalid,
197                            format!("Failed to build position delete data file: {e}"),
198                        )
199                    })
200                })
201                .collect()
202        } else {
203            Err(Error::new(
204                ErrorKind::Unexpected,
205                "Position delete inner writer has been closed.",
206            ))
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::sync::Arc;
214
215    use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
216    use arrow_select::concat::concat_batches;
217    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
218    use parquet::file::properties::WriterProperties;
219    use tempfile::TempDir;
220
221    use super::*;
222    use crate::arrow::arrow_schema_to_schema;
223    use crate::io::{FileIO, FileIOBuilder};
224    use crate::spec::DataFileFormat;
225    use crate::writer::IcebergWriterBuilder;
226    use crate::writer::file_writer::ParquetWriterBuilder;
227    use crate::writer::file_writer::location_generator::{
228        DefaultFileNameGenerator, DefaultLocationGenerator,
229    };
230    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
231
232    async fn check_parquet_position_delete_file(
233        file_io: &FileIO,
234        data_file: &DataFile,
235        expected_batch: &RecordBatch,
236    ) {
237        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
238        assert_eq!(
239            data_file.content,
240            crate::spec::DataContentType::PositionDeletes
241        );
242
243        // Read the written file
244        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
245        let input_content = input_file.read().await.unwrap();
246        let reader_builder =
247            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
248        let metadata = reader_builder.metadata().clone();
249
250        // Check data
251        let reader = reader_builder.build().unwrap();
252        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
253        let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
254        assert_eq!(*expected_batch, actual);
255
256        // Check metadata
257        assert_eq!(
258            data_file.record_count,
259            metadata
260                .row_groups()
261                .iter()
262                .map(|group| group.num_rows())
263                .sum::<i64>() as u64
264        );
265        assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
266
267        // Position deletes must have null sort_order_id
268        assert!(data_file.sort_order_id.is_none());
269    }
270
271    #[tokio::test]
272    async fn test_position_delete_writer() -> Result<()> {
273        let temp_dir = TempDir::new().unwrap();
274        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
275        let location_gen = DefaultLocationGenerator::with_data_location(
276            temp_dir.path().to_str().unwrap().to_string(),
277        );
278        let file_name_gen =
279            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
280
281        // Get the position delete schema
282        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
283        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
284
285        // Create writer
286        let config = PositionDeleteWriterConfig::new(None, 0, None);
287        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
288        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
289            pb,
290            schema,
291            file_io.clone(),
292            location_gen,
293            file_name_gen,
294        );
295        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
296            .build(None)
297            .await?;
298
299        // Create test data - delete rows 5, 10, 15 from a file
300        let file_paths = StringArray::from(vec![
301            "s3://bucket/data/file1.parquet",
302            "s3://bucket/data/file1.parquet",
303            "s3://bucket/data/file1.parquet",
304        ]);
305        let positions = Int64Array::from(vec![5, 10, 15]);
306        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
307            Arc::new(file_paths),
308            Arc::new(positions),
309        ])?;
310
311        // Write
312        writer.write(batch.clone()).await?;
313        let data_files = writer.close().await?;
314
315        assert_eq!(data_files.len(), 1);
316        let data_file = &data_files[0];
317
318        // Verify
319        check_parquet_position_delete_file(&file_io, data_file, &batch).await;
320
321        Ok(())
322    }
323
324    #[tokio::test]
325    async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
326        let temp_dir = TempDir::new().unwrap();
327        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
328        let location_gen = DefaultLocationGenerator::with_data_location(
329            temp_dir.path().to_str().unwrap().to_string(),
330        );
331        let file_name_gen =
332            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
333
334        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
335        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
336
337        // Create writer with referenced data file
338        let referenced_file = "s3://bucket/data/file1.parquet".to_string();
339        let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
340        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
341        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
342            pb,
343            schema,
344            file_io.clone(),
345            location_gen,
346            file_name_gen,
347        );
348        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
349            .build(None)
350            .await?;
351
352        // Create test data
353        let file_paths = StringArray::from(vec![referenced_file.as_str()]);
354        let positions = Int64Array::from(vec![42]);
355        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
356            Arc::new(file_paths),
357            Arc::new(positions),
358        ])?;
359
360        writer.write(batch.clone()).await?;
361        let data_files = writer.close().await?;
362
363        assert_eq!(data_files.len(), 1);
364        let data_file = &data_files[0];
365
366        // Verify referenced_data_file is set
367        assert_eq!(data_file.referenced_data_file, Some(referenced_file));
368
369        Ok(())
370    }
371
372    #[tokio::test]
373    async fn test_position_delete_writer_invalid_schema() -> Result<()> {
374        let temp_dir = TempDir::new().unwrap();
375        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
376        let location_gen = DefaultLocationGenerator::with_data_location(
377            temp_dir.path().to_str().unwrap().to_string(),
378        );
379        let file_name_gen =
380            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
381
382        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
383        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
384
385        let config = PositionDeleteWriterConfig::new(None, 0, None);
386        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
387        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
388            pb,
389            schema,
390            file_io.clone(),
391            location_gen,
392            file_name_gen,
393        );
394        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
395            .build(None)
396            .await?;
397
398        // Try to write batch with wrong schema (missing pos field)
399        let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
400            "wrong_field",
401            DataType::Int32,
402            false,
403        )]));
404        let wrong_batch =
405            RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
406
407        let result = writer.write(wrong_batch).await;
408        assert!(result.is_err());
409        assert!(result.unwrap_err().to_string().contains("invalid schema"));
410
411        Ok(())
412    }
413
414    #[tokio::test]
415    async fn test_position_delete_multiple_files() -> Result<()> {
416        let temp_dir = TempDir::new().unwrap();
417        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
418        let location_gen = DefaultLocationGenerator::with_data_location(
419            temp_dir.path().to_str().unwrap().to_string(),
420        );
421        let file_name_gen =
422            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
423
424        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
425        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
426
427        let config = PositionDeleteWriterConfig::new(None, 0, None);
428        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
429        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
430            pb,
431            schema,
432            file_io.clone(),
433            location_gen,
434            file_name_gen,
435        );
436        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
437            .build(None)
438            .await?;
439
440        // Delete rows from multiple data files
441        let file_paths = StringArray::from(vec![
442            "s3://bucket/data/file1.parquet",
443            "s3://bucket/data/file1.parquet",
444            "s3://bucket/data/file2.parquet",
445            "s3://bucket/data/file2.parquet",
446            "s3://bucket/data/file3.parquet",
447        ]);
448        let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
449        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
450            Arc::new(file_paths),
451            Arc::new(positions),
452        ])?;
453
454        writer.write(batch.clone()).await?;
455        let data_files = writer.close().await?;
456
457        assert_eq!(data_files.len(), 1);
458        check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
459
460        Ok(())
461    }
462}