Skip to main content

iceberg/writer/combined_writer/
delta_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Delta writers handle row-level changes by combining data file and delete file writers.
19//! The delta writer has three sub-writers:
20//! - A data file writer for new and updated rows.
21//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
22//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
23
24use std::collections::{HashMap, VecDeque};
25use std::sync::Arc;
26
27use arrow_array::builder::BooleanBuilder;
28use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
29use arrow_ord::partition::partition;
30use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
31use arrow_select::filter::filter_record_batch;
32use itertools::Itertools;
33
34use crate::arrow::record_batch_projector::RecordBatchProjector;
35use crate::spec::{DataFile, PartitionKey};
36use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
37use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
38use crate::{Error, ErrorKind, Result};
39
40/// Default maximum number of rows to track for position deletes.
41/// This limits memory usage for large streaming workloads.
42pub const DEFAULT_MAX_SEEN_ROWS: usize = 100_000;
43
44/// A builder for `DeltaWriter`.
45#[derive(Clone, Debug)]
46pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
47    data_writer_builder: DWB,
48    pos_delete_writer_builder: PDWB,
49    eq_delete_writer_builder: EDWB,
50    unique_cols: Vec<i32>,
51    max_seen_rows: usize,
52}
53
54impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
55    /// Creates a new `DeltaWriterBuilder`.
56    pub fn new(
57        data_writer_builder: DWB,
58        pos_delete_writer_builder: PDWB,
59        eq_delete_writer_builder: EDWB,
60        unique_cols: Vec<i32>,
61    ) -> Self {
62        Self {
63            data_writer_builder,
64            pos_delete_writer_builder,
65            eq_delete_writer_builder,
66            unique_cols,
67            max_seen_rows: DEFAULT_MAX_SEEN_ROWS,
68        }
69    }
70
71    /// Sets the maximum number of rows to track for position deletes.
72    ///
73    /// When this limit is reached, the oldest tracked rows are evicted.
74    /// Deletes for evicted rows will use equality deletes instead of
75    /// position deletes. Default is [`DEFAULT_MAX_SEEN_ROWS`].
76    ///
77    /// Set to `0` to disable row tracking entirely, causing all deletes
78    /// to use equality deletes. This eliminates memory overhead but may
79    /// reduce read performance.
80    pub fn with_max_seen_rows(mut self, max_seen_rows: usize) -> Self {
81        self.max_seen_rows = max_seen_rows;
82        self
83    }
84}
85
86#[async_trait::async_trait]
87impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
88where
89    DWB: IcebergWriterBuilder,
90    PDWB: IcebergWriterBuilder,
91    EDWB: IcebergWriterBuilder,
92    DWB::R: CurrentFileStatus,
93{
94    type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
95    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
96        let data_writer = self
97            .data_writer_builder
98            .build(partition_key.clone())
99            .await?;
100        let pos_delete_writer = self
101            .pos_delete_writer_builder
102            .build(partition_key.clone())
103            .await?;
104        let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
105        DeltaWriter::try_new(
106            data_writer,
107            pos_delete_writer,
108            eq_delete_writer,
109            self.unique_cols.clone(),
110            self.max_seen_rows,
111        )
112    }
113}
114
115/// Position information of a row in a data file.
116pub struct Position {
117    row_index: i64,
118    file_path: String,
119}
120
121/// A writer that handles row-level changes by combining data file and delete file writers.
122pub struct DeltaWriter<DW, PDW, EDW> {
123    /// The data file writer for new and updated rows.
124    pub data_writer: DW,
125    /// The position delete file writer for deletions of existing rows (that have been written within
126    /// this writer).
127    pub pos_delete_writer: PDW,
128    /// The equality delete file writer for deletions of rows based on equality conditions (for rows
129    /// that may exist in other data files).
130    pub eq_delete_writer: EDW,
131    /// The list of unique columns used for equality deletes.
132    pub unique_cols: Vec<i32>,
133    /// A map of rows (projected to unique columns) to their corresponding position information.
134    pub seen_rows: HashMap<OwnedRow, Position>,
135    /// Tracks insertion order for seen_rows to enable FIFO eviction.
136    seen_rows_order: VecDeque<OwnedRow>,
137    /// Maximum number of rows to track for position deletes.
138    max_seen_rows: usize,
139    /// A projector to project the record batch to the unique columns.
140    pub(crate) projector: RecordBatchProjector,
141    /// A converter to convert the projected columns to rows for easy comparison.
142    pub(crate) row_convertor: RowConverter,
143}
144
145impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
146where
147    DW: IcebergWriter + CurrentFileStatus,
148    PDW: IcebergWriter,
149    EDW: IcebergWriter,
150{
151    fn try_new(
152        data_writer: DW,
153        pos_delete_writer: PDW,
154        eq_delete_writer: EDW,
155        unique_cols: Vec<i32>,
156        max_seen_rows: usize,
157    ) -> Result<Self> {
158        let projector =
159            RecordBatchProjector::from_iceberg_schema(data_writer.current_schema(), &unique_cols)?;
160
161        let row_convertor = RowConverter::new(
162            projector
163                .projected_schema_ref()
164                .fields()
165                .iter()
166                .map(|f| SortField::new(f.data_type().clone()))
167                .collect(),
168        )?;
169
170        Ok(Self {
171            data_writer,
172            pos_delete_writer,
173            eq_delete_writer,
174            unique_cols,
175            seen_rows: HashMap::new(),
176            seen_rows_order: VecDeque::new(),
177            max_seen_rows,
178            projector,
179            row_convertor,
180        })
181    }
182
183    async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
184        let batch_num_rows = batch.num_rows();
185
186        // Write first to ensure the data is persisted before updating our tracking state.
187        // This prevents inconsistent state if the write fails.
188        // Note: We must write before calling current_file_path() because the underlying
189        // writer may not have created the file yet (lazy initialization).
190        self.data_writer.write(batch.clone()).await?;
191
192        // Skip row tracking if disabled (max_seen_rows == 0)
193        if self.max_seen_rows == 0 {
194            return Ok(());
195        }
196
197        let rows = self.extract_unique_column_rows(&batch)?;
198
199        // Get file path and calculate start_row_index after successful write
200        let file_path = self.data_writer.current_file_path();
201        let end_row_num = self.data_writer.current_row_num();
202        let start_row_index = end_row_num - batch_num_rows;
203
204        // Record positions for each row in this batch
205        for (i, row) in rows.iter().enumerate() {
206            let owned_row = row.owned();
207            self.seen_rows.insert(owned_row.clone(), Position {
208                row_index: start_row_index as i64 + i as i64,
209                file_path: file_path.clone(),
210            });
211            self.seen_rows_order.push_back(owned_row);
212        }
213
214        // Evict oldest entries if we exceed the limit
215        self.evict_oldest_seen_rows();
216
217        Ok(())
218    }
219
220    /// Evicts the oldest entries from seen_rows when the limit is exceeded.
221    /// Entries that were already deleted are skipped (stale entries in the order queue).
222    fn evict_oldest_seen_rows(&mut self) {
223        while self.seen_rows.len() > self.max_seen_rows {
224            if let Some(old_row) = self.seen_rows_order.pop_front() {
225                // Only count as eviction if the row still exists (wasn't already deleted)
226                self.seen_rows.remove(&old_row);
227            } else {
228                // Queue is empty but HashMap still has entries - this shouldn't happen
229                // in normal operation, but break to avoid infinite loop
230                break;
231            }
232        }
233    }
234
235    async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
236        // If row tracking is disabled, write all deletes as equality deletes
237        if self.max_seen_rows == 0 {
238            self.eq_delete_writer
239                .write(batch)
240                .await
241                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
242            return Ok(());
243        }
244
245        let rows = self.extract_unique_column_rows(&batch)?;
246        let mut file_array = vec![];
247        let mut row_index_array = vec![];
248        // Build a boolean array to track which rows need equality deletes.
249        // True = row not seen before, needs equality delete
250        // False = row was seen, already handled via position delete
251        let mut needs_equality_delete = BooleanBuilder::new();
252
253        for row in rows.iter() {
254            if let Some(pos) = self.seen_rows.remove(&row.owned()) {
255                // Row was previously inserted, use position delete
256                row_index_array.push(pos.row_index);
257                file_array.push(pos.file_path.clone());
258                needs_equality_delete.append_value(false);
259            } else {
260                // Row not seen before, use equality delete
261                needs_equality_delete.append_value(true);
262            }
263        }
264
265        // Write position deletes for rows that were previously inserted
266        let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
267        let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
268
269        let position_batch =
270            RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
271                file_array,
272                row_index_array,
273            ])?;
274
275        if position_batch.num_rows() > 0 {
276            self.pos_delete_writer
277                .write(position_batch)
278                .await
279                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
280        }
281
282        // Write equality deletes for rows that were not previously inserted
283        let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
284            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
285
286        if eq_batch.num_rows() > 0 {
287            self.eq_delete_writer
288                .write(eq_batch)
289                .await
290                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
291        }
292
293        Ok(())
294    }
295
296    fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
297        self.row_convertor
298            .convert_columns(&self.projector.project_column(batch.columns())?)
299            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
300    }
301}
302
303#[async_trait::async_trait]
304impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
305where
306    DW: IcebergWriter + CurrentFileStatus,
307    PDW: IcebergWriter,
308    EDW: IcebergWriter,
309{
310    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
311        // Treat the last row as an op indicator +1 for insert, -1 for delete
312        let ops = batch
313            .column(batch.num_columns() - 1)
314            .as_any()
315            .downcast_ref::<Int32Array>()
316            .ok_or(Error::new(
317                ErrorKind::Unexpected,
318                "Failed to downcast ops column",
319            ))?;
320
321        let partition =
322            partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
323                Error::new(
324                    ErrorKind::Unexpected,
325                    format!("Failed to partition batch: {e}"),
326                )
327            })?;
328
329        for range in partition.ranges() {
330            let batch = batch
331                .project(&(0..batch.num_columns() - 1).collect_vec())
332                .map_err(|e| {
333                    Error::new(
334                        ErrorKind::Unexpected,
335                        format!("Failed to project batch columns: {e}"),
336                    )
337                })?
338                .slice(range.start, range.end - range.start);
339            match ops.value(range.start) {
340                1 => self.insert(batch).await?,
341                -1 => self.delete(batch).await?,
342                op => {
343                    return Err(Error::new(
344                        ErrorKind::Unexpected,
345                        format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
346                    ));
347                }
348            }
349        }
350
351        Ok(())
352    }
353
354    async fn close(&mut self) -> Result<Vec<DataFile>> {
355        let data_files = self.data_writer.close().await?;
356        let pos_delete_files = self.pos_delete_writer.close().await?;
357        let eq_delete_files = self.eq_delete_writer.close().await?;
358
359        Ok(data_files
360            .into_iter()
361            .chain(pos_delete_files)
362            .chain(eq_delete_files)
363            .collect())
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    mod delta_writer_tests {
372        use std::collections::HashMap;
373
374        use arrow_array::{Int32Array, RecordBatch, StringArray};
375        use arrow_schema::{DataType, Field, Schema};
376        use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
377        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
378        use parquet::file::properties::WriterProperties;
379        use tempfile::TempDir;
380
381        use super::*;
382        use crate::arrow::arrow_schema_to_schema;
383        use crate::io::FileIO;
384        use crate::spec::{
385            DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
386        };
387        use crate::writer::IcebergWriterBuilder;
388        use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
389        use crate::writer::base_writer::equality_delete_writer::{
390            EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
391        };
392        use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
393        use crate::writer::file_writer::ParquetWriterBuilder;
394        use crate::writer::file_writer::location_generator::{
395            DefaultFileNameGenerator, DefaultLocationGenerator,
396        };
397        use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
398
399        fn create_iceberg_schema() -> Arc<IcebergSchema> {
400            Arc::new(
401                IcebergSchema::builder()
402                    .with_schema_id(0)
403                    .with_fields(vec![
404                        NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
405                        NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
406                            .into(),
407                    ])
408                    .build()
409                    .unwrap(),
410            )
411        }
412
413        fn create_test_batch_with_ops(
414            ids: Vec<i32>,
415            names: Vec<Option<&str>>,
416            ops: Vec<i32>,
417        ) -> RecordBatch {
418            let schema = Arc::new(Schema::new(vec![
419                Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
420                    PARQUET_FIELD_ID_META_KEY.to_string(),
421                    "1".to_string(),
422                )])),
423                Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
424                    PARQUET_FIELD_ID_META_KEY.to_string(),
425                    "2".to_string(),
426                )])),
427                Field::new("op", DataType::Int32, false),
428            ]));
429
430            let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
431            let name_array: ArrayRef = Arc::new(StringArray::from(names));
432            let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
433
434            RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
435        }
436
437        #[tokio::test]
438        async fn test_delta_writer_insert_only() -> Result<()> {
439            let temp_dir = TempDir::new().unwrap();
440            let file_io = FileIO::new_with_fs();
441            let schema = create_iceberg_schema();
442
443            // Create data writer
444            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
445                "{}/data",
446                temp_dir.path().to_str().unwrap()
447            ));
448            let data_file_name_gen =
449                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
450            let data_parquet_writer =
451                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
452            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
453                data_parquet_writer,
454                schema.clone(),
455                file_io.clone(),
456                data_location_gen,
457                data_file_name_gen,
458            );
459            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
460
461            // Create position delete writer
462            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
463                &PositionDeleteWriterConfig::arrow_schema(),
464            )?);
465            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
466                "{}/pos_delete",
467                temp_dir.path().to_str().unwrap()
468            ));
469            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
470                "pos_delete".to_string(),
471                None,
472                DataFileFormat::Parquet,
473            );
474            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
475                WriterProperties::builder().build(),
476                pos_delete_schema.clone(),
477            );
478            let pos_delete_rolling_writer_builder =
479                RollingFileWriterBuilder::new_with_default_file_size(
480                    pos_delete_parquet_writer,
481                    pos_delete_schema,
482                    file_io.clone(),
483                    pos_delete_location_gen,
484                    pos_delete_file_name_gen,
485                );
486            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
487                pos_delete_rolling_writer_builder,
488                PositionDeleteWriterConfig::new(None, 0, None),
489            );
490
491            // Create equality delete writer
492            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
493            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
494                eq_delete_config.projected_arrow_schema_ref(),
495            )?);
496            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
497                "{}/eq_delete",
498                temp_dir.path().to_str().unwrap()
499            ));
500            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
501                "eq_delete".to_string(),
502                None,
503                DataFileFormat::Parquet,
504            );
505            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
506                WriterProperties::builder().build(),
507                eq_delete_schema.clone(),
508            );
509            let eq_delete_rolling_writer_builder =
510                RollingFileWriterBuilder::new_with_default_file_size(
511                    eq_delete_parquet_writer,
512                    eq_delete_schema,
513                    file_io.clone(),
514                    eq_delete_location_gen,
515                    eq_delete_file_name_gen,
516                );
517            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
518                eq_delete_rolling_writer_builder,
519                eq_delete_config,
520            );
521
522            // Create delta writer
523            let data_writer_instance = data_writer.build(None).await?;
524            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
525            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
526            let mut delta_writer = DeltaWriter::try_new(
527                data_writer_instance,
528                pos_delete_writer_instance,
529                eq_delete_writer_instance,
530                vec![1], // unique on id column
531                DEFAULT_MAX_SEEN_ROWS,
532            )?;
533
534            // Write batch with only inserts
535            let batch = create_test_batch_with_ops(
536                vec![1, 2, 3],
537                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
538                vec![1, 1, 1], // all inserts
539            );
540
541            delta_writer.write(batch).await?;
542            let data_files = delta_writer.close().await?;
543
544            // Should have 1 data file, 0 delete files
545            assert_eq!(data_files.len(), 1);
546            assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
547            assert_eq!(data_files[0].record_count, 3);
548
549            // Read back and verify
550            let input_file = file_io.new_input(data_files[0].file_path.clone())?;
551            let content = input_file.read().await?;
552            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
553            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
554            assert_eq!(batches.len(), 1);
555            assert_eq!(batches[0].num_rows(), 3);
556
557            Ok(())
558        }
559
560        #[tokio::test]
561        async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
562            let temp_dir = TempDir::new().unwrap();
563            let file_io = FileIO::new_with_fs();
564            let schema = create_iceberg_schema();
565
566            // Create writers (same setup as above)
567            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
568                "{}/data",
569                temp_dir.path().to_str().unwrap()
570            ));
571            let data_file_name_gen =
572                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
573            let data_parquet_writer =
574                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
575            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
576                data_parquet_writer,
577                schema.clone(),
578                file_io.clone(),
579                data_location_gen,
580                data_file_name_gen,
581            );
582            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
583
584            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
585                &PositionDeleteWriterConfig::arrow_schema(),
586            )?);
587            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
588                "{}/pos_delete",
589                temp_dir.path().to_str().unwrap()
590            ));
591            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
592                "pos_delete".to_string(),
593                None,
594                DataFileFormat::Parquet,
595            );
596            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
597                WriterProperties::builder().build(),
598                pos_delete_schema.clone(),
599            );
600            let pos_delete_rolling_writer_builder =
601                RollingFileWriterBuilder::new_with_default_file_size(
602                    pos_delete_parquet_writer,
603                    pos_delete_schema,
604                    file_io.clone(),
605                    pos_delete_location_gen,
606                    pos_delete_file_name_gen,
607                );
608            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
609                pos_delete_rolling_writer_builder,
610                PositionDeleteWriterConfig::new(None, 0, None),
611            );
612
613            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
614            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
615                eq_delete_config.projected_arrow_schema_ref(),
616            )?);
617            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
618                "{}/eq_delete",
619                temp_dir.path().to_str().unwrap()
620            ));
621            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
622                "eq_delete".to_string(),
623                None,
624                DataFileFormat::Parquet,
625            );
626            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
627                WriterProperties::builder().build(),
628                eq_delete_schema.clone(),
629            );
630            let eq_delete_rolling_writer_builder =
631                RollingFileWriterBuilder::new_with_default_file_size(
632                    eq_delete_parquet_writer,
633                    eq_delete_schema,
634                    file_io.clone(),
635                    eq_delete_location_gen,
636                    eq_delete_file_name_gen,
637                );
638            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
639                eq_delete_rolling_writer_builder,
640                eq_delete_config,
641            );
642
643            let data_writer_instance = data_writer.build(None).await?;
644            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
645            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
646            let mut delta_writer = DeltaWriter::try_new(
647                data_writer_instance,
648                pos_delete_writer_instance,
649                eq_delete_writer_instance,
650                vec![1],
651                DEFAULT_MAX_SEEN_ROWS,
652            )?;
653
654            // First, insert some rows
655            let insert_batch = create_test_batch_with_ops(
656                vec![1, 2, 3],
657                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
658                vec![1, 1, 1],
659            );
660            delta_writer.write(insert_batch).await?;
661
662            // Now delete rows that were just inserted (should create position deletes)
663            let delete_batch =
664                create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
665                    -1, -1,
666                ]);
667            delta_writer.write(delete_batch).await?;
668
669            let data_files = delta_writer.close().await?;
670
671            // Should have 1 data file + 1 position delete file
672            assert_eq!(data_files.len(), 2);
673
674            let data_file = data_files
675                .iter()
676                .find(|f| f.content == crate::spec::DataContentType::Data)
677                .unwrap();
678            let pos_delete_file = data_files
679                .iter()
680                .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
681                .unwrap();
682
683            assert_eq!(data_file.record_count, 3);
684            assert_eq!(pos_delete_file.record_count, 2);
685
686            // Verify position delete file content
687            let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
688            let content = input_file.read().await?;
689            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
690            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
691            assert_eq!(batches[0].num_rows(), 2);
692
693            Ok(())
694        }
695
696        #[tokio::test]
697        async fn test_delta_writer_equality_delete() -> Result<()> {
698            let temp_dir = TempDir::new().unwrap();
699            let file_io = FileIO::new_with_fs();
700            let schema = create_iceberg_schema();
701
702            // Create writers
703            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
704                "{}/data",
705                temp_dir.path().to_str().unwrap()
706            ));
707            let data_file_name_gen =
708                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
709            let data_parquet_writer =
710                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
711            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
712                data_parquet_writer,
713                schema.clone(),
714                file_io.clone(),
715                data_location_gen,
716                data_file_name_gen,
717            );
718            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
719
720            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
721                &PositionDeleteWriterConfig::arrow_schema(),
722            )?);
723            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
724                "{}/pos_delete",
725                temp_dir.path().to_str().unwrap()
726            ));
727            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
728                "pos_delete".to_string(),
729                None,
730                DataFileFormat::Parquet,
731            );
732            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
733                WriterProperties::builder().build(),
734                pos_delete_schema.clone(),
735            );
736            let pos_delete_rolling_writer_builder =
737                RollingFileWriterBuilder::new_with_default_file_size(
738                    pos_delete_parquet_writer,
739                    pos_delete_schema,
740                    file_io.clone(),
741                    pos_delete_location_gen,
742                    pos_delete_file_name_gen,
743                );
744            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
745                pos_delete_rolling_writer_builder,
746                PositionDeleteWriterConfig::new(None, 0, None),
747            );
748
749            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
750            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
751                eq_delete_config.projected_arrow_schema_ref(),
752            )?);
753            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
754                "{}/eq_delete",
755                temp_dir.path().to_str().unwrap()
756            ));
757            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
758                "eq_delete".to_string(),
759                None,
760                DataFileFormat::Parquet,
761            );
762            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
763                WriterProperties::builder().build(),
764                eq_delete_schema.clone(),
765            );
766            let eq_delete_rolling_writer_builder =
767                RollingFileWriterBuilder::new_with_default_file_size(
768                    eq_delete_parquet_writer,
769                    eq_delete_schema,
770                    file_io.clone(),
771                    eq_delete_location_gen,
772                    eq_delete_file_name_gen,
773                );
774            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
775                eq_delete_rolling_writer_builder,
776                eq_delete_config,
777            );
778
779            let data_writer_instance = data_writer.build(None).await?;
780            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
781            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
782            let mut delta_writer = DeltaWriter::try_new(
783                data_writer_instance,
784                pos_delete_writer_instance,
785                eq_delete_writer_instance,
786                vec![1],
787                DEFAULT_MAX_SEEN_ROWS,
788            )?;
789
790            // Delete rows that were never inserted (should create equality deletes)
791            let delete_batch =
792                create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
793            delta_writer.write(delete_batch).await?;
794
795            let data_files = delta_writer.close().await?;
796
797            // Should have only 1 equality delete file
798            assert_eq!(data_files.len(), 1);
799            assert_eq!(
800                data_files[0].content,
801                crate::spec::DataContentType::EqualityDeletes
802            );
803            assert_eq!(data_files[0].record_count, 2);
804
805            Ok(())
806        }
807
808        #[tokio::test]
809        async fn test_delta_writer_invalid_op() -> Result<()> {
810            let temp_dir = TempDir::new().unwrap();
811            let file_io = FileIO::new_with_fs();
812            let schema = create_iceberg_schema();
813
814            // Create writers
815            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
816                "{}/data",
817                temp_dir.path().to_str().unwrap()
818            ));
819            let data_file_name_gen =
820                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
821            let data_parquet_writer =
822                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
823            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
824                data_parquet_writer,
825                schema.clone(),
826                file_io.clone(),
827                data_location_gen,
828                data_file_name_gen,
829            );
830            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
831
832            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
833                &PositionDeleteWriterConfig::arrow_schema(),
834            )?);
835            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
836                "{}/pos_delete",
837                temp_dir.path().to_str().unwrap()
838            ));
839            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
840                "pos_delete".to_string(),
841                None,
842                DataFileFormat::Parquet,
843            );
844            let pos_delete_parquet_writer = ParquetWriterBuilder::new(
845                WriterProperties::builder().build(),
846                pos_delete_schema.clone(),
847            );
848            let pos_delete_rolling_writer_builder =
849                RollingFileWriterBuilder::new_with_default_file_size(
850                    pos_delete_parquet_writer,
851                    pos_delete_schema,
852                    file_io.clone(),
853                    pos_delete_location_gen,
854                    pos_delete_file_name_gen,
855                );
856            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
857                pos_delete_rolling_writer_builder,
858                PositionDeleteWriterConfig::new(None, 0, None),
859            );
860
861            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
862            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
863                eq_delete_config.projected_arrow_schema_ref(),
864            )?);
865            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
866                "{}/eq_delete",
867                temp_dir.path().to_str().unwrap()
868            ));
869            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
870                "eq_delete".to_string(),
871                None,
872                DataFileFormat::Parquet,
873            );
874            let eq_delete_parquet_writer = ParquetWriterBuilder::new(
875                WriterProperties::builder().build(),
876                eq_delete_schema.clone(),
877            );
878            let eq_delete_rolling_writer_builder =
879                RollingFileWriterBuilder::new_with_default_file_size(
880                    eq_delete_parquet_writer,
881                    eq_delete_schema,
882                    file_io.clone(),
883                    eq_delete_location_gen,
884                    eq_delete_file_name_gen,
885                );
886            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
887                eq_delete_rolling_writer_builder,
888                eq_delete_config,
889            );
890
891            let data_writer_instance = data_writer.build(None).await?;
892            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
893            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
894            let mut delta_writer = DeltaWriter::try_new(
895                data_writer_instance,
896                pos_delete_writer_instance,
897                eq_delete_writer_instance,
898                vec![1],
899                DEFAULT_MAX_SEEN_ROWS,
900            )?;
901
902            // Invalid operation code
903            let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
904
905            let result = delta_writer.write(batch).await;
906            assert!(result.is_err());
907            assert!(
908                result
909                    .unwrap_err()
910                    .to_string()
911                    .contains("Ops column must be 1 (insert) or -1 (delete)")
912            );
913
914            Ok(())
915        }
916    }
917}