Skip to main content

iceberg/writer/combined_writer/
delta_writer.rs

1//! Delta writers handle row-level changes by combining data file and delete file writers.
2//! The delta writer has three sub-writers:
3//! - A data file writer for new and updated rows.
4//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
5//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16
17use crate::arrow::record_batch_projector::RecordBatchProjector;
18use crate::spec::{DataFile, PartitionKey};
19use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
20use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
21use crate::{Error, ErrorKind, Result};
22
23/// A builder for `DeltaWriter`.
24#[derive(Clone, Debug)]
25pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
26    data_writer_builder: DWB,
27    pos_delete_writer_builder: PDWB,
28    eq_delete_writer_builder: EDWB,
29    unique_cols: Vec<i32>,
30}
31
32impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
33    /// Creates a new `DeltaWriterBuilder`.
34    pub fn new(
35        data_writer_builder: DWB,
36        pos_delete_writer_builder: PDWB,
37        eq_delete_writer_builder: EDWB,
38        unique_cols: Vec<i32>,
39    ) -> Self {
40        Self {
41            data_writer_builder,
42            pos_delete_writer_builder,
43            eq_delete_writer_builder,
44            unique_cols,
45        }
46    }
47}
48
49#[async_trait::async_trait]
50impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
51where
52    DWB: IcebergWriterBuilder,
53    PDWB: IcebergWriterBuilder,
54    EDWB: IcebergWriterBuilder,
55    DWB::R: CurrentFileStatus,
56{
57    type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
58    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
59        let data_writer = self
60            .data_writer_builder
61            .build(partition_key.clone())
62            .await?;
63        let pos_delete_writer = self
64            .pos_delete_writer_builder
65            .build(partition_key.clone())
66            .await?;
67        let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
68        DeltaWriter::try_new(
69            data_writer,
70            pos_delete_writer,
71            eq_delete_writer,
72            self.unique_cols.clone(),
73        )
74    }
75}
76
77/// Position information of a row in a data file.
78pub struct Position {
79    row_index: i64,
80    file_path: String,
81}
82
83/// A writer that handles row-level changes by combining data file and delete file writers.
84pub struct DeltaWriter<DW, PDW, EDW> {
85    /// The data file writer for new and updated rows.
86    pub data_writer: DW,
87    /// The position delete file writer for deletions of existing rows (that have been written within
88    /// this writer).
89    pub pos_delete_writer: PDW,
90    /// The equality delete file writer for deletions of rows based on equality conditions (for rows
91    /// that may exist in other data files).
92    pub eq_delete_writer: EDW,
93    /// The list of unique columns used for equality deletes.
94    pub unique_cols: Vec<i32>,
95    /// A map of rows (projected to unique columns) to their corresponding position information.
96    pub seen_rows: HashMap<OwnedRow, Position>,
97    /// A projector to project the record batch to the unique columns.
98    pub(crate) projector: RecordBatchProjector,
99    /// A converter to convert the projected columns to rows for easy comparison.
100    pub(crate) row_convertor: RowConverter,
101}
102
103impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
104where
105    DW: IcebergWriter + CurrentFileStatus,
106    PDW: IcebergWriter,
107    EDW: IcebergWriter,
108{
109    fn try_new(
110        data_writer: DW,
111        pos_delete_writer: PDW,
112        eq_delete_writer: EDW,
113        unique_cols: Vec<i32>,
114    ) -> Result<Self> {
115        let projector = RecordBatchProjector::from_iceberg_schema(
116            data_writer.current_schema(),
117            &unique_cols,
118        )?;
119
120        let row_convertor = RowConverter::new(
121            projector
122                .projected_schema_ref()
123                .fields()
124                .iter()
125                .map(|f| SortField::new(f.data_type().clone()))
126                .collect(),
127        )?;
128
129        Ok(Self {
130            data_writer,
131            pos_delete_writer,
132            eq_delete_writer,
133            unique_cols,
134            seen_rows: HashMap::new(),
135            projector,
136            row_convertor,
137        })
138    }
139
140    async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
141        let rows = self.extract_unique_column_rows(&batch)?;
142        let batch_num_rows = batch.num_rows();
143
144        // Write first to ensure the data is persisted before updating our tracking state.
145        // This prevents inconsistent state if the write fails.
146        // Note: We must write before calling current_file_path() because the underlying
147        // writer may not have created the file yet (lazy initialization).
148        self.data_writer.write(batch.clone()).await?;
149
150        // Get file path and calculate start_row_index after successful write
151        let file_path = self.data_writer.current_file_path();
152        let end_row_num = self.data_writer.current_row_num();
153        let start_row_index = end_row_num - batch_num_rows;
154
155        // Record positions for each row in this batch
156        for (i, row) in rows.iter().enumerate() {
157            self.seen_rows.insert(row.owned(), Position {
158                row_index: start_row_index as i64 + i as i64,
159                file_path: file_path.clone(),
160            });
161        }
162
163        Ok(())
164    }
165
166    async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
167        let rows = self.extract_unique_column_rows(&batch)?;
168        let mut file_array = vec![];
169        let mut row_index_array = vec![];
170        // Build a boolean array to track which rows need equality deletes.
171        // True = row not seen before, needs equality delete
172        // False = row was seen, already handled via position delete
173        let mut needs_equality_delete = BooleanBuilder::new();
174
175        for row in rows.iter() {
176            if let Some(pos) = self.seen_rows.remove(&row.owned()) {
177                // Row was previously inserted, use position delete
178                row_index_array.push(pos.row_index);
179                file_array.push(pos.file_path.clone());
180                needs_equality_delete.append_value(false);
181            } else {
182                // Row not seen before, use equality delete
183                needs_equality_delete.append_value(true);
184            }
185        }
186
187        // Write position deletes for rows that were previously inserted
188        let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
189        let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
190
191        let position_batch =
192            RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
193                file_array,
194                row_index_array,
195            ])?;
196
197        if position_batch.num_rows() > 0 {
198            self.pos_delete_writer
199                .write(position_batch)
200                .await
201                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
202        }
203
204        // Write equality deletes for rows that were not previously inserted
205        let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
206            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
207
208        if eq_batch.num_rows() > 0 {
209            self.eq_delete_writer
210                .write(eq_batch)
211                .await
212                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
213        }
214
215        Ok(())
216    }
217
218    fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
219        self.row_convertor
220            .convert_columns(&self.projector.project_column(batch.columns())?)
221            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
222    }
223}
224
225#[async_trait::async_trait]
226impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
227where
228    DW: IcebergWriter + CurrentFileStatus,
229    PDW: IcebergWriter,
230    EDW: IcebergWriter,
231{
232    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
233        // Treat the last row as an op indicator +1 for insert, -1 for delete
234        let ops = batch
235            .column(batch.num_columns() - 1)
236            .as_any()
237            .downcast_ref::<Int32Array>()
238            .ok_or(Error::new(
239                ErrorKind::Unexpected,
240                "Failed to downcast ops column",
241            ))?;
242
243        let partition =
244            partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
245                Error::new(
246                    ErrorKind::Unexpected,
247                    format!("Failed to partition batch: {e}"),
248                )
249            })?;
250
251        for range in partition.ranges() {
252            let batch = batch
253                .project(&(0..batch.num_columns() - 1).collect_vec())
254                .map_err(|e| {
255                    Error::new(
256                        ErrorKind::Unexpected,
257                        format!("Failed to project batch columns: {e}"),
258                    )
259                })?
260                .slice(range.start, range.end - range.start);
261            match ops.value(range.start) {
262                1 => self.insert(batch).await?,
263                -1 => self.delete(batch).await?,
264                op => {
265                    return Err(Error::new(
266                        ErrorKind::Unexpected,
267                        format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
268                    ));
269                }
270            }
271        }
272
273        Ok(())
274    }
275
276    async fn close(&mut self) -> Result<Vec<DataFile>> {
277        let data_files = self.data_writer.close().await?;
278        let pos_delete_files = self.pos_delete_writer.close().await?;
279        let eq_delete_files = self.eq_delete_writer.close().await?;
280
281        Ok(data_files
282            .into_iter()
283            .chain(pos_delete_files)
284            .chain(eq_delete_files)
285            .collect())
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    mod delta_writer_tests {
294        use std::collections::HashMap;
295
296        use arrow_array::{Int32Array, RecordBatch, StringArray};
297        use arrow_schema::{DataType, Field, Schema};
298        use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
299        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
300        use parquet::file::properties::WriterProperties;
301        use tempfile::TempDir;
302
303        use super::*;
304        use crate::arrow::arrow_schema_to_schema;
305        use crate::io::FileIOBuilder;
306        use crate::spec::{
307            DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
308        };
309        use crate::writer::IcebergWriterBuilder;
310        use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
311        use crate::writer::base_writer::equality_delete_writer::{
312            EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
313        };
314        use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
315        use crate::writer::file_writer::ParquetWriterBuilder;
316        use crate::writer::file_writer::location_generator::{
317            DefaultFileNameGenerator, DefaultLocationGenerator,
318        };
319        use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
320
321        fn create_iceberg_schema() -> Arc<IcebergSchema> {
322            Arc::new(
323                IcebergSchema::builder()
324                    .with_schema_id(0)
325                    .with_fields(vec![
326                        NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
327                        NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
328                            .into(),
329                    ])
330                    .build()
331                    .unwrap(),
332            )
333        }
334
335        fn create_test_batch_with_ops(
336            ids: Vec<i32>,
337            names: Vec<Option<&str>>,
338            ops: Vec<i32>,
339        ) -> RecordBatch {
340            let schema = Arc::new(Schema::new(vec![
341                Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
342                    PARQUET_FIELD_ID_META_KEY.to_string(),
343                    "1".to_string(),
344                )])),
345                Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
346                    PARQUET_FIELD_ID_META_KEY.to_string(),
347                    "2".to_string(),
348                )])),
349                Field::new("op", DataType::Int32, false),
350            ]));
351
352            let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
353            let name_array: ArrayRef = Arc::new(StringArray::from(names));
354            let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
355
356            RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
357        }
358
359        #[tokio::test]
360        async fn test_delta_writer_insert_only() -> Result<()> {
361            let temp_dir = TempDir::new().unwrap();
362            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
363            let schema = create_iceberg_schema();
364
365            // Create data writer
366            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
367                "{}/data",
368                temp_dir.path().to_str().unwrap()
369            ));
370            let data_file_name_gen =
371                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
372            let data_parquet_writer =
373                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
374            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
375                data_parquet_writer,
376                schema.clone(),
377                file_io.clone(),
378                data_location_gen,
379                data_file_name_gen,
380            );
381            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
382
383            // Create position delete writer
384            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
385                &PositionDeleteWriterConfig::arrow_schema(),
386            )?);
387            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
388                "{}/pos_delete",
389                temp_dir.path().to_str().unwrap()
390            ));
391            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
392                "pos_delete".to_string(),
393                None,
394                DataFileFormat::Parquet,
395            );
396            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
397                WriterProperties::builder().build(),
398                pos_delete_schema.clone(),
399            );
400            let pos_delete_rolling_writer_builder =
401                RollingFileWriterBuilder::new_with_default_file_size(
402                    pos_delete_parquet_writer,
403                    pos_delete_schema,
404                    file_io.clone(),
405                    pos_delete_location_gen,
406                    pos_delete_file_name_gen,
407                );
408            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
409                pos_delete_rolling_writer_builder,
410                PositionDeleteWriterConfig::new(None, 0, None),
411            );
412
413            // Create equality delete writer
414            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
415            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
416                eq_delete_config.projected_arrow_schema_ref(),
417            )?);
418            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
419                "{}/eq_delete",
420                temp_dir.path().to_str().unwrap()
421            ));
422            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
423                "eq_delete".to_string(),
424                None,
425                DataFileFormat::Parquet,
426            );
427            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
428                WriterProperties::builder().build(),
429                eq_delete_schema.clone(),
430            );
431            let eq_delete_rolling_writer_builder =
432                RollingFileWriterBuilder::new_with_default_file_size(
433                    eq_delete_parquet_writer,
434                    eq_delete_schema,
435                    file_io.clone(),
436                    eq_delete_location_gen,
437                    eq_delete_file_name_gen,
438                );
439            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
440                eq_delete_rolling_writer_builder,
441                eq_delete_config,
442            );
443
444            // Create delta writer
445            let data_writer_instance = data_writer.build(None).await?;
446            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
447            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
448            let mut delta_writer = DeltaWriter::try_new(
449                data_writer_instance,
450                pos_delete_writer_instance,
451                eq_delete_writer_instance,
452                vec![1], // unique on id column
453            )?;
454
455            // Write batch with only inserts
456            let batch = create_test_batch_with_ops(
457                vec![1, 2, 3],
458                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
459                vec![1, 1, 1], // all inserts
460            );
461
462            delta_writer.write(batch).await?;
463            let data_files = delta_writer.close().await?;
464
465            // Should have 1 data file, 0 delete files
466            assert_eq!(data_files.len(), 1);
467            assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
468            assert_eq!(data_files[0].record_count, 3);
469
470            // Read back and verify
471            let input_file = file_io.new_input(data_files[0].file_path.clone())?;
472            let content = input_file.read().await?;
473            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
474            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
475            assert_eq!(batches.len(), 1);
476            assert_eq!(batches[0].num_rows(), 3);
477
478            Ok(())
479        }
480
481        #[tokio::test]
482        async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
483            let temp_dir = TempDir::new().unwrap();
484            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
485            let schema = create_iceberg_schema();
486
487            // Create writers (same setup as above)
488            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
489                "{}/data",
490                temp_dir.path().to_str().unwrap()
491            ));
492            let data_file_name_gen =
493                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
494            let data_parquet_writer =
495                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
496            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
497                data_parquet_writer,
498                schema.clone(),
499                file_io.clone(),
500                data_location_gen,
501                data_file_name_gen,
502            );
503            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
504
505            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
506                &PositionDeleteWriterConfig::arrow_schema(),
507            )?);
508            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
509                "{}/pos_delete",
510                temp_dir.path().to_str().unwrap()
511            ));
512            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
513                "pos_delete".to_string(),
514                None,
515                DataFileFormat::Parquet,
516            );
517            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
518                WriterProperties::builder().build(),
519                pos_delete_schema.clone(),
520            );
521            let pos_delete_rolling_writer_builder =
522                RollingFileWriterBuilder::new_with_default_file_size(
523                    pos_delete_parquet_writer,
524                    pos_delete_schema,
525                    file_io.clone(),
526                    pos_delete_location_gen,
527                    pos_delete_file_name_gen,
528                );
529            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
530                pos_delete_rolling_writer_builder,
531                PositionDeleteWriterConfig::new(None, 0, None),
532            );
533
534            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
535            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
536                eq_delete_config.projected_arrow_schema_ref(),
537            )?);
538            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
539                "{}/eq_delete",
540                temp_dir.path().to_str().unwrap()
541            ));
542            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
543                "eq_delete".to_string(),
544                None,
545                DataFileFormat::Parquet,
546            );
547            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
548                WriterProperties::builder().build(),
549                eq_delete_schema.clone(),
550            );
551            let eq_delete_rolling_writer_builder =
552                RollingFileWriterBuilder::new_with_default_file_size(
553                    eq_delete_parquet_writer,
554                    eq_delete_schema,
555                    file_io.clone(),
556                    eq_delete_location_gen,
557                    eq_delete_file_name_gen,
558                );
559            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
560                eq_delete_rolling_writer_builder,
561                eq_delete_config,
562            );
563
564            let data_writer_instance = data_writer.build(None).await?;
565            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
566            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
567            let mut delta_writer = DeltaWriter::try_new(
568                data_writer_instance,
569                pos_delete_writer_instance,
570                eq_delete_writer_instance,
571                vec![1],
572            )?;
573
574            // First, insert some rows
575            let insert_batch = create_test_batch_with_ops(
576                vec![1, 2, 3],
577                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
578                vec![1, 1, 1],
579            );
580            delta_writer.write(insert_batch).await?;
581
582            // Now delete rows that were just inserted (should create position deletes)
583            let delete_batch =
584                create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
585                    -1, -1,
586                ]);
587            delta_writer.write(delete_batch).await?;
588
589            let data_files = delta_writer.close().await?;
590
591            // Should have 1 data file + 1 position delete file
592            assert_eq!(data_files.len(), 2);
593
594            let data_file = data_files
595                .iter()
596                .find(|f| f.content == crate::spec::DataContentType::Data)
597                .unwrap();
598            let pos_delete_file = data_files
599                .iter()
600                .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
601                .unwrap();
602
603            assert_eq!(data_file.record_count, 3);
604            assert_eq!(pos_delete_file.record_count, 2);
605
606            // Verify position delete file content
607            let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
608            let content = input_file.read().await?;
609            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
610            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
611            assert_eq!(batches[0].num_rows(), 2);
612
613            Ok(())
614        }
615
616        #[tokio::test]
617        async fn test_delta_writer_equality_delete() -> Result<()> {
618            let temp_dir = TempDir::new().unwrap();
619            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
620            let schema = create_iceberg_schema();
621
622            // Create writers
623            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
624                "{}/data",
625                temp_dir.path().to_str().unwrap()
626            ));
627            let data_file_name_gen =
628                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
629            let data_parquet_writer =
630                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
631            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
632                data_parquet_writer,
633                schema.clone(),
634                file_io.clone(),
635                data_location_gen,
636                data_file_name_gen,
637            );
638            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
639
640            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
641                &PositionDeleteWriterConfig::arrow_schema(),
642            )?);
643            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
644                "{}/pos_delete",
645                temp_dir.path().to_str().unwrap()
646            ));
647            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
648                "pos_delete".to_string(),
649                None,
650                DataFileFormat::Parquet,
651            );
652            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
653                WriterProperties::builder().build(),
654                pos_delete_schema.clone(),
655            );
656            let pos_delete_rolling_writer_builder =
657                RollingFileWriterBuilder::new_with_default_file_size(
658                    pos_delete_parquet_writer,
659                    pos_delete_schema,
660                    file_io.clone(),
661                    pos_delete_location_gen,
662                    pos_delete_file_name_gen,
663                );
664            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
665                pos_delete_rolling_writer_builder,
666                PositionDeleteWriterConfig::new(None, 0, None),
667            );
668
669            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
670            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
671                eq_delete_config.projected_arrow_schema_ref(),
672            )?);
673            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
674                "{}/eq_delete",
675                temp_dir.path().to_str().unwrap()
676            ));
677            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
678                "eq_delete".to_string(),
679                None,
680                DataFileFormat::Parquet,
681            );
682            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
683                WriterProperties::builder().build(),
684                eq_delete_schema.clone(),
685            );
686            let eq_delete_rolling_writer_builder =
687                RollingFileWriterBuilder::new_with_default_file_size(
688                    eq_delete_parquet_writer,
689                    eq_delete_schema,
690                    file_io.clone(),
691                    eq_delete_location_gen,
692                    eq_delete_file_name_gen,
693                );
694            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
695                eq_delete_rolling_writer_builder,
696                eq_delete_config,
697            );
698
699            let data_writer_instance = data_writer.build(None).await?;
700            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
701            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
702            let mut delta_writer = DeltaWriter::try_new(
703                data_writer_instance,
704                pos_delete_writer_instance,
705                eq_delete_writer_instance,
706                vec![1],
707            )?;
708
709            // Delete rows that were never inserted (should create equality deletes)
710            let delete_batch =
711                create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
712            delta_writer.write(delete_batch).await?;
713
714            let data_files = delta_writer.close().await?;
715
716            // Should have only 1 equality delete file
717            assert_eq!(data_files.len(), 1);
718            assert_eq!(
719                data_files[0].content,
720                crate::spec::DataContentType::EqualityDeletes
721            );
722            assert_eq!(data_files[0].record_count, 2);
723
724            Ok(())
725        }
726
727        #[tokio::test]
728        async fn test_delta_writer_invalid_op() -> Result<()> {
729            let temp_dir = TempDir::new().unwrap();
730            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
731            let schema = create_iceberg_schema();
732
733            // Create writers
734            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
735                "{}/data",
736                temp_dir.path().to_str().unwrap()
737            ));
738            let data_file_name_gen =
739                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
740            let data_parquet_writer =
741                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
742            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
743                data_parquet_writer,
744                schema.clone(),
745                file_io.clone(),
746                data_location_gen,
747                data_file_name_gen,
748            );
749            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
750
751            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
752                &PositionDeleteWriterConfig::arrow_schema(),
753            )?);
754            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
755                "{}/pos_delete",
756                temp_dir.path().to_str().unwrap()
757            ));
758            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
759                "pos_delete".to_string(),
760                None,
761                DataFileFormat::Parquet,
762            );
763            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
764                WriterProperties::builder().build(),
765                pos_delete_schema.clone(),
766            );
767            let pos_delete_rolling_writer_builder =
768                RollingFileWriterBuilder::new_with_default_file_size(
769                    pos_delete_parquet_writer,
770                    pos_delete_schema,
771                    file_io.clone(),
772                    pos_delete_location_gen,
773                    pos_delete_file_name_gen,
774                );
775            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
776                pos_delete_rolling_writer_builder,
777                PositionDeleteWriterConfig::new(None, 0, None),
778            );
779
780            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
781            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
782                eq_delete_config.projected_arrow_schema_ref(),
783            )?);
784            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
785                "{}/eq_delete",
786                temp_dir.path().to_str().unwrap()
787            ));
788            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
789                "eq_delete".to_string(),
790                None,
791                DataFileFormat::Parquet,
792            );
793            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
794                WriterProperties::builder().build(),
795                eq_delete_schema.clone(),
796            );
797            let eq_delete_rolling_writer_builder =
798                RollingFileWriterBuilder::new_with_default_file_size(
799                    eq_delete_parquet_writer,
800                    eq_delete_schema,
801                    file_io.clone(),
802                    eq_delete_location_gen,
803                    eq_delete_file_name_gen,
804                );
805            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
806                eq_delete_rolling_writer_builder,
807                eq_delete_config,
808            );
809
810            let data_writer_instance = data_writer.build(None).await?;
811            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
812            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
813            let mut delta_writer = DeltaWriter::try_new(
814                data_writer_instance,
815                pos_delete_writer_instance,
816                eq_delete_writer_instance,
817                vec![1],
818            )?;
819
820            // Invalid operation code
821            let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
822
823            let result = delta_writer.write(batch).await;
824            assert!(result.is_err());
825            assert!(
826                result
827                    .unwrap_err()
828                    .to_string()
829                    .contains("Ops column must be 1 (insert) or -1 (delete)")
830            );
831
832            Ok(())
833        }
834    }
835}