1use std::collections::{HashMap, VecDeque};
25use std::sync::Arc;
26
27use arrow_array::builder::BooleanBuilder;
28use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
29use arrow_ord::partition::partition;
30use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
31use arrow_select::filter::filter_record_batch;
32use itertools::Itertools;
33
34use crate::arrow::record_batch_projector::RecordBatchProjector;
35use crate::spec::{DataFile, PartitionKey};
36use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
37use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
38use crate::{Error, ErrorKind, Result};
39
40pub const DEFAULT_MAX_SEEN_ROWS: usize = 100_000;
43
44#[derive(Clone, Debug)]
46pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
47 data_writer_builder: DWB,
48 pos_delete_writer_builder: PDWB,
49 eq_delete_writer_builder: EDWB,
50 unique_cols: Vec<i32>,
51 max_seen_rows: usize,
52}
53
54impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
55 pub fn new(
57 data_writer_builder: DWB,
58 pos_delete_writer_builder: PDWB,
59 eq_delete_writer_builder: EDWB,
60 unique_cols: Vec<i32>,
61 ) -> Self {
62 Self {
63 data_writer_builder,
64 pos_delete_writer_builder,
65 eq_delete_writer_builder,
66 unique_cols,
67 max_seen_rows: DEFAULT_MAX_SEEN_ROWS,
68 }
69 }
70
71 pub fn with_max_seen_rows(mut self, max_seen_rows: usize) -> Self {
81 self.max_seen_rows = max_seen_rows;
82 self
83 }
84}
85
86#[async_trait::async_trait]
87impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
88where
89 DWB: IcebergWriterBuilder,
90 PDWB: IcebergWriterBuilder,
91 EDWB: IcebergWriterBuilder,
92 DWB::R: CurrentFileStatus,
93{
94 type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
95 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
96 let data_writer = self
97 .data_writer_builder
98 .build(partition_key.clone())
99 .await?;
100 let pos_delete_writer = self
101 .pos_delete_writer_builder
102 .build(partition_key.clone())
103 .await?;
104 let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
105 DeltaWriter::try_new(
106 data_writer,
107 pos_delete_writer,
108 eq_delete_writer,
109 self.unique_cols.clone(),
110 self.max_seen_rows,
111 )
112 }
113}
114
115pub struct Position {
117 row_index: i64,
118 file_path: String,
119}
120
121pub struct DeltaWriter<DW, PDW, EDW> {
123 pub data_writer: DW,
125 pub pos_delete_writer: PDW,
128 pub eq_delete_writer: EDW,
131 pub unique_cols: Vec<i32>,
133 pub seen_rows: HashMap<OwnedRow, Position>,
135 seen_rows_order: VecDeque<OwnedRow>,
137 max_seen_rows: usize,
139 pub(crate) projector: RecordBatchProjector,
141 pub(crate) row_convertor: RowConverter,
143}
144
145impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
146where
147 DW: IcebergWriter + CurrentFileStatus,
148 PDW: IcebergWriter,
149 EDW: IcebergWriter,
150{
151 fn try_new(
152 data_writer: DW,
153 pos_delete_writer: PDW,
154 eq_delete_writer: EDW,
155 unique_cols: Vec<i32>,
156 max_seen_rows: usize,
157 ) -> Result<Self> {
158 let projector =
159 RecordBatchProjector::from_iceberg_schema(data_writer.current_schema(), &unique_cols)?;
160
161 let row_convertor = RowConverter::new(
162 projector
163 .projected_schema_ref()
164 .fields()
165 .iter()
166 .map(|f| SortField::new(f.data_type().clone()))
167 .collect(),
168 )?;
169
170 Ok(Self {
171 data_writer,
172 pos_delete_writer,
173 eq_delete_writer,
174 unique_cols,
175 seen_rows: HashMap::new(),
176 seen_rows_order: VecDeque::new(),
177 max_seen_rows,
178 projector,
179 row_convertor,
180 })
181 }
182
183 async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
184 let batch_num_rows = batch.num_rows();
185
186 self.data_writer.write(batch.clone()).await?;
191
192 if self.max_seen_rows == 0 {
194 return Ok(());
195 }
196
197 let rows = self.extract_unique_column_rows(&batch)?;
198
199 let file_path = self.data_writer.current_file_path();
201 let end_row_num = self.data_writer.current_row_num();
202 let start_row_index = end_row_num - batch_num_rows;
203
204 for (i, row) in rows.iter().enumerate() {
206 let owned_row = row.owned();
207 self.seen_rows.insert(owned_row.clone(), Position {
208 row_index: start_row_index as i64 + i as i64,
209 file_path: file_path.clone(),
210 });
211 self.seen_rows_order.push_back(owned_row);
212 }
213
214 self.evict_oldest_seen_rows();
216
217 Ok(())
218 }
219
220 fn evict_oldest_seen_rows(&mut self) {
223 while self.seen_rows.len() > self.max_seen_rows {
224 if let Some(old_row) = self.seen_rows_order.pop_front() {
225 self.seen_rows.remove(&old_row);
227 } else {
228 break;
231 }
232 }
233 }
234
235 async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
236 if self.max_seen_rows == 0 {
238 self.eq_delete_writer
239 .write(batch)
240 .await
241 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
242 return Ok(());
243 }
244
245 let rows = self.extract_unique_column_rows(&batch)?;
246 let mut file_array = vec![];
247 let mut row_index_array = vec![];
248 let mut needs_equality_delete = BooleanBuilder::new();
252
253 for row in rows.iter() {
254 if let Some(pos) = self.seen_rows.remove(&row.owned()) {
255 row_index_array.push(pos.row_index);
257 file_array.push(pos.file_path.clone());
258 needs_equality_delete.append_value(false);
259 } else {
260 needs_equality_delete.append_value(true);
262 }
263 }
264
265 let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
267 let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
268
269 let position_batch =
270 RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
271 file_array,
272 row_index_array,
273 ])?;
274
275 if position_batch.num_rows() > 0 {
276 self.pos_delete_writer
277 .write(position_batch)
278 .await
279 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
280 }
281
282 let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
284 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
285
286 if eq_batch.num_rows() > 0 {
287 self.eq_delete_writer
288 .write(eq_batch)
289 .await
290 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
291 }
292
293 Ok(())
294 }
295
296 fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
297 self.row_convertor
298 .convert_columns(&self.projector.project_column(batch.columns())?)
299 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
300 }
301}
302
303#[async_trait::async_trait]
304impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
305where
306 DW: IcebergWriter + CurrentFileStatus,
307 PDW: IcebergWriter,
308 EDW: IcebergWriter,
309{
310 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
311 let ops = batch
313 .column(batch.num_columns() - 1)
314 .as_any()
315 .downcast_ref::<Int32Array>()
316 .ok_or(Error::new(
317 ErrorKind::Unexpected,
318 "Failed to downcast ops column",
319 ))?;
320
321 let partition =
322 partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
323 Error::new(
324 ErrorKind::Unexpected,
325 format!("Failed to partition batch: {e}"),
326 )
327 })?;
328
329 for range in partition.ranges() {
330 let batch = batch
331 .project(&(0..batch.num_columns() - 1).collect_vec())
332 .map_err(|e| {
333 Error::new(
334 ErrorKind::Unexpected,
335 format!("Failed to project batch columns: {e}"),
336 )
337 })?
338 .slice(range.start, range.end - range.start);
339 match ops.value(range.start) {
340 1 => self.insert(batch).await?,
341 -1 => self.delete(batch).await?,
342 op => {
343 return Err(Error::new(
344 ErrorKind::Unexpected,
345 format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
346 ));
347 }
348 }
349 }
350
351 Ok(())
352 }
353
354 async fn close(&mut self) -> Result<Vec<DataFile>> {
355 let data_files = self.data_writer.close().await?;
356 let pos_delete_files = self.pos_delete_writer.close().await?;
357 let eq_delete_files = self.eq_delete_writer.close().await?;
358
359 Ok(data_files
360 .into_iter()
361 .chain(pos_delete_files)
362 .chain(eq_delete_files)
363 .collect())
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 mod delta_writer_tests {
372 use std::collections::HashMap;
373
374 use arrow_array::{Int32Array, RecordBatch, StringArray};
375 use arrow_schema::{DataType, Field, Schema};
376 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
377 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
378 use parquet::file::properties::WriterProperties;
379 use tempfile::TempDir;
380
381 use super::*;
382 use crate::arrow::arrow_schema_to_schema;
383 use crate::io::FileIO;
384 use crate::spec::{
385 DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
386 };
387 use crate::writer::IcebergWriterBuilder;
388 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
389 use crate::writer::base_writer::equality_delete_writer::{
390 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
391 };
392 use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
393 use crate::writer::file_writer::ParquetWriterBuilder;
394 use crate::writer::file_writer::location_generator::{
395 DefaultFileNameGenerator, DefaultLocationGenerator,
396 };
397 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
398
399 fn create_iceberg_schema() -> Arc<IcebergSchema> {
400 Arc::new(
401 IcebergSchema::builder()
402 .with_schema_id(0)
403 .with_fields(vec![
404 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
405 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
406 .into(),
407 ])
408 .build()
409 .unwrap(),
410 )
411 }
412
413 fn create_test_batch_with_ops(
414 ids: Vec<i32>,
415 names: Vec<Option<&str>>,
416 ops: Vec<i32>,
417 ) -> RecordBatch {
418 let schema = Arc::new(Schema::new(vec![
419 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
420 PARQUET_FIELD_ID_META_KEY.to_string(),
421 "1".to_string(),
422 )])),
423 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
424 PARQUET_FIELD_ID_META_KEY.to_string(),
425 "2".to_string(),
426 )])),
427 Field::new("op", DataType::Int32, false),
428 ]));
429
430 let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
431 let name_array: ArrayRef = Arc::new(StringArray::from(names));
432 let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
433
434 RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
435 }
436
437 #[tokio::test]
438 async fn test_delta_writer_insert_only() -> Result<()> {
439 let temp_dir = TempDir::new().unwrap();
440 let file_io = FileIO::new_with_fs();
441 let schema = create_iceberg_schema();
442
443 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
445 "{}/data",
446 temp_dir.path().to_str().unwrap()
447 ));
448 let data_file_name_gen =
449 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
450 let data_parquet_writer =
451 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
452 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
453 data_parquet_writer,
454 schema.clone(),
455 file_io.clone(),
456 data_location_gen,
457 data_file_name_gen,
458 );
459 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
460
461 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
463 &PositionDeleteWriterConfig::arrow_schema(),
464 )?);
465 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
466 "{}/pos_delete",
467 temp_dir.path().to_str().unwrap()
468 ));
469 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
470 "pos_delete".to_string(),
471 None,
472 DataFileFormat::Parquet,
473 );
474 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
475 WriterProperties::builder().build(),
476 pos_delete_schema.clone(),
477 );
478 let pos_delete_rolling_writer_builder =
479 RollingFileWriterBuilder::new_with_default_file_size(
480 pos_delete_parquet_writer,
481 pos_delete_schema,
482 file_io.clone(),
483 pos_delete_location_gen,
484 pos_delete_file_name_gen,
485 );
486 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
487 pos_delete_rolling_writer_builder,
488 PositionDeleteWriterConfig::new(None, 0, None),
489 );
490
491 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
493 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
494 eq_delete_config.projected_arrow_schema_ref(),
495 )?);
496 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
497 "{}/eq_delete",
498 temp_dir.path().to_str().unwrap()
499 ));
500 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
501 "eq_delete".to_string(),
502 None,
503 DataFileFormat::Parquet,
504 );
505 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
506 WriterProperties::builder().build(),
507 eq_delete_schema.clone(),
508 );
509 let eq_delete_rolling_writer_builder =
510 RollingFileWriterBuilder::new_with_default_file_size(
511 eq_delete_parquet_writer,
512 eq_delete_schema,
513 file_io.clone(),
514 eq_delete_location_gen,
515 eq_delete_file_name_gen,
516 );
517 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
518 eq_delete_rolling_writer_builder,
519 eq_delete_config,
520 );
521
522 let data_writer_instance = data_writer.build(None).await?;
524 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
525 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
526 let mut delta_writer = DeltaWriter::try_new(
527 data_writer_instance,
528 pos_delete_writer_instance,
529 eq_delete_writer_instance,
530 vec![1], DEFAULT_MAX_SEEN_ROWS,
532 )?;
533
534 let batch = create_test_batch_with_ops(
536 vec![1, 2, 3],
537 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
538 vec![1, 1, 1], );
540
541 delta_writer.write(batch).await?;
542 let data_files = delta_writer.close().await?;
543
544 assert_eq!(data_files.len(), 1);
546 assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
547 assert_eq!(data_files[0].record_count, 3);
548
549 let input_file = file_io.new_input(data_files[0].file_path.clone())?;
551 let content = input_file.read().await?;
552 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
553 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
554 assert_eq!(batches.len(), 1);
555 assert_eq!(batches[0].num_rows(), 3);
556
557 Ok(())
558 }
559
560 #[tokio::test]
561 async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
562 let temp_dir = TempDir::new().unwrap();
563 let file_io = FileIO::new_with_fs();
564 let schema = create_iceberg_schema();
565
566 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
568 "{}/data",
569 temp_dir.path().to_str().unwrap()
570 ));
571 let data_file_name_gen =
572 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
573 let data_parquet_writer =
574 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
575 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
576 data_parquet_writer,
577 schema.clone(),
578 file_io.clone(),
579 data_location_gen,
580 data_file_name_gen,
581 );
582 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
583
584 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
585 &PositionDeleteWriterConfig::arrow_schema(),
586 )?);
587 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
588 "{}/pos_delete",
589 temp_dir.path().to_str().unwrap()
590 ));
591 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
592 "pos_delete".to_string(),
593 None,
594 DataFileFormat::Parquet,
595 );
596 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
597 WriterProperties::builder().build(),
598 pos_delete_schema.clone(),
599 );
600 let pos_delete_rolling_writer_builder =
601 RollingFileWriterBuilder::new_with_default_file_size(
602 pos_delete_parquet_writer,
603 pos_delete_schema,
604 file_io.clone(),
605 pos_delete_location_gen,
606 pos_delete_file_name_gen,
607 );
608 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
609 pos_delete_rolling_writer_builder,
610 PositionDeleteWriterConfig::new(None, 0, None),
611 );
612
613 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
614 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
615 eq_delete_config.projected_arrow_schema_ref(),
616 )?);
617 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
618 "{}/eq_delete",
619 temp_dir.path().to_str().unwrap()
620 ));
621 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
622 "eq_delete".to_string(),
623 None,
624 DataFileFormat::Parquet,
625 );
626 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
627 WriterProperties::builder().build(),
628 eq_delete_schema.clone(),
629 );
630 let eq_delete_rolling_writer_builder =
631 RollingFileWriterBuilder::new_with_default_file_size(
632 eq_delete_parquet_writer,
633 eq_delete_schema,
634 file_io.clone(),
635 eq_delete_location_gen,
636 eq_delete_file_name_gen,
637 );
638 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
639 eq_delete_rolling_writer_builder,
640 eq_delete_config,
641 );
642
643 let data_writer_instance = data_writer.build(None).await?;
644 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
645 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
646 let mut delta_writer = DeltaWriter::try_new(
647 data_writer_instance,
648 pos_delete_writer_instance,
649 eq_delete_writer_instance,
650 vec![1],
651 DEFAULT_MAX_SEEN_ROWS,
652 )?;
653
654 let insert_batch = create_test_batch_with_ops(
656 vec![1, 2, 3],
657 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
658 vec![1, 1, 1],
659 );
660 delta_writer.write(insert_batch).await?;
661
662 let delete_batch =
664 create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
665 -1, -1,
666 ]);
667 delta_writer.write(delete_batch).await?;
668
669 let data_files = delta_writer.close().await?;
670
671 assert_eq!(data_files.len(), 2);
673
674 let data_file = data_files
675 .iter()
676 .find(|f| f.content == crate::spec::DataContentType::Data)
677 .unwrap();
678 let pos_delete_file = data_files
679 .iter()
680 .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
681 .unwrap();
682
683 assert_eq!(data_file.record_count, 3);
684 assert_eq!(pos_delete_file.record_count, 2);
685
686 let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
688 let content = input_file.read().await?;
689 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
690 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
691 assert_eq!(batches[0].num_rows(), 2);
692
693 Ok(())
694 }
695
696 #[tokio::test]
697 async fn test_delta_writer_equality_delete() -> Result<()> {
698 let temp_dir = TempDir::new().unwrap();
699 let file_io = FileIO::new_with_fs();
700 let schema = create_iceberg_schema();
701
702 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
704 "{}/data",
705 temp_dir.path().to_str().unwrap()
706 ));
707 let data_file_name_gen =
708 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
709 let data_parquet_writer =
710 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
711 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
712 data_parquet_writer,
713 schema.clone(),
714 file_io.clone(),
715 data_location_gen,
716 data_file_name_gen,
717 );
718 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
719
720 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
721 &PositionDeleteWriterConfig::arrow_schema(),
722 )?);
723 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
724 "{}/pos_delete",
725 temp_dir.path().to_str().unwrap()
726 ));
727 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
728 "pos_delete".to_string(),
729 None,
730 DataFileFormat::Parquet,
731 );
732 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
733 WriterProperties::builder().build(),
734 pos_delete_schema.clone(),
735 );
736 let pos_delete_rolling_writer_builder =
737 RollingFileWriterBuilder::new_with_default_file_size(
738 pos_delete_parquet_writer,
739 pos_delete_schema,
740 file_io.clone(),
741 pos_delete_location_gen,
742 pos_delete_file_name_gen,
743 );
744 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
745 pos_delete_rolling_writer_builder,
746 PositionDeleteWriterConfig::new(None, 0, None),
747 );
748
749 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
750 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
751 eq_delete_config.projected_arrow_schema_ref(),
752 )?);
753 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
754 "{}/eq_delete",
755 temp_dir.path().to_str().unwrap()
756 ));
757 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
758 "eq_delete".to_string(),
759 None,
760 DataFileFormat::Parquet,
761 );
762 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
763 WriterProperties::builder().build(),
764 eq_delete_schema.clone(),
765 );
766 let eq_delete_rolling_writer_builder =
767 RollingFileWriterBuilder::new_with_default_file_size(
768 eq_delete_parquet_writer,
769 eq_delete_schema,
770 file_io.clone(),
771 eq_delete_location_gen,
772 eq_delete_file_name_gen,
773 );
774 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
775 eq_delete_rolling_writer_builder,
776 eq_delete_config,
777 );
778
779 let data_writer_instance = data_writer.build(None).await?;
780 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
781 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
782 let mut delta_writer = DeltaWriter::try_new(
783 data_writer_instance,
784 pos_delete_writer_instance,
785 eq_delete_writer_instance,
786 vec![1],
787 DEFAULT_MAX_SEEN_ROWS,
788 )?;
789
790 let delete_batch =
792 create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
793 delta_writer.write(delete_batch).await?;
794
795 let data_files = delta_writer.close().await?;
796
797 assert_eq!(data_files.len(), 1);
799 assert_eq!(
800 data_files[0].content,
801 crate::spec::DataContentType::EqualityDeletes
802 );
803 assert_eq!(data_files[0].record_count, 2);
804
805 Ok(())
806 }
807
808 #[tokio::test]
809 async fn test_delta_writer_invalid_op() -> Result<()> {
810 let temp_dir = TempDir::new().unwrap();
811 let file_io = FileIO::new_with_fs();
812 let schema = create_iceberg_schema();
813
814 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
816 "{}/data",
817 temp_dir.path().to_str().unwrap()
818 ));
819 let data_file_name_gen =
820 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
821 let data_parquet_writer =
822 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
823 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
824 data_parquet_writer,
825 schema.clone(),
826 file_io.clone(),
827 data_location_gen,
828 data_file_name_gen,
829 );
830 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
831
832 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
833 &PositionDeleteWriterConfig::arrow_schema(),
834 )?);
835 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
836 "{}/pos_delete",
837 temp_dir.path().to_str().unwrap()
838 ));
839 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
840 "pos_delete".to_string(),
841 None,
842 DataFileFormat::Parquet,
843 );
844 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
845 WriterProperties::builder().build(),
846 pos_delete_schema.clone(),
847 );
848 let pos_delete_rolling_writer_builder =
849 RollingFileWriterBuilder::new_with_default_file_size(
850 pos_delete_parquet_writer,
851 pos_delete_schema,
852 file_io.clone(),
853 pos_delete_location_gen,
854 pos_delete_file_name_gen,
855 );
856 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
857 pos_delete_rolling_writer_builder,
858 PositionDeleteWriterConfig::new(None, 0, None),
859 );
860
861 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
862 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
863 eq_delete_config.projected_arrow_schema_ref(),
864 )?);
865 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
866 "{}/eq_delete",
867 temp_dir.path().to_str().unwrap()
868 ));
869 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
870 "eq_delete".to_string(),
871 None,
872 DataFileFormat::Parquet,
873 );
874 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
875 WriterProperties::builder().build(),
876 eq_delete_schema.clone(),
877 );
878 let eq_delete_rolling_writer_builder =
879 RollingFileWriterBuilder::new_with_default_file_size(
880 eq_delete_parquet_writer,
881 eq_delete_schema,
882 file_io.clone(),
883 eq_delete_location_gen,
884 eq_delete_file_name_gen,
885 );
886 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
887 eq_delete_rolling_writer_builder,
888 eq_delete_config,
889 );
890
891 let data_writer_instance = data_writer.build(None).await?;
892 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
893 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
894 let mut delta_writer = DeltaWriter::try_new(
895 data_writer_instance,
896 pos_delete_writer_instance,
897 eq_delete_writer_instance,
898 vec![1],
899 DEFAULT_MAX_SEEN_ROWS,
900 )?;
901
902 let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
904
905 let result = delta_writer.write(batch).await;
906 assert!(result.is_err());
907 assert!(
908 result
909 .unwrap_err()
910 .to_string()
911 .contains("Ops column must be 1 (insert) or -1 (delete)")
912 );
913
914 Ok(())
915 }
916 }
917}