Skip to main content

iceberg/writer/base_writer/
equality_delete_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provide `EqualityDeleteWriter`.
19
20use std::sync::Arc;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef};
24use itertools::Itertools;
25use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26
27use crate::arrow::record_batch_projector::RecordBatchProjector;
28use crate::arrow::schema_to_arrow_schema;
29use crate::spec::{DataFile, PartitionKey, SchemaRef};
30use crate::writer::file_writer::FileWriterBuilder;
31use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
32use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
33use crate::writer::{IcebergWriter, IcebergWriterBuilder};
34use crate::{Error, ErrorKind, Result};
35
36/// Builder for `EqualityDeleteWriter`.
37#[derive(Debug)]
38pub struct EqualityDeleteFileWriterBuilder<
39    B: FileWriterBuilder,
40    L: LocationGenerator,
41    F: FileNameGenerator,
42> {
43    inner: RollingFileWriterBuilder<B, L, F>,
44    config: EqualityDeleteWriterConfig,
45}
46
47impl<B, L, F> EqualityDeleteFileWriterBuilder<B, L, F>
48where
49    B: FileWriterBuilder,
50    L: LocationGenerator,
51    F: FileNameGenerator,
52{
53    /// Create a new `EqualityDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`.
54    pub fn new(
55        inner: RollingFileWriterBuilder<B, L, F>,
56        config: EqualityDeleteWriterConfig,
57    ) -> Self {
58        Self { inner, config }
59    }
60}
61
62/// Config for `EqualityDeleteWriter`.
63#[derive(Clone, Debug)]
64pub struct EqualityDeleteWriterConfig {
65    // Field ids used to determine row equality in equality delete files.
66    equality_ids: Vec<i32>,
67    // Projector used to project the data chunk into specific fields.
68    projector: RecordBatchProjector,
69}
70
71impl EqualityDeleteWriterConfig {
72    /// Create a new `DataFileWriterConfig` with equality ids.
73    pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
74        let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
75        let projector = RecordBatchProjector::new(
76            original_arrow_schema,
77            &equality_ids,
78            // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
79            // and https://iceberg.apache.org/spec/#equality-delete-files
80            // - The identifier field ids must be used for primitive types.
81            // - The identifier field ids must not be used for floating point types or nullable fields.
82            |field| {
83                // Only primitive type is allowed to be used for identifier field ids
84                if field.data_type().is_nested()
85                    || matches!(
86                        field.data_type(),
87                        DataType::Float16 | DataType::Float32 | DataType::Float64
88                    )
89                {
90                    return Ok(None);
91                }
92                Ok(Some(
93                    field
94                        .metadata()
95                        .get(PARQUET_FIELD_ID_META_KEY)
96                        .ok_or_else(|| {
97                            Error::new(ErrorKind::Unexpected, "Field metadata is missing.")
98                        })?
99                        .parse::<i64>()
100                        .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
101                ))
102            },
103            |_field: &Field| true,
104        )?;
105        Ok(Self {
106            equality_ids,
107            projector,
108        })
109    }
110
111    /// Return projected Schema
112    pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef {
113        self.projector.projected_schema_ref()
114    }
115}
116
117#[async_trait::async_trait]
118impl<B, L, F> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B, L, F>
119where
120    B: FileWriterBuilder,
121    L: LocationGenerator,
122    F: FileNameGenerator,
123{
124    type R = EqualityDeleteFileWriter<B, L, F>;
125
126    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
127        Ok(EqualityDeleteFileWriter {
128            inner: Some(self.inner.build()),
129            projector: self.config.projector.clone(),
130            equality_ids: self.config.equality_ids.clone(),
131            partition_key,
132        })
133    }
134}
135
136/// Writer used to write equality delete files.
137#[derive(Debug)]
138pub struct EqualityDeleteFileWriter<
139    B: FileWriterBuilder,
140    L: LocationGenerator,
141    F: FileNameGenerator,
142> {
143    inner: Option<RollingFileWriter<B, L, F>>,
144    projector: RecordBatchProjector,
145    equality_ids: Vec<i32>,
146    partition_key: Option<PartitionKey>,
147}
148
149#[async_trait::async_trait]
150impl<B, L, F> IcebergWriter for EqualityDeleteFileWriter<B, L, F>
151where
152    B: FileWriterBuilder,
153    L: LocationGenerator,
154    F: FileNameGenerator,
155{
156    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
157        let batch = self.projector.project_batch(batch)?;
158        if let Some(writer) = self.inner.as_mut() {
159            writer.write(&self.partition_key, &batch).await
160        } else {
161            Err(Error::new(
162                ErrorKind::Unexpected,
163                "Equality delete inner writer has been closed.",
164            ))
165        }
166    }
167
168    async fn close(&mut self) -> Result<Vec<DataFile>> {
169        if let Some(writer) = self.inner.take() {
170            writer
171                .close()
172                .await?
173                .into_iter()
174                .map(|mut res| {
175                    res.content(crate::spec::DataContentType::EqualityDeletes);
176                    res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
177                    if let Some(pk) = self.partition_key.as_ref() {
178                        res.partition(pk.data().clone());
179                        res.partition_spec_id(pk.spec().spec_id());
180                    }
181                    res.build().map_err(|e| {
182                        Error::new(
183                            ErrorKind::DataInvalid,
184                            format!("Failed to build data file: {e}"),
185                        )
186                    })
187                })
188                .collect()
189        } else {
190            Err(Error::new(
191                ErrorKind::Unexpected,
192                "Equality delete inner writer has been closed.",
193            ))
194        }
195    }
196}
197
198#[cfg(test)]
199mod test {
200    use std::collections::HashMap;
201    use std::sync::Arc;
202
203    use arrow_array::types::Int32Type;
204    use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
205    use arrow_buffer::NullBuffer;
206    use arrow_schema::{DataType, Field, Fields};
207    use arrow_select::concat::concat_batches;
208    use itertools::Itertools;
209    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
210    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
211    use parquet::file::properties::WriterProperties;
212    use tempfile::TempDir;
213    use uuid::Uuid;
214
215    use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
216    use crate::io::{FileIO, FileIOBuilder};
217    use crate::spec::{
218        DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema,
219        StructType, Type,
220    };
221    use crate::writer::base_writer::equality_delete_writer::{
222        EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
223    };
224    use crate::writer::file_writer::ParquetWriterBuilder;
225    use crate::writer::file_writer::location_generator::{
226        DefaultFileNameGenerator, DefaultLocationGenerator,
227    };
228    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
229    use crate::writer::{IcebergWriter, IcebergWriterBuilder};
230
231    async fn check_parquet_data_file_with_equality_delete_write(
232        file_io: &FileIO,
233        data_file: &DataFile,
234        batch: &RecordBatch,
235    ) {
236        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
237
238        // read the written file
239        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
240        // read the written file
241        let input_content = input_file.read().await.unwrap();
242        let reader_builder =
243            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
244        let metadata = reader_builder.metadata().clone();
245
246        // check data
247        let reader = reader_builder.build().unwrap();
248        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
249        let res = concat_batches(&batch.schema(), &batches).unwrap();
250        assert_eq!(*batch, res);
251
252        // check metadata
253        let expect_column_num = batch.num_columns();
254
255        assert_eq!(
256            data_file.record_count,
257            metadata
258                .row_groups()
259                .iter()
260                .map(|group| group.num_rows())
261                .sum::<i64>() as u64
262        );
263
264        assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
265
266        assert_eq!(data_file.column_sizes.len(), expect_column_num);
267
268        for (index, id) in data_file.column_sizes().keys().sorted().enumerate() {
269            metadata
270                .row_groups()
271                .iter()
272                .map(|group| group.columns())
273                .for_each(|column| {
274                    assert_eq!(
275                        *data_file.column_sizes.get(id).unwrap() as i64,
276                        column.get(index).unwrap().compressed_size()
277                    );
278                });
279        }
280
281        assert_eq!(data_file.value_counts.len(), expect_column_num);
282        data_file.value_counts.iter().for_each(|(_, &v)| {
283            let expect = metadata
284                .row_groups()
285                .iter()
286                .map(|group| group.num_rows())
287                .sum::<i64>() as u64;
288            assert_eq!(v, expect);
289        });
290
291        for (index, id) in data_file.null_value_counts().keys().enumerate() {
292            let expect = batch.column(index).null_count() as u64;
293            assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
294        }
295
296        let split_offsets = data_file
297            .split_offsets
298            .as_ref()
299            .expect("split_offsets should be set");
300        assert_eq!(split_offsets.len(), metadata.num_row_groups());
301        split_offsets.iter().enumerate().for_each(|(i, &v)| {
302            let expect = metadata.row_groups()[i].file_offset().unwrap();
303            assert_eq!(v, expect);
304        });
305    }
306
307    #[tokio::test]
308    async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
309        let temp_dir = TempDir::new().unwrap();
310        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
311        let location_gen = DefaultLocationGenerator::with_data_location(
312            temp_dir.path().to_str().unwrap().to_string(),
313        );
314        let file_name_gen =
315            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
316
317        // prepare data
318        // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
319        let schema = Schema::builder()
320            .with_schema_id(1)
321            .with_fields(vec![
322                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
323                NestedField::required(
324                    1,
325                    "col1",
326                    Type::Struct(StructType::new(vec![
327                        NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int))
328                            .into(),
329                    ])),
330                )
331                .into(),
332                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
333                NestedField::required(
334                    3,
335                    "col3",
336                    Type::List(ListType::new(
337                        NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int))
338                            .into(),
339                    )),
340                )
341                .into(),
342                NestedField::required(
343                    4,
344                    "col4",
345                    Type::Struct(StructType::new(vec![
346                        NestedField::required(
347                            7,
348                            "sub_col",
349                            Type::Struct(StructType::new(vec![
350                                NestedField::required(
351                                    8,
352                                    "sub_sub_col",
353                                    Type::Primitive(PrimitiveType::Int),
354                                )
355                                .into(),
356                            ])),
357                        )
358                        .into(),
359                    ])),
360                )
361                .into(),
362            ])
363            .build()
364            .unwrap();
365        let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
366        let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef;
367        let col1 = Arc::new(StructArray::new(
368            if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
369                fields.clone()
370            } else {
371                unreachable!()
372            },
373            vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
374            None,
375        ));
376        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
377            "test";
378            1024
379        ])) as ArrayRef;
380        let col3 = Arc::new({
381            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
382              Some(
383                  vec![Some(1),]
384              );
385              1024
386          ])
387            .into_parts();
388            arrow_array::ListArray::new(
389                if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() {
390                    field.clone()
391                } else {
392                    unreachable!()
393                },
394                list_parts.1,
395                list_parts.2,
396                list_parts.3,
397            )
398        }) as ArrayRef;
399        let col4 = Arc::new(StructArray::new(
400            if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
401                fields.clone()
402            } else {
403                unreachable!()
404            },
405            vec![Arc::new(StructArray::new(
406                if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() {
407                    if let DataType::Struct(fields) = fields.first().unwrap().data_type() {
408                        fields.clone()
409                    } else {
410                        unreachable!()
411                    }
412                } else {
413                    unreachable!()
414                },
415                vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))],
416                None,
417            ))],
418            None,
419        ));
420        let columns = vec![col0, col1, col2, col3, col4];
421        let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
422
423        let equality_ids = vec![0_i32, 8];
424        let equality_config =
425            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
426        let delete_schema =
427            Arc::new(arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap());
428        let projector = equality_config.projector.clone();
429
430        // prepare writer
431        let pb =
432            ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone());
433        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
434            pb,
435            delete_schema,
436            file_io.clone(),
437            location_gen,
438            file_name_gen,
439        );
440        let mut equality_delete_writer =
441            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config)
442                .build(None)
443                .await?;
444
445        // write
446        equality_delete_writer.write(to_write.clone()).await?;
447        let res = equality_delete_writer.close().await?;
448        assert_eq!(res.len(), 1);
449        let data_file = res.into_iter().next().unwrap();
450
451        // check
452        let to_write_projected = projector.project_batch(to_write)?;
453        check_parquet_data_file_with_equality_delete_write(
454            &file_io,
455            &data_file,
456            &to_write_projected,
457        )
458        .await;
459        Ok(())
460    }
461
462    #[tokio::test]
463    async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> {
464        let schema = Arc::new(
465            Schema::builder()
466                .with_schema_id(1)
467                .with_fields(vec![
468                    NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(),
469                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(),
470                    NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
471                    NestedField::required(
472                        3,
473                        "col3",
474                        Type::Struct(StructType::new(vec![
475                            NestedField::required(
476                                4,
477                                "sub_col",
478                                Type::Primitive(PrimitiveType::Int),
479                            )
480                            .into(),
481                        ])),
482                    )
483                    .into(),
484                    NestedField::optional(
485                        5,
486                        "col4",
487                        Type::Struct(StructType::new(vec![
488                            NestedField::required(
489                                6,
490                                "sub_col2",
491                                Type::Primitive(PrimitiveType::Int),
492                            )
493                            .into(),
494                        ])),
495                    )
496                    .into(),
497                    NestedField::required(
498                        7,
499                        "col5",
500                        Type::Map(MapType::new(
501                            Arc::new(NestedField::required(
502                                8,
503                                "key",
504                                Type::Primitive(PrimitiveType::String),
505                            )),
506                            Arc::new(NestedField::required(
507                                9,
508                                "value",
509                                Type::Primitive(PrimitiveType::Int),
510                            )),
511                        )),
512                    )
513                    .into(),
514                    NestedField::required(
515                        10,
516                        "col6",
517                        Type::List(ListType::new(Arc::new(NestedField::required(
518                            11,
519                            "element",
520                            Type::Primitive(PrimitiveType::Int),
521                        )))),
522                    )
523                    .into(),
524                ])
525                .build()
526                .unwrap(),
527        );
528        // Float and Double are not allowed to be used for equality delete
529        assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone()).is_err());
530        assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone()).is_err());
531        // Struct is not allowed to be used for equality delete
532        assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone()).is_err());
533        // Nested field of struct is allowed to be used for equality delete
534        assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone()).is_ok());
535        // Nested field of map is not allowed to be used for equality delete
536        assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone()).is_err());
537        assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone()).is_err());
538        assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone()).is_err());
539        // Nested field of list is not allowed to be used for equality delete
540        assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone()).is_err());
541        assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone()).is_err());
542
543        Ok(())
544    }
545
546    #[tokio::test]
547    async fn test_equality_delete_with_primitive_type() -> Result<(), anyhow::Error> {
548        let temp_dir = TempDir::new().unwrap();
549        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
550        let location_gen = DefaultLocationGenerator::with_data_location(
551            temp_dir.path().to_str().unwrap().to_string(),
552        );
553        let file_name_gen =
554            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
555
556        let schema = Arc::new(
557            Schema::builder()
558                .with_schema_id(1)
559                .with_fields(vec![
560                    NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Boolean))
561                        .into(),
562                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Int)).into(),
563                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Long)).into(),
564                    NestedField::required(
565                        3,
566                        "col3",
567                        Type::Primitive(PrimitiveType::Decimal {
568                            precision: 38,
569                            scale: 5,
570                        }),
571                    )
572                    .into(),
573                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Date)).into(),
574                    NestedField::required(5, "col5", Type::Primitive(PrimitiveType::Time)).into(),
575                    NestedField::required(6, "col6", Type::Primitive(PrimitiveType::Timestamp))
576                        .into(),
577                    NestedField::required(7, "col7", Type::Primitive(PrimitiveType::Timestamptz))
578                        .into(),
579                    NestedField::required(8, "col8", Type::Primitive(PrimitiveType::TimestampNs))
580                        .into(),
581                    NestedField::required(9, "col9", Type::Primitive(PrimitiveType::TimestamptzNs))
582                        .into(),
583                    NestedField::required(10, "col10", Type::Primitive(PrimitiveType::String))
584                        .into(),
585                    NestedField::required(11, "col11", Type::Primitive(PrimitiveType::Uuid)).into(),
586                    NestedField::required(12, "col12", Type::Primitive(PrimitiveType::Fixed(10)))
587                        .into(),
588                    NestedField::required(13, "col13", Type::Primitive(PrimitiveType::Binary))
589                        .into(),
590                ])
591                .build()
592                .unwrap(),
593        );
594        let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
595        let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap();
596        let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
597        let delete_schema = Arc::new(arrow_schema_to_schema(&delete_arrow_schema).unwrap());
598
599        let pb =
600            ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone());
601        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
602            pb,
603            delete_schema,
604            file_io.clone(),
605            location_gen,
606            file_name_gen,
607        );
608        let mut equality_delete_writer =
609            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
610                .build(None)
611                .await?;
612
613        // prepare data
614        let col0 = Arc::new(BooleanArray::from(vec![
615            Some(true),
616            Some(false),
617            Some(true),
618        ])) as ArrayRef;
619        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
620        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
621        let col3 = Arc::new(
622            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), Some(4)])
623                .with_precision_and_scale(38, 5)
624                .unwrap(),
625        ) as ArrayRef;
626        let col4 = Arc::new(arrow_array::Date32Array::from(vec![
627            Some(0),
628            Some(1),
629            Some(3),
630        ])) as ArrayRef;
631        let col5 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
632            Some(0),
633            Some(1),
634            Some(3),
635        ])) as ArrayRef;
636        let col6 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
637            Some(0),
638            Some(1),
639            Some(3),
640        ])) as ArrayRef;
641        let col7 = Arc::new(
642            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), Some(3)])
643                .with_timezone_utc(),
644        ) as ArrayRef;
645        let col8 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
646            Some(0),
647            Some(1),
648            Some(3),
649        ])) as ArrayRef;
650        let col9 = Arc::new(
651            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), Some(3)])
652                .with_timezone_utc(),
653        ) as ArrayRef;
654        let col10 = Arc::new(arrow_array::StringArray::from(vec![
655            Some("a"),
656            Some("b"),
657            Some("d"),
658        ])) as ArrayRef;
659        let col11 = Arc::new(
660            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
661                vec![
662                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
663                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
664                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
665                ]
666                .into_iter(),
667                16,
668            )
669            .unwrap(),
670        ) as ArrayRef;
671        let col12 = Arc::new(
672            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
673                vec![
674                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
675                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
676                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
677                ]
678                .into_iter(),
679                10,
680            )
681            .unwrap(),
682        ) as ArrayRef;
683        let col13 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
684            Some(b"one"),
685            Some(b""),
686            Some(b"zzzz"),
687        ])) as ArrayRef;
688        let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
689            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
690        ])
691        .unwrap();
692        equality_delete_writer.write(to_write.clone()).await?;
693        let res = equality_delete_writer.close().await?;
694        assert_eq!(res.len(), 1);
695        check_parquet_data_file_with_equality_delete_write(
696            &file_io,
697            &res.into_iter().next().unwrap(),
698            &to_write,
699        )
700        .await;
701
702        Ok(())
703    }
704
705    #[tokio::test]
706    async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
707        // prepare data
708        // Int, Struct(Int), Struct(Struct(Int))
709        let schema = Schema::builder()
710            .with_schema_id(1)
711            .with_fields(vec![
712                NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
713                NestedField::optional(
714                    1,
715                    "col1",
716                    Type::Struct(StructType::new(vec![
717                        NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int))
718                            .into(),
719                    ])),
720                )
721                .into(),
722                NestedField::optional(
723                    3,
724                    "col2",
725                    Type::Struct(StructType::new(vec![
726                        NestedField::optional(
727                            4,
728                            "sub_struct_col",
729                            Type::Struct(StructType::new(vec![
730                                NestedField::optional(
731                                    5,
732                                    "sub_sub_col",
733                                    Type::Primitive(PrimitiveType::Int),
734                                )
735                                .into(),
736                            ])),
737                        )
738                        .into(),
739                    ])),
740                )
741                .into(),
742            ])
743            .build()
744            .unwrap();
745        let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
746        // null 1            null(struct)
747        // 2    null(struct) null(sub_struct_col)
748        // 3    null(field)  null(sub_sub_col)
749        let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
750        let col1 = {
751            let nulls = NullBuffer::from(vec![true, false, true]);
752            Arc::new(StructArray::new(
753                if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
754                    fields.clone()
755                } else {
756                    unreachable!()
757                },
758                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
759                Some(nulls),
760            ))
761        };
762        let col2 = {
763            let inner_col = {
764                let nulls = NullBuffer::from(vec![true, false, true]);
765                Arc::new(StructArray::new(
766                    Fields::from(vec![
767                        Field::new("sub_sub_col", DataType::Int32, true).with_metadata(
768                            HashMap::from([(
769                                PARQUET_FIELD_ID_META_KEY.to_string(),
770                                "5".to_string(),
771                            )]),
772                        ),
773                    ]),
774                    vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
775                    Some(nulls),
776                ))
777            };
778            let nulls = NullBuffer::from(vec![false, true, true]);
779            Arc::new(StructArray::new(
780                if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
781                    fields.clone()
782                } else {
783                    unreachable!()
784                },
785                vec![inner_col],
786                Some(nulls),
787            ))
788        };
789        let columns = vec![col0, col1, col2];
790
791        let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
792        let equality_ids = vec![0_i32, 2, 5];
793        let equality_config =
794            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
795        let projector = equality_config.projector.clone();
796
797        // check
798        let to_write_projected = projector.project_batch(to_write)?;
799        let expect_batch =
800            RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
801                Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
802                Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
803                Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
804            ])
805            .unwrap();
806        assert_eq!(to_write_projected, expect_batch);
807        Ok(())
808    }
809}