Skip to main content

iceberg/writer/combined_writer/
delta_writer.rs

1//! Delta writers handle row-level changes by combining data file and delete file writers.
2//! The delta writer has three sub-writers:
3//! - A data file writer for new and updated rows.
4//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
5//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
6
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16
17use crate::arrow::record_batch_projector::RecordBatchProjector;
18use crate::spec::{DataFile, PartitionKey};
19use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
20use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
21use crate::{Error, ErrorKind, Result};
22
23/// Default maximum number of rows to track for position deletes.
24/// This limits memory usage for large streaming workloads.
25pub const DEFAULT_MAX_SEEN_ROWS: usize = 100_000;
26
27/// A builder for `DeltaWriter`.
28#[derive(Clone, Debug)]
29pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
30    data_writer_builder: DWB,
31    pos_delete_writer_builder: PDWB,
32    eq_delete_writer_builder: EDWB,
33    unique_cols: Vec<i32>,
34    max_seen_rows: usize,
35}
36
37impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
38    /// Creates a new `DeltaWriterBuilder`.
39    pub fn new(
40        data_writer_builder: DWB,
41        pos_delete_writer_builder: PDWB,
42        eq_delete_writer_builder: EDWB,
43        unique_cols: Vec<i32>,
44    ) -> Self {
45        Self {
46            data_writer_builder,
47            pos_delete_writer_builder,
48            eq_delete_writer_builder,
49            unique_cols,
50            max_seen_rows: DEFAULT_MAX_SEEN_ROWS,
51        }
52    }
53
54    /// Sets the maximum number of rows to track for position deletes.
55    ///
56    /// When this limit is reached, the oldest tracked rows are evicted.
57    /// Deletes for evicted rows will use equality deletes instead of
58    /// position deletes. Default is [`DEFAULT_MAX_SEEN_ROWS`].
59    ///
60    /// Set to `0` to disable row tracking entirely, causing all deletes
61    /// to use equality deletes. This eliminates memory overhead but may
62    /// reduce read performance.
63    pub fn with_max_seen_rows(mut self, max_seen_rows: usize) -> Self {
64        self.max_seen_rows = max_seen_rows;
65        self
66    }
67}
68
69#[async_trait::async_trait]
70impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
71where
72    DWB: IcebergWriterBuilder,
73    PDWB: IcebergWriterBuilder,
74    EDWB: IcebergWriterBuilder,
75    DWB::R: CurrentFileStatus,
76{
77    type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
78    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
79        let data_writer = self
80            .data_writer_builder
81            .build(partition_key.clone())
82            .await?;
83        let pos_delete_writer = self
84            .pos_delete_writer_builder
85            .build(partition_key.clone())
86            .await?;
87        let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
88        DeltaWriter::try_new(
89            data_writer,
90            pos_delete_writer,
91            eq_delete_writer,
92            self.unique_cols.clone(),
93            self.max_seen_rows,
94        )
95    }
96}
97
98/// Position information of a row in a data file.
99pub struct Position {
100    row_index: i64,
101    file_path: String,
102}
103
104/// A writer that handles row-level changes by combining data file and delete file writers.
105pub struct DeltaWriter<DW, PDW, EDW> {
106    /// The data file writer for new and updated rows.
107    pub data_writer: DW,
108    /// The position delete file writer for deletions of existing rows (that have been written within
109    /// this writer).
110    pub pos_delete_writer: PDW,
111    /// The equality delete file writer for deletions of rows based on equality conditions (for rows
112    /// that may exist in other data files).
113    pub eq_delete_writer: EDW,
114    /// The list of unique columns used for equality deletes.
115    pub unique_cols: Vec<i32>,
116    /// A map of rows (projected to unique columns) to their corresponding position information.
117    pub seen_rows: HashMap<OwnedRow, Position>,
118    /// Tracks insertion order for seen_rows to enable FIFO eviction.
119    seen_rows_order: VecDeque<OwnedRow>,
120    /// Maximum number of rows to track for position deletes.
121    max_seen_rows: usize,
122    /// A projector to project the record batch to the unique columns.
123    pub(crate) projector: RecordBatchProjector,
124    /// A converter to convert the projected columns to rows for easy comparison.
125    pub(crate) row_convertor: RowConverter,
126}
127
128impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
129where
130    DW: IcebergWriter + CurrentFileStatus,
131    PDW: IcebergWriter,
132    EDW: IcebergWriter,
133{
134    fn try_new(
135        data_writer: DW,
136        pos_delete_writer: PDW,
137        eq_delete_writer: EDW,
138        unique_cols: Vec<i32>,
139        max_seen_rows: usize,
140    ) -> Result<Self> {
141        let projector = RecordBatchProjector::from_iceberg_schema(
142            data_writer.current_schema(),
143            &unique_cols,
144        )?;
145
146        let row_convertor = RowConverter::new(
147            projector
148                .projected_schema_ref()
149                .fields()
150                .iter()
151                .map(|f| SortField::new(f.data_type().clone()))
152                .collect(),
153        )?;
154
155        Ok(Self {
156            data_writer,
157            pos_delete_writer,
158            eq_delete_writer,
159            unique_cols,
160            seen_rows: HashMap::new(),
161            seen_rows_order: VecDeque::new(),
162            max_seen_rows,
163            projector,
164            row_convertor,
165        })
166    }
167
168    async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
169        let batch_num_rows = batch.num_rows();
170
171        // Write first to ensure the data is persisted before updating our tracking state.
172        // This prevents inconsistent state if the write fails.
173        // Note: We must write before calling current_file_path() because the underlying
174        // writer may not have created the file yet (lazy initialization).
175        self.data_writer.write(batch.clone()).await?;
176
177        // Skip row tracking if disabled (max_seen_rows == 0)
178        if self.max_seen_rows == 0 {
179            return Ok(());
180        }
181
182        let rows = self.extract_unique_column_rows(&batch)?;
183
184        // Get file path and calculate start_row_index after successful write
185        let file_path = self.data_writer.current_file_path();
186        let end_row_num = self.data_writer.current_row_num();
187        let start_row_index = end_row_num - batch_num_rows;
188
189        // Record positions for each row in this batch
190        for (i, row) in rows.iter().enumerate() {
191            let owned_row = row.owned();
192            self.seen_rows.insert(owned_row.clone(), Position {
193                row_index: start_row_index as i64 + i as i64,
194                file_path: file_path.clone(),
195            });
196            self.seen_rows_order.push_back(owned_row);
197        }
198
199        // Evict oldest entries if we exceed the limit
200        self.evict_oldest_seen_rows();
201
202        Ok(())
203    }
204
205    /// Evicts the oldest entries from seen_rows when the limit is exceeded.
206    /// Entries that were already deleted are skipped (stale entries in the order queue).
207    fn evict_oldest_seen_rows(&mut self) {
208        while self.seen_rows.len() > self.max_seen_rows {
209            if let Some(old_row) = self.seen_rows_order.pop_front() {
210                // Only count as eviction if the row still exists (wasn't already deleted)
211                self.seen_rows.remove(&old_row);
212            } else {
213                // Queue is empty but HashMap still has entries - this shouldn't happen
214                // in normal operation, but break to avoid infinite loop
215                break;
216            }
217        }
218    }
219
220    async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
221        // If row tracking is disabled, write all deletes as equality deletes
222        if self.max_seen_rows == 0 {
223            self.eq_delete_writer
224                .write(batch)
225                .await
226                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
227            return Ok(());
228        }
229
230        let rows = self.extract_unique_column_rows(&batch)?;
231        let mut file_array = vec![];
232        let mut row_index_array = vec![];
233        // Build a boolean array to track which rows need equality deletes.
234        // True = row not seen before, needs equality delete
235        // False = row was seen, already handled via position delete
236        let mut needs_equality_delete = BooleanBuilder::new();
237
238        for row in rows.iter() {
239            if let Some(pos) = self.seen_rows.remove(&row.owned()) {
240                // Row was previously inserted, use position delete
241                row_index_array.push(pos.row_index);
242                file_array.push(pos.file_path.clone());
243                needs_equality_delete.append_value(false);
244            } else {
245                // Row not seen before, use equality delete
246                needs_equality_delete.append_value(true);
247            }
248        }
249
250        // Write position deletes for rows that were previously inserted
251        let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
252        let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
253
254        let position_batch =
255            RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
256                file_array,
257                row_index_array,
258            ])?;
259
260        if position_batch.num_rows() > 0 {
261            self.pos_delete_writer
262                .write(position_batch)
263                .await
264                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
265        }
266
267        // Write equality deletes for rows that were not previously inserted
268        let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
269            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
270
271        if eq_batch.num_rows() > 0 {
272            self.eq_delete_writer
273                .write(eq_batch)
274                .await
275                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
276        }
277
278        Ok(())
279    }
280
281    fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
282        self.row_convertor
283            .convert_columns(&self.projector.project_column(batch.columns())?)
284            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
285    }
286}
287
288#[async_trait::async_trait]
289impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
290where
291    DW: IcebergWriter + CurrentFileStatus,
292    PDW: IcebergWriter,
293    EDW: IcebergWriter,
294{
295    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
296        // Treat the last row as an op indicator +1 for insert, -1 for delete
297        let ops = batch
298            .column(batch.num_columns() - 1)
299            .as_any()
300            .downcast_ref::<Int32Array>()
301            .ok_or(Error::new(
302                ErrorKind::Unexpected,
303                "Failed to downcast ops column",
304            ))?;
305
306        let partition =
307            partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
308                Error::new(
309                    ErrorKind::Unexpected,
310                    format!("Failed to partition batch: {e}"),
311                )
312            })?;
313
314        for range in partition.ranges() {
315            let batch = batch
316                .project(&(0..batch.num_columns() - 1).collect_vec())
317                .map_err(|e| {
318                    Error::new(
319                        ErrorKind::Unexpected,
320                        format!("Failed to project batch columns: {e}"),
321                    )
322                })?
323                .slice(range.start, range.end - range.start);
324            match ops.value(range.start) {
325                1 => self.insert(batch).await?,
326                -1 => self.delete(batch).await?,
327                op => {
328                    return Err(Error::new(
329                        ErrorKind::Unexpected,
330                        format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
331                    ));
332                }
333            }
334        }
335
336        Ok(())
337    }
338
339    async fn close(&mut self) -> Result<Vec<DataFile>> {
340        let data_files = self.data_writer.close().await?;
341        let pos_delete_files = self.pos_delete_writer.close().await?;
342        let eq_delete_files = self.eq_delete_writer.close().await?;
343
344        Ok(data_files
345            .into_iter()
346            .chain(pos_delete_files)
347            .chain(eq_delete_files)
348            .collect())
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    mod delta_writer_tests {
357        use std::collections::HashMap;
358
359        use arrow_array::{Int32Array, RecordBatch, StringArray};
360        use arrow_schema::{DataType, Field, Schema};
361        use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
362        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
363        use parquet::file::properties::WriterProperties;
364        use tempfile::TempDir;
365
366        use super::*;
367        use crate::arrow::arrow_schema_to_schema;
368        use crate::io::FileIOBuilder;
369        use crate::spec::{
370            DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
371        };
372        use crate::writer::IcebergWriterBuilder;
373        use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
374        use crate::writer::base_writer::equality_delete_writer::{
375            EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
376        };
377        use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
378        use crate::writer::file_writer::ParquetWriterBuilder;
379        use crate::writer::file_writer::location_generator::{
380            DefaultFileNameGenerator, DefaultLocationGenerator,
381        };
382        use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
383
384        fn create_iceberg_schema() -> Arc<IcebergSchema> {
385            Arc::new(
386                IcebergSchema::builder()
387                    .with_schema_id(0)
388                    .with_fields(vec![
389                        NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
390                        NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
391                            .into(),
392                    ])
393                    .build()
394                    .unwrap(),
395            )
396        }
397
398        fn create_test_batch_with_ops(
399            ids: Vec<i32>,
400            names: Vec<Option<&str>>,
401            ops: Vec<i32>,
402        ) -> RecordBatch {
403            let schema = Arc::new(Schema::new(vec![
404                Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
405                    PARQUET_FIELD_ID_META_KEY.to_string(),
406                    "1".to_string(),
407                )])),
408                Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
409                    PARQUET_FIELD_ID_META_KEY.to_string(),
410                    "2".to_string(),
411                )])),
412                Field::new("op", DataType::Int32, false),
413            ]));
414
415            let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
416            let name_array: ArrayRef = Arc::new(StringArray::from(names));
417            let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
418
419            RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
420        }
421
422        #[tokio::test]
423        async fn test_delta_writer_insert_only() -> Result<()> {
424            let temp_dir = TempDir::new().unwrap();
425            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
426            let schema = create_iceberg_schema();
427
428            // Create data writer
429            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
430                "{}/data",
431                temp_dir.path().to_str().unwrap()
432            ));
433            let data_file_name_gen =
434                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
435            let data_parquet_writer =
436                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
437            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
438                data_parquet_writer,
439                schema.clone(),
440                file_io.clone(),
441                data_location_gen,
442                data_file_name_gen,
443            );
444            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
445
446            // Create position delete writer
447            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
448                &PositionDeleteWriterConfig::arrow_schema(),
449            )?);
450            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
451                "{}/pos_delete",
452                temp_dir.path().to_str().unwrap()
453            ));
454            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
455                "pos_delete".to_string(),
456                None,
457                DataFileFormat::Parquet,
458            );
459            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
460                WriterProperties::builder().build(),
461                pos_delete_schema.clone(),
462            );
463            let pos_delete_rolling_writer_builder =
464                RollingFileWriterBuilder::new_with_default_file_size(
465                    pos_delete_parquet_writer,
466                    pos_delete_schema,
467                    file_io.clone(),
468                    pos_delete_location_gen,
469                    pos_delete_file_name_gen,
470                );
471            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
472                pos_delete_rolling_writer_builder,
473                PositionDeleteWriterConfig::new(None, 0, None),
474            );
475
476            // Create equality delete writer
477            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
478            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
479                eq_delete_config.projected_arrow_schema_ref(),
480            )?);
481            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
482                "{}/eq_delete",
483                temp_dir.path().to_str().unwrap()
484            ));
485            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
486                "eq_delete".to_string(),
487                None,
488                DataFileFormat::Parquet,
489            );
490            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
491                WriterProperties::builder().build(),
492                eq_delete_schema.clone(),
493            );
494            let eq_delete_rolling_writer_builder =
495                RollingFileWriterBuilder::new_with_default_file_size(
496                    eq_delete_parquet_writer,
497                    eq_delete_schema,
498                    file_io.clone(),
499                    eq_delete_location_gen,
500                    eq_delete_file_name_gen,
501                );
502            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
503                eq_delete_rolling_writer_builder,
504                eq_delete_config,
505            );
506
507            // Create delta writer
508            let data_writer_instance = data_writer.build(None).await?;
509            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
510            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
511            let mut delta_writer = DeltaWriter::try_new(
512                data_writer_instance,
513                pos_delete_writer_instance,
514                eq_delete_writer_instance,
515                vec![1], // unique on id column
516                DEFAULT_MAX_SEEN_ROWS,
517            )?;
518
519            // Write batch with only inserts
520            let batch = create_test_batch_with_ops(
521                vec![1, 2, 3],
522                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
523                vec![1, 1, 1], // all inserts
524            );
525
526            delta_writer.write(batch).await?;
527            let data_files = delta_writer.close().await?;
528
529            // Should have 1 data file, 0 delete files
530            assert_eq!(data_files.len(), 1);
531            assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
532            assert_eq!(data_files[0].record_count, 3);
533
534            // Read back and verify
535            let input_file = file_io.new_input(data_files[0].file_path.clone())?;
536            let content = input_file.read().await?;
537            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
538            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
539            assert_eq!(batches.len(), 1);
540            assert_eq!(batches[0].num_rows(), 3);
541
542            Ok(())
543        }
544
545        #[tokio::test]
546        async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
547            let temp_dir = TempDir::new().unwrap();
548            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
549            let schema = create_iceberg_schema();
550
551            // Create writers (same setup as above)
552            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
553                "{}/data",
554                temp_dir.path().to_str().unwrap()
555            ));
556            let data_file_name_gen =
557                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
558            let data_parquet_writer =
559                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
560            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
561                data_parquet_writer,
562                schema.clone(),
563                file_io.clone(),
564                data_location_gen,
565                data_file_name_gen,
566            );
567            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
568
569            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
570                &PositionDeleteWriterConfig::arrow_schema(),
571            )?);
572            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
573                "{}/pos_delete",
574                temp_dir.path().to_str().unwrap()
575            ));
576            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
577                "pos_delete".to_string(),
578                None,
579                DataFileFormat::Parquet,
580            );
581            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
582                WriterProperties::builder().build(),
583                pos_delete_schema.clone(),
584            );
585            let pos_delete_rolling_writer_builder =
586                RollingFileWriterBuilder::new_with_default_file_size(
587                    pos_delete_parquet_writer,
588                    pos_delete_schema,
589                    file_io.clone(),
590                    pos_delete_location_gen,
591                    pos_delete_file_name_gen,
592                );
593            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
594                pos_delete_rolling_writer_builder,
595                PositionDeleteWriterConfig::new(None, 0, None),
596            );
597
598            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
599            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
600                eq_delete_config.projected_arrow_schema_ref(),
601            )?);
602            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
603                "{}/eq_delete",
604                temp_dir.path().to_str().unwrap()
605            ));
606            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
607                "eq_delete".to_string(),
608                None,
609                DataFileFormat::Parquet,
610            );
611            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
612                WriterProperties::builder().build(),
613                eq_delete_schema.clone(),
614            );
615            let eq_delete_rolling_writer_builder =
616                RollingFileWriterBuilder::new_with_default_file_size(
617                    eq_delete_parquet_writer,
618                    eq_delete_schema,
619                    file_io.clone(),
620                    eq_delete_location_gen,
621                    eq_delete_file_name_gen,
622                );
623            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
624                eq_delete_rolling_writer_builder,
625                eq_delete_config,
626            );
627
628            let data_writer_instance = data_writer.build(None).await?;
629            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
630            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
631            let mut delta_writer = DeltaWriter::try_new(
632                data_writer_instance,
633                pos_delete_writer_instance,
634                eq_delete_writer_instance,
635                vec![1],
636                DEFAULT_MAX_SEEN_ROWS,
637            )?;
638
639            // First, insert some rows
640            let insert_batch = create_test_batch_with_ops(
641                vec![1, 2, 3],
642                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
643                vec![1, 1, 1],
644            );
645            delta_writer.write(insert_batch).await?;
646
647            // Now delete rows that were just inserted (should create position deletes)
648            let delete_batch =
649                create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
650                    -1, -1,
651                ]);
652            delta_writer.write(delete_batch).await?;
653
654            let data_files = delta_writer.close().await?;
655
656            // Should have 1 data file + 1 position delete file
657            assert_eq!(data_files.len(), 2);
658
659            let data_file = data_files
660                .iter()
661                .find(|f| f.content == crate::spec::DataContentType::Data)
662                .unwrap();
663            let pos_delete_file = data_files
664                .iter()
665                .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
666                .unwrap();
667
668            assert_eq!(data_file.record_count, 3);
669            assert_eq!(pos_delete_file.record_count, 2);
670
671            // Verify position delete file content
672            let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
673            let content = input_file.read().await?;
674            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
675            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
676            assert_eq!(batches[0].num_rows(), 2);
677
678            Ok(())
679        }
680
681        #[tokio::test]
682        async fn test_delta_writer_equality_delete() -> Result<()> {
683            let temp_dir = TempDir::new().unwrap();
684            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
685            let schema = create_iceberg_schema();
686
687            // Create writers
688            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
689                "{}/data",
690                temp_dir.path().to_str().unwrap()
691            ));
692            let data_file_name_gen =
693                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
694            let data_parquet_writer =
695                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
696            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
697                data_parquet_writer,
698                schema.clone(),
699                file_io.clone(),
700                data_location_gen,
701                data_file_name_gen,
702            );
703            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
704
705            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
706                &PositionDeleteWriterConfig::arrow_schema(),
707            )?);
708            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
709                "{}/pos_delete",
710                temp_dir.path().to_str().unwrap()
711            ));
712            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
713                "pos_delete".to_string(),
714                None,
715                DataFileFormat::Parquet,
716            );
717            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
718                WriterProperties::builder().build(),
719                pos_delete_schema.clone(),
720            );
721            let pos_delete_rolling_writer_builder =
722                RollingFileWriterBuilder::new_with_default_file_size(
723                    pos_delete_parquet_writer,
724                    pos_delete_schema,
725                    file_io.clone(),
726                    pos_delete_location_gen,
727                    pos_delete_file_name_gen,
728                );
729            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
730                pos_delete_rolling_writer_builder,
731                PositionDeleteWriterConfig::new(None, 0, None),
732            );
733
734            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
735            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
736                eq_delete_config.projected_arrow_schema_ref(),
737            )?);
738            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
739                "{}/eq_delete",
740                temp_dir.path().to_str().unwrap()
741            ));
742            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
743                "eq_delete".to_string(),
744                None,
745                DataFileFormat::Parquet,
746            );
747            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
748                WriterProperties::builder().build(),
749                eq_delete_schema.clone(),
750            );
751            let eq_delete_rolling_writer_builder =
752                RollingFileWriterBuilder::new_with_default_file_size(
753                    eq_delete_parquet_writer,
754                    eq_delete_schema,
755                    file_io.clone(),
756                    eq_delete_location_gen,
757                    eq_delete_file_name_gen,
758                );
759            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
760                eq_delete_rolling_writer_builder,
761                eq_delete_config,
762            );
763
764            let data_writer_instance = data_writer.build(None).await?;
765            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
766            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
767            let mut delta_writer = DeltaWriter::try_new(
768                data_writer_instance,
769                pos_delete_writer_instance,
770                eq_delete_writer_instance,
771                vec![1],
772                DEFAULT_MAX_SEEN_ROWS,
773            )?;
774
775            // Delete rows that were never inserted (should create equality deletes)
776            let delete_batch =
777                create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
778            delta_writer.write(delete_batch).await?;
779
780            let data_files = delta_writer.close().await?;
781
782            // Should have only 1 equality delete file
783            assert_eq!(data_files.len(), 1);
784            assert_eq!(
785                data_files[0].content,
786                crate::spec::DataContentType::EqualityDeletes
787            );
788            assert_eq!(data_files[0].record_count, 2);
789
790            Ok(())
791        }
792
793        #[tokio::test]
794        async fn test_delta_writer_invalid_op() -> Result<()> {
795            let temp_dir = TempDir::new().unwrap();
796            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
797            let schema = create_iceberg_schema();
798
799            // Create writers
800            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
801                "{}/data",
802                temp_dir.path().to_str().unwrap()
803            ));
804            let data_file_name_gen =
805                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
806            let data_parquet_writer =
807                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
808            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
809                data_parquet_writer,
810                schema.clone(),
811                file_io.clone(),
812                data_location_gen,
813                data_file_name_gen,
814            );
815            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
816
817            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
818                &PositionDeleteWriterConfig::arrow_schema(),
819            )?);
820            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
821                "{}/pos_delete",
822                temp_dir.path().to_str().unwrap()
823            ));
824            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
825                "pos_delete".to_string(),
826                None,
827                DataFileFormat::Parquet,
828            );
829            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
830                WriterProperties::builder().build(),
831                pos_delete_schema.clone(),
832            );
833            let pos_delete_rolling_writer_builder =
834                RollingFileWriterBuilder::new_with_default_file_size(
835                    pos_delete_parquet_writer,
836                    pos_delete_schema,
837                    file_io.clone(),
838                    pos_delete_location_gen,
839                    pos_delete_file_name_gen,
840                );
841            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
842                pos_delete_rolling_writer_builder,
843                PositionDeleteWriterConfig::new(None, 0, None),
844            );
845
846            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
847            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
848                eq_delete_config.projected_arrow_schema_ref(),
849            )?);
850            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
851                "{}/eq_delete",
852                temp_dir.path().to_str().unwrap()
853            ));
854            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
855                "eq_delete".to_string(),
856                None,
857                DataFileFormat::Parquet,
858            );
859            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
860                WriterProperties::builder().build(),
861                eq_delete_schema.clone(),
862            );
863            let eq_delete_rolling_writer_builder =
864                RollingFileWriterBuilder::new_with_default_file_size(
865                    eq_delete_parquet_writer,
866                    eq_delete_schema,
867                    file_io.clone(),
868                    eq_delete_location_gen,
869                    eq_delete_file_name_gen,
870                );
871            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
872                eq_delete_rolling_writer_builder,
873                eq_delete_config,
874            );
875
876            let data_writer_instance = data_writer.build(None).await?;
877            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
878            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
879            let mut delta_writer = DeltaWriter::try_new(
880                data_writer_instance,
881                pos_delete_writer_instance,
882                eq_delete_writer_instance,
883                vec![1],
884                DEFAULT_MAX_SEEN_ROWS,
885            )?;
886
887            // Invalid operation code
888            let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
889
890            let result = delta_writer.write(batch).await;
891            assert!(result.is_err());
892            assert!(
893                result
894                    .unwrap_err()
895                    .to_string()
896                    .contains("Ops column must be 1 (insert) or -1 (delete)")
897            );
898
899            Ok(())
900        }
901    }
902}