1use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16
17use crate::arrow::record_batch_projector::RecordBatchProjector;
18use crate::spec::{DataFile, PartitionKey};
19use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
20use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
21use crate::{Error, ErrorKind, Result};
22
23pub const DEFAULT_MAX_SEEN_ROWS: usize = 100_000;
26
27#[derive(Clone, Debug)]
29pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
30 data_writer_builder: DWB,
31 pos_delete_writer_builder: PDWB,
32 eq_delete_writer_builder: EDWB,
33 unique_cols: Vec<i32>,
34 max_seen_rows: usize,
35}
36
37impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
38 pub fn new(
40 data_writer_builder: DWB,
41 pos_delete_writer_builder: PDWB,
42 eq_delete_writer_builder: EDWB,
43 unique_cols: Vec<i32>,
44 ) -> Self {
45 Self {
46 data_writer_builder,
47 pos_delete_writer_builder,
48 eq_delete_writer_builder,
49 unique_cols,
50 max_seen_rows: DEFAULT_MAX_SEEN_ROWS,
51 }
52 }
53
54 pub fn with_max_seen_rows(mut self, max_seen_rows: usize) -> Self {
64 self.max_seen_rows = max_seen_rows;
65 self
66 }
67}
68
69#[async_trait::async_trait]
70impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
71where
72 DWB: IcebergWriterBuilder,
73 PDWB: IcebergWriterBuilder,
74 EDWB: IcebergWriterBuilder,
75 DWB::R: CurrentFileStatus,
76{
77 type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
78 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
79 let data_writer = self
80 .data_writer_builder
81 .build(partition_key.clone())
82 .await?;
83 let pos_delete_writer = self
84 .pos_delete_writer_builder
85 .build(partition_key.clone())
86 .await?;
87 let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
88 DeltaWriter::try_new(
89 data_writer,
90 pos_delete_writer,
91 eq_delete_writer,
92 self.unique_cols.clone(),
93 self.max_seen_rows,
94 )
95 }
96}
97
98pub struct Position {
100 row_index: i64,
101 file_path: String,
102}
103
104pub struct DeltaWriter<DW, PDW, EDW> {
106 pub data_writer: DW,
108 pub pos_delete_writer: PDW,
111 pub eq_delete_writer: EDW,
114 pub unique_cols: Vec<i32>,
116 pub seen_rows: HashMap<OwnedRow, Position>,
118 seen_rows_order: VecDeque<OwnedRow>,
120 max_seen_rows: usize,
122 pub(crate) projector: RecordBatchProjector,
124 pub(crate) row_convertor: RowConverter,
126}
127
128impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
129where
130 DW: IcebergWriter + CurrentFileStatus,
131 PDW: IcebergWriter,
132 EDW: IcebergWriter,
133{
134 fn try_new(
135 data_writer: DW,
136 pos_delete_writer: PDW,
137 eq_delete_writer: EDW,
138 unique_cols: Vec<i32>,
139 max_seen_rows: usize,
140 ) -> Result<Self> {
141 let projector = RecordBatchProjector::from_iceberg_schema(
142 data_writer.current_schema(),
143 &unique_cols,
144 )?;
145
146 let row_convertor = RowConverter::new(
147 projector
148 .projected_schema_ref()
149 .fields()
150 .iter()
151 .map(|f| SortField::new(f.data_type().clone()))
152 .collect(),
153 )?;
154
155 Ok(Self {
156 data_writer,
157 pos_delete_writer,
158 eq_delete_writer,
159 unique_cols,
160 seen_rows: HashMap::new(),
161 seen_rows_order: VecDeque::new(),
162 max_seen_rows,
163 projector,
164 row_convertor,
165 })
166 }
167
168 async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
169 let batch_num_rows = batch.num_rows();
170
171 self.data_writer.write(batch.clone()).await?;
176
177 if self.max_seen_rows == 0 {
179 return Ok(());
180 }
181
182 let rows = self.extract_unique_column_rows(&batch)?;
183
184 let file_path = self.data_writer.current_file_path();
186 let end_row_num = self.data_writer.current_row_num();
187 let start_row_index = end_row_num - batch_num_rows;
188
189 for (i, row) in rows.iter().enumerate() {
191 let owned_row = row.owned();
192 self.seen_rows.insert(owned_row.clone(), Position {
193 row_index: start_row_index as i64 + i as i64,
194 file_path: file_path.clone(),
195 });
196 self.seen_rows_order.push_back(owned_row);
197 }
198
199 self.evict_oldest_seen_rows();
201
202 Ok(())
203 }
204
205 fn evict_oldest_seen_rows(&mut self) {
208 while self.seen_rows.len() > self.max_seen_rows {
209 if let Some(old_row) = self.seen_rows_order.pop_front() {
210 self.seen_rows.remove(&old_row);
212 } else {
213 break;
216 }
217 }
218 }
219
220 async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
221 if self.max_seen_rows == 0 {
223 self.eq_delete_writer
224 .write(batch)
225 .await
226 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
227 return Ok(());
228 }
229
230 let rows = self.extract_unique_column_rows(&batch)?;
231 let mut file_array = vec![];
232 let mut row_index_array = vec![];
233 let mut needs_equality_delete = BooleanBuilder::new();
237
238 for row in rows.iter() {
239 if let Some(pos) = self.seen_rows.remove(&row.owned()) {
240 row_index_array.push(pos.row_index);
242 file_array.push(pos.file_path.clone());
243 needs_equality_delete.append_value(false);
244 } else {
245 needs_equality_delete.append_value(true);
247 }
248 }
249
250 let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
252 let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
253
254 let position_batch =
255 RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
256 file_array,
257 row_index_array,
258 ])?;
259
260 if position_batch.num_rows() > 0 {
261 self.pos_delete_writer
262 .write(position_batch)
263 .await
264 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
265 }
266
267 let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
269 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
270
271 if eq_batch.num_rows() > 0 {
272 self.eq_delete_writer
273 .write(eq_batch)
274 .await
275 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
276 }
277
278 Ok(())
279 }
280
281 fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
282 self.row_convertor
283 .convert_columns(&self.projector.project_column(batch.columns())?)
284 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
285 }
286}
287
288#[async_trait::async_trait]
289impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
290where
291 DW: IcebergWriter + CurrentFileStatus,
292 PDW: IcebergWriter,
293 EDW: IcebergWriter,
294{
295 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
296 let ops = batch
298 .column(batch.num_columns() - 1)
299 .as_any()
300 .downcast_ref::<Int32Array>()
301 .ok_or(Error::new(
302 ErrorKind::Unexpected,
303 "Failed to downcast ops column",
304 ))?;
305
306 let partition =
307 partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
308 Error::new(
309 ErrorKind::Unexpected,
310 format!("Failed to partition batch: {e}"),
311 )
312 })?;
313
314 for range in partition.ranges() {
315 let batch = batch
316 .project(&(0..batch.num_columns() - 1).collect_vec())
317 .map_err(|e| {
318 Error::new(
319 ErrorKind::Unexpected,
320 format!("Failed to project batch columns: {e}"),
321 )
322 })?
323 .slice(range.start, range.end - range.start);
324 match ops.value(range.start) {
325 1 => self.insert(batch).await?,
326 -1 => self.delete(batch).await?,
327 op => {
328 return Err(Error::new(
329 ErrorKind::Unexpected,
330 format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
331 ));
332 }
333 }
334 }
335
336 Ok(())
337 }
338
339 async fn close(&mut self) -> Result<Vec<DataFile>> {
340 let data_files = self.data_writer.close().await?;
341 let pos_delete_files = self.pos_delete_writer.close().await?;
342 let eq_delete_files = self.eq_delete_writer.close().await?;
343
344 Ok(data_files
345 .into_iter()
346 .chain(pos_delete_files)
347 .chain(eq_delete_files)
348 .collect())
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 mod delta_writer_tests {
357 use std::collections::HashMap;
358
359 use arrow_array::{Int32Array, RecordBatch, StringArray};
360 use arrow_schema::{DataType, Field, Schema};
361 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
362 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
363 use parquet::file::properties::WriterProperties;
364 use tempfile::TempDir;
365
366 use super::*;
367 use crate::arrow::arrow_schema_to_schema;
368 use crate::io::FileIOBuilder;
369 use crate::spec::{
370 DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
371 };
372 use crate::writer::IcebergWriterBuilder;
373 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
374 use crate::writer::base_writer::equality_delete_writer::{
375 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
376 };
377 use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
378 use crate::writer::file_writer::ParquetWriterBuilder;
379 use crate::writer::file_writer::location_generator::{
380 DefaultFileNameGenerator, DefaultLocationGenerator,
381 };
382 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
383
384 fn create_iceberg_schema() -> Arc<IcebergSchema> {
385 Arc::new(
386 IcebergSchema::builder()
387 .with_schema_id(0)
388 .with_fields(vec![
389 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
390 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
391 .into(),
392 ])
393 .build()
394 .unwrap(),
395 )
396 }
397
398 fn create_test_batch_with_ops(
399 ids: Vec<i32>,
400 names: Vec<Option<&str>>,
401 ops: Vec<i32>,
402 ) -> RecordBatch {
403 let schema = Arc::new(Schema::new(vec![
404 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
405 PARQUET_FIELD_ID_META_KEY.to_string(),
406 "1".to_string(),
407 )])),
408 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
409 PARQUET_FIELD_ID_META_KEY.to_string(),
410 "2".to_string(),
411 )])),
412 Field::new("op", DataType::Int32, false),
413 ]));
414
415 let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
416 let name_array: ArrayRef = Arc::new(StringArray::from(names));
417 let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
418
419 RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
420 }
421
422 #[tokio::test]
423 async fn test_delta_writer_insert_only() -> Result<()> {
424 let temp_dir = TempDir::new().unwrap();
425 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
426 let schema = create_iceberg_schema();
427
428 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
430 "{}/data",
431 temp_dir.path().to_str().unwrap()
432 ));
433 let data_file_name_gen =
434 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
435 let data_parquet_writer =
436 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
437 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
438 data_parquet_writer,
439 schema.clone(),
440 file_io.clone(),
441 data_location_gen,
442 data_file_name_gen,
443 );
444 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
445
446 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
448 &PositionDeleteWriterConfig::arrow_schema(),
449 )?);
450 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
451 "{}/pos_delete",
452 temp_dir.path().to_str().unwrap()
453 ));
454 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
455 "pos_delete".to_string(),
456 None,
457 DataFileFormat::Parquet,
458 );
459 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
460 WriterProperties::builder().build(),
461 pos_delete_schema.clone(),
462 );
463 let pos_delete_rolling_writer_builder =
464 RollingFileWriterBuilder::new_with_default_file_size(
465 pos_delete_parquet_writer,
466 pos_delete_schema,
467 file_io.clone(),
468 pos_delete_location_gen,
469 pos_delete_file_name_gen,
470 );
471 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
472 pos_delete_rolling_writer_builder,
473 PositionDeleteWriterConfig::new(None, 0, None),
474 );
475
476 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
478 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
479 eq_delete_config.projected_arrow_schema_ref(),
480 )?);
481 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
482 "{}/eq_delete",
483 temp_dir.path().to_str().unwrap()
484 ));
485 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
486 "eq_delete".to_string(),
487 None,
488 DataFileFormat::Parquet,
489 );
490 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
491 WriterProperties::builder().build(),
492 eq_delete_schema.clone(),
493 );
494 let eq_delete_rolling_writer_builder =
495 RollingFileWriterBuilder::new_with_default_file_size(
496 eq_delete_parquet_writer,
497 eq_delete_schema,
498 file_io.clone(),
499 eq_delete_location_gen,
500 eq_delete_file_name_gen,
501 );
502 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
503 eq_delete_rolling_writer_builder,
504 eq_delete_config,
505 );
506
507 let data_writer_instance = data_writer.build(None).await?;
509 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
510 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
511 let mut delta_writer = DeltaWriter::try_new(
512 data_writer_instance,
513 pos_delete_writer_instance,
514 eq_delete_writer_instance,
515 vec![1], DEFAULT_MAX_SEEN_ROWS,
517 )?;
518
519 let batch = create_test_batch_with_ops(
521 vec![1, 2, 3],
522 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
523 vec![1, 1, 1], );
525
526 delta_writer.write(batch).await?;
527 let data_files = delta_writer.close().await?;
528
529 assert_eq!(data_files.len(), 1);
531 assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
532 assert_eq!(data_files[0].record_count, 3);
533
534 let input_file = file_io.new_input(data_files[0].file_path.clone())?;
536 let content = input_file.read().await?;
537 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
538 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
539 assert_eq!(batches.len(), 1);
540 assert_eq!(batches[0].num_rows(), 3);
541
542 Ok(())
543 }
544
545 #[tokio::test]
546 async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
547 let temp_dir = TempDir::new().unwrap();
548 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
549 let schema = create_iceberg_schema();
550
551 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
553 "{}/data",
554 temp_dir.path().to_str().unwrap()
555 ));
556 let data_file_name_gen =
557 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
558 let data_parquet_writer =
559 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
560 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
561 data_parquet_writer,
562 schema.clone(),
563 file_io.clone(),
564 data_location_gen,
565 data_file_name_gen,
566 );
567 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
568
569 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
570 &PositionDeleteWriterConfig::arrow_schema(),
571 )?);
572 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
573 "{}/pos_delete",
574 temp_dir.path().to_str().unwrap()
575 ));
576 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
577 "pos_delete".to_string(),
578 None,
579 DataFileFormat::Parquet,
580 );
581 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
582 WriterProperties::builder().build(),
583 pos_delete_schema.clone(),
584 );
585 let pos_delete_rolling_writer_builder =
586 RollingFileWriterBuilder::new_with_default_file_size(
587 pos_delete_parquet_writer,
588 pos_delete_schema,
589 file_io.clone(),
590 pos_delete_location_gen,
591 pos_delete_file_name_gen,
592 );
593 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
594 pos_delete_rolling_writer_builder,
595 PositionDeleteWriterConfig::new(None, 0, None),
596 );
597
598 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
599 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
600 eq_delete_config.projected_arrow_schema_ref(),
601 )?);
602 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
603 "{}/eq_delete",
604 temp_dir.path().to_str().unwrap()
605 ));
606 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
607 "eq_delete".to_string(),
608 None,
609 DataFileFormat::Parquet,
610 );
611 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
612 WriterProperties::builder().build(),
613 eq_delete_schema.clone(),
614 );
615 let eq_delete_rolling_writer_builder =
616 RollingFileWriterBuilder::new_with_default_file_size(
617 eq_delete_parquet_writer,
618 eq_delete_schema,
619 file_io.clone(),
620 eq_delete_location_gen,
621 eq_delete_file_name_gen,
622 );
623 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
624 eq_delete_rolling_writer_builder,
625 eq_delete_config,
626 );
627
628 let data_writer_instance = data_writer.build(None).await?;
629 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
630 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
631 let mut delta_writer = DeltaWriter::try_new(
632 data_writer_instance,
633 pos_delete_writer_instance,
634 eq_delete_writer_instance,
635 vec![1],
636 DEFAULT_MAX_SEEN_ROWS,
637 )?;
638
639 let insert_batch = create_test_batch_with_ops(
641 vec![1, 2, 3],
642 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
643 vec![1, 1, 1],
644 );
645 delta_writer.write(insert_batch).await?;
646
647 let delete_batch =
649 create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
650 -1, -1,
651 ]);
652 delta_writer.write(delete_batch).await?;
653
654 let data_files = delta_writer.close().await?;
655
656 assert_eq!(data_files.len(), 2);
658
659 let data_file = data_files
660 .iter()
661 .find(|f| f.content == crate::spec::DataContentType::Data)
662 .unwrap();
663 let pos_delete_file = data_files
664 .iter()
665 .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
666 .unwrap();
667
668 assert_eq!(data_file.record_count, 3);
669 assert_eq!(pos_delete_file.record_count, 2);
670
671 let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
673 let content = input_file.read().await?;
674 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
675 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
676 assert_eq!(batches[0].num_rows(), 2);
677
678 Ok(())
679 }
680
681 #[tokio::test]
682 async fn test_delta_writer_equality_delete() -> Result<()> {
683 let temp_dir = TempDir::new().unwrap();
684 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
685 let schema = create_iceberg_schema();
686
687 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
689 "{}/data",
690 temp_dir.path().to_str().unwrap()
691 ));
692 let data_file_name_gen =
693 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
694 let data_parquet_writer =
695 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
696 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
697 data_parquet_writer,
698 schema.clone(),
699 file_io.clone(),
700 data_location_gen,
701 data_file_name_gen,
702 );
703 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
704
705 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
706 &PositionDeleteWriterConfig::arrow_schema(),
707 )?);
708 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
709 "{}/pos_delete",
710 temp_dir.path().to_str().unwrap()
711 ));
712 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
713 "pos_delete".to_string(),
714 None,
715 DataFileFormat::Parquet,
716 );
717 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
718 WriterProperties::builder().build(),
719 pos_delete_schema.clone(),
720 );
721 let pos_delete_rolling_writer_builder =
722 RollingFileWriterBuilder::new_with_default_file_size(
723 pos_delete_parquet_writer,
724 pos_delete_schema,
725 file_io.clone(),
726 pos_delete_location_gen,
727 pos_delete_file_name_gen,
728 );
729 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
730 pos_delete_rolling_writer_builder,
731 PositionDeleteWriterConfig::new(None, 0, None),
732 );
733
734 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
735 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
736 eq_delete_config.projected_arrow_schema_ref(),
737 )?);
738 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
739 "{}/eq_delete",
740 temp_dir.path().to_str().unwrap()
741 ));
742 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
743 "eq_delete".to_string(),
744 None,
745 DataFileFormat::Parquet,
746 );
747 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
748 WriterProperties::builder().build(),
749 eq_delete_schema.clone(),
750 );
751 let eq_delete_rolling_writer_builder =
752 RollingFileWriterBuilder::new_with_default_file_size(
753 eq_delete_parquet_writer,
754 eq_delete_schema,
755 file_io.clone(),
756 eq_delete_location_gen,
757 eq_delete_file_name_gen,
758 );
759 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
760 eq_delete_rolling_writer_builder,
761 eq_delete_config,
762 );
763
764 let data_writer_instance = data_writer.build(None).await?;
765 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
766 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
767 let mut delta_writer = DeltaWriter::try_new(
768 data_writer_instance,
769 pos_delete_writer_instance,
770 eq_delete_writer_instance,
771 vec![1],
772 DEFAULT_MAX_SEEN_ROWS,
773 )?;
774
775 let delete_batch =
777 create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
778 delta_writer.write(delete_batch).await?;
779
780 let data_files = delta_writer.close().await?;
781
782 assert_eq!(data_files.len(), 1);
784 assert_eq!(
785 data_files[0].content,
786 crate::spec::DataContentType::EqualityDeletes
787 );
788 assert_eq!(data_files[0].record_count, 2);
789
790 Ok(())
791 }
792
793 #[tokio::test]
794 async fn test_delta_writer_invalid_op() -> Result<()> {
795 let temp_dir = TempDir::new().unwrap();
796 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
797 let schema = create_iceberg_schema();
798
799 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
801 "{}/data",
802 temp_dir.path().to_str().unwrap()
803 ));
804 let data_file_name_gen =
805 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
806 let data_parquet_writer =
807 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
808 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
809 data_parquet_writer,
810 schema.clone(),
811 file_io.clone(),
812 data_location_gen,
813 data_file_name_gen,
814 );
815 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
816
817 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
818 &PositionDeleteWriterConfig::arrow_schema(),
819 )?);
820 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
821 "{}/pos_delete",
822 temp_dir.path().to_str().unwrap()
823 ));
824 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
825 "pos_delete".to_string(),
826 None,
827 DataFileFormat::Parquet,
828 );
829 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
830 WriterProperties::builder().build(),
831 pos_delete_schema.clone(),
832 );
833 let pos_delete_rolling_writer_builder =
834 RollingFileWriterBuilder::new_with_default_file_size(
835 pos_delete_parquet_writer,
836 pos_delete_schema,
837 file_io.clone(),
838 pos_delete_location_gen,
839 pos_delete_file_name_gen,
840 );
841 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
842 pos_delete_rolling_writer_builder,
843 PositionDeleteWriterConfig::new(None, 0, None),
844 );
845
846 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
847 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
848 eq_delete_config.projected_arrow_schema_ref(),
849 )?);
850 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
851 "{}/eq_delete",
852 temp_dir.path().to_str().unwrap()
853 ));
854 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
855 "eq_delete".to_string(),
856 None,
857 DataFileFormat::Parquet,
858 );
859 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
860 WriterProperties::builder().build(),
861 eq_delete_schema.clone(),
862 );
863 let eq_delete_rolling_writer_builder =
864 RollingFileWriterBuilder::new_with_default_file_size(
865 eq_delete_parquet_writer,
866 eq_delete_schema,
867 file_io.clone(),
868 eq_delete_location_gen,
869 eq_delete_file_name_gen,
870 );
871 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
872 eq_delete_rolling_writer_builder,
873 eq_delete_config,
874 );
875
876 let data_writer_instance = data_writer.build(None).await?;
877 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
878 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
879 let mut delta_writer = DeltaWriter::try_new(
880 data_writer_instance,
881 pos_delete_writer_instance,
882 eq_delete_writer_instance,
883 vec![1],
884 DEFAULT_MAX_SEEN_ROWS,
885 )?;
886
887 let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
889
890 let result = delta_writer.write(batch).await;
891 assert!(result.is_err());
892 assert!(
893 result
894 .unwrap_err()
895 .to_string()
896 .contains("Ops column must be 1 (insert) or -1 (delete)")
897 );
898
899 Ok(())
900 }
901 }
902}