1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16
17use crate::arrow::record_batch_projector::RecordBatchProjector;
18use crate::spec::{DataFile, PartitionKey};
19use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
20use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
21use crate::{Error, ErrorKind, Result};
22
23#[derive(Clone, Debug)]
25pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
26 data_writer_builder: DWB,
27 pos_delete_writer_builder: PDWB,
28 eq_delete_writer_builder: EDWB,
29 unique_cols: Vec<i32>,
30}
31
32impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
33 pub fn new(
35 data_writer_builder: DWB,
36 pos_delete_writer_builder: PDWB,
37 eq_delete_writer_builder: EDWB,
38 unique_cols: Vec<i32>,
39 ) -> Self {
40 Self {
41 data_writer_builder,
42 pos_delete_writer_builder,
43 eq_delete_writer_builder,
44 unique_cols,
45 }
46 }
47}
48
49#[async_trait::async_trait]
50impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
51where
52 DWB: IcebergWriterBuilder,
53 PDWB: IcebergWriterBuilder,
54 EDWB: IcebergWriterBuilder,
55 DWB::R: CurrentFileStatus,
56{
57 type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
58 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
59 let data_writer = self
60 .data_writer_builder
61 .build(partition_key.clone())
62 .await?;
63 let pos_delete_writer = self
64 .pos_delete_writer_builder
65 .build(partition_key.clone())
66 .await?;
67 let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?;
68 DeltaWriter::try_new(
69 data_writer,
70 pos_delete_writer,
71 eq_delete_writer,
72 self.unique_cols.clone(),
73 )
74 }
75}
76
77pub struct Position {
79 row_index: i64,
80 file_path: String,
81}
82
83pub struct DeltaWriter<DW, PDW, EDW> {
85 pub data_writer: DW,
87 pub pos_delete_writer: PDW,
90 pub eq_delete_writer: EDW,
93 pub unique_cols: Vec<i32>,
95 pub seen_rows: HashMap<OwnedRow, Position>,
97 pub(crate) projector: RecordBatchProjector,
99 pub(crate) row_convertor: RowConverter,
101}
102
103impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
104where
105 DW: IcebergWriter + CurrentFileStatus,
106 PDW: IcebergWriter,
107 EDW: IcebergWriter,
108{
109 fn try_new(
110 data_writer: DW,
111 pos_delete_writer: PDW,
112 eq_delete_writer: EDW,
113 unique_cols: Vec<i32>,
114 ) -> Result<Self> {
115 let projector = RecordBatchProjector::from_iceberg_schema(
116 data_writer.current_schema(),
117 &unique_cols,
118 )?;
119
120 let row_convertor = RowConverter::new(
121 projector
122 .projected_schema_ref()
123 .fields()
124 .iter()
125 .map(|f| SortField::new(f.data_type().clone()))
126 .collect(),
127 )?;
128
129 Ok(Self {
130 data_writer,
131 pos_delete_writer,
132 eq_delete_writer,
133 unique_cols,
134 seen_rows: HashMap::new(),
135 projector,
136 row_convertor,
137 })
138 }
139
140 async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
141 let rows = self.extract_unique_column_rows(&batch)?;
142 let batch_num_rows = batch.num_rows();
143
144 self.data_writer.write(batch.clone()).await?;
149
150 let file_path = self.data_writer.current_file_path();
152 let end_row_num = self.data_writer.current_row_num();
153 let start_row_index = end_row_num - batch_num_rows;
154
155 for (i, row) in rows.iter().enumerate() {
157 self.seen_rows.insert(row.owned(), Position {
158 row_index: start_row_index as i64 + i as i64,
159 file_path: file_path.clone(),
160 });
161 }
162
163 Ok(())
164 }
165
166 async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
167 let rows = self.extract_unique_column_rows(&batch)?;
168 let mut file_array = vec![];
169 let mut row_index_array = vec![];
170 let mut needs_equality_delete = BooleanBuilder::new();
174
175 for row in rows.iter() {
176 if let Some(pos) = self.seen_rows.remove(&row.owned()) {
177 row_index_array.push(pos.row_index);
179 file_array.push(pos.file_path.clone());
180 needs_equality_delete.append_value(false);
181 } else {
182 needs_equality_delete.append_value(true);
184 }
185 }
186
187 let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
189 let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
190
191 let position_batch =
192 RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
193 file_array,
194 row_index_array,
195 ])?;
196
197 if position_batch.num_rows() > 0 {
198 self.pos_delete_writer
199 .write(position_batch)
200 .await
201 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
202 }
203
204 let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
206 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
207
208 if eq_batch.num_rows() > 0 {
209 self.eq_delete_writer
210 .write(eq_batch)
211 .await
212 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
213 }
214
215 Ok(())
216 }
217
218 fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
219 self.row_convertor
220 .convert_columns(&self.projector.project_column(batch.columns())?)
221 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
222 }
223}
224
225#[async_trait::async_trait]
226impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
227where
228 DW: IcebergWriter + CurrentFileStatus,
229 PDW: IcebergWriter,
230 EDW: IcebergWriter,
231{
232 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
233 let ops = batch
235 .column(batch.num_columns() - 1)
236 .as_any()
237 .downcast_ref::<Int32Array>()
238 .ok_or(Error::new(
239 ErrorKind::Unexpected,
240 "Failed to downcast ops column",
241 ))?;
242
243 let partition =
244 partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
245 Error::new(
246 ErrorKind::Unexpected,
247 format!("Failed to partition batch: {e}"),
248 )
249 })?;
250
251 for range in partition.ranges() {
252 let batch = batch
253 .project(&(0..batch.num_columns() - 1).collect_vec())
254 .map_err(|e| {
255 Error::new(
256 ErrorKind::Unexpected,
257 format!("Failed to project batch columns: {e}"),
258 )
259 })?
260 .slice(range.start, range.end - range.start);
261 match ops.value(range.start) {
262 1 => self.insert(batch).await?,
263 -1 => self.delete(batch).await?,
264 op => {
265 return Err(Error::new(
266 ErrorKind::Unexpected,
267 format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
268 ));
269 }
270 }
271 }
272
273 Ok(())
274 }
275
276 async fn close(&mut self) -> Result<Vec<DataFile>> {
277 let data_files = self.data_writer.close().await?;
278 let pos_delete_files = self.pos_delete_writer.close().await?;
279 let eq_delete_files = self.eq_delete_writer.close().await?;
280
281 Ok(data_files
282 .into_iter()
283 .chain(pos_delete_files)
284 .chain(eq_delete_files)
285 .collect())
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 mod delta_writer_tests {
294 use std::collections::HashMap;
295
296 use arrow_array::{Int32Array, RecordBatch, StringArray};
297 use arrow_schema::{DataType, Field, Schema};
298 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
299 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
300 use parquet::file::properties::WriterProperties;
301 use tempfile::TempDir;
302
303 use super::*;
304 use crate::arrow::arrow_schema_to_schema;
305 use crate::io::FileIOBuilder;
306 use crate::spec::{
307 DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
308 };
309 use crate::writer::IcebergWriterBuilder;
310 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
311 use crate::writer::base_writer::equality_delete_writer::{
312 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
313 };
314 use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
315 use crate::writer::file_writer::ParquetWriterBuilder;
316 use crate::writer::file_writer::location_generator::{
317 DefaultFileNameGenerator, DefaultLocationGenerator,
318 };
319 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
320
321 fn create_iceberg_schema() -> Arc<IcebergSchema> {
322 Arc::new(
323 IcebergSchema::builder()
324 .with_schema_id(0)
325 .with_fields(vec![
326 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
327 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
328 .into(),
329 ])
330 .build()
331 .unwrap(),
332 )
333 }
334
335 fn create_test_batch_with_ops(
336 ids: Vec<i32>,
337 names: Vec<Option<&str>>,
338 ops: Vec<i32>,
339 ) -> RecordBatch {
340 let schema = Arc::new(Schema::new(vec![
341 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
342 PARQUET_FIELD_ID_META_KEY.to_string(),
343 "1".to_string(),
344 )])),
345 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
346 PARQUET_FIELD_ID_META_KEY.to_string(),
347 "2".to_string(),
348 )])),
349 Field::new("op", DataType::Int32, false),
350 ]));
351
352 let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
353 let name_array: ArrayRef = Arc::new(StringArray::from(names));
354 let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
355
356 RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
357 }
358
359 #[tokio::test]
360 async fn test_delta_writer_insert_only() -> Result<()> {
361 let temp_dir = TempDir::new().unwrap();
362 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
363 let schema = create_iceberg_schema();
364
365 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
367 "{}/data",
368 temp_dir.path().to_str().unwrap()
369 ));
370 let data_file_name_gen =
371 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
372 let data_parquet_writer =
373 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
374 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
375 data_parquet_writer,
376 schema.clone(),
377 file_io.clone(),
378 data_location_gen,
379 data_file_name_gen,
380 );
381 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
382
383 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
385 &PositionDeleteWriterConfig::arrow_schema(),
386 )?);
387 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
388 "{}/pos_delete",
389 temp_dir.path().to_str().unwrap()
390 ));
391 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
392 "pos_delete".to_string(),
393 None,
394 DataFileFormat::Parquet,
395 );
396 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
397 WriterProperties::builder().build(),
398 pos_delete_schema.clone(),
399 );
400 let pos_delete_rolling_writer_builder =
401 RollingFileWriterBuilder::new_with_default_file_size(
402 pos_delete_parquet_writer,
403 pos_delete_schema,
404 file_io.clone(),
405 pos_delete_location_gen,
406 pos_delete_file_name_gen,
407 );
408 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
409 pos_delete_rolling_writer_builder,
410 PositionDeleteWriterConfig::new(None, 0, None),
411 );
412
413 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
415 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
416 eq_delete_config.projected_arrow_schema_ref(),
417 )?);
418 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
419 "{}/eq_delete",
420 temp_dir.path().to_str().unwrap()
421 ));
422 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
423 "eq_delete".to_string(),
424 None,
425 DataFileFormat::Parquet,
426 );
427 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
428 WriterProperties::builder().build(),
429 eq_delete_schema.clone(),
430 );
431 let eq_delete_rolling_writer_builder =
432 RollingFileWriterBuilder::new_with_default_file_size(
433 eq_delete_parquet_writer,
434 eq_delete_schema,
435 file_io.clone(),
436 eq_delete_location_gen,
437 eq_delete_file_name_gen,
438 );
439 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
440 eq_delete_rolling_writer_builder,
441 eq_delete_config,
442 );
443
444 let data_writer_instance = data_writer.build(None).await?;
446 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
447 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
448 let mut delta_writer = DeltaWriter::try_new(
449 data_writer_instance,
450 pos_delete_writer_instance,
451 eq_delete_writer_instance,
452 vec![1], )?;
454
455 let batch = create_test_batch_with_ops(
457 vec![1, 2, 3],
458 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
459 vec![1, 1, 1], );
461
462 delta_writer.write(batch).await?;
463 let data_files = delta_writer.close().await?;
464
465 assert_eq!(data_files.len(), 1);
467 assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
468 assert_eq!(data_files[0].record_count, 3);
469
470 let input_file = file_io.new_input(data_files[0].file_path.clone())?;
472 let content = input_file.read().await?;
473 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
474 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
475 assert_eq!(batches.len(), 1);
476 assert_eq!(batches[0].num_rows(), 3);
477
478 Ok(())
479 }
480
481 #[tokio::test]
482 async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
483 let temp_dir = TempDir::new().unwrap();
484 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
485 let schema = create_iceberg_schema();
486
487 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
489 "{}/data",
490 temp_dir.path().to_str().unwrap()
491 ));
492 let data_file_name_gen =
493 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
494 let data_parquet_writer =
495 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
496 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
497 data_parquet_writer,
498 schema.clone(),
499 file_io.clone(),
500 data_location_gen,
501 data_file_name_gen,
502 );
503 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
504
505 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
506 &PositionDeleteWriterConfig::arrow_schema(),
507 )?);
508 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
509 "{}/pos_delete",
510 temp_dir.path().to_str().unwrap()
511 ));
512 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
513 "pos_delete".to_string(),
514 None,
515 DataFileFormat::Parquet,
516 );
517 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
518 WriterProperties::builder().build(),
519 pos_delete_schema.clone(),
520 );
521 let pos_delete_rolling_writer_builder =
522 RollingFileWriterBuilder::new_with_default_file_size(
523 pos_delete_parquet_writer,
524 pos_delete_schema,
525 file_io.clone(),
526 pos_delete_location_gen,
527 pos_delete_file_name_gen,
528 );
529 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
530 pos_delete_rolling_writer_builder,
531 PositionDeleteWriterConfig::new(None, 0, None),
532 );
533
534 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
535 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
536 eq_delete_config.projected_arrow_schema_ref(),
537 )?);
538 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
539 "{}/eq_delete",
540 temp_dir.path().to_str().unwrap()
541 ));
542 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
543 "eq_delete".to_string(),
544 None,
545 DataFileFormat::Parquet,
546 );
547 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
548 WriterProperties::builder().build(),
549 eq_delete_schema.clone(),
550 );
551 let eq_delete_rolling_writer_builder =
552 RollingFileWriterBuilder::new_with_default_file_size(
553 eq_delete_parquet_writer,
554 eq_delete_schema,
555 file_io.clone(),
556 eq_delete_location_gen,
557 eq_delete_file_name_gen,
558 );
559 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
560 eq_delete_rolling_writer_builder,
561 eq_delete_config,
562 );
563
564 let data_writer_instance = data_writer.build(None).await?;
565 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
566 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
567 let mut delta_writer = DeltaWriter::try_new(
568 data_writer_instance,
569 pos_delete_writer_instance,
570 eq_delete_writer_instance,
571 vec![1],
572 )?;
573
574 let insert_batch = create_test_batch_with_ops(
576 vec![1, 2, 3],
577 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
578 vec![1, 1, 1],
579 );
580 delta_writer.write(insert_batch).await?;
581
582 let delete_batch =
584 create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
585 -1, -1,
586 ]);
587 delta_writer.write(delete_batch).await?;
588
589 let data_files = delta_writer.close().await?;
590
591 assert_eq!(data_files.len(), 2);
593
594 let data_file = data_files
595 .iter()
596 .find(|f| f.content == crate::spec::DataContentType::Data)
597 .unwrap();
598 let pos_delete_file = data_files
599 .iter()
600 .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
601 .unwrap();
602
603 assert_eq!(data_file.record_count, 3);
604 assert_eq!(pos_delete_file.record_count, 2);
605
606 let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
608 let content = input_file.read().await?;
609 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
610 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
611 assert_eq!(batches[0].num_rows(), 2);
612
613 Ok(())
614 }
615
616 #[tokio::test]
617 async fn test_delta_writer_equality_delete() -> Result<()> {
618 let temp_dir = TempDir::new().unwrap();
619 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
620 let schema = create_iceberg_schema();
621
622 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
624 "{}/data",
625 temp_dir.path().to_str().unwrap()
626 ));
627 let data_file_name_gen =
628 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
629 let data_parquet_writer =
630 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
631 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
632 data_parquet_writer,
633 schema.clone(),
634 file_io.clone(),
635 data_location_gen,
636 data_file_name_gen,
637 );
638 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
639
640 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
641 &PositionDeleteWriterConfig::arrow_schema(),
642 )?);
643 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
644 "{}/pos_delete",
645 temp_dir.path().to_str().unwrap()
646 ));
647 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
648 "pos_delete".to_string(),
649 None,
650 DataFileFormat::Parquet,
651 );
652 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
653 WriterProperties::builder().build(),
654 pos_delete_schema.clone(),
655 );
656 let pos_delete_rolling_writer_builder =
657 RollingFileWriterBuilder::new_with_default_file_size(
658 pos_delete_parquet_writer,
659 pos_delete_schema,
660 file_io.clone(),
661 pos_delete_location_gen,
662 pos_delete_file_name_gen,
663 );
664 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
665 pos_delete_rolling_writer_builder,
666 PositionDeleteWriterConfig::new(None, 0, None),
667 );
668
669 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
670 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
671 eq_delete_config.projected_arrow_schema_ref(),
672 )?);
673 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
674 "{}/eq_delete",
675 temp_dir.path().to_str().unwrap()
676 ));
677 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
678 "eq_delete".to_string(),
679 None,
680 DataFileFormat::Parquet,
681 );
682 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
683 WriterProperties::builder().build(),
684 eq_delete_schema.clone(),
685 );
686 let eq_delete_rolling_writer_builder =
687 RollingFileWriterBuilder::new_with_default_file_size(
688 eq_delete_parquet_writer,
689 eq_delete_schema,
690 file_io.clone(),
691 eq_delete_location_gen,
692 eq_delete_file_name_gen,
693 );
694 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
695 eq_delete_rolling_writer_builder,
696 eq_delete_config,
697 );
698
699 let data_writer_instance = data_writer.build(None).await?;
700 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
701 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
702 let mut delta_writer = DeltaWriter::try_new(
703 data_writer_instance,
704 pos_delete_writer_instance,
705 eq_delete_writer_instance,
706 vec![1],
707 )?;
708
709 let delete_batch =
711 create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
712 delta_writer.write(delete_batch).await?;
713
714 let data_files = delta_writer.close().await?;
715
716 assert_eq!(data_files.len(), 1);
718 assert_eq!(
719 data_files[0].content,
720 crate::spec::DataContentType::EqualityDeletes
721 );
722 assert_eq!(data_files[0].record_count, 2);
723
724 Ok(())
725 }
726
727 #[tokio::test]
728 async fn test_delta_writer_invalid_op() -> Result<()> {
729 let temp_dir = TempDir::new().unwrap();
730 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
731 let schema = create_iceberg_schema();
732
733 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
735 "{}/data",
736 temp_dir.path().to_str().unwrap()
737 ));
738 let data_file_name_gen =
739 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
740 let data_parquet_writer =
741 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
742 let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
743 data_parquet_writer,
744 schema.clone(),
745 file_io.clone(),
746 data_location_gen,
747 data_file_name_gen,
748 );
749 let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder);
750
751 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
752 &PositionDeleteWriterConfig::arrow_schema(),
753 )?);
754 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
755 "{}/pos_delete",
756 temp_dir.path().to_str().unwrap()
757 ));
758 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
759 "pos_delete".to_string(),
760 None,
761 DataFileFormat::Parquet,
762 );
763 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
764 WriterProperties::builder().build(),
765 pos_delete_schema.clone(),
766 );
767 let pos_delete_rolling_writer_builder =
768 RollingFileWriterBuilder::new_with_default_file_size(
769 pos_delete_parquet_writer,
770 pos_delete_schema,
771 file_io.clone(),
772 pos_delete_location_gen,
773 pos_delete_file_name_gen,
774 );
775 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
776 pos_delete_rolling_writer_builder,
777 PositionDeleteWriterConfig::new(None, 0, None),
778 );
779
780 let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?;
781 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
782 eq_delete_config.projected_arrow_schema_ref(),
783 )?);
784 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
785 "{}/eq_delete",
786 temp_dir.path().to_str().unwrap()
787 ));
788 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
789 "eq_delete".to_string(),
790 None,
791 DataFileFormat::Parquet,
792 );
793 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
794 WriterProperties::builder().build(),
795 eq_delete_schema.clone(),
796 );
797 let eq_delete_rolling_writer_builder =
798 RollingFileWriterBuilder::new_with_default_file_size(
799 eq_delete_parquet_writer,
800 eq_delete_schema,
801 file_io.clone(),
802 eq_delete_location_gen,
803 eq_delete_file_name_gen,
804 );
805 let eq_delete_writer = EqualityDeleteFileWriterBuilder::new(
806 eq_delete_rolling_writer_builder,
807 eq_delete_config,
808 );
809
810 let data_writer_instance = data_writer.build(None).await?;
811 let pos_delete_writer_instance = pos_delete_writer.build(None).await?;
812 let eq_delete_writer_instance = eq_delete_writer.build(None).await?;
813 let mut delta_writer = DeltaWriter::try_new(
814 data_writer_instance,
815 pos_delete_writer_instance,
816 eq_delete_writer_instance,
817 vec![1],
818 )?;
819
820 let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
822
823 let result = delta_writer.write(batch).await;
824 assert!(result.is_err());
825 assert!(
826 result
827 .unwrap_err()
828 .to_string()
829 .contains("Ops column must be 1 (insert) or -1 (delete)")
830 );
831
832 Ok(())
833 }
834 }
835}