iceberg/writer/combined_writer/
delta_writer.rs

1//! Delta writers handle row-level changes by combining data file and delete file writers.
2//! The delta writer has three sub-writers:
3//! - A data file writer for new and updated rows.
4//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
5//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, make_array};
12use arrow_buffer::NullBuffer;
13use arrow_ord::partition::partition;
14use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
15use arrow_schema::{DataType, Field, FieldRef, Fields};
16use arrow_select::filter::filter_record_batch;
17use itertools::Itertools;
18use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
19
20use crate::arrow::schema_to_arrow_schema;
21use crate::spec::{DataFile, PartitionKey};
22use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
23use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
24use crate::{Error, ErrorKind, Result};
25
26/// A projector that projects an Arrow RecordBatch to a subset of its columns based on field IDs.
27#[derive(Debug)]
28pub(crate) struct BatchProjector {
29    // Arrow arrays can be nested, so we need a Vec<Vec<usize>> to represent the indices of the columns to project.
30    field_indices: Vec<Vec<usize>>,
31    projected_schema: arrow_schema::SchemaRef,
32}
33
34//The batchprojector is extremely inspired by rinsingwaves impl. thanks to them!
35impl BatchProjector {
36    pub fn new<F1, F2>(
37        original_schema: &arrow_schema::Schema,
38        field_ids: &[i32],
39        field_id_fetch_fn: F1,
40        nested_field_fetch: F2,
41    ) -> Result<Self>
42    where
43        F1: Fn(&Field) -> Result<Option<i32>>,
44        F2: Fn(&Field) -> bool,
45    {
46        let mut field_indices = Vec::with_capacity(field_ids.len());
47        let mut projected_fields = Vec::with_capacity(field_ids.len());
48
49        for &field_id in field_ids {
50            if let Some((field, indices)) = Self::fetch_field_index(
51                original_schema.fields(),
52                field_id,
53                &field_id_fetch_fn,
54                &nested_field_fetch,
55            )? {
56                field_indices.push(indices);
57                projected_fields.push(field);
58            } else {
59                return Err(Error::new(
60                    ErrorKind::Unexpected,
61                    format!(
62                        "Field ID {} not found in schema {:?}",
63                        field_id, original_schema
64                    ),
65                ));
66            }
67        }
68
69        if field_indices.is_empty() {
70            return Err(Error::new(
71                ErrorKind::Unexpected,
72                "No valid fields found for the provided field IDs",
73            ));
74        }
75
76        let projected_schema = arrow_schema::Schema::new(projected_fields);
77        Ok(Self {
78            field_indices,
79            projected_schema: Arc::new(projected_schema),
80        })
81    }
82
83    fn projected_schema_ref(&self) -> arrow_schema::SchemaRef {
84        self.projected_schema.clone()
85    }
86
87    fn fetch_field_index<F1, F2>(
88        fields: &Fields,
89        target_field_id: i32,
90        field_id_fetch_fn: &F1,
91        nested_field_fetch: &F2,
92    ) -> Result<Option<(FieldRef, Vec<usize>)>>
93    where
94        F1: Fn(&Field) -> Result<Option<i32>>,
95        F2: Fn(&Field) -> bool,
96    {
97        for (pos, field) in fields.iter().enumerate() {
98            let id = field_id_fetch_fn(field)?;
99            if let Some(field_id) = id {
100                if field_id == target_field_id {
101                    return Ok(Some((field.clone(), vec![pos])));
102                }
103            }
104            if let DataType::Struct(inner_struct) = field.data_type() {
105                if nested_field_fetch(field) {
106                    if let Some((field, mut sub_indices)) = Self::fetch_field_index(
107                        &inner_struct,
108                        target_field_id,
109                        field_id_fetch_fn,
110                        nested_field_fetch,
111                    )? {
112                        sub_indices.insert(0, pos);
113                        return Ok(Some((field, sub_indices)));
114                    }
115                }
116            }
117        }
118        Ok(None)
119    }
120
121    fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
122        let columns = self.project_columns(batch.columns())?;
123        RecordBatch::try_new(self.projected_schema.clone(), columns)
124            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
125    }
126
127    pub fn project_columns(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
128        self.field_indices
129            .iter()
130            .map(|indices| Self::get_col_by_id(batch, indices))
131            .collect()
132    }
133
134    fn get_col_by_id(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
135        if field_index.is_empty() {
136            return Err(Error::new(
137                ErrorKind::Unexpected,
138                "Field index cannot be empty",
139            ));
140        }
141
142        let mut iter = field_index.iter();
143        let first_index = *iter.next().unwrap();
144        let mut array = batch[first_index].clone();
145        let mut null_buffer = array.logical_nulls();
146
147        for &i in iter {
148            let struct_array = array
149                .as_any()
150                .downcast_ref::<arrow_array::StructArray>()
151                .ok_or_else(|| {
152                    Error::new(
153                        ErrorKind::Unexpected,
154                        "Expected struct array when traversing nested fields",
155                    )
156                })?;
157
158            array = struct_array.column(i).clone();
159            null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref());
160        }
161
162        Ok(make_array(
163            array.to_data().into_builder().nulls(null_buffer).build()?,
164        ))
165    }
166}
167
168/// A builder for `DeltaWriter`.
169#[derive(Clone, Debug)]
170pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
171    data_writer_builder: DWB,
172    pos_delete_writer_builder: PDWB,
173    eq_delete_writer_builder: EDWB,
174    unique_cols: Vec<i32>,
175}
176
177impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
178    /// Creates a new `DeltaWriterBuilder`.
179    pub fn new(
180        data_writer_builder: DWB,
181        pos_delete_writer_builder: PDWB,
182        eq_delete_writer_builder: EDWB,
183        unique_cols: Vec<i32>,
184    ) -> Self {
185        Self {
186            data_writer_builder,
187            pos_delete_writer_builder,
188            eq_delete_writer_builder,
189            unique_cols,
190        }
191    }
192}
193
194#[async_trait::async_trait]
195impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
196where
197    DWB: IcebergWriterBuilder,
198    PDWB: IcebergWriterBuilder,
199    EDWB: IcebergWriterBuilder,
200    DWB::R: CurrentFileStatus,
201{
202    type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
203    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
204        let data_writer = self
205            .data_writer_builder
206            .build(partition_key.clone())
207            .await?;
208        let pos_delete_writer = self
209            .pos_delete_writer_builder
210            .build(partition_key.clone())
211            .await?;
212        let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
213        DeltaWriter::try_new(
214            data_writer,
215            pos_delete_writer,
216            eq_delete_writer,
217            self.unique_cols.clone(),
218        )
219    }
220}
221
222/// Position information of a row in a data file.
223pub struct Position {
224    row_index: i64,
225    file_path: String,
226}
227
228/// A writer that handles row-level changes by combining data file and delete file writers.
229pub struct DeltaWriter<DW, PDW, EDW> {
230    /// The data file writer for new and updated rows.
231    pub data_writer: DW,
232    /// The position delete file writer for deletions of existing rows (that have been written within
233    /// this writer).
234    pub pos_delete_writer: PDW,
235    /// The equality delete file writer for deletions of rows based on equality conditions (for rows
236    /// that may exist in other data files).
237    pub eq_delete_writer: EDW,
238    /// The list of unique columns used for equality deletes.
239    pub unique_cols: Vec<i32>,
240    /// A map of rows (projected to unique columns) to their corresponding position information.
241    pub seen_rows: HashMap<OwnedRow, Position>,
242    /// A projector to project the record batch to the unique columns.
243    pub(crate) projector: BatchProjector,
244    /// A converter to convert the projected columns to rows for easy comparison.
245    pub(crate) row_convertor: RowConverter,
246}
247
248impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
249where
250    DW: IcebergWriter + CurrentFileStatus,
251    PDW: IcebergWriter,
252    EDW: IcebergWriter,
253{
254    fn try_new(
255        data_writer: DW,
256        pos_delete_writer: PDW,
257        eq_delete_writer: EDW,
258        unique_cols: Vec<i32>,
259    ) -> Result<Self> {
260        let projector = BatchProjector::new(
261            &schema_to_arrow_schema(&data_writer.current_schema())?,
262            &unique_cols,
263            |field| {
264                if field.data_type().is_nested() {
265                    return Ok(None);
266                }
267                field
268                    .metadata()
269                    .get(PARQUET_FIELD_ID_META_KEY)
270                    .map(|id_str| {
271                        id_str.parse::<i32>().map_err(|e| {
272                            Error::new(
273                                ErrorKind::Unexpected,
274                                format!("Failed to parse field ID {}: {}", id_str, e),
275                            )
276                        })
277                    })
278                    .transpose()
279            },
280            |_| false,
281        )?;
282
283        let row_convertor = RowConverter::new(
284            projector
285                .projected_schema_ref()
286                .fields()
287                .iter()
288                .map(|f| SortField::new(f.data_type().clone()))
289                .collect(),
290        )?;
291
292        Ok(Self {
293            data_writer,
294            pos_delete_writer,
295            eq_delete_writer,
296            unique_cols,
297            seen_rows: HashMap::new(),
298            projector,
299            row_convertor,
300        })
301    }
302
303    async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
304        let rows = self.extract_unique_column_rows(&batch)?;
305        let batch_num_rows = batch.num_rows();
306
307        // Write first to ensure the data is persisted before updating our tracking state.
308        // This prevents inconsistent state if the write fails.
309        // Note: We must write before calling current_file_path() because the underlying
310        // writer may not have created the file yet (lazy initialization).
311        self.data_writer.write(batch.clone()).await?;
312
313        // Get file path and calculate start_row_index after successful write
314        let file_path = self.data_writer.current_file_path();
315        let end_row_num = self.data_writer.current_row_num();
316        let start_row_index = end_row_num - batch_num_rows;
317
318        // Record positions for each row in this batch
319        for (i, row) in rows.iter().enumerate() {
320            self.seen_rows.insert(row.owned(), Position {
321                row_index: start_row_index as i64 + i as i64,
322                file_path: file_path.clone(),
323            });
324        }
325
326        Ok(())
327    }
328
329    async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
330        let rows = self.extract_unique_column_rows(&batch)?;
331        let mut file_array = vec![];
332        let mut row_index_array = vec![];
333        // Build a boolean array to track which rows need equality deletes.
334        // True = row not seen before, needs equality delete
335        // False = row was seen, already handled via position delete
336        let mut needs_equality_delete = BooleanBuilder::new();
337
338        for row in rows.iter() {
339            if let Some(pos) = self.seen_rows.remove(&row.owned()) {
340                // Row was previously inserted, use position delete
341                row_index_array.push(pos.row_index);
342                file_array.push(pos.file_path.clone());
343                needs_equality_delete.append_value(false);
344            } else {
345                // Row not seen before, use equality delete
346                needs_equality_delete.append_value(true);
347            }
348        }
349
350        // Write position deletes for rows that were previously inserted
351        let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
352        let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
353
354        let position_batch =
355            RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
356                file_array,
357                row_index_array,
358            ])?;
359
360        if position_batch.num_rows() > 0 {
361            self.pos_delete_writer
362                .write(position_batch)
363                .await
364                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
365        }
366
367        // Write equality deletes for rows that were not previously inserted
368        let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
369            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
370
371        if eq_batch.num_rows() > 0 {
372            self.eq_delete_writer
373                .write(eq_batch)
374                .await
375                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
376        }
377
378        Ok(())
379    }
380
381    fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
382        self.row_convertor
383            .convert_columns(&self.projector.project_columns(batch.columns())?)
384            .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
385    }
386}
387
388#[async_trait::async_trait]
389impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
390where
391    DW: IcebergWriter + CurrentFileStatus,
392    PDW: IcebergWriter,
393    EDW: IcebergWriter,
394{
395    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
396        // Treat the last row as an op indicator +1 for insert, -1 for delete
397        let ops = batch
398            .column(batch.num_columns() - 1)
399            .as_any()
400            .downcast_ref::<Int32Array>()
401            .ok_or(Error::new(
402                ErrorKind::Unexpected,
403                "Failed to downcast ops column",
404            ))?;
405
406        let partition =
407            partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
408                Error::new(
409                    ErrorKind::Unexpected,
410                    format!("Failed to partition batch: {e}"),
411                )
412            })?;
413
414        for range in partition.ranges() {
415            let batch = batch
416                .project(&(0..batch.num_columns() - 1).collect_vec())
417                .map_err(|e| {
418                    Error::new(
419                        ErrorKind::Unexpected,
420                        format!("Failed to project batch columns: {e}"),
421                    )
422                })?
423                .slice(range.start, range.end - range.start);
424            match ops.value(range.start) {
425                1 => self.insert(batch).await?,
426                -1 => self.delete(batch).await?,
427                op => {
428                    return Err(Error::new(
429                        ErrorKind::Unexpected,
430                        format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
431                    ));
432                }
433            }
434        }
435
436        Ok(())
437    }
438
439    async fn close(&mut self) -> Result<Vec<DataFile>> {
440        let data_files = self.data_writer.close().await?;
441        let pos_delete_files = self.pos_delete_writer.close().await?;
442        let eq_delete_files = self.eq_delete_writer.close().await?;
443
444        Ok(data_files
445            .into_iter()
446            .chain(pos_delete_files)
447            .chain(eq_delete_files)
448            .collect())
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use std::sync::Arc;
455
456    use arrow_array::{Array, Int32Array, StringArray, StructArray};
457    use arrow_schema::{DataType, Field, Schema};
458
459    use super::*;
460
461    fn test_field_id_fetch(field: &Field) -> Result<Option<i32>> {
462        // Mock field ID extraction - use the field name as ID for testing
463        match field.name().as_str() {
464            "id" => Ok(Some(1)),
465            "name" => Ok(Some(2)),
466            "address" => Ok(Some(3)),
467            "street" => Ok(Some(4)),
468            "city" => Ok(Some(5)),
469            "age" => Ok(Some(6)),
470            _ => Ok(None),
471        }
472    }
473
474    fn test_nested_field_fetch(field: &Field) -> bool {
475        // Allow traversing into struct fields
476        matches!(field.data_type(), DataType::Struct(_))
477    }
478
479    fn create_test_schema() -> Schema {
480        Schema::new(vec![
481            Field::new("id", DataType::Int32, false),
482            Field::new("name", DataType::Utf8, true),
483            Field::new(
484                "address",
485                DataType::Struct(
486                    vec![
487                        Field::new("street", DataType::Utf8, true),
488                        Field::new("city", DataType::Utf8, true),
489                    ]
490                    .into(),
491                ),
492                true,
493            ),
494            Field::new("age", DataType::Int32, true),
495        ])
496    }
497
498    fn create_test_batch() -> RecordBatch {
499        let schema = Arc::new(create_test_schema());
500
501        let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
502        let name_array = Arc::new(StringArray::from(vec![Some("John"), Some("Jane"), None]));
503
504        let street_array = Arc::new(StringArray::from(vec![
505            Some("123 Main St"),
506            None,
507            Some("789 Oak Ave"),
508        ]));
509        let city_array = Arc::new(StringArray::from(vec![Some("NYC"), Some("LA"), None]));
510
511        let address_array = Arc::new(StructArray::from(vec![
512            (
513                Arc::new(Field::new("street", DataType::Utf8, true)),
514                street_array as ArrayRef,
515            ),
516            (
517                Arc::new(Field::new("city", DataType::Utf8, true)),
518                city_array as ArrayRef,
519            ),
520        ]));
521
522        let age_array = Arc::new(Int32Array::from(vec![Some(25), Some(30), None]));
523
524        RecordBatch::try_new(schema, vec![
525            id_array as ArrayRef,
526            name_array as ArrayRef,
527            address_array as ArrayRef,
528            age_array as ArrayRef,
529        ])
530        .unwrap()
531    }
532
533    #[test]
534    fn test_projector_simple_fields() {
535        let schema = create_test_schema();
536        let batch = create_test_batch();
537
538        // Project id and name fields
539        let projector = BatchProjector::new(
540            &schema,
541            &[1, 2], // id, name
542            test_field_id_fetch,
543            test_nested_field_fetch,
544        )
545        .unwrap();
546
547        let projected = projector.project_columns(batch.columns()).unwrap();
548
549        assert_eq!(projected.len(), 2);
550
551        // Check id column
552        let id_array = projected[0].as_any().downcast_ref::<Int32Array>().unwrap();
553        assert_eq!(id_array.values(), &[1, 2, 3]);
554
555        // Check name column
556        let name_array = projected[1].as_any().downcast_ref::<StringArray>().unwrap();
557        assert_eq!(name_array.value(0), "John");
558        assert_eq!(name_array.value(1), "Jane");
559        assert!(name_array.is_null(2));
560    }
561
562    #[test]
563    fn test_projector_nested_fields() {
564        let schema = create_test_schema();
565        let batch = create_test_batch();
566
567        // Project nested street field
568        let projector = BatchProjector::new(
569            &schema,
570            &[4], // street
571            test_field_id_fetch,
572            test_nested_field_fetch,
573        )
574        .unwrap();
575
576        let projected = projector.project_columns(batch.columns()).unwrap();
577
578        assert_eq!(projected.len(), 1);
579
580        let street_array = projected[0].as_any().downcast_ref::<StringArray>().unwrap();
581        assert_eq!(street_array.value(0), "123 Main St");
582        assert!(street_array.is_null(1));
583        assert_eq!(street_array.value(2), "789 Oak Ave");
584    }
585
586    #[test]
587    fn test_projector_mixed_fields() {
588        let schema = create_test_schema();
589        let batch = create_test_batch();
590
591        // Project id, street, and age
592        let projector = BatchProjector::new(
593            &schema,
594            &[1, 4, 6], // id, street, age
595            test_field_id_fetch,
596            test_nested_field_fetch,
597        )
598        .unwrap();
599
600        let projected = projector.project_columns(batch.columns()).unwrap();
601
602        assert_eq!(projected.len(), 3);
603
604        // Check projected schema
605        assert_eq!(projector.projected_schema.fields().len(), 3);
606        assert_eq!(projector.projected_schema.field(0).name(), "id");
607        assert_eq!(projector.projected_schema.field(1).name(), "street");
608        assert_eq!(projector.projected_schema.field(2).name(), "age");
609    }
610
611    #[test]
612    fn test_projector_field_not_found() {
613        let schema = create_test_schema();
614
615        let result = BatchProjector::new(
616            &schema,
617            &[999], // non-existent field ID
618            test_field_id_fetch,
619            test_nested_field_fetch,
620        );
621
622        assert!(result.is_err());
623        assert!(
624            result
625                .unwrap_err()
626                .to_string()
627                .contains("Field ID 999 not found")
628        );
629    }
630
631    #[test]
632    fn test_projector_empty_field_ids() {
633        let schema = create_test_schema();
634
635        let result = BatchProjector::new(
636            &schema,
637            &[], // empty field IDs
638            test_field_id_fetch,
639            test_nested_field_fetch,
640        );
641
642        assert!(result.is_err());
643        assert!(
644            result
645                .unwrap_err()
646                .to_string()
647                .contains("No valid fields found")
648        );
649    }
650
651    #[test]
652    fn test_get_col_by_id_empty_index() {
653        let batch = create_test_batch();
654        let result = BatchProjector::get_col_by_id(batch.columns(), &[]);
655
656        assert!(result.is_err());
657        assert!(
658            result
659                .unwrap_err()
660                .to_string()
661                .contains("Field index cannot be empty")
662        );
663    }
664
665    #[test]
666    fn test_projector_null_propagation() {
667        // Create a batch where the struct itself has nulls
668        let schema = Arc::new(create_test_schema());
669
670        let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
671        let name_array = Arc::new(StringArray::from(vec![
672            Some("John"),
673            Some("Jane"),
674            Some("Bob"),
675        ]));
676
677        let street_array = Arc::new(StringArray::from(vec![
678            Some("123 Main St"),
679            Some("456 Elm St"),
680            Some("789 Oak Ave"),
681        ]));
682        let city_array = Arc::new(StringArray::from(vec![
683            Some("NYC"),
684            Some("LA"),
685            Some("Chicago"),
686        ]));
687
688        // Create address array with one null struct
689        let address_fields = vec![
690            (
691                Arc::new(Field::new("street", DataType::Utf8, true)),
692                street_array as ArrayRef,
693            ),
694            (
695                Arc::new(Field::new("city", DataType::Utf8, true)),
696                city_array as ArrayRef,
697            ),
698        ];
699
700        // Make the second address struct null
701        let null_buffer = NullBuffer::from(vec![true, false, true]);
702        let address_data = StructArray::from(address_fields).into_data();
703        let address_array = Arc::new(StructArray::from(
704            address_data
705                .into_builder()
706                .nulls(Some(null_buffer))
707                .build()
708                .unwrap(),
709        ));
710
711        let age_array = Arc::new(Int32Array::from(vec![Some(25), Some(30), Some(35)]));
712
713        let batch = RecordBatch::try_new(schema, vec![
714            id_array as ArrayRef,
715            name_array as ArrayRef,
716            address_array as ArrayRef,
717            age_array as ArrayRef,
718        ])
719        .unwrap();
720
721        let projector = BatchProjector::new(
722            &batch.schema(),
723            &[4], // street
724            test_field_id_fetch,
725            test_nested_field_fetch,
726        )
727        .unwrap();
728
729        let projected = projector.project_columns(batch.columns()).unwrap();
730        let street_array = projected[0].as_any().downcast_ref::<StringArray>().unwrap();
731
732        // The street should be null when the parent address struct is null
733        assert!(!street_array.is_null(0)); // address not null, street has value
734        assert!(street_array.is_null(1)); // address is null, so street should be null
735        assert!(!street_array.is_null(2)); // address not null, street has value
736    }
737
738    #[test]
739    fn test_project_batch_method() {
740        let schema = create_test_schema();
741        let batch = create_test_batch();
742
743        let projector = BatchProjector::new(
744            &schema,
745            &[1, 2], // id, name
746            test_field_id_fetch,
747            test_nested_field_fetch,
748        )
749        .unwrap();
750
751        let projected_batch = projector.project_batch(&batch).unwrap();
752
753        assert_eq!(projected_batch.num_columns(), 2);
754        assert_eq!(projected_batch.num_rows(), 3);
755        assert_eq!(projected_batch.schema().field(0).name(), "id");
756        assert_eq!(projected_batch.schema().field(1).name(), "name");
757    }
758
759    // Tests for DeltaWriter
760    mod delta_writer_tests {
761        use std::collections::HashMap;
762
763        use arrow_array::{Int32Array, RecordBatch, StringArray};
764        use arrow_schema::{DataType, Field, Schema};
765        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
766        use parquet::file::properties::WriterProperties;
767        use tempfile::TempDir;
768
769        use super::*;
770        use crate::arrow::arrow_schema_to_schema;
771        use crate::io::FileIOBuilder;
772        use crate::spec::{
773            DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
774        };
775        use crate::writer::IcebergWriterBuilder;
776        use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
777        use crate::writer::base_writer::equality_delete_writer::{
778            EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
779        };
780        use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
781        use crate::writer::file_writer::ParquetWriterBuilder;
782        use crate::writer::file_writer::location_generator::{
783            DefaultFileNameGenerator, DefaultLocationGenerator,
784        };
785        use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
786
787        fn create_iceberg_schema() -> Arc<IcebergSchema> {
788            Arc::new(
789                IcebergSchema::builder()
790                    .with_schema_id(0)
791                    .with_fields(vec![
792                        NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
793                        NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
794                            .into(),
795                    ])
796                    .build()
797                    .unwrap(),
798            )
799        }
800
801        fn create_test_batch_with_ops(
802            ids: Vec<i32>,
803            names: Vec<Option<&str>>,
804            ops: Vec<i32>,
805        ) -> RecordBatch {
806            let schema = Arc::new(Schema::new(vec![
807                Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
808                    PARQUET_FIELD_ID_META_KEY.to_string(),
809                    "1".to_string(),
810                )])),
811                Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
812                    PARQUET_FIELD_ID_META_KEY.to_string(),
813                    "2".to_string(),
814                )])),
815                Field::new("op", DataType::Int32, false),
816            ]));
817
818            let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
819            let name_array: ArrayRef = Arc::new(StringArray::from(names));
820            let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
821
822            RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
823        }
824
825        #[tokio::test]
826        async fn test_delta_writer_insert_only() -> Result<()> {
827            let temp_dir = TempDir::new().unwrap();
828            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
829            let schema = create_iceberg_schema();
830
831            // Create data writer
832            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
833                "{}/data",
834                temp_dir.path().to_str().unwrap()
835            ));
836            let data_file_name_gen =
837                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
838            let data_parquet_writer =
839                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
840            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
841                data_parquet_writer,
842                schema.clone(),
843                file_io.clone(),
844                data_location_gen,
845                data_file_name_gen,
846            );
847            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
848
849            // Create position delete writer
850            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
851                &PositionDeleteWriterConfig::arrow_schema(),
852            )?);
853            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
854                "{}/pos_delete",
855                temp_dir.path().to_str().unwrap()
856            ));
857            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
858                "pos_delete".to_string(),
859                None,
860                DataFileFormat::Parquet,
861            );
862            let pos_delete_parquet_writer =
863                ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
864            let pos_delete_rolling_writer_builder =
865                RollingFileWriterBuilder::new_with_default_file_size(
866                    pos_delete_parquet_writer,
867                    pos_delete_schema,
868                    file_io.clone(),
869                    pos_delete_location_gen,
870                    pos_delete_file_name_gen,
871                );
872            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
873                pos_delete_rolling_writer_builder,
874                PositionDeleteWriterConfig::new(None, 0, None),
875            );
876
877            // Create equality delete writer
878            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
879            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
880                eq_delete_config.projected_arrow_schema_ref(),
881            )?);
882            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
883                "{}/eq_delete",
884                temp_dir.path().to_str().unwrap()
885            ));
886            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
887                "eq_delete".to_string(),
888                None,
889                DataFileFormat::Parquet,
890            );
891            let eq_delete_parquet_writer =
892                ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
893            let eq_delete_rolling_writer_builder =
894                RollingFileWriterBuilder::new_with_default_file_size(
895                    eq_delete_parquet_writer,
896                    eq_delete_schema,
897                    file_io.clone(),
898                    eq_delete_location_gen,
899                    eq_delete_file_name_gen,
900                );
901            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
902                eq_delete_rolling_writer_builder,
903                eq_delete_config,
904            );
905
906            // Create delta writer
907            let data_writer_instance = data_writer.build(None).await?;
908            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
909            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
910            let mut delta_writer = DeltaWriter::try_new(
911                data_writer_instance,
912                pos_delete_writer_instance,
913                eq_delete_writer_instance,
914                vec![1], // unique on id column
915            )?;
916
917            // Write batch with only inserts
918            let batch = create_test_batch_with_ops(
919                vec![1, 2, 3],
920                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
921                vec![1, 1, 1], // all inserts
922            );
923
924            delta_writer.write(batch).await?;
925            let data_files = delta_writer.close().await?;
926
927            // Should have 1 data file, 0 delete files
928            assert_eq!(data_files.len(), 1);
929            assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
930            assert_eq!(data_files[0].record_count, 3);
931
932            // Read back and verify
933            let input_file = file_io.new_input(data_files[0].file_path.clone())?;
934            let content = input_file.read().await?;
935            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
936            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
937            assert_eq!(batches.len(), 1);
938            assert_eq!(batches[0].num_rows(), 3);
939
940            Ok(())
941        }
942
943        #[tokio::test]
944        async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
945            let temp_dir = TempDir::new().unwrap();
946            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
947            let schema = create_iceberg_schema();
948
949            // Create writers (same setup as above)
950            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
951                "{}/data",
952                temp_dir.path().to_str().unwrap()
953            ));
954            let data_file_name_gen =
955                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
956            let data_parquet_writer =
957                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
958            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
959                data_parquet_writer,
960                schema.clone(),
961                file_io.clone(),
962                data_location_gen,
963                data_file_name_gen,
964            );
965            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
966
967            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
968                &PositionDeleteWriterConfig::arrow_schema(),
969            )?);
970            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
971                "{}/pos_delete",
972                temp_dir.path().to_str().unwrap()
973            ));
974            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
975                "pos_delete".to_string(),
976                None,
977                DataFileFormat::Parquet,
978            );
979            let pos_delete_parquet_writer =
980                ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
981            let pos_delete_rolling_writer_builder =
982                RollingFileWriterBuilder::new_with_default_file_size(
983                    pos_delete_parquet_writer,
984                    pos_delete_schema,
985                    file_io.clone(),
986                    pos_delete_location_gen,
987                    pos_delete_file_name_gen,
988                );
989            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
990                pos_delete_rolling_writer_builder,
991                PositionDeleteWriterConfig::new(None, 0, None),
992            );
993
994            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
995            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
996                eq_delete_config.projected_arrow_schema_ref(),
997            )?);
998            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
999                "{}/eq_delete",
1000                temp_dir.path().to_str().unwrap()
1001            ));
1002            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1003                "eq_delete".to_string(),
1004                None,
1005                DataFileFormat::Parquet,
1006            );
1007            let eq_delete_parquet_writer =
1008                ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1009            let eq_delete_rolling_writer_builder =
1010                RollingFileWriterBuilder::new_with_default_file_size(
1011                    eq_delete_parquet_writer,
1012                    eq_delete_schema,
1013                    file_io.clone(),
1014                    eq_delete_location_gen,
1015                    eq_delete_file_name_gen,
1016                );
1017            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1018                eq_delete_rolling_writer_builder,
1019                eq_delete_config,
1020            );
1021
1022            let data_writer_instance = data_writer.build(None).await?;
1023            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1024            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1025            let mut delta_writer = DeltaWriter::try_new(
1026                data_writer_instance,
1027                pos_delete_writer_instance,
1028                eq_delete_writer_instance,
1029                vec![1],
1030            )?;
1031
1032            // First, insert some rows
1033            let insert_batch = create_test_batch_with_ops(
1034                vec![1, 2, 3],
1035                vec![Some("Alice"), Some("Bob"), Some("Charlie")],
1036                vec![1, 1, 1],
1037            );
1038            delta_writer.write(insert_batch).await?;
1039
1040            // Now delete rows that were just inserted (should create position deletes)
1041            let delete_batch =
1042                create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
1043                    -1, -1,
1044                ]);
1045            delta_writer.write(delete_batch).await?;
1046
1047            let data_files = delta_writer.close().await?;
1048
1049            // Should have 1 data file + 1 position delete file
1050            assert_eq!(data_files.len(), 2);
1051
1052            let data_file = data_files
1053                .iter()
1054                .find(|f| f.content == crate::spec::DataContentType::Data)
1055                .unwrap();
1056            let pos_delete_file = data_files
1057                .iter()
1058                .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
1059                .unwrap();
1060
1061            assert_eq!(data_file.record_count, 3);
1062            assert_eq!(pos_delete_file.record_count, 2);
1063
1064            // Verify position delete file content
1065            let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
1066            let content = input_file.read().await?;
1067            let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
1068            let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
1069            assert_eq!(batches[0].num_rows(), 2);
1070
1071            Ok(())
1072        }
1073
1074        #[tokio::test]
1075        async fn test_delta_writer_equality_delete() -> Result<()> {
1076            let temp_dir = TempDir::new().unwrap();
1077            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1078            let schema = create_iceberg_schema();
1079
1080            // Create writers
1081            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
1082                "{}/data",
1083                temp_dir.path().to_str().unwrap()
1084            ));
1085            let data_file_name_gen =
1086                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
1087            let data_parquet_writer =
1088                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
1089            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1090                data_parquet_writer,
1091                schema.clone(),
1092                file_io.clone(),
1093                data_location_gen,
1094                data_file_name_gen,
1095            );
1096            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
1097
1098            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
1099                &PositionDeleteWriterConfig::arrow_schema(),
1100            )?);
1101            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1102                "{}/pos_delete",
1103                temp_dir.path().to_str().unwrap()
1104            ));
1105            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
1106                "pos_delete".to_string(),
1107                None,
1108                DataFileFormat::Parquet,
1109            );
1110            let pos_delete_parquet_writer =
1111                ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
1112            let pos_delete_rolling_writer_builder =
1113                RollingFileWriterBuilder::new_with_default_file_size(
1114                    pos_delete_parquet_writer,
1115                    pos_delete_schema,
1116                    file_io.clone(),
1117                    pos_delete_location_gen,
1118                    pos_delete_file_name_gen,
1119                );
1120            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
1121                pos_delete_rolling_writer_builder,
1122                PositionDeleteWriterConfig::new(None, 0, None),
1123            );
1124
1125            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
1126            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
1127                eq_delete_config.projected_arrow_schema_ref(),
1128            )?);
1129            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1130                "{}/eq_delete",
1131                temp_dir.path().to_str().unwrap()
1132            ));
1133            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1134                "eq_delete".to_string(),
1135                None,
1136                DataFileFormat::Parquet,
1137            );
1138            let eq_delete_parquet_writer =
1139                ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1140            let eq_delete_rolling_writer_builder =
1141                RollingFileWriterBuilder::new_with_default_file_size(
1142                    eq_delete_parquet_writer,
1143                    eq_delete_schema,
1144                    file_io.clone(),
1145                    eq_delete_location_gen,
1146                    eq_delete_file_name_gen,
1147                );
1148            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1149                eq_delete_rolling_writer_builder,
1150                eq_delete_config,
1151            );
1152
1153            let data_writer_instance = data_writer.build(None).await?;
1154            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1155            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1156            let mut delta_writer = DeltaWriter::try_new(
1157                data_writer_instance,
1158                pos_delete_writer_instance,
1159                eq_delete_writer_instance,
1160                vec![1],
1161            )?;
1162
1163            // Delete rows that were never inserted (should create equality deletes)
1164            let delete_batch =
1165                create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
1166            delta_writer.write(delete_batch).await?;
1167
1168            let data_files = delta_writer.close().await?;
1169
1170            // Should have only 1 equality delete file
1171            assert_eq!(data_files.len(), 1);
1172            assert_eq!(
1173                data_files[0].content,
1174                crate::spec::DataContentType::EqualityDeletes
1175            );
1176            assert_eq!(data_files[0].record_count, 2);
1177
1178            Ok(())
1179        }
1180
1181        #[tokio::test]
1182        async fn test_delta_writer_invalid_op() -> Result<()> {
1183            let temp_dir = TempDir::new().unwrap();
1184            let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1185            let schema = create_iceberg_schema();
1186
1187            // Create writers
1188            let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
1189                "{}/data",
1190                temp_dir.path().to_str().unwrap()
1191            ));
1192            let data_file_name_gen =
1193                DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
1194            let data_parquet_writer =
1195                ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
1196            let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1197                data_parquet_writer,
1198                schema.clone(),
1199                file_io.clone(),
1200                data_location_gen,
1201                data_file_name_gen,
1202            );
1203            let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
1204
1205            let pos_delete_schema = Arc::new(arrow_schema_to_schema(
1206                &PositionDeleteWriterConfig::arrow_schema(),
1207            )?);
1208            let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1209                "{}/pos_delete",
1210                temp_dir.path().to_str().unwrap()
1211            ));
1212            let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
1213                "pos_delete".to_string(),
1214                None,
1215                DataFileFormat::Parquet,
1216            );
1217            let pos_delete_parquet_writer =
1218                ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
1219            let pos_delete_rolling_writer_builder =
1220                RollingFileWriterBuilder::new_with_default_file_size(
1221                    pos_delete_parquet_writer,
1222                    pos_delete_schema,
1223                    file_io.clone(),
1224                    pos_delete_location_gen,
1225                    pos_delete_file_name_gen,
1226                );
1227            let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
1228                pos_delete_rolling_writer_builder,
1229                PositionDeleteWriterConfig::new(None, 0, None),
1230            );
1231
1232            let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
1233            let eq_delete_schema = Arc::new(arrow_schema_to_schema(
1234                eq_delete_config.projected_arrow_schema_ref(),
1235            )?);
1236            let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1237                "{}/eq_delete",
1238                temp_dir.path().to_str().unwrap()
1239            ));
1240            let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1241                "eq_delete".to_string(),
1242                None,
1243                DataFileFormat::Parquet,
1244            );
1245            let eq_delete_parquet_writer =
1246                ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1247            let eq_delete_rolling_writer_builder =
1248                RollingFileWriterBuilder::new_with_default_file_size(
1249                    eq_delete_parquet_writer,
1250                    eq_delete_schema,
1251                    file_io.clone(),
1252                    eq_delete_location_gen,
1253                    eq_delete_file_name_gen,
1254                );
1255            let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1256                eq_delete_rolling_writer_builder,
1257                eq_delete_config,
1258            );
1259
1260            let data_writer_instance = data_writer.build(None).await?;
1261            let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1262            let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1263            let mut delta_writer = DeltaWriter::try_new(
1264                data_writer_instance,
1265                pos_delete_writer_instance,
1266                eq_delete_writer_instance,
1267                vec![1],
1268            )?;
1269
1270            // Invalid operation code
1271            let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
1272
1273            let result = delta_writer.write(batch).await;
1274            assert!(result.is_err());
1275            assert!(
1276                result
1277                    .unwrap_err()
1278                    .to_string()
1279                    .contains("Ops column must be 1 (insert) or -1 (delete)")
1280            );
1281
1282            Ok(())
1283        }
1284    }
1285}