1use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::arrow::record_batch_projector::RecordBatchProjector;
28use crate::arrow::schema_to_arrow_schema;
29use crate::spec::{DataFile, SchemaRef, Struct};
30use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
31use crate::writer::{IcebergWriter, IcebergWriterBuilder};
32use crate::{Error, ErrorKind, Result};
33
34#[derive(Clone, Debug)]
36pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
37 inner: B,
38 config: EqualityDeleteWriterConfig,
39}
40
41impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
42 pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
44 Self { inner, config }
45 }
46}
47
48#[derive(Clone, Debug)]
50pub struct EqualityDeleteWriterConfig {
51 equality_ids: Vec<i32>,
53 projector: RecordBatchProjector,
55 partition_value: Struct,
56 partition_spec_id: i32,
57}
58
59impl EqualityDeleteWriterConfig {
60 pub fn new(
62 equality_ids: Vec<i32>,
63 original_schema: SchemaRef,
64 partition_value: Option<Struct>,
65 partition_spec_id: i32,
66 ) -> Result<Self> {
67 let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
68 let projector = RecordBatchProjector::new(
69 original_arrow_schema,
70 &equality_ids,
71 |field| {
76 if field.data_type().is_nested()
78 || matches!(
79 field.data_type(),
80 DataType::Float16 | DataType::Float32 | DataType::Float64
81 )
82 {
83 return Ok(None);
84 }
85 Ok(Some(
86 field
87 .metadata()
88 .get(PARQUET_FIELD_ID_META_KEY)
89 .ok_or_else(|| {
90 Error::new(ErrorKind::Unexpected, "Field metadata is missing.")
91 })?
92 .parse::<i64>()
93 .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
94 ))
95 },
96 |_field: &Field| true,
97 )?;
98 Ok(Self {
99 equality_ids,
100 projector,
101 partition_value: partition_value.unwrap_or(Struct::empty()),
102 partition_spec_id,
103 })
104 }
105
106 pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef {
108 self.projector.projected_schema_ref()
109 }
110}
111
112#[async_trait::async_trait]
113impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
114 type R = EqualityDeleteFileWriter<B>;
115
116 async fn build(self) -> Result<Self::R> {
117 Ok(EqualityDeleteFileWriter {
118 inner_writer: Some(self.inner.clone().build().await?),
119 projector: self.config.projector,
120 equality_ids: self.config.equality_ids,
121 partition_value: self.config.partition_value,
122 partition_spec_id: self.config.partition_spec_id,
123 })
124 }
125}
126
127#[derive(Debug)]
129pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
130 inner_writer: Option<B::R>,
131 projector: RecordBatchProjector,
132 equality_ids: Vec<i32>,
133 partition_value: Struct,
134 partition_spec_id: i32,
135}
136
137#[async_trait::async_trait]
138impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
139 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
140 let batch = self.projector.project_batch(batch)?;
141 if let Some(writer) = self.inner_writer.as_mut() {
142 writer.write(&batch).await
143 } else {
144 Err(Error::new(
145 ErrorKind::Unexpected,
146 "Equality delete inner writer has been closed.",
147 ))
148 }
149 }
150
151 async fn close(&mut self) -> Result<Vec<DataFile>> {
152 if let Some(writer) = self.inner_writer.take() {
153 Ok(writer
154 .close()
155 .await?
156 .into_iter()
157 .map(|mut res| {
158 res.content(crate::spec::DataContentType::EqualityDeletes);
159 res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
160 res.partition(self.partition_value.clone());
161 res.partition_spec_id(self.partition_spec_id);
162 res.build().expect("msg")
163 })
164 .collect_vec())
165 } else {
166 Err(Error::new(
167 ErrorKind::Unexpected,
168 "Equality delete inner writer has been closed.",
169 ))
170 }
171 }
172}
173
174#[cfg(test)]
175mod test {
176 use std::collections::HashMap;
177 use std::sync::Arc;
178
179 use arrow_array::types::Int32Type;
180 use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
181 use arrow_buffer::NullBuffer;
182 use arrow_schema::{DataType, Field, Fields};
183 use arrow_select::concat::concat_batches;
184 use itertools::Itertools;
185 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
186 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
187 use parquet::file::properties::WriterProperties;
188 use tempfile::TempDir;
189 use uuid::Uuid;
190
191 use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
192 use crate::io::{FileIO, FileIOBuilder};
193 use crate::spec::{
194 DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema,
195 StructType, Type,
196 };
197 use crate::writer::base_writer::equality_delete_writer::{
198 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
199 };
200 use crate::writer::file_writer::ParquetWriterBuilder;
201 use crate::writer::file_writer::location_generator::{
202 DefaultFileNameGenerator, DefaultLocationGenerator,
203 };
204 use crate::writer::{IcebergWriter, IcebergWriterBuilder};
205
206 async fn check_parquet_data_file_with_equality_delete_write(
207 file_io: &FileIO,
208 data_file: &DataFile,
209 batch: &RecordBatch,
210 ) {
211 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
212
213 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
215 let input_content = input_file.read().await.unwrap();
217 let reader_builder =
218 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
219 let metadata = reader_builder.metadata().clone();
220
221 let reader = reader_builder.build().unwrap();
223 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
224 let res = concat_batches(&batch.schema(), &batches).unwrap();
225 assert_eq!(*batch, res);
226
227 let expect_column_num = batch.num_columns();
229
230 assert_eq!(
231 data_file.record_count,
232 metadata
233 .row_groups()
234 .iter()
235 .map(|group| group.num_rows())
236 .sum::<i64>() as u64
237 );
238
239 assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
240
241 assert_eq!(data_file.column_sizes.len(), expect_column_num);
242
243 for (index, id) in data_file.column_sizes().keys().sorted().enumerate() {
244 metadata
245 .row_groups()
246 .iter()
247 .map(|group| group.columns())
248 .for_each(|column| {
249 assert_eq!(
250 *data_file.column_sizes.get(id).unwrap() as i64,
251 column.get(index).unwrap().compressed_size()
252 );
253 });
254 }
255
256 assert_eq!(data_file.value_counts.len(), expect_column_num);
257 data_file.value_counts.iter().for_each(|(_, &v)| {
258 let expect = metadata
259 .row_groups()
260 .iter()
261 .map(|group| group.num_rows())
262 .sum::<i64>() as u64;
263 assert_eq!(v, expect);
264 });
265
266 for (index, id) in data_file.null_value_counts().keys().enumerate() {
267 let expect = batch.column(index).null_count() as u64;
268 assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
269 }
270
271 assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
272 data_file
273 .split_offsets
274 .iter()
275 .enumerate()
276 .for_each(|(i, &v)| {
277 let expect = metadata.row_groups()[i].file_offset().unwrap();
278 assert_eq!(v, expect);
279 });
280 }
281
282 #[tokio::test]
283 async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
284 let temp_dir = TempDir::new().unwrap();
285 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
286 let location_gen = DefaultLocationGenerator::with_data_location(
287 temp_dir.path().to_str().unwrap().to_string(),
288 );
289 let file_name_gen =
290 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
291
292 let schema = Schema::builder()
295 .with_schema_id(1)
296 .with_fields(vec![
297 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
298 NestedField::required(
299 1,
300 "col1",
301 Type::Struct(StructType::new(vec![
302 NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int))
303 .into(),
304 ])),
305 )
306 .into(),
307 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
308 NestedField::required(
309 3,
310 "col3",
311 Type::List(ListType::new(
312 NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int))
313 .into(),
314 )),
315 )
316 .into(),
317 NestedField::required(
318 4,
319 "col4",
320 Type::Struct(StructType::new(vec![
321 NestedField::required(
322 7,
323 "sub_col",
324 Type::Struct(StructType::new(vec![
325 NestedField::required(
326 8,
327 "sub_sub_col",
328 Type::Primitive(PrimitiveType::Int),
329 )
330 .into(),
331 ])),
332 )
333 .into(),
334 ])),
335 )
336 .into(),
337 ])
338 .build()
339 .unwrap();
340 let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
341 let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef;
342 let col1 = Arc::new(StructArray::new(
343 if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
344 fields.clone()
345 } else {
346 unreachable!()
347 },
348 vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
349 None,
350 ));
351 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
352 "test";
353 1024
354 ])) as ArrayRef;
355 let col3 = Arc::new({
356 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
357 Some(
358 vec![Some(1),]
359 );
360 1024
361 ])
362 .into_parts();
363 arrow_array::ListArray::new(
364 if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() {
365 field.clone()
366 } else {
367 unreachable!()
368 },
369 list_parts.1,
370 list_parts.2,
371 list_parts.3,
372 )
373 }) as ArrayRef;
374 let col4 = Arc::new(StructArray::new(
375 if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
376 fields.clone()
377 } else {
378 unreachable!()
379 },
380 vec![Arc::new(StructArray::new(
381 if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
382 if let DataType::Struct(fields) = fields.first().unwrap().data_type() {
383 fields.clone()
384 } else {
385 unreachable!()
386 }
387 } else {
388 unreachable!()
389 },
390 vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
391 None,
392 ))],
393 None,
394 ));
395 let columns = vec![col0, col1, col2, col3, col4];
396 let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
397
398 let equality_ids = vec![0_i32, 8];
399 let equality_config =
400 EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap();
401 let delete_schema =
402 arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
403 let projector = equality_config.projector.clone();
404
405 let pb = ParquetWriterBuilder::new(
407 WriterProperties::builder().build(),
408 Arc::new(delete_schema),
409 None,
410 file_io.clone(),
411 location_gen,
412 file_name_gen,
413 );
414 let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config)
415 .build()
416 .await?;
417
418 equality_delete_writer.write(to_write.clone()).await?;
420 let res = equality_delete_writer.close().await?;
421 assert_eq!(res.len(), 1);
422 let data_file = res.into_iter().next().unwrap();
423
424 let to_write_projected = projector.project_batch(to_write)?;
426 check_parquet_data_file_with_equality_delete_write(
427 &file_io,
428 &data_file,
429 &to_write_projected,
430 )
431 .await;
432 Ok(())
433 }
434
435 #[tokio::test]
436 async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> {
437 let schema = Arc::new(
438 Schema::builder()
439 .with_schema_id(1)
440 .with_fields(vec![
441 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(),
442 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(),
443 NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
444 NestedField::required(
445 3,
446 "col3",
447 Type::Struct(StructType::new(vec![
448 NestedField::required(
449 4,
450 "sub_col",
451 Type::Primitive(PrimitiveType::Int),
452 )
453 .into(),
454 ])),
455 )
456 .into(),
457 NestedField::optional(
458 5,
459 "col4",
460 Type::Struct(StructType::new(vec![
461 NestedField::required(
462 6,
463 "sub_col2",
464 Type::Primitive(PrimitiveType::Int),
465 )
466 .into(),
467 ])),
468 )
469 .into(),
470 NestedField::required(
471 7,
472 "col5",
473 Type::Map(MapType::new(
474 Arc::new(NestedField::required(
475 8,
476 "key",
477 Type::Primitive(PrimitiveType::String),
478 )),
479 Arc::new(NestedField::required(
480 9,
481 "value",
482 Type::Primitive(PrimitiveType::Int),
483 )),
484 )),
485 )
486 .into(),
487 NestedField::required(
488 10,
489 "col6",
490 Type::List(ListType::new(Arc::new(NestedField::required(
491 11,
492 "element",
493 Type::Primitive(PrimitiveType::Int),
494 )))),
495 )
496 .into(),
497 ])
498 .build()
499 .unwrap(),
500 );
501 assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None, 0).is_err());
503 assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0).is_err());
504 assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None, 0).is_err());
506 assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None, 0).is_ok());
508 assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None, 0).is_err());
510 assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None, 0).is_err());
511 assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None, 0).is_err());
512 assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None, 0).is_err());
514 assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None, 0).is_err());
515
516 Ok(())
517 }
518
519 #[tokio::test]
520 async fn test_equality_delete_with_primitive_type() -> Result<(), anyhow::Error> {
521 let temp_dir = TempDir::new().unwrap();
522 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
523 let location_gen = DefaultLocationGenerator::with_data_location(
524 temp_dir.path().to_str().unwrap().to_string(),
525 );
526 let file_name_gen =
527 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
528
529 let schema = Arc::new(
530 Schema::builder()
531 .with_schema_id(1)
532 .with_fields(vec![
533 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Boolean))
534 .into(),
535 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Int)).into(),
536 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Long)).into(),
537 NestedField::required(
538 3,
539 "col3",
540 Type::Primitive(PrimitiveType::Decimal {
541 precision: 38,
542 scale: 5,
543 }),
544 )
545 .into(),
546 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Date)).into(),
547 NestedField::required(5, "col5", Type::Primitive(PrimitiveType::Time)).into(),
548 NestedField::required(6, "col6", Type::Primitive(PrimitiveType::Timestamp))
549 .into(),
550 NestedField::required(7, "col7", Type::Primitive(PrimitiveType::Timestamptz))
551 .into(),
552 NestedField::required(8, "col8", Type::Primitive(PrimitiveType::TimestampNs))
553 .into(),
554 NestedField::required(9, "col9", Type::Primitive(PrimitiveType::TimestamptzNs))
555 .into(),
556 NestedField::required(10, "col10", Type::Primitive(PrimitiveType::String))
557 .into(),
558 NestedField::required(11, "col11", Type::Primitive(PrimitiveType::Uuid)).into(),
559 NestedField::required(12, "col12", Type::Primitive(PrimitiveType::Fixed(10)))
560 .into(),
561 NestedField::required(13, "col13", Type::Primitive(PrimitiveType::Binary))
562 .into(),
563 ])
564 .build()
565 .unwrap(),
566 );
567 let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
568 let config =
569 EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None, 0).unwrap();
570 let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
571 let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();
572
573 let pb = ParquetWriterBuilder::new(
574 WriterProperties::builder().build(),
575 Arc::new(delete_schema),
576 None,
577 file_io.clone(),
578 location_gen,
579 file_name_gen,
580 );
581 let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config)
582 .build()
583 .await?;
584
585 let col0 = Arc::new(BooleanArray::from(vec![
587 Some(true),
588 Some(false),
589 Some(true),
590 ])) as ArrayRef;
591 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
592 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
593 let col3 = Arc::new(
594 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), Some(4)])
595 .with_precision_and_scale(38, 5)
596 .unwrap(),
597 ) as ArrayRef;
598 let col4 = Arc::new(arrow_array::Date32Array::from(vec![
599 Some(0),
600 Some(1),
601 Some(3),
602 ])) as ArrayRef;
603 let col5 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
604 Some(0),
605 Some(1),
606 Some(3),
607 ])) as ArrayRef;
608 let col6 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
609 Some(0),
610 Some(1),
611 Some(3),
612 ])) as ArrayRef;
613 let col7 = Arc::new(
614 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), Some(3)])
615 .with_timezone_utc(),
616 ) as ArrayRef;
617 let col8 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
618 Some(0),
619 Some(1),
620 Some(3),
621 ])) as ArrayRef;
622 let col9 = Arc::new(
623 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), Some(3)])
624 .with_timezone_utc(),
625 ) as ArrayRef;
626 let col10 = Arc::new(arrow_array::StringArray::from(vec![
627 Some("a"),
628 Some("b"),
629 Some("d"),
630 ])) as ArrayRef;
631 let col11 = Arc::new(
632 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
633 vec![
634 Some(Uuid::from_u128(0).as_bytes().to_vec()),
635 Some(Uuid::from_u128(1).as_bytes().to_vec()),
636 Some(Uuid::from_u128(3).as_bytes().to_vec()),
637 ]
638 .into_iter(),
639 16,
640 )
641 .unwrap(),
642 ) as ArrayRef;
643 let col12 = Arc::new(
644 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
645 vec![
646 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
647 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
648 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
649 ]
650 .into_iter(),
651 10,
652 )
653 .unwrap(),
654 ) as ArrayRef;
655 let col13 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
656 Some(b"one"),
657 Some(b""),
658 Some(b"zzzz"),
659 ])) as ArrayRef;
660 let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
661 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
662 ])
663 .unwrap();
664 equality_delete_writer.write(to_write.clone()).await?;
665 let res = equality_delete_writer.close().await?;
666 assert_eq!(res.len(), 1);
667 check_parquet_data_file_with_equality_delete_write(
668 &file_io,
669 &res.into_iter().next().unwrap(),
670 &to_write,
671 )
672 .await;
673
674 Ok(())
675 }
676
677 #[tokio::test]
678 async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
679 let schema = Schema::builder()
682 .with_schema_id(1)
683 .with_fields(vec![
684 NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
685 NestedField::optional(
686 1,
687 "col1",
688 Type::Struct(StructType::new(vec![
689 NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int))
690 .into(),
691 ])),
692 )
693 .into(),
694 NestedField::optional(
695 3,
696 "col2",
697 Type::Struct(StructType::new(vec![
698 NestedField::optional(
699 4,
700 "sub_struct_col",
701 Type::Struct(StructType::new(vec![
702 NestedField::optional(
703 5,
704 "sub_sub_col",
705 Type::Primitive(PrimitiveType::Int),
706 )
707 .into(),
708 ])),
709 )
710 .into(),
711 ])),
712 )
713 .into(),
714 ])
715 .build()
716 .unwrap();
717 let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
718 let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
722 let col1 = {
723 let nulls = NullBuffer::from(vec![true, false, true]);
724 Arc::new(StructArray::new(
725 if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
726 fields.clone()
727 } else {
728 unreachable!()
729 },
730 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
731 Some(nulls),
732 ))
733 };
734 let col2 = {
735 let inner_col = {
736 let nulls = NullBuffer::from(vec![true, false, true]);
737 Arc::new(StructArray::new(
738 Fields::from(vec![
739 Field::new("sub_sub_col", DataType::Int32, true).with_metadata(
740 HashMap::from([(
741 PARQUET_FIELD_ID_META_KEY.to_string(),
742 "5".to_string(),
743 )]),
744 ),
745 ]),
746 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
747 Some(nulls),
748 ))
749 };
750 let nulls = NullBuffer::from(vec![false, true, true]);
751 Arc::new(StructArray::new(
752 if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
753 fields.clone()
754 } else {
755 unreachable!()
756 },
757 vec![inner_col],
758 Some(nulls),
759 ))
760 };
761 let columns = vec![col0, col1, col2];
762
763 let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
764 let equality_ids = vec![0_i32, 2, 5];
765 let equality_config =
766 EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap();
767 let projector = equality_config.projector.clone();
768
769 let to_write_projected = projector.project_batch(to_write)?;
771 let expect_batch =
772 RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
773 Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
774 Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
775 Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
776 ])
777 .unwrap();
778 assert_eq!(to_write_projected, expect_batch);
779 Ok(())
780 }
781}