iceberg/writer/base_writer/
position_delete_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11//   Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides `PositionDeleteWriter`.
19
20use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::spec::{DataFile, Struct};
28use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
29use crate::writer::{IcebergWriter, IcebergWriterBuilder};
30use crate::{Error, ErrorKind, Result};
31
32/// Builder for `PositionDeleteWriter`.
33#[derive(Clone, Debug)]
34pub struct PositionDeleteFileWriterBuilder<B: FileWriterBuilder> {
35    inner: B,
36    config: PositionDeleteWriterConfig,
37}
38
39impl<B: FileWriterBuilder> PositionDeleteFileWriterBuilder<B> {
40    /// Create a new `PositionDeleteFileWriterBuilder` using a `FileWriterBuilder`.
41    pub fn new(inner: B, config: PositionDeleteWriterConfig) -> Self {
42        Self { inner, config }
43    }
44}
45
46/// Config for `PositionDeleteWriter`.
47#[derive(Clone, Debug)]
48pub struct PositionDeleteWriterConfig {
49    partition_value: Struct,
50    partition_spec_id: i32,
51    referenced_data_file: Option<String>,
52}
53
54impl PositionDeleteWriterConfig {
55    /// Create a new `PositionDeleteWriterConfig`.
56    ///
57    /// # Arguments
58    /// * `partition_value` - The partition value for the delete file
59    /// * `partition_spec_id` - The partition spec ID
60    /// * `referenced_data_file` - Optional path to the data file being deleted from
61    pub fn new(
62        partition_value: Option<Struct>,
63        partition_spec_id: i32,
64        referenced_data_file: Option<String>,
65    ) -> Self {
66        Self {
67            partition_value: partition_value.unwrap_or(Struct::empty()),
68            partition_spec_id,
69            referenced_data_file,
70        }
71    }
72
73    /// Returns the Arrow schema for position delete files.
74    ///
75    /// Position delete files have a fixed schema:
76    /// - file_path: string (field id 2147483546)
77    /// - pos: long (field id 2147483545)
78    pub fn arrow_schema() -> ArrowSchemaRef {
79        Arc::new(ArrowSchema::new(vec![
80            Field::new("file_path", DataType::Utf8, false).with_metadata(
81                [(
82                    PARQUET_FIELD_ID_META_KEY.to_string(),
83                    "2147483546".to_string(),
84                )]
85                .into_iter()
86                .collect(),
87            ),
88            Field::new("pos", DataType::Int64, false).with_metadata(
89                [(
90                    PARQUET_FIELD_ID_META_KEY.to_string(),
91                    "2147483545".to_string(),
92                )]
93                .into_iter()
94                .collect(),
95            ),
96        ]))
97    }
98}
99
100#[async_trait::async_trait]
101impl<B: FileWriterBuilder> IcebergWriterBuilder for PositionDeleteFileWriterBuilder<B> {
102    type R = PositionDeleteFileWriter<B>;
103
104    async fn build(self) -> Result<Self::R> {
105        Ok(PositionDeleteFileWriter {
106            inner_writer: Some(self.inner.clone().build().await?),
107            partition_value: self.config.partition_value,
108            partition_spec_id: self.config.partition_spec_id,
109            referenced_data_file: self.config.referenced_data_file,
110        })
111    }
112}
113
114/// Writer used to write position delete files.
115///
116/// Position delete files mark specific rows in data files as deleted
117/// by their position (row number). The schema is fixed with two columns:
118/// - file_path: The path to the data file
119/// - pos: The row position (0-indexed) in that file
120///
121/// # Example
122///
123/// ```no_run
124/// use std::sync::Arc;
125/// use arrow_array::{RecordBatch, StringArray, Int64Array};
126/// use iceberg::spec::DataFileFormat;
127/// use iceberg::writer::base_writer::position_delete_writer::{
128///     PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig
129/// };
130/// use iceberg::writer::file_writer::ParquetWriterBuilder;
131/// use iceberg::writer::IcebergWriterBuilder;
132/// # use iceberg::Result;
133/// # async fn example() -> Result<()> {
134/// # let file_io = iceberg::io::FileIOBuilder::new_fs_io().build()?;
135/// # let location_gen = iceberg::writer::file_writer::location_generator::DefaultLocationGenerator::with_data_location("/tmp".to_string());
136/// # let file_name_gen = iceberg::writer::file_writer::location_generator::DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
137/// # let schema = Arc::new(iceberg::spec::Schema::builder().with_schema_id(0).build()?);
138/// # let parquet_builder = ParquetWriterBuilder::new(Default::default(), schema, None, file_io, location_gen, file_name_gen);
139///
140/// // Create position delete writer
141/// let config = PositionDeleteWriterConfig::new(None, 0, None);
142/// let mut writer = PositionDeleteFileWriterBuilder::new(parquet_builder, config)
143///     .build()
144///     .await?;
145///
146/// // Write delete records
147/// let file_paths = StringArray::from(vec!["s3://bucket/data/file1.parquet"]);
148/// let positions = Int64Array::from(vec![42]); // Delete row 42
149/// let batch = RecordBatch::try_new(
150///     PositionDeleteWriterConfig::arrow_schema(),
151///     vec![Arc::new(file_paths), Arc::new(positions)],
152/// )?;
153///
154/// writer.write(batch).await?;
155/// let data_files = writer.close().await?;
156/// # Ok(())
157/// # }
158/// ```
159#[derive(Debug)]
160pub struct PositionDeleteFileWriter<B: FileWriterBuilder> {
161    inner_writer: Option<B::R>,
162    partition_value: Struct,
163    partition_spec_id: i32,
164    referenced_data_file: Option<String>,
165}
166
167#[async_trait::async_trait]
168impl<B: FileWriterBuilder> IcebergWriter for PositionDeleteFileWriter<B> {
169    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
170        // Validate the batch has the correct schema
171        let expected_schema = PositionDeleteWriterConfig::arrow_schema();
172        if batch.schema().fields() != expected_schema.fields() {
173            return Err(Error::new(
174                ErrorKind::DataInvalid,
175                format!(
176                    "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}",
177                    expected_schema.fields(),
178                    batch.schema().fields()
179                ),
180            ));
181        }
182
183        if let Some(writer) = self.inner_writer.as_mut() {
184            writer.write(&batch).await
185        } else {
186            Err(Error::new(
187                ErrorKind::Unexpected,
188                "Position delete inner writer has been closed.",
189            ))
190        }
191    }
192
193    async fn close(&mut self) -> Result<Vec<DataFile>> {
194        if let Some(writer) = self.inner_writer.take() {
195            Ok(writer
196                .close()
197                .await?
198                .into_iter()
199                .map(|mut res| {
200                    res.content(crate::spec::DataContentType::PositionDeletes);
201                    res.partition(self.partition_value.clone());
202                    res.partition_spec_id(self.partition_spec_id);
203                    if let Some(ref data_file) = self.referenced_data_file {
204                        res.referenced_data_file(Some(data_file.clone()));
205                    }
206                    // Position deletes must have null sort_order_id (default is None)
207                    res.build()
208                        .expect("Failed to build position delete data file")
209                })
210                .collect_vec())
211        } else {
212            Err(Error::new(
213                ErrorKind::Unexpected,
214                "Position delete inner writer has been closed.",
215            ))
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use std::sync::Arc;
223
224    use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
225    use arrow_select::concat::concat_batches;
226    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
227    use parquet::file::properties::WriterProperties;
228    use tempfile::TempDir;
229
230    use super::*;
231    use crate::arrow::arrow_schema_to_schema;
232    use crate::io::{FileIO, FileIOBuilder};
233    use crate::spec::DataFileFormat;
234    use crate::writer::file_writer::location_generator::{
235        DefaultFileNameGenerator, DefaultLocationGenerator,
236    };
237    use crate::writer::file_writer::ParquetWriterBuilder;
238    use crate::writer::IcebergWriterBuilder;
239
240    async fn check_parquet_position_delete_file(
241        file_io: &FileIO,
242        data_file: &DataFile,
243        expected_batch: &RecordBatch,
244    ) {
245        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
246        assert_eq!(
247            data_file.content,
248            crate::spec::DataContentType::PositionDeletes
249        );
250
251        // Read the written file
252        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
253        let input_content = input_file.read().await.unwrap();
254        let reader_builder =
255            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
256        let metadata = reader_builder.metadata().clone();
257
258        // Check data
259        let reader = reader_builder.build().unwrap();
260        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
261        let actual = concat_batches(&expected_batch.schema(), &batches).unwrap();
262        assert_eq!(*expected_batch, actual);
263
264        // Check metadata
265        assert_eq!(
266            data_file.record_count,
267            metadata
268                .row_groups()
269                .iter()
270                .map(|group| group.num_rows())
271                .sum::<i64>() as u64
272        );
273        assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
274
275        // Position deletes must have null sort_order_id
276        assert!(data_file.sort_order_id.is_none());
277    }
278
279    #[tokio::test]
280    async fn test_position_delete_writer() -> Result<()> {
281        let temp_dir = TempDir::new().unwrap();
282        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
283        let location_gen = DefaultLocationGenerator::with_data_location(
284            temp_dir.path().to_str().unwrap().to_string(),
285        );
286        let file_name_gen =
287            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
288
289        // Get the position delete schema
290        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
291        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
292
293        // Create writer
294        let config = PositionDeleteWriterConfig::new(None, 0, None);
295        let pb = ParquetWriterBuilder::new(
296            WriterProperties::builder().build(),
297            schema,
298            None,
299            file_io.clone(),
300            location_gen,
301            file_name_gen,
302        );
303        let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
304            .build()
305            .await?;
306
307        // Create test data - delete rows 5, 10, 15 from a file
308        let file_paths = StringArray::from(vec![
309            "s3://bucket/data/file1.parquet",
310            "s3://bucket/data/file1.parquet",
311            "s3://bucket/data/file1.parquet",
312        ]);
313        let positions = Int64Array::from(vec![5, 10, 15]);
314        let batch = RecordBatch::try_new(
315            arrow_schema.clone(),
316            vec![Arc::new(file_paths), Arc::new(positions)],
317        )?;
318
319        // Write
320        writer.write(batch.clone()).await?;
321        let data_files = writer.close().await?;
322
323        assert_eq!(data_files.len(), 1);
324        let data_file = &data_files[0];
325
326        // Verify
327        check_parquet_position_delete_file(&file_io, data_file, &batch).await;
328
329        Ok(())
330    }
331
332    #[tokio::test]
333    async fn test_position_delete_writer_with_referenced_file() -> Result<()> {
334        let temp_dir = TempDir::new().unwrap();
335        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
336        let location_gen = DefaultLocationGenerator::with_data_location(
337            temp_dir.path().to_str().unwrap().to_string(),
338        );
339        let file_name_gen =
340            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
341
342        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
343        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
344
345        // Create writer with referenced data file
346        let referenced_file = "s3://bucket/data/file1.parquet".to_string();
347        let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone()));
348        let pb = ParquetWriterBuilder::new(
349            WriterProperties::builder().build(),
350            schema,
351            None,
352            file_io.clone(),
353            location_gen,
354            file_name_gen,
355        );
356        let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
357            .build()
358            .await?;
359
360        // Create test data
361        let file_paths = StringArray::from(vec![referenced_file.as_str()]);
362        let positions = Int64Array::from(vec![42]);
363        let batch = RecordBatch::try_new(
364            arrow_schema.clone(),
365            vec![Arc::new(file_paths), Arc::new(positions)],
366        )?;
367
368        writer.write(batch.clone()).await?;
369        let data_files = writer.close().await?;
370
371        assert_eq!(data_files.len(), 1);
372        let data_file = &data_files[0];
373
374        // Verify referenced_data_file is set
375        assert_eq!(data_file.referenced_data_file, Some(referenced_file));
376
377        Ok(())
378    }
379
380    #[tokio::test]
381    async fn test_position_delete_writer_invalid_schema() -> Result<()> {
382        let temp_dir = TempDir::new().unwrap();
383        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
384        let location_gen = DefaultLocationGenerator::with_data_location(
385            temp_dir.path().to_str().unwrap().to_string(),
386        );
387        let file_name_gen =
388            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
389
390        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
391        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
392
393        let config = PositionDeleteWriterConfig::new(None, 0, None);
394        let pb = ParquetWriterBuilder::new(
395            WriterProperties::builder().build(),
396            schema,
397            None,
398            file_io.clone(),
399            location_gen,
400            file_name_gen,
401        );
402        let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
403            .build()
404            .await?;
405
406        // Try to write batch with wrong schema (missing pos field)
407        let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new(
408            "wrong_field",
409            DataType::Int32,
410            false,
411        )]));
412        let wrong_batch =
413            RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?;
414
415        let result = writer.write(wrong_batch).await;
416        assert!(result.is_err());
417        assert!(result
418            .unwrap_err()
419            .to_string()
420            .contains("invalid schema"));
421
422        Ok(())
423    }
424
425    #[tokio::test]
426    async fn test_position_delete_multiple_files() -> Result<()> {
427        let temp_dir = TempDir::new().unwrap();
428        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
429        let location_gen = DefaultLocationGenerator::with_data_location(
430            temp_dir.path().to_str().unwrap().to_string(),
431        );
432        let file_name_gen =
433            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
434
435        let arrow_schema = PositionDeleteWriterConfig::arrow_schema();
436        let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?);
437
438        let config = PositionDeleteWriterConfig::new(None, 0, None);
439        let pb = ParquetWriterBuilder::new(
440            WriterProperties::builder().build(),
441            schema,
442            None,
443            file_io.clone(),
444            location_gen,
445            file_name_gen,
446        );
447        let mut writer = PositionDeleteFileWriterBuilder::new(pb, config)
448            .build()
449            .await?;
450
451        // Delete rows from multiple data files
452        let file_paths = StringArray::from(vec![
453            "s3://bucket/data/file1.parquet",
454            "s3://bucket/data/file1.parquet",
455            "s3://bucket/data/file2.parquet",
456            "s3://bucket/data/file2.parquet",
457            "s3://bucket/data/file3.parquet",
458        ]);
459        let positions = Int64Array::from(vec![0, 10, 5, 15, 100]);
460        let batch = RecordBatch::try_new(
461            arrow_schema.clone(),
462            vec![Arc::new(file_paths), Arc::new(positions)],
463        )?;
464
465        writer.write(batch.clone()).await?;
466        let data_files = writer.close().await?;
467
468        assert_eq!(data_files.len(), 1);
469        check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await;
470
471        Ok(())
472    }
473}