1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, make_array};
12use arrow_buffer::NullBuffer;
13use arrow_ord::partition::partition;
14use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
15use arrow_schema::{DataType, Field, FieldRef, Fields};
16use arrow_select::filter::filter_record_batch;
17use itertools::Itertools;
18use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
19
20use crate::arrow::schema_to_arrow_schema;
21use crate::spec::{DataFile, PartitionKey};
22use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
23use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
24use crate::{Error, ErrorKind, Result};
25
26#[derive(Debug)]
28pub(crate) struct BatchProjector {
29 field_indices: Vec<Vec<usize>>,
31 projected_schema: arrow_schema::SchemaRef,
32}
33
34impl BatchProjector {
36 pub fn new<F1, F2>(
37 original_schema: &arrow_schema::Schema,
38 field_ids: &[i32],
39 field_id_fetch_fn: F1,
40 nested_field_fetch: F2,
41 ) -> Result<Self>
42 where
43 F1: Fn(&Field) -> Result<Option<i32>>,
44 F2: Fn(&Field) -> bool,
45 {
46 let mut field_indices = Vec::with_capacity(field_ids.len());
47 let mut projected_fields = Vec::with_capacity(field_ids.len());
48
49 for &field_id in field_ids {
50 if let Some((field, indices)) = Self::fetch_field_index(
51 original_schema.fields(),
52 field_id,
53 &field_id_fetch_fn,
54 &nested_field_fetch,
55 )? {
56 field_indices.push(indices);
57 projected_fields.push(field);
58 } else {
59 return Err(Error::new(
60 ErrorKind::Unexpected,
61 format!(
62 "Field ID {} not found in schema {:?}",
63 field_id, original_schema
64 ),
65 ));
66 }
67 }
68
69 if field_indices.is_empty() {
70 return Err(Error::new(
71 ErrorKind::Unexpected,
72 "No valid fields found for the provided field IDs",
73 ));
74 }
75
76 let projected_schema = arrow_schema::Schema::new(projected_fields);
77 Ok(Self {
78 field_indices,
79 projected_schema: Arc::new(projected_schema),
80 })
81 }
82
83 fn projected_schema_ref(&self) -> arrow_schema::SchemaRef {
84 self.projected_schema.clone()
85 }
86
87 fn fetch_field_index<F1, F2>(
88 fields: &Fields,
89 target_field_id: i32,
90 field_id_fetch_fn: &F1,
91 nested_field_fetch: &F2,
92 ) -> Result<Option<(FieldRef, Vec<usize>)>>
93 where
94 F1: Fn(&Field) -> Result<Option<i32>>,
95 F2: Fn(&Field) -> bool,
96 {
97 for (pos, field) in fields.iter().enumerate() {
98 let id = field_id_fetch_fn(field)?;
99 if let Some(field_id) = id {
100 if field_id == target_field_id {
101 return Ok(Some((field.clone(), vec![pos])));
102 }
103 }
104 if let DataType::Struct(inner_struct) = field.data_type() {
105 if nested_field_fetch(field) {
106 if let Some((field, mut sub_indices)) = Self::fetch_field_index(
107 &inner_struct,
108 target_field_id,
109 field_id_fetch_fn,
110 nested_field_fetch,
111 )? {
112 sub_indices.insert(0, pos);
113 return Ok(Some((field, sub_indices)));
114 }
115 }
116 }
117 }
118 Ok(None)
119 }
120
121 fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
122 let columns = self.project_columns(batch.columns())?;
123 RecordBatch::try_new(self.projected_schema.clone(), columns)
124 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
125 }
126
127 pub fn project_columns(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
128 self.field_indices
129 .iter()
130 .map(|indices| Self::get_col_by_id(batch, indices))
131 .collect()
132 }
133
134 fn get_col_by_id(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
135 if field_index.is_empty() {
136 return Err(Error::new(
137 ErrorKind::Unexpected,
138 "Field index cannot be empty",
139 ));
140 }
141
142 let mut iter = field_index.iter();
143 let first_index = *iter.next().unwrap();
144 let mut array = batch[first_index].clone();
145 let mut null_buffer = array.logical_nulls();
146
147 for &i in iter {
148 let struct_array = array
149 .as_any()
150 .downcast_ref::<arrow_array::StructArray>()
151 .ok_or_else(|| {
152 Error::new(
153 ErrorKind::Unexpected,
154 "Expected struct array when traversing nested fields",
155 )
156 })?;
157
158 array = struct_array.column(i).clone();
159 null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref());
160 }
161
162 Ok(make_array(
163 array.to_data().into_builder().nulls(null_buffer).build()?,
164 ))
165 }
166}
167
168#[derive(Clone, Debug)]
170pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
171 data_writer_builder: DWB,
172 pos_delete_writer_builder: PDWB,
173 eq_delete_writer_builder: EDWB,
174 unique_cols: Vec<i32>,
175}
176
177impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
178 pub fn new(
180 data_writer_builder: DWB,
181 pos_delete_writer_builder: PDWB,
182 eq_delete_writer_builder: EDWB,
183 unique_cols: Vec<i32>,
184 ) -> Self {
185 Self {
186 data_writer_builder,
187 pos_delete_writer_builder,
188 eq_delete_writer_builder,
189 unique_cols,
190 }
191 }
192}
193
194#[async_trait::async_trait]
195impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
196where
197 DWB: IcebergWriterBuilder,
198 PDWB: IcebergWriterBuilder,
199 EDWB: IcebergWriterBuilder,
200 DWB::R: CurrentFileStatus,
201{
202 type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
203 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
204 let data_writer = self
205 .data_writer_builder
206 .build(partition_key.clone())
207 .await?;
208 let pos_delete_writer = self
209 .pos_delete_writer_builder
210 .build(partition_key.clone())
211 .await?;
212 let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
213 DeltaWriter::try_new(
214 data_writer,
215 pos_delete_writer,
216 eq_delete_writer,
217 self.unique_cols.clone(),
218 )
219 }
220}
221
222pub struct Position {
224 row_index: i64,
225 file_path: String,
226}
227
228pub struct DeltaWriter<DW, PDW, EDW> {
230 pub data_writer: DW,
232 pub pos_delete_writer: PDW,
235 pub eq_delete_writer: EDW,
238 pub unique_cols: Vec<i32>,
240 pub seen_rows: HashMap<OwnedRow, Position>,
242 pub(crate) projector: BatchProjector,
244 pub(crate) row_convertor: RowConverter,
246}
247
248impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
249where
250 DW: IcebergWriter + CurrentFileStatus,
251 PDW: IcebergWriter,
252 EDW: IcebergWriter,
253{
254 fn try_new(
255 data_writer: DW,
256 pos_delete_writer: PDW,
257 eq_delete_writer: EDW,
258 unique_cols: Vec<i32>,
259 ) -> Result<Self> {
260 let projector = BatchProjector::new(
261 &schema_to_arrow_schema(&data_writer.current_schema())?,
262 &unique_cols,
263 |field| {
264 if field.data_type().is_nested() {
265 return Ok(None);
266 }
267 field
268 .metadata()
269 .get(PARQUET_FIELD_ID_META_KEY)
270 .map(|id_str| {
271 id_str.parse::<i32>().map_err(|e| {
272 Error::new(
273 ErrorKind::Unexpected,
274 format!("Failed to parse field ID {}: {}", id_str, e),
275 )
276 })
277 })
278 .transpose()
279 },
280 |_| false,
281 )?;
282
283 let row_convertor = RowConverter::new(
284 projector
285 .projected_schema_ref()
286 .fields()
287 .iter()
288 .map(|f| SortField::new(f.data_type().clone()))
289 .collect(),
290 )?;
291
292 Ok(Self {
293 data_writer,
294 pos_delete_writer,
295 eq_delete_writer,
296 unique_cols,
297 seen_rows: HashMap::new(),
298 projector,
299 row_convertor,
300 })
301 }
302
303 async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
304 let rows = self.extract_unique_column_rows(&batch)?;
305 let batch_num_rows = batch.num_rows();
306
307 self.data_writer.write(batch.clone()).await?;
312
313 let file_path = self.data_writer.current_file_path();
315 let end_row_num = self.data_writer.current_row_num();
316 let start_row_index = end_row_num - batch_num_rows;
317
318 for (i, row) in rows.iter().enumerate() {
320 self.seen_rows.insert(row.owned(), Position {
321 row_index: start_row_index as i64 + i as i64,
322 file_path: file_path.clone(),
323 });
324 }
325
326 Ok(())
327 }
328
329 async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
330 let rows = self.extract_unique_column_rows(&batch)?;
331 let mut file_array = vec![];
332 let mut row_index_array = vec![];
333 let mut needs_equality_delete = BooleanBuilder::new();
337
338 for row in rows.iter() {
339 if let Some(pos) = self.seen_rows.remove(&row.owned()) {
340 row_index_array.push(pos.row_index);
342 file_array.push(pos.file_path.clone());
343 needs_equality_delete.append_value(false);
344 } else {
345 needs_equality_delete.append_value(true);
347 }
348 }
349
350 let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
352 let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
353
354 let position_batch =
355 RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
356 file_array,
357 row_index_array,
358 ])?;
359
360 if position_batch.num_rows() > 0 {
361 self.pos_delete_writer
362 .write(position_batch)
363 .await
364 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
365 }
366
367 let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
369 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
370
371 if eq_batch.num_rows() > 0 {
372 self.eq_delete_writer
373 .write(eq_batch)
374 .await
375 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
376 }
377
378 Ok(())
379 }
380
381 fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
382 self.row_convertor
383 .convert_columns(&self.projector.project_columns(batch.columns())?)
384 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
385 }
386}
387
388#[async_trait::async_trait]
389impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
390where
391 DW: IcebergWriter + CurrentFileStatus,
392 PDW: IcebergWriter,
393 EDW: IcebergWriter,
394{
395 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
396 let ops = batch
398 .column(batch.num_columns() - 1)
399 .as_any()
400 .downcast_ref::<Int32Array>()
401 .ok_or(Error::new(
402 ErrorKind::Unexpected,
403 "Failed to downcast ops column",
404 ))?;
405
406 let partition =
407 partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
408 Error::new(
409 ErrorKind::Unexpected,
410 format!("Failed to partition batch: {e}"),
411 )
412 })?;
413
414 for range in partition.ranges() {
415 let batch = batch
416 .project(&(0..batch.num_columns() - 1).collect_vec())
417 .map_err(|e| {
418 Error::new(
419 ErrorKind::Unexpected,
420 format!("Failed to project batch columns: {e}"),
421 )
422 })?
423 .slice(range.start, range.end - range.start);
424 match ops.value(range.start) {
425 1 => self.insert(batch).await?,
426 -1 => self.delete(batch).await?,
427 op => {
428 return Err(Error::new(
429 ErrorKind::Unexpected,
430 format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
431 ));
432 }
433 }
434 }
435
436 Ok(())
437 }
438
439 async fn close(&mut self) -> Result<Vec<DataFile>> {
440 let data_files = self.data_writer.close().await?;
441 let pos_delete_files = self.pos_delete_writer.close().await?;
442 let eq_delete_files = self.eq_delete_writer.close().await?;
443
444 Ok(data_files
445 .into_iter()
446 .chain(pos_delete_files)
447 .chain(eq_delete_files)
448 .collect())
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use std::sync::Arc;
455
456 use arrow_array::{Array, Int32Array, StringArray, StructArray};
457 use arrow_schema::{DataType, Field, Schema};
458
459 use super::*;
460
461 fn test_field_id_fetch(field: &Field) -> Result<Option<i32>> {
462 match field.name().as_str() {
464 "id" => Ok(Some(1)),
465 "name" => Ok(Some(2)),
466 "address" => Ok(Some(3)),
467 "street" => Ok(Some(4)),
468 "city" => Ok(Some(5)),
469 "age" => Ok(Some(6)),
470 _ => Ok(None),
471 }
472 }
473
474 fn test_nested_field_fetch(field: &Field) -> bool {
475 matches!(field.data_type(), DataType::Struct(_))
477 }
478
479 fn create_test_schema() -> Schema {
480 Schema::new(vec![
481 Field::new("id", DataType::Int32, false),
482 Field::new("name", DataType::Utf8, true),
483 Field::new(
484 "address",
485 DataType::Struct(
486 vec![
487 Field::new("street", DataType::Utf8, true),
488 Field::new("city", DataType::Utf8, true),
489 ]
490 .into(),
491 ),
492 true,
493 ),
494 Field::new("age", DataType::Int32, true),
495 ])
496 }
497
498 fn create_test_batch() -> RecordBatch {
499 let schema = Arc::new(create_test_schema());
500
501 let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
502 let name_array = Arc::new(StringArray::from(vec![Some("John"), Some("Jane"), None]));
503
504 let street_array = Arc::new(StringArray::from(vec![
505 Some("123 Main St"),
506 None,
507 Some("789 Oak Ave"),
508 ]));
509 let city_array = Arc::new(StringArray::from(vec![Some("NYC"), Some("LA"), None]));
510
511 let address_array = Arc::new(StructArray::from(vec![
512 (
513 Arc::new(Field::new("street", DataType::Utf8, true)),
514 street_array as ArrayRef,
515 ),
516 (
517 Arc::new(Field::new("city", DataType::Utf8, true)),
518 city_array as ArrayRef,
519 ),
520 ]));
521
522 let age_array = Arc::new(Int32Array::from(vec![Some(25), Some(30), None]));
523
524 RecordBatch::try_new(schema, vec![
525 id_array as ArrayRef,
526 name_array as ArrayRef,
527 address_array as ArrayRef,
528 age_array as ArrayRef,
529 ])
530 .unwrap()
531 }
532
533 #[test]
534 fn test_projector_simple_fields() {
535 let schema = create_test_schema();
536 let batch = create_test_batch();
537
538 let projector = BatchProjector::new(
540 &schema,
541 &[1, 2], test_field_id_fetch,
543 test_nested_field_fetch,
544 )
545 .unwrap();
546
547 let projected = projector.project_columns(batch.columns()).unwrap();
548
549 assert_eq!(projected.len(), 2);
550
551 let id_array = projected[0].as_any().downcast_ref::<Int32Array>().unwrap();
553 assert_eq!(id_array.values(), &[1, 2, 3]);
554
555 let name_array = projected[1].as_any().downcast_ref::<StringArray>().unwrap();
557 assert_eq!(name_array.value(0), "John");
558 assert_eq!(name_array.value(1), "Jane");
559 assert!(name_array.is_null(2));
560 }
561
562 #[test]
563 fn test_projector_nested_fields() {
564 let schema = create_test_schema();
565 let batch = create_test_batch();
566
567 let projector = BatchProjector::new(
569 &schema,
570 &[4], test_field_id_fetch,
572 test_nested_field_fetch,
573 )
574 .unwrap();
575
576 let projected = projector.project_columns(batch.columns()).unwrap();
577
578 assert_eq!(projected.len(), 1);
579
580 let street_array = projected[0].as_any().downcast_ref::<StringArray>().unwrap();
581 assert_eq!(street_array.value(0), "123 Main St");
582 assert!(street_array.is_null(1));
583 assert_eq!(street_array.value(2), "789 Oak Ave");
584 }
585
586 #[test]
587 fn test_projector_mixed_fields() {
588 let schema = create_test_schema();
589 let batch = create_test_batch();
590
591 let projector = BatchProjector::new(
593 &schema,
594 &[1, 4, 6], test_field_id_fetch,
596 test_nested_field_fetch,
597 )
598 .unwrap();
599
600 let projected = projector.project_columns(batch.columns()).unwrap();
601
602 assert_eq!(projected.len(), 3);
603
604 assert_eq!(projector.projected_schema.fields().len(), 3);
606 assert_eq!(projector.projected_schema.field(0).name(), "id");
607 assert_eq!(projector.projected_schema.field(1).name(), "street");
608 assert_eq!(projector.projected_schema.field(2).name(), "age");
609 }
610
611 #[test]
612 fn test_projector_field_not_found() {
613 let schema = create_test_schema();
614
615 let result = BatchProjector::new(
616 &schema,
617 &[999], test_field_id_fetch,
619 test_nested_field_fetch,
620 );
621
622 assert!(result.is_err());
623 assert!(
624 result
625 .unwrap_err()
626 .to_string()
627 .contains("Field ID 999 not found")
628 );
629 }
630
631 #[test]
632 fn test_projector_empty_field_ids() {
633 let schema = create_test_schema();
634
635 let result = BatchProjector::new(
636 &schema,
637 &[], test_field_id_fetch,
639 test_nested_field_fetch,
640 );
641
642 assert!(result.is_err());
643 assert!(
644 result
645 .unwrap_err()
646 .to_string()
647 .contains("No valid fields found")
648 );
649 }
650
651 #[test]
652 fn test_get_col_by_id_empty_index() {
653 let batch = create_test_batch();
654 let result = BatchProjector::get_col_by_id(batch.columns(), &[]);
655
656 assert!(result.is_err());
657 assert!(
658 result
659 .unwrap_err()
660 .to_string()
661 .contains("Field index cannot be empty")
662 );
663 }
664
665 #[test]
666 fn test_projector_null_propagation() {
667 let schema = Arc::new(create_test_schema());
669
670 let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
671 let name_array = Arc::new(StringArray::from(vec![
672 Some("John"),
673 Some("Jane"),
674 Some("Bob"),
675 ]));
676
677 let street_array = Arc::new(StringArray::from(vec![
678 Some("123 Main St"),
679 Some("456 Elm St"),
680 Some("789 Oak Ave"),
681 ]));
682 let city_array = Arc::new(StringArray::from(vec![
683 Some("NYC"),
684 Some("LA"),
685 Some("Chicago"),
686 ]));
687
688 let address_fields = vec![
690 (
691 Arc::new(Field::new("street", DataType::Utf8, true)),
692 street_array as ArrayRef,
693 ),
694 (
695 Arc::new(Field::new("city", DataType::Utf8, true)),
696 city_array as ArrayRef,
697 ),
698 ];
699
700 let null_buffer = NullBuffer::from(vec![true, false, true]);
702 let address_data = StructArray::from(address_fields).into_data();
703 let address_array = Arc::new(StructArray::from(
704 address_data
705 .into_builder()
706 .nulls(Some(null_buffer))
707 .build()
708 .unwrap(),
709 ));
710
711 let age_array = Arc::new(Int32Array::from(vec![Some(25), Some(30), Some(35)]));
712
713 let batch = RecordBatch::try_new(schema, vec![
714 id_array as ArrayRef,
715 name_array as ArrayRef,
716 address_array as ArrayRef,
717 age_array as ArrayRef,
718 ])
719 .unwrap();
720
721 let projector = BatchProjector::new(
722 &batch.schema(),
723 &[4], test_field_id_fetch,
725 test_nested_field_fetch,
726 )
727 .unwrap();
728
729 let projected = projector.project_columns(batch.columns()).unwrap();
730 let street_array = projected[0].as_any().downcast_ref::<StringArray>().unwrap();
731
732 assert!(!street_array.is_null(0)); assert!(street_array.is_null(1)); assert!(!street_array.is_null(2)); }
737
738 #[test]
739 fn test_project_batch_method() {
740 let schema = create_test_schema();
741 let batch = create_test_batch();
742
743 let projector = BatchProjector::new(
744 &schema,
745 &[1, 2], test_field_id_fetch,
747 test_nested_field_fetch,
748 )
749 .unwrap();
750
751 let projected_batch = projector.project_batch(&batch).unwrap();
752
753 assert_eq!(projected_batch.num_columns(), 2);
754 assert_eq!(projected_batch.num_rows(), 3);
755 assert_eq!(projected_batch.schema().field(0).name(), "id");
756 assert_eq!(projected_batch.schema().field(1).name(), "name");
757 }
758
759 mod delta_writer_tests {
761 use std::collections::HashMap;
762
763 use arrow_array::{Int32Array, RecordBatch, StringArray};
764 use arrow_schema::{DataType, Field, Schema};
765 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
766 use parquet::file::properties::WriterProperties;
767 use tempfile::TempDir;
768
769 use super::*;
770 use crate::arrow::arrow_schema_to_schema;
771 use crate::io::FileIOBuilder;
772 use crate::spec::{
773 DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
774 };
775 use crate::writer::IcebergWriterBuilder;
776 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
777 use crate::writer::base_writer::equality_delete_writer::{
778 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
779 };
780 use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
781 use crate::writer::file_writer::ParquetWriterBuilder;
782 use crate::writer::file_writer::location_generator::{
783 DefaultFileNameGenerator, DefaultLocationGenerator,
784 };
785 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
786
787 fn create_iceberg_schema() -> Arc<IcebergSchema> {
788 Arc::new(
789 IcebergSchema::builder()
790 .with_schema_id(0)
791 .with_fields(vec![
792 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
793 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
794 .into(),
795 ])
796 .build()
797 .unwrap(),
798 )
799 }
800
801 fn create_test_batch_with_ops(
802 ids: Vec<i32>,
803 names: Vec<Option<&str>>,
804 ops: Vec<i32>,
805 ) -> RecordBatch {
806 let schema = Arc::new(Schema::new(vec![
807 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
808 PARQUET_FIELD_ID_META_KEY.to_string(),
809 "1".to_string(),
810 )])),
811 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
812 PARQUET_FIELD_ID_META_KEY.to_string(),
813 "2".to_string(),
814 )])),
815 Field::new("op", DataType::Int32, false),
816 ]));
817
818 let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
819 let name_array: ArrayRef = Arc::new(StringArray::from(names));
820 let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
821
822 RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
823 }
824
825 #[tokio::test]
826 async fn test_delta_writer_insert_only() -> Result<()> {
827 let temp_dir = TempDir::new().unwrap();
828 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
829 let schema = create_iceberg_schema();
830
831 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
833 "{}/data",
834 temp_dir.path().to_str().unwrap()
835 ));
836 let data_file_name_gen =
837 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
838 let data_parquet_writer =
839 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
840 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
841 data_parquet_writer,
842 schema.clone(),
843 file_io.clone(),
844 data_location_gen,
845 data_file_name_gen,
846 );
847 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
848
849 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
851 &PositionDeleteWriterConfig::arrow_schema(),
852 )?);
853 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
854 "{}/pos_delete",
855 temp_dir.path().to_str().unwrap()
856 ));
857 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
858 "pos_delete".to_string(),
859 None,
860 DataFileFormat::Parquet,
861 );
862 let pos_delete_parquet_writer =
863 ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
864 let pos_delete_rolling_writer_builder =
865 RollingFileWriterBuilder::new_with_default_file_size(
866 pos_delete_parquet_writer,
867 pos_delete_schema,
868 file_io.clone(),
869 pos_delete_location_gen,
870 pos_delete_file_name_gen,
871 );
872 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
873 pos_delete_rolling_writer_builder,
874 PositionDeleteWriterConfig::new(None, 0, None),
875 );
876
877 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
879 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
880 eq_delete_config.projected_arrow_schema_ref(),
881 )?);
882 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
883 "{}/eq_delete",
884 temp_dir.path().to_str().unwrap()
885 ));
886 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
887 "eq_delete".to_string(),
888 None,
889 DataFileFormat::Parquet,
890 );
891 let eq_delete_parquet_writer =
892 ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
893 let eq_delete_rolling_writer_builder =
894 RollingFileWriterBuilder::new_with_default_file_size(
895 eq_delete_parquet_writer,
896 eq_delete_schema,
897 file_io.clone(),
898 eq_delete_location_gen,
899 eq_delete_file_name_gen,
900 );
901 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
902 eq_delete_rolling_writer_builder,
903 eq_delete_config,
904 );
905
906 let data_writer_instance = data_writer.build(None).await?;
908 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
909 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
910 let mut delta_writer = DeltaWriter::try_new(
911 data_writer_instance,
912 pos_delete_writer_instance,
913 eq_delete_writer_instance,
914 vec![1], )?;
916
917 let batch = create_test_batch_with_ops(
919 vec![1, 2, 3],
920 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
921 vec![1, 1, 1], );
923
924 delta_writer.write(batch).await?;
925 let data_files = delta_writer.close().await?;
926
927 assert_eq!(data_files.len(), 1);
929 assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
930 assert_eq!(data_files[0].record_count, 3);
931
932 let input_file = file_io.new_input(data_files[0].file_path.clone())?;
934 let content = input_file.read().await?;
935 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
936 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
937 assert_eq!(batches.len(), 1);
938 assert_eq!(batches[0].num_rows(), 3);
939
940 Ok(())
941 }
942
943 #[tokio::test]
944 async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
945 let temp_dir = TempDir::new().unwrap();
946 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
947 let schema = create_iceberg_schema();
948
949 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
951 "{}/data",
952 temp_dir.path().to_str().unwrap()
953 ));
954 let data_file_name_gen =
955 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
956 let data_parquet_writer =
957 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
958 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
959 data_parquet_writer,
960 schema.clone(),
961 file_io.clone(),
962 data_location_gen,
963 data_file_name_gen,
964 );
965 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
966
967 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
968 &PositionDeleteWriterConfig::arrow_schema(),
969 )?);
970 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
971 "{}/pos_delete",
972 temp_dir.path().to_str().unwrap()
973 ));
974 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
975 "pos_delete".to_string(),
976 None,
977 DataFileFormat::Parquet,
978 );
979 let pos_delete_parquet_writer =
980 ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
981 let pos_delete_rolling_writer_builder =
982 RollingFileWriterBuilder::new_with_default_file_size(
983 pos_delete_parquet_writer,
984 pos_delete_schema,
985 file_io.clone(),
986 pos_delete_location_gen,
987 pos_delete_file_name_gen,
988 );
989 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
990 pos_delete_rolling_writer_builder,
991 PositionDeleteWriterConfig::new(None, 0, None),
992 );
993
994 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
995 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
996 eq_delete_config.projected_arrow_schema_ref(),
997 )?);
998 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
999 "{}/eq_delete",
1000 temp_dir.path().to_str().unwrap()
1001 ));
1002 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1003 "eq_delete".to_string(),
1004 None,
1005 DataFileFormat::Parquet,
1006 );
1007 let eq_delete_parquet_writer =
1008 ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1009 let eq_delete_rolling_writer_builder =
1010 RollingFileWriterBuilder::new_with_default_file_size(
1011 eq_delete_parquet_writer,
1012 eq_delete_schema,
1013 file_io.clone(),
1014 eq_delete_location_gen,
1015 eq_delete_file_name_gen,
1016 );
1017 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1018 eq_delete_rolling_writer_builder,
1019 eq_delete_config,
1020 );
1021
1022 let data_writer_instance = data_writer.build(None).await?;
1023 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1024 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1025 let mut delta_writer = DeltaWriter::try_new(
1026 data_writer_instance,
1027 pos_delete_writer_instance,
1028 eq_delete_writer_instance,
1029 vec![1],
1030 )?;
1031
1032 let insert_batch = create_test_batch_with_ops(
1034 vec![1, 2, 3],
1035 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
1036 vec![1, 1, 1],
1037 );
1038 delta_writer.write(insert_batch).await?;
1039
1040 let delete_batch =
1042 create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
1043 -1, -1,
1044 ]);
1045 delta_writer.write(delete_batch).await?;
1046
1047 let data_files = delta_writer.close().await?;
1048
1049 assert_eq!(data_files.len(), 2);
1051
1052 let data_file = data_files
1053 .iter()
1054 .find(|f| f.content == crate::spec::DataContentType::Data)
1055 .unwrap();
1056 let pos_delete_file = data_files
1057 .iter()
1058 .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
1059 .unwrap();
1060
1061 assert_eq!(data_file.record_count, 3);
1062 assert_eq!(pos_delete_file.record_count, 2);
1063
1064 let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
1066 let content = input_file.read().await?;
1067 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
1068 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
1069 assert_eq!(batches[0].num_rows(), 2);
1070
1071 Ok(())
1072 }
1073
1074 #[tokio::test]
1075 async fn test_delta_writer_equality_delete() -> Result<()> {
1076 let temp_dir = TempDir::new().unwrap();
1077 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1078 let schema = create_iceberg_schema();
1079
1080 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
1082 "{}/data",
1083 temp_dir.path().to_str().unwrap()
1084 ));
1085 let data_file_name_gen =
1086 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
1087 let data_parquet_writer =
1088 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
1089 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1090 data_parquet_writer,
1091 schema.clone(),
1092 file_io.clone(),
1093 data_location_gen,
1094 data_file_name_gen,
1095 );
1096 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
1097
1098 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
1099 &PositionDeleteWriterConfig::arrow_schema(),
1100 )?);
1101 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1102 "{}/pos_delete",
1103 temp_dir.path().to_str().unwrap()
1104 ));
1105 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
1106 "pos_delete".to_string(),
1107 None,
1108 DataFileFormat::Parquet,
1109 );
1110 let pos_delete_parquet_writer =
1111 ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
1112 let pos_delete_rolling_writer_builder =
1113 RollingFileWriterBuilder::new_with_default_file_size(
1114 pos_delete_parquet_writer,
1115 pos_delete_schema,
1116 file_io.clone(),
1117 pos_delete_location_gen,
1118 pos_delete_file_name_gen,
1119 );
1120 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
1121 pos_delete_rolling_writer_builder,
1122 PositionDeleteWriterConfig::new(None, 0, None),
1123 );
1124
1125 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
1126 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
1127 eq_delete_config.projected_arrow_schema_ref(),
1128 )?);
1129 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1130 "{}/eq_delete",
1131 temp_dir.path().to_str().unwrap()
1132 ));
1133 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1134 "eq_delete".to_string(),
1135 None,
1136 DataFileFormat::Parquet,
1137 );
1138 let eq_delete_parquet_writer =
1139 ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1140 let eq_delete_rolling_writer_builder =
1141 RollingFileWriterBuilder::new_with_default_file_size(
1142 eq_delete_parquet_writer,
1143 eq_delete_schema,
1144 file_io.clone(),
1145 eq_delete_location_gen,
1146 eq_delete_file_name_gen,
1147 );
1148 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1149 eq_delete_rolling_writer_builder,
1150 eq_delete_config,
1151 );
1152
1153 let data_writer_instance = data_writer.build(None).await?;
1154 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1155 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1156 let mut delta_writer = DeltaWriter::try_new(
1157 data_writer_instance,
1158 pos_delete_writer_instance,
1159 eq_delete_writer_instance,
1160 vec![1],
1161 )?;
1162
1163 let delete_batch =
1165 create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
1166 delta_writer.write(delete_batch).await?;
1167
1168 let data_files = delta_writer.close().await?;
1169
1170 assert_eq!(data_files.len(), 1);
1172 assert_eq!(
1173 data_files[0].content,
1174 crate::spec::DataContentType::EqualityDeletes
1175 );
1176 assert_eq!(data_files[0].record_count, 2);
1177
1178 Ok(())
1179 }
1180
1181 #[tokio::test]
1182 async fn test_delta_writer_invalid_op() -> Result<()> {
1183 let temp_dir = TempDir::new().unwrap();
1184 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1185 let schema = create_iceberg_schema();
1186
1187 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
1189 "{}/data",
1190 temp_dir.path().to_str().unwrap()
1191 ));
1192 let data_file_name_gen =
1193 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
1194 let data_parquet_writer =
1195 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
1196 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1197 data_parquet_writer,
1198 schema.clone(),
1199 file_io.clone(),
1200 data_location_gen,
1201 data_file_name_gen,
1202 );
1203 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
1204
1205 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
1206 &PositionDeleteWriterConfig::arrow_schema(),
1207 )?);
1208 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1209 "{}/pos_delete",
1210 temp_dir.path().to_str().unwrap()
1211 ));
1212 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
1213 "pos_delete".to_string(),
1214 None,
1215 DataFileFormat::Parquet,
1216 );
1217 let pos_delete_parquet_writer =
1218 ParquetWriterBuilder::new(WriterProperties::builder().build(), pos_delete_schema.clone());
1219 let pos_delete_rolling_writer_builder =
1220 RollingFileWriterBuilder::new_with_default_file_size(
1221 pos_delete_parquet_writer,
1222 pos_delete_schema,
1223 file_io.clone(),
1224 pos_delete_location_gen,
1225 pos_delete_file_name_gen,
1226 );
1227 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
1228 pos_delete_rolling_writer_builder,
1229 PositionDeleteWriterConfig::new(None, 0, None),
1230 );
1231
1232 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
1233 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
1234 eq_delete_config.projected_arrow_schema_ref(),
1235 )?);
1236 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
1237 "{}/eq_delete",
1238 temp_dir.path().to_str().unwrap()
1239 ));
1240 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
1241 "eq_delete".to_string(),
1242 None,
1243 DataFileFormat::Parquet,
1244 );
1245 let eq_delete_parquet_writer =
1246 ParquetWriterBuilder::new(WriterProperties::builder().build(), eq_delete_schema.clone());
1247 let eq_delete_rolling_writer_builder =
1248 RollingFileWriterBuilder::new_with_default_file_size(
1249 eq_delete_parquet_writer,
1250 eq_delete_schema,
1251 file_io.clone(),
1252 eq_delete_location_gen,
1253 eq_delete_file_name_gen,
1254 );
1255 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
1256 eq_delete_rolling_writer_builder,
1257 eq_delete_config,
1258 );
1259
1260 let data_writer_instance = data_writer.build(None).await?;
1261 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
1262 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
1263 let mut delta_writer = DeltaWriter::try_new(
1264 data_writer_instance,
1265 pos_delete_writer_instance,
1266 eq_delete_writer_instance,
1267 vec![1],
1268 )?;
1269
1270 let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
1272
1273 let result = delta_writer.write(batch).await;
1274 assert!(result.is_err());
1275 assert!(
1276 result
1277 .unwrap_err()
1278 .to_string()
1279 .contains("Ops column must be 1 (insert) or -1 (delete)")
1280 );
1281
1282 Ok(())
1283 }
1284 }
1285}