Skip to main content

iceberg/writer/base_writer/
position_delete_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11//   Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides `PositionDeleteWriter`.
19
20use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
25
26use crate::spec::{DataFile, PartitionKey, Struct};
27use crate::writer::file_writer::FileWriterBuilder;
28use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
29use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
30use crate::writer::{IcebergWriter, IcebergWriterBuilder};
31use crate::{Error, ErrorKind, Result};
32
33/// Builder for `PositionDeleteWriter`.
34#[derive(Debug)]
35pub struct PositionDeleteFileWriterBuilder<
36    B: FileWriterBuilder,
37    L: LocationGenerator,
38    F: FileNameGenerator,
39> {
40    inner: RollingFileWriterBuilder<B, L, F>,
41    config: PositionDeleteWriterConfig,
42}
43
44impl<B, L, F> PositionDeleteFileWriterBuilder<B, L, F>
45where
46    B: FileWriterBuilder,
47    L: LocationGenerator,
48    F: FileNameGenerator,
49{
50    /// Create a new `PositionDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`.
51    pub fn new(
52        inner: RollingFileWriterBuilder<B, L, F>,
53        config: PositionDeleteWriterConfig,
54    ) -> Self {
55        Self { inner, config }
56    }
57}
58
59/// Config for `PositionDeleteWriter`.
60#[derive(Clone, Debug)]
61pub struct PositionDeleteWriterConfig {
62    partition_value: Struct,
63    partition_spec_id: i32,
64    referenced_data_file: Option<String>,
65}
66
67impl PositionDeleteWriterConfig {
68    /// Create a new `PositionDeleteWriterConfig`.
69    ///
70    /// # Arguments
71    /// * `partition_value` - The partition value for the delete file
72    /// * `partition_spec_id` - The partition spec ID
73    /// * `referenced_data_file` - Optional path to the data file being deleted from
74    pub fn new(
75        partition_value: Option<Struct>,
76        partition_spec_id: i32,
77        referenced_data_file: Option<String>,
78    ) -> Self {
79        Self {
80            partition_value: partition_value.unwrap_or(Struct::empty()),
81            partition_spec_id,
82            referenced_data_file,
83        }
84    }
85
86    /// Returns the Arrow schema for position delete files.
87    ///
88    /// Position delete files have a fixed schema:
89    /// - file_path: string (field id 2147483546)
90    /// - pos: long (field id 2147483545)
91    pub fn arrow_schema() -> ArrowSchemaRef {
92        Arc::new(ArrowSchema::new(vec![
93            Field::new("file_path", DataType::Utf8, false).with_metadata(
94                [(
95                    PARQUET_FIELD_ID_META_KEY.to_string(),
96                    "2147483546".to_string(),
97                )]
98                .into_iter()
99                .collect(),
100            ),
101            Field::new("pos", DataType::Int64, false).with_metadata(
102                [(
103                    PARQUET_FIELD_ID_META_KEY.to_string(),
104                    "2147483545".to_string(),
105                )]
106                .into_iter()
107                .collect(),
108            ),
109        ]))
110    }
111}
112
113#[async_trait::async_trait]
114impl<B, L, F> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B, L, F>
115where
116    B: FileWriterBuilder,
117    L: LocationGenerator,
118    F: FileNameGenerator,
119{
120    type R = PositionDeleteFileWriter<B, L, F>;
121
122    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
123        Ok(PositionDeleteFileWriter {
124            inner: Some(self.inner.build()),
125            partition_value: self.config.partition_value.clone(),
126            partition_spec_id: self.config.partition_spec_id,
127            referenced_data_file: self.config.referenced_data_file.clone(),
128            partition_key,
129        })
130    }
131}
132
133/// Writer used to write position delete files.
134///
135/// Position delete files mark specific rows in data files as deleted
136/// by their position (row number). The schema is fixed with two columns:
137/// - file_path: The path to the data file
138/// - pos: The row position (0-indexed) in that file
139#[derive(Debug)]
140pub struct PositionDeleteFileWriter<
141    B: FileWriterBuilder,
142    L: LocationGenerator,
143    F: FileNameGenerator,
144> {
145    inner: Option<RollingFileWriter<B, L, F>>,
146    partition_value: Struct,
147    partition_spec_id: i32,
148    referenced_data_file: Option<String>,
149    partition_key: Option<PartitionKey>,
150}
151
152#[async_trait::async_trait]
153impl<B, L, F> IcebergWriter for PositionDeleteFileWriter<B, L, F>
154where
155    B: FileWriterBuilder,
156    L: LocationGenerator,
157    F: FileNameGenerator,
158{
159    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
160        // Validate the batch has the correct schema
161        let expected_schema = PositionDeleteWriterConfig::arrow_schema();
162        if batch.schema().fields() != expected_schema.fields() {
163            return Err(Error::new(
164                ErrorKind::DataInvalid,
165                format!(
166                    "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
167                    expected_schema.fields(),
168                    batch.schema().fields()
169                ),
170            ));
171        }
172
173        if let Some(writer) = self.inner.as_mut() {
174            writer.write(&self.partition_key, &batch).await
175        } else {
176            Err(Error::new(
177                ErrorKind::Unexpected,
178                "Position delete inner writer has been closed.",
179            ))
180        }
181    }
182
183    async fn close(&mut self) -> Result<Vec<DataFile>> {
184        if let Some(writer) = self.inner.take() {
185            writer
186                .close()
187                .await?
188                .into_iter()
189                .map(|mut res| {
190                    res.content(crate::spec::DataContentType::PositionDeletes);
191                    res.partition(self.partition_value.clone());
192                    res.partition_spec_id(self.partition_spec_id);
193                    if let Some(ref data_file) = self.referenced_data_file {
194                        res.referenced_data_file(Some(data_file.clone()));
195                    }
196                    // Position deletes must have null sort_order_id (default is None)
197                    res.build().map_err(|e| {
198                        Error::new(
199                            ErrorKind::DataInvalid,
200                            format!("Failed to build position delete data file: {e}"),
201                        )
202                    })
203                })
204                .collect()
205        } else {
206            Err(Error::new(
207                ErrorKind::Unexpected,
208                "Position delete inner writer has been closed.",
209            ))
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use std::sync::Arc;
217
218    use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
219    use arrow_select::concat::concat_batches;
220    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
221    use parquet::file::properties::WriterProperties;
222    use tempfile::TempDir;
223
224    use super::*;
225    use crate::arrow::arrow_schema_to_schema;
226    use crate::io::FileIO;
227    use crate::spec::DataFileFormat;
228    use crate::writer::IcebergWriterBuilder;
229    use crate::writer::file_writer::ParquetWriterBuilder;
230    use crate::writer::file_writer::location_generator::{
231        DefaultFileNameGenerator, DefaultLocationGenerator,
232    };
233    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
234
235    async fn check_parquet_position_delete_file(
236        file_io: &FileIO,
237        data_file: &DataFile,
238        expected_batch: &RecordBatch,
239    ) {
240        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
241        assert_eq!(
242            data_file.content,
243            crate::spec::DataContentType::PositionDeletes
244        );
245
246        // Read the written file
247        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
248        let input_content = input_file.read().await.unwrap();
249        let reader_builder =
250            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
251        let metadata = reader_builder.metadata().clone();
252
253        // Check data
254        let reader = reader_builder.build().unwrap();
255        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
256        let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
257        assert_eq!(*expected_batch, actual);
258
259        // Check metadata
260        assert_eq!(
261            data_file.record_count,
262            metadata
263                .row_groups()
264                .iter()
265                .map(|group| group.num_rows())
266                .sum::<i64>() as u64
267        );
268        assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
269
270        // Position deletes must have null sort_order_id
271        assert!(data_file.sort_order_id.is_none());
272    }
273
274    #[tokio::test]
275    async fn test_position_delete_writer() -> Result<()> {
276        let temp_dir = TempDir::new().unwrap();
277        let file_io = FileIO::new_with_fs();
278        let location_gen = DefaultLocationGenerator::with_data_location(
279            temp_dir.path().to_str().unwrap().to_string(),
280        );
281        let file_name_gen =
282            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
283
284        // Get the position delete schema
285        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
286        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
287
288        // Create writer
289        let config = PositionDeleteWriterConfig::new(None, 0, None);
290        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
291        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
292            pb,
293            schema,
294            file_io.clone(),
295            location_gen,
296            file_name_gen,
297        );
298        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
299            .build(None)
300            .await?;
301
302        // Create test data - delete rows 5, 10, 15 from a file
303        let file_paths = StringArray::from(vec![
304            "s3://bucket/data/file1.parquet",
305            "s3://bucket/data/file1.parquet",
306            "s3://bucket/data/file1.parquet",
307        ]);
308        let positions = Int64Array::from(vec![5, 10, 15]);
309        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
310            Arc::new(file_paths),
311            Arc::new(positions),
312        ])?;
313
314        // Write
315        writer.write(batch.clone()).await?;
316        let data_files = writer.close().await?;
317
318        assert_eq!(data_files.len(), 1);
319        let data_file = &data_files[0];
320
321        // Verify
322        check_parquet_position_delete_file(&file_io, data_file, &batch).await;
323
324        Ok(())
325    }
326
327    #[tokio::test]
328    async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
329        let temp_dir = TempDir::new().unwrap();
330        let file_io = FileIO::new_with_fs();
331        let location_gen = DefaultLocationGenerator::with_data_location(
332            temp_dir.path().to_str().unwrap().to_string(),
333        );
334        let file_name_gen =
335            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
336
337        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
338        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
339
340        // Create writer with referenced data file
341        let referenced_file = "s3://bucket/data/file1.parquet".to_string();
342        let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
343        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
344        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
345            pb,
346            schema,
347            file_io.clone(),
348            location_gen,
349            file_name_gen,
350        );
351        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
352            .build(None)
353            .await?;
354
355        // Create test data
356        let file_paths = StringArray::from(vec![referenced_file.as_str()]);
357        let positions = Int64Array::from(vec![42]);
358        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
359            Arc::new(file_paths),
360            Arc::new(positions),
361        ])?;
362
363        writer.write(batch.clone()).await?;
364        let data_files = writer.close().await?;
365
366        assert_eq!(data_files.len(), 1);
367        let data_file = &data_files[0];
368
369        // Verify referenced_data_file is set
370        assert_eq!(data_file.referenced_data_file, Some(referenced_file));
371
372        Ok(())
373    }
374
375    #[tokio::test]
376    async fn test_position_delete_writer_invalid_schema() -> Result<()> {
377        let temp_dir = TempDir::new().unwrap();
378        let file_io = FileIO::new_with_fs();
379        let location_gen = DefaultLocationGenerator::with_data_location(
380            temp_dir.path().to_str().unwrap().to_string(),
381        );
382        let file_name_gen =
383            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
384
385        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
386        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
387
388        let config = PositionDeleteWriterConfig::new(None, 0, None);
389        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
390        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
391            pb,
392            schema,
393            file_io.clone(),
394            location_gen,
395            file_name_gen,
396        );
397        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
398            .build(None)
399            .await?;
400
401        // Try to write batch with wrong schema (missing pos field)
402        let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
403            "wrong_field",
404            DataType::Int32,
405            false,
406        )]));
407        let wrong_batch =
408            RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
409
410        let result = writer.write(wrong_batch).await;
411        assert!(result.is_err());
412        assert!(result.unwrap_err().to_string().contains("invalid schema"));
413
414        Ok(())
415    }
416
417    #[tokio::test]
418    async fn test_position_delete_multiple_files() -> Result<()> {
419        let temp_dir = TempDir::new().unwrap();
420        let file_io = FileIO::new_with_fs();
421        let location_gen = DefaultLocationGenerator::with_data_location(
422            temp_dir.path().to_str().unwrap().to_string(),
423        );
424        let file_name_gen =
425            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
426
427        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
428        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
429
430        let config = PositionDeleteWriterConfig::new(None, 0, None);
431        let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
432        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
433            pb,
434            schema,
435            file_io.clone(),
436            location_gen,
437            file_name_gen,
438        );
439        let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config)
440            .build(None)
441            .await?;
442
443        // Delete rows from multiple data files
444        let file_paths = StringArray::from(vec![
445            "s3://bucket/data/file1.parquet",
446            "s3://bucket/data/file1.parquet",
447            "s3://bucket/data/file2.parquet",
448            "s3://bucket/data/file2.parquet",
449            "s3://bucket/data/file3.parquet",
450        ]);
451        let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
452        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
453            Arc::new(file_paths),
454            Arc::new(positions),
455        ])?;
456
457        writer.write(batch.clone()).await?;
458        let data_files = writer.close().await?;
459
460        assert_eq!(data_files.len(), 1);
461        check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
462
463        Ok(())
464    }
465}