1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_array::builder::BooleanBuilder;
11use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
12use arrow_ord::partition::partition;
13use arrow_row::{OwnedRow, RowConverter, Rows, SortField};
14use arrow_select::filter::filter_record_batch;
15use itertools::Itertools;
16use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
17
18use crate::arrow::record_batch_projector::RecordBatchProjector;
19use crate::arrow::schema_to_arrow_schema;
20use crate::spec::DataFile;
21use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig;
22use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
23use crate::{Error, ErrorKind, Result};
24
25#[derive(Clone, Debug)]
27pub struct DeltaWriterBuilder<DWB, PDWB, EDWB> {
28 data_writer_builder: DWB,
29 pos_delete_writer_builder: PDWB,
30 eq_delete_writer_builder: EDWB,
31 unique_cols: Vec<i32>,
32}
33
34impl<DWB, PDWB, EDWB> DeltaWriterBuilder<DWB, PDWB, EDWB> {
35 pub fn new(
37 data_writer_builder: DWB,
38 pos_delete_writer_builder: PDWB,
39 eq_delete_writer_builder: EDWB,
40 unique_cols: Vec<i32>,
41 ) -> Self {
42 Self {
43 data_writer_builder,
44 pos_delete_writer_builder,
45 eq_delete_writer_builder,
46 unique_cols,
47 }
48 }
49}
50
51#[async_trait::async_trait]
52impl<DWB, PDWB, EDWB> IcebergWriterBuilder for DeltaWriterBuilder<DWB, PDWB, EDWB>
53where
54 DWB: IcebergWriterBuilder,
55 PDWB: IcebergWriterBuilder,
56 EDWB: IcebergWriterBuilder,
57 DWB::R: CurrentFileStatus,
58{
59 type R = DeltaWriter<DWB::R, PDWB::R, EDWB::R>;
60 async fn build(self) -> Result<Self::R> {
61 let data_writer = self.data_writer_builder.build().await?;
62 let pos_delete_writer = self.pos_delete_writer_builder.build().await?;
63 let eq_delete_writer = self.eq_delete_writer_builder.build().await?;
64 DeltaWriter::try_new(
65 data_writer,
66 pos_delete_writer,
67 eq_delete_writer,
68 self.unique_cols,
69 )
70 }
71}
72
73pub struct Position {
75 row_index: i64,
76 file_path: String,
77}
78
79pub struct DeltaWriter<DW, PDW, EDW> {
81 pub data_writer: DW,
83 pub pos_delete_writer: PDW,
86 pub eq_delete_writer: EDW,
89 pub unique_cols: Vec<i32>,
91 pub seen_rows: HashMap<OwnedRow, Position>,
93 pub(crate) projector: RecordBatchProjector,
95 pub(crate) row_convertor: RowConverter,
97}
98
99impl<DW, PDW, EDW> DeltaWriter<DW, PDW, EDW>
100where
101 DW: IcebergWriter + CurrentFileStatus,
102 PDW: IcebergWriter,
103 EDW: IcebergWriter,
104{
105 fn try_new(
106 data_writer: DW,
107 pos_delete_writer: PDW,
108 eq_delete_writer: EDW,
109 unique_cols: Vec<i32>,
110 ) -> Result<Self> {
111 let arrow_schema = Arc::new(schema_to_arrow_schema(&data_writer.current_schema())?);
112 let projector = RecordBatchProjector::new(
113 arrow_schema,
114 &unique_cols,
115 |field| {
116 if field.data_type().is_nested() {
117 return Ok(None);
118 }
119 field
120 .metadata()
121 .get(PARQUET_FIELD_ID_META_KEY)
122 .map(|id_str| {
123 id_str.parse::<i64>().map_err(|e| {
124 Error::new(
125 ErrorKind::Unexpected,
126 format!("Failed to parse field ID {}: {}", id_str, e),
127 )
128 })
129 })
130 .transpose()
131 },
132 |_| false,
133 )?;
134
135 let row_convertor = RowConverter::new(
136 projector
137 .projected_schema_ref()
138 .fields()
139 .iter()
140 .map(|f| SortField::new(f.data_type().clone()))
141 .collect(),
142 )?;
143
144 Ok(Self {
145 data_writer,
146 pos_delete_writer,
147 eq_delete_writer,
148 unique_cols,
149 seen_rows: HashMap::new(),
150 projector,
151 row_convertor,
152 })
153 }
154
155 async fn insert(&mut self, batch: RecordBatch) -> Result<()> {
156 let rows = self.extract_unique_column_rows(&batch)?;
157 let file_path = self.data_writer.current_file_path();
158 let start_row_index = self.data_writer.current_row_num();
159
160 self.data_writer.write(batch.clone()).await?;
161
162 for (i, row) in rows.iter().enumerate() {
163 self.seen_rows.insert(row.owned(), Position {
164 row_index: start_row_index as i64 + i as i64,
165 file_path: file_path.clone(),
166 });
167 }
168
169 Ok(())
170 }
171
172 async fn delete(&mut self, batch: RecordBatch) -> Result<()> {
173 let rows = self.extract_unique_column_rows(&batch)?;
174 let mut file_array = vec![];
175 let mut row_index_array = vec![];
176 let mut needs_equality_delete = BooleanBuilder::new();
180
181 for row in rows.iter() {
182 if let Some(pos) = self.seen_rows.remove(&row.owned()) {
183 row_index_array.push(pos.row_index);
185 file_array.push(pos.file_path.clone());
186 needs_equality_delete.append_value(false);
187 } else {
188 needs_equality_delete.append_value(true);
190 }
191 }
192
193 let file_array: ArrayRef = Arc::new(StringArray::from(file_array));
195 let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array));
196
197 let position_batch =
198 RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![
199 file_array,
200 row_index_array,
201 ])?;
202
203 if position_batch.num_rows() > 0 {
204 self.pos_delete_writer
205 .write(position_batch)
206 .await
207 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
208 }
209
210 let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish())
212 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
213
214 if eq_batch.num_rows() > 0 {
215 self.eq_delete_writer
216 .write(eq_batch)
217 .await
218 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?;
219 }
220
221 Ok(())
222 }
223
224 fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result<Rows> {
225 self.row_convertor
226 .convert_columns(&self.projector.project_column(batch.columns())?)
227 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))
228 }
229}
230
231#[async_trait::async_trait]
232impl<DW, PDW, EDW> IcebergWriter for DeltaWriter<DW, PDW, EDW>
233where
234 DW: IcebergWriter + CurrentFileStatus,
235 PDW: IcebergWriter,
236 EDW: IcebergWriter,
237{
238 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
239 let ops = batch
241 .column(batch.num_columns() - 1)
242 .as_any()
243 .downcast_ref::<Int32Array>()
244 .ok_or(Error::new(
245 ErrorKind::Unexpected,
246 "Failed to downcast ops column",
247 ))?;
248
249 let partition =
250 partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| {
251 Error::new(
252 ErrorKind::Unexpected,
253 format!("Failed to partition batch: {e}"),
254 )
255 })?;
256
257 for range in partition.ranges() {
258 let batch = batch
259 .project(&(0..batch.num_columns() - 1).collect_vec())
260 .map_err(|e| {
261 Error::new(
262 ErrorKind::Unexpected,
263 format!("Failed to project batch columns: {e}"),
264 )
265 })?
266 .slice(range.start, range.end - range.start);
267 match ops.value(range.start) {
268 1 => self.insert(batch).await?,
269 -1 => self.delete(batch).await?,
270 op => {
271 return Err(Error::new(
272 ErrorKind::Unexpected,
273 format!("Ops column must be 1 (insert) or -1 (delete), not {op}"),
274 ));
275 }
276 }
277 }
278
279 Ok(())
280 }
281
282 async fn close(&mut self) -> Result<Vec<DataFile>> {
283 let data_files = self.data_writer.close().await?;
284 let pos_delete_files = self.pos_delete_writer.close().await?;
285 let eq_delete_files = self.eq_delete_writer.close().await?;
286
287 Ok(data_files
288 .into_iter()
289 .chain(pos_delete_files)
290 .chain(eq_delete_files)
291 .collect())
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 mod delta_writer_tests {
298 use std::collections::HashMap;
299 use std::sync::Arc;
300
301 use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
302 use arrow_schema::{DataType, Field, Schema};
303 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
304 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
305 use parquet::file::properties::WriterProperties;
306 use tempfile::TempDir;
307
308 use crate::Result;
309 use crate::arrow::arrow_schema_to_schema;
310 use crate::io::FileIOBuilder;
311 use crate::spec::{
312 DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type,
313 };
314 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
315 use crate::writer::base_writer::equality_delete_writer::{
316 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
317 };
318 use crate::writer::base_writer::position_delete_writer::{
319 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
320 };
321 use crate::writer::combined_writer::delta_writer::DeltaWriter;
322 use crate::writer::file_writer::ParquetWriterBuilder;
323 use crate::writer::file_writer::location_generator::{
324 DefaultFileNameGenerator, DefaultLocationGenerator,
325 };
326 use crate::writer::{IcebergWriter, IcebergWriterBuilder};
327
328 fn create_iceberg_schema() -> Arc<IcebergSchema> {
329 Arc::new(
330 IcebergSchema::builder()
331 .with_schema_id(0)
332 .with_fields(vec![
333 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
334 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String))
335 .into(),
336 ])
337 .build()
338 .unwrap(),
339 )
340 }
341
342 fn create_test_batch_with_ops(
343 ids: Vec<i32>,
344 names: Vec<Option<&str>>,
345 ops: Vec<i32>,
346 ) -> RecordBatch {
347 let schema = Arc::new(Schema::new(vec![
348 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
349 PARQUET_FIELD_ID_META_KEY.to_string(),
350 "1".to_string(),
351 )])),
352 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
353 PARQUET_FIELD_ID_META_KEY.to_string(),
354 "2".to_string(),
355 )])),
356 Field::new("op", DataType::Int32, false),
357 ]));
358
359 let id_array: ArrayRef = Arc::new(Int32Array::from(ids));
360 let name_array: ArrayRef = Arc::new(StringArray::from(names));
361 let op_array: ArrayRef = Arc::new(Int32Array::from(ops));
362
363 RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap()
364 }
365
366 #[tokio::test]
367 async fn test_delta_writer_insert_only() -> Result<()> {
368 let temp_dir = TempDir::new().unwrap();
369 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
370 let schema = create_iceberg_schema();
371
372 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
374 "{}/data",
375 temp_dir.path().to_str().unwrap()
376 ));
377 let data_file_name_gen =
378 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
379 let data_parquet_writer = ParquetWriterBuilder::new(
380 WriterProperties::builder().build(),
381 schema.clone(),
382 None,
383 file_io.clone(),
384 data_location_gen,
385 data_file_name_gen,
386 );
387 let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
388
389 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
391 &PositionDeleteWriterConfig::arrow_schema(),
392 )?);
393 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
394 "{}/pos_delete",
395 temp_dir.path().to_str().unwrap()
396 ));
397 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
398 "pos_delete".to_string(),
399 None,
400 DataFileFormat::Parquet,
401 );
402 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
403 WriterProperties::builder().build(),
404 pos_delete_schema,
405 None,
406 file_io.clone(),
407 pos_delete_location_gen,
408 pos_delete_file_name_gen,
409 );
410 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
411 pos_delete_parquet_writer,
412 PositionDeleteWriterConfig::new(None, 0, None),
413 );
414
415 let eq_delete_config =
417 EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
418 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
419 eq_delete_config.projected_arrow_schema_ref(),
420 )?);
421 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
422 "{}/eq_delete",
423 temp_dir.path().to_str().unwrap()
424 ));
425 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
426 "eq_delete".to_string(),
427 None,
428 DataFileFormat::Parquet,
429 );
430 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
431 WriterProperties::builder().build(),
432 eq_delete_schema,
433 None,
434 file_io.clone(),
435 eq_delete_location_gen,
436 eq_delete_file_name_gen,
437 );
438 let eq_delete_writer =
439 EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
440
441 let data_writer_instance = data_writer.build().await?;
443 let pos_delete_writer_instance = pos_delete_writer.build().await?;
444 let eq_delete_writer_instance = eq_delete_writer.build().await?;
445 let mut delta_writer = DeltaWriter::try_new(
446 data_writer_instance,
447 pos_delete_writer_instance,
448 eq_delete_writer_instance,
449 vec![1], )?;
451
452 let batch = create_test_batch_with_ops(
454 vec![1, 2, 3],
455 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
456 vec![1, 1, 1], );
458
459 delta_writer.write(batch).await?;
460 let data_files = delta_writer.close().await?;
461
462 assert_eq!(data_files.len(), 1);
464 assert_eq!(data_files[0].content, crate::spec::DataContentType::Data);
465 assert_eq!(data_files[0].record_count, 3);
466
467 let input_file = file_io.new_input(data_files[0].file_path.clone())?;
469 let content = input_file.read().await?;
470 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
471 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
472 assert_eq!(batches.len(), 1);
473 assert_eq!(batches[0].num_rows(), 3);
474
475 Ok(())
476 }
477
478 #[tokio::test]
479 async fn test_delta_writer_insert_then_position_delete() -> Result<()> {
480 let temp_dir = TempDir::new().unwrap();
481 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
482 let schema = create_iceberg_schema();
483
484 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
486 "{}/data",
487 temp_dir.path().to_str().unwrap()
488 ));
489 let data_file_name_gen =
490 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
491 let data_parquet_writer = ParquetWriterBuilder::new(
492 WriterProperties::builder().build(),
493 schema.clone(),
494 None,
495 file_io.clone(),
496 data_location_gen,
497 data_file_name_gen,
498 );
499 let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
500
501 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
502 &PositionDeleteWriterConfig::arrow_schema(),
503 )?);
504 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
505 "{}/pos_delete",
506 temp_dir.path().to_str().unwrap()
507 ));
508 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
509 "pos_delete".to_string(),
510 None,
511 DataFileFormat::Parquet,
512 );
513 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
514 WriterProperties::builder().build(),
515 pos_delete_schema,
516 None,
517 file_io.clone(),
518 pos_delete_location_gen,
519 pos_delete_file_name_gen,
520 );
521 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
522 pos_delete_parquet_writer,
523 PositionDeleteWriterConfig::new(None, 0, None),
524 );
525
526 let eq_delete_config =
527 EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
528 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
529 eq_delete_config.projected_arrow_schema_ref(),
530 )?);
531 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
532 "{}/eq_delete",
533 temp_dir.path().to_str().unwrap()
534 ));
535 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
536 "eq_delete".to_string(),
537 None,
538 DataFileFormat::Parquet,
539 );
540 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
541 WriterProperties::builder().build(),
542 eq_delete_schema,
543 None,
544 file_io.clone(),
545 eq_delete_location_gen,
546 eq_delete_file_name_gen,
547 );
548 let eq_delete_writer =
549 EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
550
551 let data_writer_instance = data_writer.build().await?;
552 let pos_delete_writer_instance = pos_delete_writer.build().await?;
553 let eq_delete_writer_instance = eq_delete_writer.build().await?;
554 let mut delta_writer = DeltaWriter::try_new(
555 data_writer_instance,
556 pos_delete_writer_instance,
557 eq_delete_writer_instance,
558 vec![1],
559 )?;
560
561 let insert_batch = create_test_batch_with_ops(
563 vec![1, 2, 3],
564 vec![Some("Alice"), Some("Bob"), Some("Charlie")],
565 vec![1, 1, 1],
566 );
567 delta_writer.write(insert_batch).await?;
568
569 let delete_batch =
571 create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![
572 -1, -1,
573 ]);
574 delta_writer.write(delete_batch).await?;
575
576 let data_files = delta_writer.close().await?;
577
578 assert_eq!(data_files.len(), 2);
580
581 let data_file = data_files
582 .iter()
583 .find(|f| f.content == crate::spec::DataContentType::Data)
584 .unwrap();
585 let pos_delete_file = data_files
586 .iter()
587 .find(|f| f.content == crate::spec::DataContentType::PositionDeletes)
588 .unwrap();
589
590 assert_eq!(data_file.record_count, 3);
591 assert_eq!(pos_delete_file.record_count, 2);
592
593 let input_file = file_io.new_input(pos_delete_file.file_path.clone())?;
595 let content = input_file.read().await?;
596 let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?;
597 let batches: Vec<_> = reader.map(|b| b.unwrap()).collect();
598 assert_eq!(batches[0].num_rows(), 2);
599
600 Ok(())
601 }
602
603 #[tokio::test]
604 async fn test_delta_writer_equality_delete() -> Result<()> {
605 let temp_dir = TempDir::new().unwrap();
606 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
607 let schema = create_iceberg_schema();
608
609 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
611 "{}/data",
612 temp_dir.path().to_str().unwrap()
613 ));
614 let data_file_name_gen =
615 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
616 let data_parquet_writer = ParquetWriterBuilder::new(
617 WriterProperties::builder().build(),
618 schema.clone(),
619 None,
620 file_io.clone(),
621 data_location_gen,
622 data_file_name_gen,
623 );
624 let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
625
626 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
627 &PositionDeleteWriterConfig::arrow_schema(),
628 )?);
629 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
630 "{}/pos_delete",
631 temp_dir.path().to_str().unwrap()
632 ));
633 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
634 "pos_delete".to_string(),
635 None,
636 DataFileFormat::Parquet,
637 );
638 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
639 WriterProperties::builder().build(),
640 pos_delete_schema,
641 None,
642 file_io.clone(),
643 pos_delete_location_gen,
644 pos_delete_file_name_gen,
645 );
646 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
647 pos_delete_parquet_writer,
648 PositionDeleteWriterConfig::new(None, 0, None),
649 );
650
651 let eq_delete_config =
652 EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
653 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
654 eq_delete_config.projected_arrow_schema_ref(),
655 )?);
656 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
657 "{}/eq_delete",
658 temp_dir.path().to_str().unwrap()
659 ));
660 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
661 "eq_delete".to_string(),
662 None,
663 DataFileFormat::Parquet,
664 );
665 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
666 WriterProperties::builder().build(),
667 eq_delete_schema,
668 None,
669 file_io.clone(),
670 eq_delete_location_gen,
671 eq_delete_file_name_gen,
672 );
673 let eq_delete_writer =
674 EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
675
676 let data_writer_instance = data_writer.build().await?;
677 let pos_delete_writer_instance = pos_delete_writer.build().await?;
678 let eq_delete_writer_instance = eq_delete_writer.build().await?;
679 let mut delta_writer = DeltaWriter::try_new(
680 data_writer_instance,
681 pos_delete_writer_instance,
682 eq_delete_writer_instance,
683 vec![1],
684 )?;
685
686 let delete_batch =
688 create_test_batch_with_ops(vec![99, 100], vec![Some("X"), Some("Y")], vec![-1, -1]);
689 delta_writer.write(delete_batch).await?;
690
691 let data_files = delta_writer.close().await?;
692
693 assert_eq!(data_files.len(), 1);
695 assert_eq!(
696 data_files[0].content,
697 crate::spec::DataContentType::EqualityDeletes
698 );
699 assert_eq!(data_files[0].record_count, 2);
700
701 Ok(())
702 }
703
704 #[tokio::test]
705 async fn test_delta_writer_invalid_op() -> Result<()> {
706 let temp_dir = TempDir::new().unwrap();
707 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
708 let schema = create_iceberg_schema();
709
710 let data_location_gen = DefaultLocationGenerator::with_data_location(format!(
712 "{}/data",
713 temp_dir.path().to_str().unwrap()
714 ));
715 let data_file_name_gen =
716 DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet);
717 let data_parquet_writer = ParquetWriterBuilder::new(
718 WriterProperties::builder().build(),
719 schema.clone(),
720 None,
721 file_io.clone(),
722 data_location_gen,
723 data_file_name_gen,
724 );
725 let data_writer = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
726
727 let pos_delete_schema = Arc::new(arrow_schema_to_schema(
728 &PositionDeleteWriterConfig::arrow_schema(),
729 )?);
730 let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
731 "{}/pos_delete",
732 temp_dir.path().to_str().unwrap()
733 ));
734 let pos_delete_file_name_gen = DefaultFileNameGenerator::new(
735 "pos_delete".to_string(),
736 None,
737 DataFileFormat::Parquet,
738 );
739 let pos_delete_parquet_writer = ParquetWriterBuilder::new(
740 WriterProperties::builder().build(),
741 pos_delete_schema,
742 None,
743 file_io.clone(),
744 pos_delete_location_gen,
745 pos_delete_file_name_gen,
746 );
747 let pos_delete_writer = PositionDeleteFileWriterBuilder::new(
748 pos_delete_parquet_writer,
749 PositionDeleteWriterConfig::new(None, 0, None),
750 );
751
752 let eq_delete_config =
753 EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0)?;
754 let eq_delete_schema = Arc::new(arrow_schema_to_schema(
755 eq_delete_config.projected_arrow_schema_ref(),
756 )?);
757 let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!(
758 "{}/eq_delete",
759 temp_dir.path().to_str().unwrap()
760 ));
761 let eq_delete_file_name_gen = DefaultFileNameGenerator::new(
762 "eq_delete".to_string(),
763 None,
764 DataFileFormat::Parquet,
765 );
766 let eq_delete_parquet_writer = ParquetWriterBuilder::new(
767 WriterProperties::builder().build(),
768 eq_delete_schema,
769 None,
770 file_io.clone(),
771 eq_delete_location_gen,
772 eq_delete_file_name_gen,
773 );
774 let eq_delete_writer =
775 EqualityDeleteFileWriterBuilder::new(eq_delete_parquet_writer, eq_delete_config);
776
777 let data_writer_instance = data_writer.build().await?;
778 let pos_delete_writer_instance = pos_delete_writer.build().await?;
779 let eq_delete_writer_instance = eq_delete_writer.build().await?;
780 let mut delta_writer = DeltaWriter::try_new(
781 data_writer_instance,
782 pos_delete_writer_instance,
783 eq_delete_writer_instance,
784 vec![1],
785 )?;
786
787 let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]);
789
790 let result = delta_writer.write(batch).await;
791 assert!(result.is_err());
792 assert!(
793 result
794 .unwrap_err()
795 .to_string()
796 .contains("Ops column must be 1 (insert) or -1 (delete)")
797 );
798
799 Ok(())
800 }
801 }
802}