1use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::arrow::record_batch_projector::RecordBatchProjector;
28use crate::arrow::schema_to_arrow_schema;
29use crate::spec::{DataFile, PartitionKey, SchemaRef};
30use crate::writer::file_writer::FileWriterBuilder;
31use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
32use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
33use crate::writer::{IcebergWriter, IcebergWriterBuilder};
34use crate::{Error, ErrorKind, Result};
35
36#[derive(Debug)]
38pub struct EqualityDeleteFileWriterBuilder<
39 B: FileWriterBuilder,
40 L: LocationGenerator,
41 F: FileNameGenerator,
42> {
43 inner: RollingFileWriterBuilder<B, L, F>,
44 config: EqualityDeleteWriterConfig,
45}
46
47impl<B, L, F> EqualityDeleteFileWriterBuilder<B, L, F>
48where
49 B: FileWriterBuilder,
50 L: LocationGenerator,
51 F: FileNameGenerator,
52{
53 pub fn new(
55 inner: RollingFileWriterBuilder<B, L, F>,
56 config: EqualityDeleteWriterConfig,
57 ) -> Self {
58 Self { inner, config }
59 }
60}
61
62#[derive(Clone, Debug)]
64pub struct EqualityDeleteWriterConfig {
65 equality_ids: Vec<i32>,
67 projector: RecordBatchProjector,
69}
70
71impl EqualityDeleteWriterConfig {
72 pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
74 let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
75 let projector = RecordBatchProjector::new(
76 original_arrow_schema,
77 &equality_ids,
78 |field| {
83 if field.data_type().is_nested()
85 || matches!(
86 field.data_type(),
87 DataType::Float16 | DataType::Float32 | DataType::Float64
88 )
89 {
90 return Ok(None);
91 }
92 Ok(Some(
93 field
94 .metadata()
95 .get(PARQUET_FIELD_ID_META_KEY)
96 .ok_or_else(|| {
97 Error::new(ErrorKind::Unexpected, "Field metadata is missing.")
98 })?
99 .parse::<i64>()
100 .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
101 ))
102 },
103 |_field: &Field| true,
104 )?;
105 Ok(Self {
106 equality_ids,
107 projector,
108 })
109 }
110
111 pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef {
113 self.projector.projected_schema_ref()
114 }
115}
116
117#[async_trait::async_trait]
118impl<B, L, F> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B, L, F>
119where
120 B: FileWriterBuilder,
121 L: LocationGenerator,
122 F: FileNameGenerator,
123{
124 type R = EqualityDeleteFileWriter<B, L, F>;
125
126 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
127 Ok(EqualityDeleteFileWriter {
128 inner: Some(self.inner.build()),
129 projector: self.config.projector.clone(),
130 equality_ids: self.config.equality_ids.clone(),
131 partition_key,
132 })
133 }
134}
135
136#[derive(Debug)]
138pub struct EqualityDeleteFileWriter<
139 B: FileWriterBuilder,
140 L: LocationGenerator,
141 F: FileNameGenerator,
142> {
143 inner: Option<RollingFileWriter<B, L, F>>,
144 projector: RecordBatchProjector,
145 equality_ids: Vec<i32>,
146 partition_key: Option<PartitionKey>,
147}
148
149#[async_trait::async_trait]
150impl<B, L, F> IcebergWriter for EqualityDeleteFileWriter<B, L, F>
151where
152 B: FileWriterBuilder,
153 L: LocationGenerator,
154 F: FileNameGenerator,
155{
156 async fn write(&mut self, batch: RecordBatch) -> Result<()> {
157 let batch = self.projector.project_batch(batch)?;
158 if let Some(writer) = self.inner.as_mut() {
159 writer.write(&self.partition_key, &batch).await
160 } else {
161 Err(Error::new(
162 ErrorKind::Unexpected,
163 "Equality delete inner writer has been closed.",
164 ))
165 }
166 }
167
168 async fn close(&mut self) -> Result<Vec<DataFile>> {
169 if let Some(writer) = self.inner.take() {
170 writer
171 .close()
172 .await?
173 .into_iter()
174 .map(|mut res| {
175 res.content(crate::spec::DataContentType::EqualityDeletes);
176 res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
177 if let Some(pk) = self.partition_key.as_ref() {
178 res.partition(pk.data().clone());
179 res.partition_spec_id(pk.spec().spec_id());
180 }
181 res.build().map_err(|e| {
182 Error::new(
183 ErrorKind::DataInvalid,
184 format!("Failed to build data file: {e}"),
185 )
186 })
187 })
188 .collect()
189 } else {
190 Err(Error::new(
191 ErrorKind::Unexpected,
192 "Equality delete inner writer has been closed.",
193 ))
194 }
195 }
196}
197
198#[cfg(test)]
199mod test {
200 use std::collections::HashMap;
201 use std::sync::Arc;
202
203 use arrow_array::types::Int32Type;
204 use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
205 use arrow_buffer::NullBuffer;
206 use arrow_schema::{DataType, Field, Fields};
207 use arrow_select::concat::concat_batches;
208 use itertools::Itertools;
209 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
210 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
211 use parquet::file::properties::WriterProperties;
212 use tempfile::TempDir;
213 use uuid::Uuid;
214
215 use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
216 use crate::io::{FileIO, FileIOBuilder};
217 use crate::spec::{
218 DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema,
219 StructType, Type,
220 };
221 use crate::writer::base_writer::equality_delete_writer::{
222 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
223 };
224 use crate::writer::file_writer::ParquetWriterBuilder;
225 use crate::writer::file_writer::location_generator::{
226 DefaultFileNameGenerator, DefaultLocationGenerator,
227 };
228 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
229 use crate::writer::{IcebergWriter, IcebergWriterBuilder};
230
231 async fn check_parquet_data_file_with_equality_delete_write(
232 file_io: &FileIO,
233 data_file: &DataFile,
234 batch: &RecordBatch,
235 ) {
236 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
237
238 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
240 let input_content = input_file.read().await.unwrap();
242 let reader_builder =
243 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
244 let metadata = reader_builder.metadata().clone();
245
246 let reader = reader_builder.build().unwrap();
248 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
249 let res = concat_batches(&batch.schema(), &batches).unwrap();
250 assert_eq!(*batch, res);
251
252 let expect_column_num = batch.num_columns();
254
255 assert_eq!(
256 data_file.record_count,
257 metadata
258 .row_groups()
259 .iter()
260 .map(|group| group.num_rows())
261 .sum::<i64>() as u64
262 );
263
264 assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
265
266 assert_eq!(data_file.column_sizes.len(), expect_column_num);
267
268 for (index, id) in data_file.column_sizes().keys().sorted().enumerate() {
269 metadata
270 .row_groups()
271 .iter()
272 .map(|group| group.columns())
273 .for_each(|column| {
274 assert_eq!(
275 *data_file.column_sizes.get(id).unwrap() as i64,
276 column.get(index).unwrap().compressed_size()
277 );
278 });
279 }
280
281 assert_eq!(data_file.value_counts.len(), expect_column_num);
282 data_file.value_counts.iter().for_each(|(_, &v)| {
283 let expect = metadata
284 .row_groups()
285 .iter()
286 .map(|group| group.num_rows())
287 .sum::<i64>() as u64;
288 assert_eq!(v, expect);
289 });
290
291 for (index, id) in data_file.null_value_counts().keys().enumerate() {
292 let expect = batch.column(index).null_count() as u64;
293 assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
294 }
295
296 let split_offsets = data_file
297 .split_offsets
298 .as_ref()
299 .expect("split_offsets should be set");
300 assert_eq!(split_offsets.len(), metadata.num_row_groups());
301 split_offsets.iter().enumerate().for_each(|(i, &v)| {
302 let expect = metadata.row_groups()[i].file_offset().unwrap();
303 assert_eq!(v, expect);
304 });
305 }
306
307 #[tokio::test]
308 async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
309 let temp_dir = TempDir::new().unwrap();
310 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
311 let location_gen = DefaultLocationGenerator::with_data_location(
312 temp_dir.path().to_str().unwrap().to_string(),
313 );
314 let file_name_gen =
315 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
316
317 let schema = Schema::builder()
320 .with_schema_id(1)
321 .with_fields(vec![
322 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
323 NestedField::required(
324 1,
325 "col1",
326 Type::Struct(StructType::new(vec![
327 NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int))
328 .into(),
329 ])),
330 )
331 .into(),
332 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
333 NestedField::required(
334 3,
335 "col3",
336 Type::List(ListType::new(
337 NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int))
338 .into(),
339 )),
340 )
341 .into(),
342 NestedField::required(
343 4,
344 "col4",
345 Type::Struct(StructType::new(vec![
346 NestedField::required(
347 7,
348 "sub_col",
349 Type::Struct(StructType::new(vec![
350 NestedField::required(
351 8,
352 "sub_sub_col",
353 Type::Primitive(PrimitiveType::Int),
354 )
355 .into(),
356 ])),
357 )
358 .into(),
359 ])),
360 )
361 .into(),
362 ])
363 .build()
364 .unwrap();
365 let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
366 let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef;
367 let col1 = Arc::new(StructArray::new(
368 if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
369 fields.clone()
370 } else {
371 unreachable!()
372 },
373 vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
374 None,
375 ));
376 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
377 "test";
378 1024
379 ])) as ArrayRef;
380 let col3 = Arc::new({
381 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
382 Some(
383 vec![Some(1),]
384 );
385 1024
386 ])
387 .into_parts();
388 arrow_array::ListArray::new(
389 if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() {
390 field.clone()
391 } else {
392 unreachable!()
393 },
394 list_parts.1,
395 list_parts.2,
396 list_parts.3,
397 )
398 }) as ArrayRef;
399 let col4 = Arc::new(StructArray::new(
400 if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
401 fields.clone()
402 } else {
403 unreachable!()
404 },
405 vec![Arc::new(StructArray::new(
406 if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
407 if let DataType::Struct(fields) = fields.first().unwrap().data_type() {
408 fields.clone()
409 } else {
410 unreachable!()
411 }
412 } else {
413 unreachable!()
414 },
415 vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
416 None,
417 ))],
418 None,
419 ));
420 let columns = vec![col0, col1, col2, col3, col4];
421 let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
422
423 let equality_ids = vec![0_i32, 8];
424 let equality_config =
425 EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
426 let delete_schema =
427 Arc::new(arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap());
428 let projector = equality_config.projector.clone();
429
430 let pb =
432 ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone());
433 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
434 pb,
435 delete_schema,
436 file_io.clone(),
437 location_gen,
438 file_name_gen,
439 );
440 let mut equality_delete_writer =
441 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config)
442 .build(None)
443 .await?;
444
445 equality_delete_writer.write(to_write.clone()).await?;
447 let res = equality_delete_writer.close().await?;
448 assert_eq!(res.len(), 1);
449 let data_file = res.into_iter().next().unwrap();
450
451 let to_write_projected = projector.project_batch(to_write)?;
453 check_parquet_data_file_with_equality_delete_write(
454 &file_io,
455 &data_file,
456 &to_write_projected,
457 )
458 .await;
459 Ok(())
460 }
461
462 #[tokio::test]
463 async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> {
464 let schema = Arc::new(
465 Schema::builder()
466 .with_schema_id(1)
467 .with_fields(vec![
468 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(),
469 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(),
470 NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
471 NestedField::required(
472 3,
473 "col3",
474 Type::Struct(StructType::new(vec![
475 NestedField::required(
476 4,
477 "sub_col",
478 Type::Primitive(PrimitiveType::Int),
479 )
480 .into(),
481 ])),
482 )
483 .into(),
484 NestedField::optional(
485 5,
486 "col4",
487 Type::Struct(StructType::new(vec![
488 NestedField::required(
489 6,
490 "sub_col2",
491 Type::Primitive(PrimitiveType::Int),
492 )
493 .into(),
494 ])),
495 )
496 .into(),
497 NestedField::required(
498 7,
499 "col5",
500 Type::Map(MapType::new(
501 Arc::new(NestedField::required(
502 8,
503 "key",
504 Type::Primitive(PrimitiveType::String),
505 )),
506 Arc::new(NestedField::required(
507 9,
508 "value",
509 Type::Primitive(PrimitiveType::Int),
510 )),
511 )),
512 )
513 .into(),
514 NestedField::required(
515 10,
516 "col6",
517 Type::List(ListType::new(Arc::new(NestedField::required(
518 11,
519 "element",
520 Type::Primitive(PrimitiveType::Int),
521 )))),
522 )
523 .into(),
524 ])
525 .build()
526 .unwrap(),
527 );
528 assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone()).is_err());
530 assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone()).is_err());
531 assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone()).is_err());
533 assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone()).is_ok());
535 assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone()).is_err());
537 assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone()).is_err());
538 assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone()).is_err());
539 assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone()).is_err());
541 assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone()).is_err());
542
543 Ok(())
544 }
545
546 #[tokio::test]
547 async fn test_equality_delete_with_primitive_type() -> Result<(), anyhow::Error> {
548 let temp_dir = TempDir::new().unwrap();
549 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
550 let location_gen = DefaultLocationGenerator::with_data_location(
551 temp_dir.path().to_str().unwrap().to_string(),
552 );
553 let file_name_gen =
554 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
555
556 let schema = Arc::new(
557 Schema::builder()
558 .with_schema_id(1)
559 .with_fields(vec![
560 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Boolean))
561 .into(),
562 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Int)).into(),
563 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Long)).into(),
564 NestedField::required(
565 3,
566 "col3",
567 Type::Primitive(PrimitiveType::Decimal {
568 precision: 38,
569 scale: 5,
570 }),
571 )
572 .into(),
573 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Date)).into(),
574 NestedField::required(5, "col5", Type::Primitive(PrimitiveType::Time)).into(),
575 NestedField::required(6, "col6", Type::Primitive(PrimitiveType::Timestamp))
576 .into(),
577 NestedField::required(7, "col7", Type::Primitive(PrimitiveType::Timestamptz))
578 .into(),
579 NestedField::required(8, "col8", Type::Primitive(PrimitiveType::TimestampNs))
580 .into(),
581 NestedField::required(9, "col9", Type::Primitive(PrimitiveType::TimestamptzNs))
582 .into(),
583 NestedField::required(10, "col10", Type::Primitive(PrimitiveType::String))
584 .into(),
585 NestedField::required(11, "col11", Type::Primitive(PrimitiveType::Uuid)).into(),
586 NestedField::required(12, "col12", Type::Primitive(PrimitiveType::Fixed(10)))
587 .into(),
588 NestedField::required(13, "col13", Type::Primitive(PrimitiveType::Binary))
589 .into(),
590 ])
591 .build()
592 .unwrap(),
593 );
594 let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
595 let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap();
596 let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
597 let delete_schema = Arc::new(arrow_schema_to_schema(&delete_arrow_schema).unwrap());
598
599 let pb =
600 ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone());
601 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
602 pb,
603 delete_schema,
604 file_io.clone(),
605 location_gen,
606 file_name_gen,
607 );
608 let mut equality_delete_writer =
609 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
610 .build(None)
611 .await?;
612
613 let col0 = Arc::new(BooleanArray::from(vec![
615 Some(true),
616 Some(false),
617 Some(true),
618 ])) as ArrayRef;
619 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
620 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
621 let col3 = Arc::new(
622 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), Some(4)])
623 .with_precision_and_scale(38, 5)
624 .unwrap(),
625 ) as ArrayRef;
626 let col4 = Arc::new(arrow_array::Date32Array::from(vec![
627 Some(0),
628 Some(1),
629 Some(3),
630 ])) as ArrayRef;
631 let col5 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
632 Some(0),
633 Some(1),
634 Some(3),
635 ])) as ArrayRef;
636 let col6 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
637 Some(0),
638 Some(1),
639 Some(3),
640 ])) as ArrayRef;
641 let col7 = Arc::new(
642 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), Some(3)])
643 .with_timezone_utc(),
644 ) as ArrayRef;
645 let col8 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
646 Some(0),
647 Some(1),
648 Some(3),
649 ])) as ArrayRef;
650 let col9 = Arc::new(
651 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), Some(3)])
652 .with_timezone_utc(),
653 ) as ArrayRef;
654 let col10 = Arc::new(arrow_array::StringArray::from(vec![
655 Some("a"),
656 Some("b"),
657 Some("d"),
658 ])) as ArrayRef;
659 let col11 = Arc::new(
660 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
661 vec![
662 Some(Uuid::from_u128(0).as_bytes().to_vec()),
663 Some(Uuid::from_u128(1).as_bytes().to_vec()),
664 Some(Uuid::from_u128(3).as_bytes().to_vec()),
665 ]
666 .into_iter(),
667 16,
668 )
669 .unwrap(),
670 ) as ArrayRef;
671 let col12 = Arc::new(
672 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
673 vec![
674 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
675 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
676 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
677 ]
678 .into_iter(),
679 10,
680 )
681 .unwrap(),
682 ) as ArrayRef;
683 let col13 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
684 Some(b"one"),
685 Some(b""),
686 Some(b"zzzz"),
687 ])) as ArrayRef;
688 let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
689 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
690 ])
691 .unwrap();
692 equality_delete_writer.write(to_write.clone()).await?;
693 let res = equality_delete_writer.close().await?;
694 assert_eq!(res.len(), 1);
695 check_parquet_data_file_with_equality_delete_write(
696 &file_io,
697 &res.into_iter().next().unwrap(),
698 &to_write,
699 )
700 .await;
701
702 Ok(())
703 }
704
705 #[tokio::test]
706 async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
707 let schema = Schema::builder()
710 .with_schema_id(1)
711 .with_fields(vec![
712 NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
713 NestedField::optional(
714 1,
715 "col1",
716 Type::Struct(StructType::new(vec![
717 NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int))
718 .into(),
719 ])),
720 )
721 .into(),
722 NestedField::optional(
723 3,
724 "col2",
725 Type::Struct(StructType::new(vec![
726 NestedField::optional(
727 4,
728 "sub_struct_col",
729 Type::Struct(StructType::new(vec![
730 NestedField::optional(
731 5,
732 "sub_sub_col",
733 Type::Primitive(PrimitiveType::Int),
734 )
735 .into(),
736 ])),
737 )
738 .into(),
739 ])),
740 )
741 .into(),
742 ])
743 .build()
744 .unwrap();
745 let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
746 let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
750 let col1 = {
751 let nulls = NullBuffer::from(vec![true, false, true]);
752 Arc::new(StructArray::new(
753 if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
754 fields.clone()
755 } else {
756 unreachable!()
757 },
758 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
759 Some(nulls),
760 ))
761 };
762 let col2 = {
763 let inner_col = {
764 let nulls = NullBuffer::from(vec![true, false, true]);
765 Arc::new(StructArray::new(
766 Fields::from(vec![
767 Field::new("sub_sub_col", DataType::Int32, true).with_metadata(
768 HashMap::from([(
769 PARQUET_FIELD_ID_META_KEY.to_string(),
770 "5".to_string(),
771 )]),
772 ),
773 ]),
774 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
775 Some(nulls),
776 ))
777 };
778 let nulls = NullBuffer::from(vec![false, true, true]);
779 Arc::new(StructArray::new(
780 if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
781 fields.clone()
782 } else {
783 unreachable!()
784 },
785 vec![inner_col],
786 Some(nulls),
787 ))
788 };
789 let columns = vec![col0, col1, col2];
790
791 let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
792 let equality_ids = vec![0_i32, 2, 5];
793 let equality_config =
794 EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
795 let projector = equality_config.projector.clone();
796
797 let to_write_projected = projector.project_batch(to_write)?;
799 let expect_batch =
800 RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
801 Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
802 Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
803 Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
804 ])
805 .unwrap();
806 assert_eq!(to_write_projected, expect_batch);
807 Ok(())
808 }
809}