iceberg/writer/base_writer/
equality_delete_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provide `EqualityDeleteWriter`.
19
20use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::arrow::record_batch_projector::RecordBatchProjector;
28use crate::arrow::schema_to_arrow_schema;
29use crate::spec::{DataFile, SchemaRef, Struct};
30use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
31use crate::writer::{IcebergWriter, IcebergWriterBuilder};
32use crate::{Error, ErrorKind, Result};
33
34/// Builder for `EqualityDeleteWriter`.
35#[derive(Clone, Debug)]
36pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
37    inner: B,
38    config: EqualityDeleteWriterConfig,
39}
40
41impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
42    /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`.
43    pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
44        Self { inner, config }
45    }
46}
47
48/// Config for `EqualityDeleteWriter`.
49#[derive(Clone, Debug)]
50pub struct EqualityDeleteWriterConfig {
51    // Field ids used to determine row equality in equality delete files.
52    equality_ids: Vec<i32>,
53    // Projector used to project the data chunk into specific fields.
54    projector: RecordBatchProjector,
55    partition_value: Struct,
56    partition_spec_id: i32,
57}
58
59impl EqualityDeleteWriterConfig {
60    /// Create a new `DataFileWriterConfig` with equality ids.
61    pub fn new(
62        equality_ids: Vec<i32>,
63        original_schema: SchemaRef,
64        partition_value: Option<Struct>,
65        partition_spec_id: i32,
66    ) -> Result<Self> {
67        let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
68        let projector = RecordBatchProjector::new(
69            original_arrow_schema,
70            &equality_ids,
71            // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
72            // and https://iceberg.apache.org/spec/#equality-delete-files
73            // - The identifier field ids must be used for primitive types.
74            // - The identifier field ids must not be used for floating point types or nullable fields.
75            |field| {
76                // Only primitive type is allowed to be used for identifier field ids
77                if field.data_type().is_nested()
78                    || matches!(
79                        field.data_type(),
80                        DataType::Float16 | DataType::Float32 | DataType::Float64
81                    )
82                {
83                    return Ok(None);
84                }
85                Ok(Some(
86                    field
87                        .metadata()
88                        .get(PARQUET_FIELD_ID_META_KEY)
89                        .ok_or_else(|| {
90                            Error::new(ErrorKind::Unexpected, "Field metadata is missing.")
91                        })?
92                        .parse::<i64>()
93                        .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
94                ))
95            },
96            |_field: &Field| true,
97        )?;
98        Ok(Self {
99            equality_ids,
100            projector,
101            partition_value: partition_value.unwrap_or(Struct::empty()),
102            partition_spec_id,
103        })
104    }
105
106    /// Return projected Schema
107    pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef {
108        self.projector.projected_schema_ref()
109    }
110}
111
112#[async_trait::async_trait]
113impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
114    type R = EqualityDeleteFileWriter<B>;
115
116    async fn build(self) -> Result<Self::R> {
117        Ok(EqualityDeleteFileWriter {
118            inner_writer: Some(self.inner.clone().build().await?),
119            projector: self.config.projector,
120            equality_ids: self.config.equality_ids,
121            partition_value: self.config.partition_value,
122            partition_spec_id: self.config.partition_spec_id,
123        })
124    }
125}
126
127/// Writer used to write equality delete files.
128#[derive(Debug)]
129pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
130    inner_writer: Option<B::R>,
131    projector: RecordBatchProjector,
132    equality_ids: Vec<i32>,
133    partition_value: Struct,
134    partition_spec_id: i32,
135}
136
137#[async_trait::async_trait]
138impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
139    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
140        let batch = self.projector.project_batch(batch)?;
141        if let Some(writer) = self.inner_writer.as_mut() {
142            writer.write(&batch).await
143        } else {
144            Err(Error::new(
145                ErrorKind::Unexpected,
146                "Equality delete inner writer has been closed.",
147            ))
148        }
149    }
150
151    async fn close(&mut self) -> Result<Vec<DataFile>> {
152        if let Some(writer) = self.inner_writer.take() {
153            Ok(writer
154                .close()
155                .await?
156                .into_iter()
157                .map(|mut res| {
158                    res.content(crate::spec::DataContentType::EqualityDeletes);
159                    res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
160                    res.partition(self.partition_value.clone());
161                    res.partition_spec_id(self.partition_spec_id);
162                    res.build().expect("msg")
163                })
164                .collect_vec())
165        } else {
166            Err(Error::new(
167                ErrorKind::Unexpected,
168                "Equality delete inner writer has been closed.",
169            ))
170        }
171    }
172}
173
174#[cfg(test)]
175mod test {
176    use std::collections::HashMap;
177    use std::sync::Arc;
178
179    use arrow_array::types::Int32Type;
180    use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
181    use arrow_buffer::NullBuffer;
182    use arrow_schema::{DataType, Field, Fields};
183    use arrow_select::concat::concat_batches;
184    use itertools::Itertools;
185    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
186    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
187    use parquet::file::properties::WriterProperties;
188    use tempfile::TempDir;
189    use uuid::Uuid;
190
191    use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
192    use crate::io::{FileIO, FileIOBuilder};
193    use crate::spec::{
194        DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema,
195        StructType, Type,
196    };
197    use crate::writer::base_writer::equality_delete_writer::{
198        EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
199    };
200    use crate::writer::file_writer::ParquetWriterBuilder;
201    use crate::writer::file_writer::location_generator::{
202        DefaultFileNameGenerator, DefaultLocationGenerator,
203    };
204    use crate::writer::{IcebergWriter, IcebergWriterBuilder};
205
206    async fn check_parquet_data_file_with_equality_delete_write(
207        file_io: &FileIO,
208        data_file: &DataFile,
209        batch: &RecordBatch,
210    ) {
211        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
212
213        // read the written file
214        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
215        // read the written file
216        let input_content = input_file.read().await.unwrap();
217        let reader_builder =
218            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
219        let metadata = reader_builder.metadata().clone();
220
221        // check data
222        let reader = reader_builder.build().unwrap();
223        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
224        let res = concat_batches(&batch.schema(), &batches).unwrap();
225        assert_eq!(*batch, res);
226
227        // check metadata
228        let expect_column_num = batch.num_columns();
229
230        assert_eq!(
231            data_file.record_count,
232            metadata
233                .row_groups()
234                .iter()
235                .map(|group| group.num_rows())
236                .sum::<i64>() as u64
237        );
238
239        assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
240
241        assert_eq!(data_file.column_sizes.len(), expect_column_num);
242
243        for (index, id) in data_file.column_sizes().keys().sorted().enumerate() {
244            metadata
245                .row_groups()
246                .iter()
247                .map(|group| group.columns())
248                .for_each(|column| {
249                    assert_eq!(
250                        *data_file.column_sizes.get(id).unwrap() as i64,
251                        column.get(index).unwrap().compressed_size()
252                    );
253                });
254        }
255
256        assert_eq!(data_file.value_counts.len(), expect_column_num);
257        data_file.value_counts.iter().for_each(|(_, &v)| {
258            let expect = metadata
259                .row_groups()
260                .iter()
261                .map(|group| group.num_rows())
262                .sum::<i64>() as u64;
263            assert_eq!(v, expect);
264        });
265
266        for (index, id) in data_file.null_value_counts().keys().enumerate() {
267            let expect = batch.column(index).null_count() as u64;
268            assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
269        }
270
271        assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
272        data_file
273            .split_offsets
274            .iter()
275            .enumerate()
276            .for_each(|(i, &v)| {
277                let expect = metadata.row_groups()[i].file_offset().unwrap();
278                assert_eq!(v, expect);
279            });
280    }
281
282    #[tokio::test]
283    async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
284        let temp_dir = TempDir::new().unwrap();
285        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
286        let location_gen = DefaultLocationGenerator::with_data_location(
287            temp_dir.path().to_str().unwrap().to_string(),
288        );
289        let file_name_gen =
290            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
291
292        // prepare data
293        // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
294        let schema = Schema::builder()
295            .with_schema_id(1)
296            .with_fields(vec![
297                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
298                NestedField::required(
299                    1,
300                    "col1",
301                    Type::Struct(StructType::new(vec![
302                        NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int))
303                            .into(),
304                    ])),
305                )
306                .into(),
307                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
308                NestedField::required(
309                    3,
310                    "col3",
311                    Type::List(ListType::new(
312                        NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int))
313                            .into(),
314                    )),
315                )
316                .into(),
317                NestedField::required(
318                    4,
319                    "col4",
320                    Type::Struct(StructType::new(vec![
321                        NestedField::required(
322                            7,
323                            "sub_col",
324                            Type::Struct(StructType::new(vec![
325                                NestedField::required(
326                                    8,
327                                    "sub_sub_col",
328                                    Type::Primitive(PrimitiveType::Int),
329                                )
330                                .into(),
331                            ])),
332                        )
333                        .into(),
334                    ])),
335                )
336                .into(),
337            ])
338            .build()
339            .unwrap();
340        let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
341        let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef;
342        let col1 = Arc::new(StructArray::new(
343            if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
344                fields.clone()
345            } else {
346                unreachable!()
347            },
348            vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
349            None,
350        ));
351        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
352            "test";
353            1024
354        ])) as ArrayRef;
355        let col3 = Arc::new({
356            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
357              Some(
358                  vec![Some(1),]
359              );
360              1024
361          ])
362            .into_parts();
363            arrow_array::ListArray::new(
364                if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() {
365                    field.clone()
366                } else {
367                    unreachable!()
368                },
369                list_parts.1,
370                list_parts.2,
371                list_parts.3,
372            )
373        }) as ArrayRef;
374        let col4 = Arc::new(StructArray::new(
375            if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
376                fields.clone()
377            } else {
378                unreachable!()
379            },
380            vec![Arc::new(StructArray::new(
381                if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
382                    if let DataType::Struct(fields) = fields.first().unwrap().data_type() {
383                        fields.clone()
384                    } else {
385                        unreachable!()
386                    }
387                } else {
388                    unreachable!()
389                },
390                vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
391                None,
392            ))],
393            None,
394        ));
395        let columns = vec![col0, col1, col2, col3, col4];
396        let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
397
398        let equality_ids = vec![0_i32, 8];
399        let equality_config =
400            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap();
401        let delete_schema =
402            arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
403        let projector = equality_config.projector.clone();
404
405        // prepare writer
406        let pb = ParquetWriterBuilder::new(
407            WriterProperties::builder().build(),
408            Arc::new(delete_schema),
409            None,
410            file_io.clone(),
411            location_gen,
412            file_name_gen,
413        );
414        let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config)
415            .build()
416            .await?;
417
418        // write
419        equality_delete_writer.write(to_write.clone()).await?;
420        let res = equality_delete_writer.close().await?;
421        assert_eq!(res.len(), 1);
422        let data_file = res.into_iter().next().unwrap();
423
424        // check
425        let to_write_projected = projector.project_batch(to_write)?;
426        check_parquet_data_file_with_equality_delete_write(
427            &file_io,
428            &data_file,
429            &to_write_projected,
430        )
431        .await;
432        Ok(())
433    }
434
435    #[tokio::test]
436    async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> {
437        let schema = Arc::new(
438            Schema::builder()
439                .with_schema_id(1)
440                .with_fields(vec![
441                    NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(),
442                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(),
443                    NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
444                    NestedField::required(
445                        3,
446                        "col3",
447                        Type::Struct(StructType::new(vec![
448                            NestedField::required(
449                                4,
450                                "sub_col",
451                                Type::Primitive(PrimitiveType::Int),
452                            )
453                            .into(),
454                        ])),
455                    )
456                    .into(),
457                    NestedField::optional(
458                        5,
459                        "col4",
460                        Type::Struct(StructType::new(vec![
461                            NestedField::required(
462                                6,
463                                "sub_col2",
464                                Type::Primitive(PrimitiveType::Int),
465                            )
466                            .into(),
467                        ])),
468                    )
469                    .into(),
470                    NestedField::required(
471                        7,
472                        "col5",
473                        Type::Map(MapType::new(
474                            Arc::new(NestedField::required(
475                                8,
476                                "key",
477                                Type::Primitive(PrimitiveType::String),
478                            )),
479                            Arc::new(NestedField::required(
480                                9,
481                                "value",
482                                Type::Primitive(PrimitiveType::Int),
483                            )),
484                        )),
485                    )
486                    .into(),
487                    NestedField::required(
488                        10,
489                        "col6",
490                        Type::List(ListType::new(Arc::new(NestedField::required(
491                            11,
492                            "element",
493                            Type::Primitive(PrimitiveType::Int),
494                        )))),
495                    )
496                    .into(),
497                ])
498                .build()
499                .unwrap(),
500        );
501        // Float and Double are not allowed to be used for equality delete
502        assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None, 0).is_err());
503        assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0).is_err());
504        // Struct is not allowed to be used for equality delete
505        assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None, 0).is_err());
506        // Nested field of struct is allowed to be used for equality delete
507        assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None, 0).is_ok());
508        // Nested field of map is not allowed to be used for equality delete
509        assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None, 0).is_err());
510        assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None, 0).is_err());
511        assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None, 0).is_err());
512        // Nested field of list is not allowed to be used for equality delete
513        assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None, 0).is_err());
514        assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None, 0).is_err());
515
516        Ok(())
517    }
518
519    #[tokio::test]
520    async fn test_equality_delete_with_primitive_type() -> Result<(), anyhow::Error> {
521        let temp_dir = TempDir::new().unwrap();
522        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
523        let location_gen = DefaultLocationGenerator::with_data_location(
524            temp_dir.path().to_str().unwrap().to_string(),
525        );
526        let file_name_gen =
527            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
528
529        let schema = Arc::new(
530            Schema::builder()
531                .with_schema_id(1)
532                .with_fields(vec![
533                    NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Boolean))
534                        .into(),
535                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Int)).into(),
536                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Long)).into(),
537                    NestedField::required(
538                        3,
539                        "col3",
540                        Type::Primitive(PrimitiveType::Decimal {
541                            precision: 38,
542                            scale: 5,
543                        }),
544                    )
545                    .into(),
546                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Date)).into(),
547                    NestedField::required(5, "col5", Type::Primitive(PrimitiveType::Time)).into(),
548                    NestedField::required(6, "col6", Type::Primitive(PrimitiveType::Timestamp))
549                        .into(),
550                    NestedField::required(7, "col7", Type::Primitive(PrimitiveType::Timestamptz))
551                        .into(),
552                    NestedField::required(8, "col8", Type::Primitive(PrimitiveType::TimestampNs))
553                        .into(),
554                    NestedField::required(9, "col9", Type::Primitive(PrimitiveType::TimestamptzNs))
555                        .into(),
556                    NestedField::required(10, "col10", Type::Primitive(PrimitiveType::String))
557                        .into(),
558                    NestedField::required(11, "col11", Type::Primitive(PrimitiveType::Uuid)).into(),
559                    NestedField::required(12, "col12", Type::Primitive(PrimitiveType::Fixed(10)))
560                        .into(),
561                    NestedField::required(13, "col13", Type::Primitive(PrimitiveType::Binary))
562                        .into(),
563                ])
564                .build()
565                .unwrap(),
566        );
567        let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
568        let config =
569            EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None, 0).unwrap();
570        let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
571        let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();
572
573        let pb = ParquetWriterBuilder::new(
574            WriterProperties::builder().build(),
575            Arc::new(delete_schema),
576            None,
577            file_io.clone(),
578            location_gen,
579            file_name_gen,
580        );
581        let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config)
582            .build()
583            .await?;
584
585        // prepare data
586        let col0 = Arc::new(BooleanArray::from(vec![
587            Some(true),
588            Some(false),
589            Some(true),
590        ])) as ArrayRef;
591        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
592        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
593        let col3 = Arc::new(
594            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), Some(4)])
595                .with_precision_and_scale(38, 5)
596                .unwrap(),
597        ) as ArrayRef;
598        let col4 = Arc::new(arrow_array::Date32Array::from(vec![
599            Some(0),
600            Some(1),
601            Some(3),
602        ])) as ArrayRef;
603        let col5 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
604            Some(0),
605            Some(1),
606            Some(3),
607        ])) as ArrayRef;
608        let col6 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
609            Some(0),
610            Some(1),
611            Some(3),
612        ])) as ArrayRef;
613        let col7 = Arc::new(
614            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), Some(3)])
615                .with_timezone_utc(),
616        ) as ArrayRef;
617        let col8 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
618            Some(0),
619            Some(1),
620            Some(3),
621        ])) as ArrayRef;
622        let col9 = Arc::new(
623            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), Some(3)])
624                .with_timezone_utc(),
625        ) as ArrayRef;
626        let col10 = Arc::new(arrow_array::StringArray::from(vec![
627            Some("a"),
628            Some("b"),
629            Some("d"),
630        ])) as ArrayRef;
631        let col11 = Arc::new(
632            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
633                vec![
634                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
635                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
636                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
637                ]
638                .into_iter(),
639                16,
640            )
641            .unwrap(),
642        ) as ArrayRef;
643        let col12 = Arc::new(
644            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
645                vec![
646                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
647                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
648                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
649                ]
650                .into_iter(),
651                10,
652            )
653            .unwrap(),
654        ) as ArrayRef;
655        let col13 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
656            Some(b"one"),
657            Some(b""),
658            Some(b"zzzz"),
659        ])) as ArrayRef;
660        let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
661            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
662        ])
663        .unwrap();
664        equality_delete_writer.write(to_write.clone()).await?;
665        let res = equality_delete_writer.close().await?;
666        assert_eq!(res.len(), 1);
667        check_parquet_data_file_with_equality_delete_write(
668            &file_io,
669            &res.into_iter().next().unwrap(),
670            &to_write,
671        )
672        .await;
673
674        Ok(())
675    }
676
677    #[tokio::test]
678    async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
679        // prepare data
680        // Int, Struct(Int), Struct(Struct(Int))
681        let schema = Schema::builder()
682            .with_schema_id(1)
683            .with_fields(vec![
684                NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
685                NestedField::optional(
686                    1,
687                    "col1",
688                    Type::Struct(StructType::new(vec![
689                        NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int))
690                            .into(),
691                    ])),
692                )
693                .into(),
694                NestedField::optional(
695                    3,
696                    "col2",
697                    Type::Struct(StructType::new(vec![
698                        NestedField::optional(
699                            4,
700                            "sub_struct_col",
701                            Type::Struct(StructType::new(vec![
702                                NestedField::optional(
703                                    5,
704                                    "sub_sub_col",
705                                    Type::Primitive(PrimitiveType::Int),
706                                )
707                                .into(),
708                            ])),
709                        )
710                        .into(),
711                    ])),
712                )
713                .into(),
714            ])
715            .build()
716            .unwrap();
717        let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
718        // null 1            null(struct)
719        // 2    null(struct) null(sub_struct_col)
720        // 3    null(field)  null(sub_sub_col)
721        let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
722        let col1 = {
723            let nulls = NullBuffer::from(vec![true, false, true]);
724            Arc::new(StructArray::new(
725                if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
726                    fields.clone()
727                } else {
728                    unreachable!()
729                },
730                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
731                Some(nulls),
732            ))
733        };
734        let col2 = {
735            let inner_col = {
736                let nulls = NullBuffer::from(vec![true, false, true]);
737                Arc::new(StructArray::new(
738                    Fields::from(vec![
739                        Field::new("sub_sub_col", DataType::Int32, true).with_metadata(
740                            HashMap::from([(
741                                PARQUET_FIELD_ID_META_KEY.to_string(),
742                                "5".to_string(),
743                            )]),
744                        ),
745                    ]),
746                    vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
747                    Some(nulls),
748                ))
749            };
750            let nulls = NullBuffer::from(vec![false, true, true]);
751            Arc::new(StructArray::new(
752                if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
753                    fields.clone()
754                } else {
755                    unreachable!()
756                },
757                vec![inner_col],
758                Some(nulls),
759            ))
760        };
761        let columns = vec![col0, col1, col2];
762
763        let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
764        let equality_ids = vec![0_i32, 2, 5];
765        let equality_config =
766            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap();
767        let projector = equality_config.projector.clone();
768
769        // check
770        let to_write_projected = projector.project_batch(to_write)?;
771        let expect_batch =
772            RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
773                Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
774                Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
775                Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
776            ])
777            .unwrap();
778        assert_eq!(to_write_projected, expect_batch);
779        Ok(())
780    }
781}