iceberg/writer/combined_writer/
delta_writer.rs

1//! Delta writers handle row-level changes by combining data file and delete file writers.
2//! The delta writer has three sub-writers:
3//! - A data file writer for new and updated rows.
4//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
5//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
17
18use crate::arrow::record_batch_projector::RecordBatchProjector;
19use crate::arrow::schema_to_arrow_schema;
20use crate::spec::DataFile;
21use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
22use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
23use crate::{Error, ErrorKind, Result};
24
25/// A builder for `DeltaWriter`.
26#[derive(Clone, Debug)]
27pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
28    data_writer_builder: DWB,
29    pos_delete_writer_builder: PDWB,
30    eq_delete_writer_builder: EDWB,
31    unique_cols: Vec<i32>,
32}
33
34impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
35    /// Creates a new `DeltaWriterBuilder`.
36    pub fn new(
37        data_writer_builder: DWB,
38        pos_delete_writer_builder: PDWB,
39        eq_delete_writer_builder: EDWB,
40        unique_cols: Vec<i32>,
41    ) -> Self {
42        Self {
43            data_writer_builder,
44            pos_delete_writer_builder,
45            eq_delete_writer_builder,
46            unique_cols,
47        }
48    }
49}
50
51#[async_trait::async_trait]
52impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
53where
54    DWB: IcebergWriterBuilder,
55    PDWB: IcebergWriterBuilder,
56    EDWB: IcebergWriterBuilder,
57    DWB::R: CurrentFileStatus,
58{
59    type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
60    async fn build(self) -> Result<Self::R> {
61        let data_writer = self.data_writer_builder.build().await?;
62        let pos_delete_writer = self.pos_delete_writer_builder.build().await?;
63        let eq_delete_writer = self.eq_delete_writer_builder.build().await?;
64        DeltaWriter::try_new(
65            data_writer,
66            pos_delete_writer,
67            eq_delete_writer,
68            self.unique_cols,
69        )
70    }
71}
72
73/// Position information of a row in a data file.
74pub struct Position {
75    row_index: i64,
76    file_path: String,
77}
78
79/// A writer that handles row-level changes by combining data file and delete file writers.
80pub struct DeltaWriter<DW, PDW, EDW> {
81    /// The data file writer for new and updated rows.
82    pub data_writer: DW,
83    /// The position delete file writer for deletions of existing rows (that have been written within
84    /// this writer).
85    pub pos_delete_writer: PDW,
86    /// The equality delete file writer for deletions of rows based on equality conditions (for rows
87    /// that may exist in other data files).
88    pub eq_delete_writer: EDW,
89    /// The list of unique columns used for equality deletes.
90    pub unique_cols: Vec<i32>,
91    /// A map of rows (projected to unique columns) to their corresponding position information.
92    pub seen_rows: HashMap<OwnedRow, Position>,
93    /// A projector to project the record batch to the unique columns.
94    pub(crate) projector: RecordBatchProjector,
95    /// A converter to convert the projected columns to rows for easy comparison.
96    pub(crate) row_convertor: RowConverter,
97}
98
99impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
100where
101    DW: IcebergWriter + CurrentFileStatus,
102    PDW: IcebergWriter,
103    EDW: IcebergWriter,
104{
105    fn try_new(
106        data_writer: DW,
107        pos_delete_writer: PDW,
108        eq_delete_writer: EDW,
109        unique_cols: Vec<i32>,
110    ) -> Result<Self> {
111        let arrow_schema = Arc::new(schema_to_arrow_schema(&data_writer.current_schema())?);
112        let projector = RecordBatchProjector::new(
113            arrow_schema,
114            &unique_cols,
115            |field| {
116                if field.data_type().is_nested() {
117                    return Ok(None);
118                }
119                field
120                    .metadata()
121                    .get(PARQUET_FIELD_ID_META_KEY)
122                    .map(|id_str| {
123                        id_str.parse::<i64>().map_err(|e| {
124                            Error::new(
125                                ErrorKind::Unexpected,
126                                format!("Failed to parse field ID {}: {}", id_str, e),
127                            )
128                        })
129                    })
130                    .transpose()
131            },
132            |_| false,
133        )?;
134
135        let row_convertor = RowConverter::new(
136            projector
137                .projected_schema_ref()
138                .fields()
139                .iter()
140                .map(|f| SortField::new(f.data_type().clone()))
141                .collect(),
142        )?;
143
144        Ok(Self {
145            data_writer,
146            pos_delete_writer,
147            eq_delete_writer,
148            unique_cols,
149            seen_rows: HashMap::new(),
150            projector,
151            row_convertor,
152        })
153    }
154
155    async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
156        let rows = self.extract_unique_column_rows(&batch)?;
157        let file_path = self.data_writer.current_file_path();
158        let start_row_index = self.data_writer.current_row_num();
159
160        self.data_writer.write(batch.clone()).await?;
161
162        for (i, row) in rows.iter().enumerate() {
163            self.seen_rows.insert(row.owned(), Position {
164                row_index: start_row_index as i64 + i as i64,
165                file_path: file_path.clone(),
166            });
167        }
168
169        Ok(())
170    }
171
172    async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
173        let rows = self.extract_unique_column_rows(&batch)?;
174        let mut file_array = vec![];
175        let mut row_index_array = vec![];
176        // Build a boolean array to track which rows need equality deletes.
177        // True = row not seen before, needs equality delete
178        // False = row was seen, already handled via position delete
179        let mut needs_equality_delete = BooleanBuilder::new();
180
181        for row in rows.iter() {
182            if let Some(pos) = self.seen_rows.remove(&row.owned()) {
183                // Row was previously inserted, use position delete
184                row_index_array.push(pos.row_index);
185                file_array.push(pos.file_path.clone());
186                needs_equality_delete.append_value(false);
187            } else {
188                // Row not seen before, use equality delete
189                needs_equality_delete.append_value(true);
190            }
191        }
192
193        // Write position deletes for rows that were previously inserted
194        let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
195        let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
196
197        let position_batch =
198            RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
199                file_array,
200                row_index_array,
201            ])?;
202
203        if position_batch.num_rows() > 0 {
204            self.pos_delete_writer
205                .write(position_batch)
206                .await
207                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
208        }
209
210        // Write equality deletes for rows that were not previously inserted
211        let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
212            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
213
214        if eq_batch.num_rows() > 0 {
215            self.eq_delete_writer
216                .write(eq_batch)
217                .await
218                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
219        }
220
221        Ok(())
222    }
223
224    fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
225        self.row_convertor
226            .convert_columns(&self.projector.project_column(batch.columns())?)
227            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
228    }
229}
230
231#[async_trait::async_trait]
232impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
233where
234    DW: IcebergWriter + CurrentFileStatus,
235    PDW: IcebergWriter,
236    EDW: IcebergWriter,
237{
238    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
239        // Treat the last row as an op indicator +1 for insert, -1 for delete
240        let ops = batch
241            .column(batch.num_columns() - 1)
242            .as_any()
243            .downcast_ref::<Int32Array>()
244            .ok_or(Error::new(
245                ErrorKind::Unexpected,
246                "Failed to downcast ops column",
247            ))?;
248
249        let partition =
250            partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
251                Error::new(
252                    ErrorKind::Unexpected,
253                    format!("Failed to partition batch: {e}"),
254                )
255            })?;
256
257        for range in partition.ranges() {
258            let batch = batch
259                .project(&(0..batch.num_columns() - 1).collect_vec())
260                .map_err(|e| {
261                    Error::new(
262                        ErrorKind::Unexpected,
263                        format!("Failed to project batch columns: {e}"),
264                    )
265                })?
266                .slice(range.start, range.end - range.start);
267            match ops.value(range.start) {
268                1 => self.insert(batch).await?,
269                -1 => self.delete(batch).await?,
270                op => {
271                    return Err(Error::new(
272                        ErrorKind::Unexpected,
273                        format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
274                    ));
275                }
276            }
277        }
278
279        Ok(())
280    }
281
282    async fn close(&mut self) -> Result<Vec<DataFile>> {
283        let data_files = self.data_writer.close().await?;
284        let pos_delete_files = self.pos_delete_writer.close().await?;
285        let eq_delete_files = self.eq_delete_writer.close().await?;
286
287        Ok(data_files
288            .into_iter()
289            .chain(pos_delete_files)
290            .chain(eq_delete_files)
291            .collect())
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    mod delta_writer_tests {
298        use std::collections::HashMap;
299        use std::sync::Arc;
300
301        use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
302        use arrow_schema::{DataType, Field, Schema};
303        use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
304        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
305        use parquet::file::properties::WriterProperties;
306        use tempfile::TempDir;
307
308        use crate::Result;
309        use crate::arrow::arrow_schema_to_schema;
310        use crate::io::FileIOBuilder;
311        use crate::spec::{
312            DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
313        };
314        use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
315        use crate::writer::base_writer::equality_delete_writer::{
316            EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
317        };
318        use crate::writer::base_writer::position_delete_writer::{
319            PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
320        };
321        use crate::writer::combined_writer::delta_writer::DeltaWriter;
322        use crate::writer::file_writer::ParquetWriterBuilder;
323        use crate::writer::file_writer::location_generator::{
324            DefaultFileNameGenerator, DefaultLocationGenerator,
325        };
326        use crate::writer::{IcebergWriter, IcebergWriterBuilder};
327
328        fn create_iceberg_schema() -> Arc<IcebergSchema> {
329            Arc::new(
330                IcebergSchema::builder()
331                    .with_schema_id(0)
332                    .with_fields(vec![
333                        NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
334                        NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
335                            .into(),
336                    ])
337                    .build()
338                    .unwrap(),
339            )
340        }
341
342        fn create_test_batch_with_ops(
343            ids: Vec<i32>,
344            names: Vec<Option<&str>>,
345            ops: Vec<i32>,
346        ) -> RecordBatch {
347            let schema = Arc::new(Schema::new(vec![
348                Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
349                    PARQUET_FIELD_ID_META_KEY.to_string(),
350                    "1".to_string(),
351                )])),
352                Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
353                    PARQUET_FIELD_ID_META_KEY.to_string(),
354                    "2".to_string(),
355                )])),
356                Field::new("op", DataType::Int32, false),
357            ]));
358
359            let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
360            let name_array: ArrayRef = Arc::new(StringArray::from(names));
361            let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
362
363            RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
364        }
365
366        #[tokio::test]
367        async fn test_delta_writer_insert_only() -> Result<()> {
368            let temp_dir = TempDir::new().unwrap();
369            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
370            let schema = create_iceberg_schema();
371
372            // Create data writer
373            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
374                "{}/data",
375                temp_dir.path().to_str().unwrap()
376            ));
377            let data_file_name_gen =
378                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
379            let data_parquet_writer = ParquetWriterBuilder::new(
380                WriterProperties::builder().build(),
381                schema.clone(),
382                None,
383                file_io.clone(),
384                data_location_gen,
385                data_file_name_gen,
386            );
387            let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
388
389            // Create position delete writer
390            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
391                &PositionDeleteWriterConfig::arrow_schema(),
392            )?);
393            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
394                "{}/pos_delete",
395                temp_dir.path().to_str().unwrap()
396            ));
397            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
398                "pos_delete".to_string(),
399                None,
400                DataFileFormat::Parquet,
401            );
402            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
403                WriterProperties::builder().build(),
404                pos_delete_schema,
405                None,
406                file_io.clone(),
407                pos_delete_location_gen,
408                pos_delete_file_name_gen,
409            );
410            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
411                pos_delete_parquet_writer,
412                PositionDeleteWriterConfig::new(None, 0, None),
413            );
414
415            // Create equality delete writer
416            let eq_delete_config =
417                EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
418            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
419                eq_delete_config.projected_arrow_schema_ref(),
420            )?);
421            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
422                "{}/eq_delete",
423                temp_dir.path().to_str().unwrap()
424            ));
425            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
426                "eq_delete".to_string(),
427                None,
428                DataFileFormat::Parquet,
429            );
430            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
431                WriterProperties::builder().build(),
432                eq_delete_schema,
433                None,
434                file_io.clone(),
435                eq_delete_location_gen,
436                eq_delete_file_name_gen,
437            );
438            let eq_delete_writer =
439                EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
440
441            // Create delta writer
442            let data_writer_instance = data_writer.build().await?;
443            let pos_delete_writer_instance = pos_delete_writer.build().await?;
444            let eq_delete_writer_instance = eq_delete_writer.build().await?;
445            let mut delta_writer = DeltaWriter::try_new(
446                data_writer_instance,
447                pos_delete_writer_instance,
448                eq_delete_writer_instance,
449                vec![1], // unique on id column
450            )?;
451
452            // Write batch with only inserts
453            let batch = create_test_batch_with_ops(
454                vec![1, 2, 3],
455                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
456                vec![1, 1, 1], // all inserts
457            );
458
459            delta_writer.write(batch).await?;
460            let data_files = delta_writer.close().await?;
461
462            // Should have 1 data file, 0 delete files
463            assert_eq!(data_files.len(), 1);
464            assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
465            assert_eq!(data_files[0].record_count, 3);
466
467            // Read back and verify
468            let input_file = file_io.new_input(data_files[0].file_path.clone())?;
469            let content = input_file.read().await?;
470            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
471            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
472            assert_eq!(batches.len(), 1);
473            assert_eq!(batches[0].num_rows(), 3);
474
475            Ok(())
476        }
477
478        #[tokio::test]
479        async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
480            let temp_dir = TempDir::new().unwrap();
481            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
482            let schema = create_iceberg_schema();
483
484            // Create writers (same setup as above)
485            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
486                "{}/data",
487                temp_dir.path().to_str().unwrap()
488            ));
489            let data_file_name_gen =
490                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
491            let data_parquet_writer = ParquetWriterBuilder::new(
492                WriterProperties::builder().build(),
493                schema.clone(),
494                None,
495                file_io.clone(),
496                data_location_gen,
497                data_file_name_gen,
498            );
499            let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
500
501            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
502                &PositionDeleteWriterConfig::arrow_schema(),
503            )?);
504            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
505                "{}/pos_delete",
506                temp_dir.path().to_str().unwrap()
507            ));
508            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
509                "pos_delete".to_string(),
510                None,
511                DataFileFormat::Parquet,
512            );
513            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
514                WriterProperties::builder().build(),
515                pos_delete_schema,
516                None,
517                file_io.clone(),
518                pos_delete_location_gen,
519                pos_delete_file_name_gen,
520            );
521            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
522                pos_delete_parquet_writer,
523                PositionDeleteWriterConfig::new(None, 0, None),
524            );
525
526            let eq_delete_config =
527                EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
528            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
529                eq_delete_config.projected_arrow_schema_ref(),
530            )?);
531            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
532                "{}/eq_delete",
533                temp_dir.path().to_str().unwrap()
534            ));
535            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
536                "eq_delete".to_string(),
537                None,
538                DataFileFormat::Parquet,
539            );
540            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
541                WriterProperties::builder().build(),
542                eq_delete_schema,
543                None,
544                file_io.clone(),
545                eq_delete_location_gen,
546                eq_delete_file_name_gen,
547            );
548            let eq_delete_writer =
549                EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
550
551            let data_writer_instance = data_writer.build().await?;
552            let pos_delete_writer_instance = pos_delete_writer.build().await?;
553            let eq_delete_writer_instance = eq_delete_writer.build().await?;
554            let mut delta_writer = DeltaWriter::try_new(
555                data_writer_instance,
556                pos_delete_writer_instance,
557                eq_delete_writer_instance,
558                vec![1],
559            )?;
560
561            // First, insert some rows
562            let insert_batch = create_test_batch_with_ops(
563                vec![1, 2, 3],
564                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
565                vec![1, 1, 1],
566            );
567            delta_writer.write(insert_batch).await?;
568
569            // Now delete rows that were just inserted (should create position deletes)
570            let delete_batch =
571                create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
572                    -1, -1,
573                ]);
574            delta_writer.write(delete_batch).await?;
575
576            let data_files = delta_writer.close().await?;
577
578            // Should have 1 data file + 1 position delete file
579            assert_eq!(data_files.len(), 2);
580
581            let data_file = data_files
582                .iter()
583                .find(|f| f.content == crate::spec::DataContentType::Data)
584                .unwrap();
585            let pos_delete_file = data_files
586                .iter()
587                .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
588                .unwrap();
589
590            assert_eq!(data_file.record_count, 3);
591            assert_eq!(pos_delete_file.record_count, 2);
592
593            // Verify position delete file content
594            let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
595            let content = input_file.read().await?;
596            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
597            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
598            assert_eq!(batches[0].num_rows(), 2);
599
600            Ok(())
601        }
602
603        #[tokio::test]
604        async fn test_delta_writer_equality_delete() -> Result<()> {
605            let temp_dir = TempDir::new().unwrap();
606            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
607            let schema = create_iceberg_schema();
608
609            // Create writers
610            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
611                "{}/data",
612                temp_dir.path().to_str().unwrap()
613            ));
614            let data_file_name_gen =
615                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
616            let data_parquet_writer = ParquetWriterBuilder::new(
617                WriterProperties::builder().build(),
618                schema.clone(),
619                None,
620                file_io.clone(),
621                data_location_gen,
622                data_file_name_gen,
623            );
624            let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
625
626            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
627                &PositionDeleteWriterConfig::arrow_schema(),
628            )?);
629            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
630                "{}/pos_delete",
631                temp_dir.path().to_str().unwrap()
632            ));
633            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
634                "pos_delete".to_string(),
635                None,
636                DataFileFormat::Parquet,
637            );
638            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
639                WriterProperties::builder().build(),
640                pos_delete_schema,
641                None,
642                file_io.clone(),
643                pos_delete_location_gen,
644                pos_delete_file_name_gen,
645            );
646            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
647                pos_delete_parquet_writer,
648                PositionDeleteWriterConfig::new(None, 0, None),
649            );
650
651            let eq_delete_config =
652                EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
653            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
654                eq_delete_config.projected_arrow_schema_ref(),
655            )?);
656            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
657                "{}/eq_delete",
658                temp_dir.path().to_str().unwrap()
659            ));
660            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
661                "eq_delete".to_string(),
662                None,
663                DataFileFormat::Parquet,
664            );
665            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
666                WriterProperties::builder().build(),
667                eq_delete_schema,
668                None,
669                file_io.clone(),
670                eq_delete_location_gen,
671                eq_delete_file_name_gen,
672            );
673            let eq_delete_writer =
674                EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
675
676            let data_writer_instance = data_writer.build().await?;
677            let pos_delete_writer_instance = pos_delete_writer.build().await?;
678            let eq_delete_writer_instance = eq_delete_writer.build().await?;
679            let mut delta_writer = DeltaWriter::try_new(
680                data_writer_instance,
681                pos_delete_writer_instance,
682                eq_delete_writer_instance,
683                vec![1],
684            )?;
685
686            // Delete rows that were never inserted (should create equality deletes)
687            let delete_batch =
688                create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
689            delta_writer.write(delete_batch).await?;
690
691            let data_files = delta_writer.close().await?;
692
693            // Should have only 1 equality delete file
694            assert_eq!(data_files.len(), 1);
695            assert_eq!(
696                data_files[0].content,
697                crate::spec::DataContentType::EqualityDeletes
698            );
699            assert_eq!(data_files[0].record_count, 2);
700
701            Ok(())
702        }
703
704        #[tokio::test]
705        async fn test_delta_writer_invalid_op() -> Result<()> {
706            let temp_dir = TempDir::new().unwrap();
707            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
708            let schema = create_iceberg_schema();
709
710            // Create writers
711            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
712                "{}/data",
713                temp_dir.path().to_str().unwrap()
714            ));
715            let data_file_name_gen =
716                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
717            let data_parquet_writer = ParquetWriterBuilder::new(
718                WriterProperties::builder().build(),
719                schema.clone(),
720                None,
721                file_io.clone(),
722                data_location_gen,
723                data_file_name_gen,
724            );
725            let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
726
727            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
728                &PositionDeleteWriterConfig::arrow_schema(),
729            )?);
730            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
731                "{}/pos_delete",
732                temp_dir.path().to_str().unwrap()
733            ));
734            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
735                "pos_delete".to_string(),
736                None,
737                DataFileFormat::Parquet,
738            );
739            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
740                WriterProperties::builder().build(),
741                pos_delete_schema,
742                None,
743                file_io.clone(),
744                pos_delete_location_gen,
745                pos_delete_file_name_gen,
746            );
747            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
748                pos_delete_parquet_writer,
749                PositionDeleteWriterConfig::new(None, 0, None),
750            );
751
752            let eq_delete_config =
753                EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
754            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
755                eq_delete_config.projected_arrow_schema_ref(),
756            )?);
757            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
758                "{}/eq_delete",
759                temp_dir.path().to_str().unwrap()
760            ));
761            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
762                "eq_delete".to_string(),
763                None,
764                DataFileFormat::Parquet,
765            );
766            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
767                WriterProperties::builder().build(),
768                eq_delete_schema,
769                None,
770                file_io.clone(),
771                eq_delete_location_gen,
772                eq_delete_file_name_gen,
773            );
774            let eq_delete_writer =
775                EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
776
777            let data_writer_instance = data_writer.build().await?;
778            let pos_delete_writer_instance = pos_delete_writer.build().await?;
779            let eq_delete_writer_instance = eq_delete_writer.build().await?;
780            let mut delta_writer = DeltaWriter::try_new(
781                data_writer_instance,
782                pos_delete_writer_instance,
783                eq_delete_writer_instance,
784                vec![1],
785            )?;
786
787            // Invalid operation code
788            let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
789
790            let result = delta_writer.write(batch).await;
791            assert!(result.is_err());
792            assert!(
793                result
794                    .unwrap_err()
795                    .to_string()
796                    .contains("Ops column must be 1 (insert) or -1 (delete)")
797            );
798
799            Ok(())
800        }
801    }
802}