iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::ParquetMetaData;
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33
34use super::{FileWriter, FileWriterBuilder};
35use crate::arrow::{
36    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
37    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
38};
39use crate::io::{FileIO, FileWrite, OutputFile};
40use crate::spec::{
41    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
42    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43    StructType, TableMetadata, Type, visit_schema,
44};
45use crate::transform::create_transform_function;
46use crate::writer::{CurrentFileStatus, DataFile};
47use crate::{Error, ErrorKind, Result};
48
49/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
50#[derive(Clone, Debug)]
51pub struct ParquetWriterBuilder {
52    props: WriterProperties,
53    schema: SchemaRef,
54    match_mode: FieldMatchMode,
55}
56
57impl ParquetWriterBuilder {
58    /// Create a new `ParquetWriterBuilder`
59    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
60    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
61        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
62    }
63
64    /// Create a new `ParquetWriterBuilder` with custom match mode
65    pub fn new_with_match_mode(
66        props: WriterProperties,
67        schema: SchemaRef,
68        match_mode: FieldMatchMode,
69    ) -> Self {
70        Self {
71            props,
72            schema,
73            match_mode,
74        }
75    }
76}
77
78impl FileWriterBuilder for ParquetWriterBuilder {
79    type R = ParquetWriter;
80
81    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
82        Ok(ParquetWriter {
83            schema: self.schema.clone(),
84            inner_writer: None,
85            writer_properties: self.props.clone(),
86            current_row_num: 0,
87            output_file,
88            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
89        })
90    }
91}
92
93/// A mapping from Parquet column path names to internal field id
94struct IndexByParquetPathName {
95    name_to_id: HashMap<String, i32>,
96
97    field_names: Vec<String>,
98
99    field_id: i32,
100}
101
102impl IndexByParquetPathName {
103    /// Creates a new, empty `IndexByParquetPathName`
104    pub fn new() -> Self {
105        Self {
106            name_to_id: HashMap::new(),
107            field_names: Vec::new(),
108            field_id: 0,
109        }
110    }
111
112    /// Retrieves the internal field ID
113    pub fn get(&self, name: &str) -> Option<&i32> {
114        self.name_to_id.get(name)
115    }
116}
117
118impl Default for IndexByParquetPathName {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl SchemaVisitor for IndexByParquetPathName {
125    type T = ();
126
127    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
128        self.field_names.push(field.name.to_string());
129        self.field_id = field.id;
130        Ok(())
131    }
132
133    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
134        self.field_names.pop();
135        Ok(())
136    }
137
138    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
139        self.field_names.push(format!("list.{}", field.name));
140        self.field_id = field.id;
141        Ok(())
142    }
143
144    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
145        self.field_names.pop();
146        Ok(())
147    }
148
149    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
150        self.field_names
151            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
152        self.field_id = field.id;
153        Ok(())
154    }
155
156    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
157        self.field_names.pop();
158        Ok(())
159    }
160
161    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
162        self.field_names
163            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
164        self.field_id = field.id;
165        Ok(())
166    }
167
168    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
169        self.field_names.pop();
170        Ok(())
171    }
172
173    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
174        Ok(())
175    }
176
177    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
178        Ok(())
179    }
180
181    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
182        Ok(())
183    }
184
185    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
186        Ok(())
187    }
188
189    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
190        Ok(())
191    }
192
193    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
194        let full_name = self.field_names.iter().map(String::as_str).join(".");
195        let field_id = self.field_id;
196        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
197            return Err(Error::new(
198                ErrorKind::DataInvalid,
199                format!(
200                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
201                ),
202            ));
203        } else {
204            self.name_to_id.insert(full_name, field_id);
205        }
206
207        Ok(())
208    }
209}
210
211/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
212pub struct ParquetWriter {
213    schema: SchemaRef,
214    output_file: OutputFile,
215    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
216    writer_properties: WriterProperties,
217    current_row_num: usize,
218    nan_value_count_visitor: NanValueCountVisitor,
219}
220
221/// Used to aggregate min and max value of each column.
222struct MinMaxColAggregator {
223    lower_bounds: HashMap<i32, Datum>,
224    upper_bounds: HashMap<i32, Datum>,
225    schema: SchemaRef,
226}
227
228impl MinMaxColAggregator {
229    /// Creates new and empty `MinMaxColAggregator`
230    fn new(schema: SchemaRef) -> Self {
231        Self {
232            lower_bounds: HashMap::new(),
233            upper_bounds: HashMap::new(),
234            schema,
235        }
236    }
237
238    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
239        self.lower_bounds
240            .entry(field_id)
241            .and_modify(|e| {
242                if *e > datum {
243                    *e = datum.clone()
244                }
245            })
246            .or_insert(datum);
247    }
248
249    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
250        self.upper_bounds
251            .entry(field_id)
252            .and_modify(|e| {
253                if *e < datum {
254                    *e = datum.clone()
255                }
256            })
257            .or_insert(datum);
258    }
259
260    /// Update statistics
261    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
262        let Some(ty) = self
263            .schema
264            .field_by_id(field_id)
265            .map(|f| f.field_type.as_ref())
266        else {
267            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
268            // Ignore the field if it is not in schema.
269            return Ok(());
270        };
271        let Type::Primitive(ty) = ty.clone() else {
272            return Err(Error::new(
273                ErrorKind::Unexpected,
274                format!("Composed type {ty} is not supported for min max aggregation."),
275            ));
276        };
277
278        if value.min_is_exact() {
279            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
280                return Err(Error::new(
281                    ErrorKind::Unexpected,
282                    format!("Statistics {value} is not match with field type {ty}."),
283                ));
284            };
285
286            self.update_state_min(field_id, min_datum);
287        }
288
289        if value.max_is_exact() {
290            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
291                return Err(Error::new(
292                    ErrorKind::Unexpected,
293                    format!("Statistics {value} is not match with field type {ty}."),
294                ));
295            };
296
297            self.update_state_max(field_id, max_datum);
298        }
299
300        Ok(())
301    }
302
303    /// Returns lower and upper bounds
304    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
305        (self.lower_bounds, self.upper_bounds)
306    }
307}
308
309impl ParquetWriter {
310    /// Converts parquet files to data files
311    #[allow(dead_code)]
312    pub(crate) async fn parquet_files_to_data_files(
313        file_io: &FileIO,
314        file_paths: Vec<String>,
315        table_metadata: &TableMetadata,
316    ) -> Result<Vec<DataFile>> {
317        // TODO: support adding to partitioned table
318        let mut data_files: Vec<DataFile> = Vec::new();
319
320        for file_path in file_paths {
321            let input_file = file_io.new_input(&file_path)?;
322            let file_metadata = input_file.metadata().await?;
323            let file_size_in_bytes = file_metadata.size as usize;
324            let reader = input_file.reader().await?;
325
326            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
327            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
328                Error::new(
329                    ErrorKind::DataInvalid,
330                    format!("Error reading Parquet metadata: {err}"),
331                )
332            })?;
333            let mut builder = ParquetWriter::parquet_to_data_file_builder(
334                table_metadata.current_schema().clone(),
335                parquet_metadata,
336                file_size_in_bytes,
337                file_path,
338                // TODO: Implement nan_value_counts here
339                HashMap::new(),
340            )?;
341            builder.partition_spec_id(table_metadata.default_partition_spec_id());
342            let data_file = builder.build().unwrap();
343            data_files.push(data_file);
344        }
345
346        Ok(data_files)
347    }
348
349    /// `ParquetMetadata` to data file builder
350    pub(crate) fn parquet_to_data_file_builder(
351        schema: SchemaRef,
352        metadata: Arc<ParquetMetaData>,
353        written_size: usize,
354        file_path: String,
355        nan_value_counts: HashMap<i32, u64>,
356    ) -> Result<DataFileBuilder> {
357        let index_by_parquet_path = {
358            let mut visitor = IndexByParquetPathName::new();
359            visit_schema(&schema, &mut visitor)?;
360            visitor
361        };
362
363        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
364            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
365            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
366            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
367            let mut min_max_agg = MinMaxColAggregator::new(schema);
368
369            for row_group in metadata.row_groups() {
370                for column_chunk_metadata in row_group.columns() {
371                    let parquet_path = column_chunk_metadata.column_descr().path().string();
372
373                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
374                        continue;
375                    };
376
377                    *per_col_size.entry(field_id).or_insert(0) +=
378                        column_chunk_metadata.compressed_size() as u64;
379                    *per_col_val_num.entry(field_id).or_insert(0) +=
380                        column_chunk_metadata.num_values() as u64;
381
382                    if let Some(statistics) = column_chunk_metadata.statistics() {
383                        if let Some(null_count) = statistics.null_count_opt() {
384                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
385                        }
386
387                        min_max_agg.update(field_id, statistics.clone())?;
388                    }
389                }
390            }
391            (
392                per_col_size,
393                per_col_val_num,
394                per_col_null_val_num,
395                min_max_agg.produce(),
396            )
397        };
398
399        let mut builder = DataFileBuilder::default();
400        builder
401            .content(DataContentType::Data)
402            .file_path(file_path)
403            .file_format(DataFileFormat::Parquet)
404            .partition(Struct::empty())
405            .record_count(metadata.file_metadata().num_rows() as u64)
406            .file_size_in_bytes(written_size as u64)
407            .column_sizes(column_sizes)
408            .value_counts(value_counts)
409            .null_value_counts(null_value_counts)
410            .nan_value_counts(nan_value_counts)
411            // # NOTE:
412            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
413            .lower_bounds(lower_bounds)
414            .upper_bounds(upper_bounds)
415            .split_offsets(Some(
416                metadata
417                    .row_groups()
418                    .iter()
419                    .filter_map(|group| group.file_offset())
420                    .collect(),
421            ));
422
423        Ok(builder)
424    }
425
426    #[allow(dead_code)]
427    fn partition_value_from_bounds(
428        table_spec: Arc<PartitionSpec>,
429        lower_bounds: &HashMap<i32, Datum>,
430        upper_bounds: &HashMap<i32, Datum>,
431    ) -> Result<Struct> {
432        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
433
434        for field in table_spec.fields() {
435            if let (Some(lower), Some(upper)) = (
436                lower_bounds.get(&field.source_id),
437                upper_bounds.get(&field.source_id),
438            ) {
439                if !field.transform.preserves_order() {
440                    return Err(Error::new(
441                        ErrorKind::DataInvalid,
442                        format!(
443                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
444                            field.name, field.transform
445                        ),
446                    ));
447                }
448
449                if lower != upper {
450                    return Err(Error::new(
451                        ErrorKind::DataInvalid,
452                        format!(
453                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
454                            field.name, lower, upper
455                        ),
456                    ));
457                }
458
459                let transform_fn = create_transform_function(&field.transform)?;
460                let transform_literal =
461                    Literal::from(transform_fn.transform_literal_result(lower)?);
462
463                partition_literals.push(Some(transform_literal));
464            } else {
465                partition_literals.push(None);
466            }
467        }
468
469        let partition_struct = Struct::from_iter(partition_literals);
470
471        Ok(partition_struct)
472    }
473}
474
475impl FileWriter for ParquetWriter {
476    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
477        // Skip empty batch
478        if batch.num_rows() == 0 {
479            return Ok(());
480        }
481
482        self.current_row_num += batch.num_rows();
483
484        let batch_c = batch.clone();
485        self.nan_value_count_visitor
486            .compute(self.schema.clone(), batch_c)?;
487
488        // Lazy initialize the writer
489        let writer = if let Some(writer) = &mut self.inner_writer {
490            writer
491        } else {
492            let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
493            let inner_writer = self.output_file.writer().await?;
494            let async_writer = AsyncFileWriter::new(inner_writer);
495            let writer = AsyncArrowWriter::try_new(
496                async_writer,
497                arrow_schema.clone(),
498                Some(self.writer_properties.clone()),
499            )
500            .map_err(|err| {
501                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
502                    .with_source(err)
503            })?;
504            self.inner_writer = Some(writer);
505            self.inner_writer.as_mut().unwrap()
506        };
507
508        writer.write(batch).await.map_err(|err| {
509            Error::new(
510                ErrorKind::Unexpected,
511                "Failed to write using parquet writer.",
512            )
513            .with_source(err)
514        })?;
515
516        Ok(())
517    }
518
519    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
520        let mut writer = match self.inner_writer.take() {
521            Some(writer) => writer,
522            None => return Ok(vec![]),
523        };
524
525        let metadata = writer.finish().await.map_err(|err| {
526            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
527        })?;
528
529        let written_size = writer.bytes_written();
530
531        if self.current_row_num == 0 {
532            self.output_file.delete().await.map_err(|err| {
533                Error::new(
534                    ErrorKind::Unexpected,
535                    "Failed to delete empty parquet file.",
536                )
537                .with_source(err)
538            })?;
539            Ok(vec![])
540        } else {
541            let parquet_metadata = Arc::new(metadata);
542
543            Ok(vec![Self::parquet_to_data_file_builder(
544                self.schema,
545                parquet_metadata,
546                written_size,
547                self.output_file.location().to_string(),
548                self.nan_value_count_visitor.nan_value_counts,
549            )?])
550        }
551    }
552}
553
554impl CurrentFileStatus for ParquetWriter {
555    fn current_file_path(&self) -> String {
556        self.output_file.location().to_string()
557    }
558
559    fn current_row_num(&self) -> usize {
560        self.current_row_num
561    }
562
563    fn current_written_size(&self) -> usize {
564        if let Some(inner) = self.inner_writer.as_ref() {
565            // inner/AsyncArrowWriter contains sync and async writers
566            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
567            inner.bytes_written() + inner.in_progress_size()
568        } else {
569            // inner writer is not initialized yet
570            0
571        }
572    }
573
574    fn current_schema(&self) -> SchemaRef {
575        self.schema.clone()
576    }
577}
578
579/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
580///
581/// # NOTES
582///
583/// We keep this wrapper been used inside only.
584struct AsyncFileWriter<W: FileWrite>(W);
585
586impl<W: FileWrite> AsyncFileWriter<W> {
587    /// Create a new `AsyncFileWriter` with the given writer.
588    pub fn new(writer: W) -> Self {
589        Self(writer)
590    }
591}
592
593impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
594    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
595        Box::pin(async {
596            self.0
597                .write(bs)
598                .await
599                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
600        })
601    }
602
603    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
604        Box::pin(async {
605            self.0
606                .close()
607                .await
608                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
609        })
610    }
611}
612
613#[cfg(test)]
614mod tests {
615    use std::collections::HashMap;
616    use std::sync::Arc;
617
618    use anyhow::Result;
619    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
620    use arrow_array::types::{Float32Type, Int64Type};
621    use arrow_array::{
622        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
623        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
624    };
625    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
626    use arrow_select::concat::concat_batches;
627    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
628    use parquet::file::statistics::ValueStatistics;
629    use rust_decimal::Decimal;
630    use tempfile::TempDir;
631    use uuid::Uuid;
632
633    use super::*;
634    use crate::arrow::schema_to_arrow_schema;
635    use crate::io::FileIOBuilder;
636    use crate::spec::{PrimitiveLiteral, Struct, *};
637    use crate::writer::file_writer::location_generator::{
638        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
639    };
640    use crate::writer::tests::check_parquet_data_file;
641
642    fn schema_for_all_type() -> Schema {
643        Schema::builder()
644            .with_schema_id(1)
645            .with_fields(vec![
646                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
647                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
648                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
649                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
650                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
651                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
652                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
653                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
654                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
655                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
656                    .into(),
657                NestedField::optional(
658                    10,
659                    "timestamptz",
660                    Type::Primitive(PrimitiveType::Timestamptz),
661                )
662                .into(),
663                NestedField::optional(
664                    11,
665                    "timestamp_ns",
666                    Type::Primitive(PrimitiveType::TimestampNs),
667                )
668                .into(),
669                NestedField::optional(
670                    12,
671                    "timestamptz_ns",
672                    Type::Primitive(PrimitiveType::TimestamptzNs),
673                )
674                .into(),
675                NestedField::optional(
676                    13,
677                    "decimal",
678                    Type::Primitive(PrimitiveType::Decimal {
679                        precision: 10,
680                        scale: 5,
681                    }),
682                )
683                .into(),
684                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
685                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
686                    .into(),
687                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
688                // so we need to add a new field for it.
689                NestedField::optional(
690                    16,
691                    "decimal_38",
692                    Type::Primitive(PrimitiveType::Decimal {
693                        precision: 38,
694                        scale: 5,
695                    }),
696                )
697                .into(),
698            ])
699            .build()
700            .unwrap()
701    }
702
703    fn nested_schema_for_test() -> Schema {
704        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
705        Schema::builder()
706            .with_schema_id(1)
707            .with_fields(vec![
708                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
709                NestedField::required(
710                    1,
711                    "col1",
712                    Type::Struct(StructType::new(vec![
713                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
714                            .into(),
715                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
716                            .into(),
717                    ])),
718                )
719                .into(),
720                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
721                NestedField::required(
722                    3,
723                    "col3",
724                    Type::List(ListType::new(
725                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
726                            .into(),
727                    )),
728                )
729                .into(),
730                NestedField::required(
731                    4,
732                    "col4",
733                    Type::Struct(StructType::new(vec![
734                        NestedField::required(
735                            8,
736                            "col_4_8",
737                            Type::Struct(StructType::new(vec![
738                                NestedField::required(
739                                    9,
740                                    "col_4_8_9",
741                                    Type::Primitive(PrimitiveType::Long),
742                                )
743                                .into(),
744                            ])),
745                        )
746                        .into(),
747                    ])),
748                )
749                .into(),
750                NestedField::required(
751                    10,
752                    "col5",
753                    Type::Map(MapType::new(
754                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
755                            .into(),
756                        NestedField::required(
757                            12,
758                            "value",
759                            Type::List(ListType::new(
760                                NestedField::required(
761                                    13,
762                                    "item",
763                                    Type::Primitive(PrimitiveType::Long),
764                                )
765                                .into(),
766                            )),
767                        )
768                        .into(),
769                    )),
770                )
771                .into(),
772            ])
773            .build()
774            .unwrap()
775    }
776
777    #[tokio::test]
778    async fn test_index_by_parquet_path() {
779        let expect = HashMap::from([
780            ("col0".to_string(), 0),
781            ("col1.col_1_5".to_string(), 5),
782            ("col1.col_1_6".to_string(), 6),
783            ("col2".to_string(), 2),
784            ("col3.list.element".to_string(), 7),
785            ("col4.col_4_8.col_4_8_9".to_string(), 9),
786            ("col5.key_value.key".to_string(), 11),
787            ("col5.key_value.value.list.item".to_string(), 13),
788        ]);
789        let mut visitor = IndexByParquetPathName::new();
790        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
791        assert_eq!(visitor.name_to_id, expect);
792    }
793
794    #[tokio::test]
795    async fn test_parquet_writer() -> Result<()> {
796        let temp_dir = TempDir::new().unwrap();
797        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
798        let location_gen = DefaultLocationGenerator::with_data_location(
799            temp_dir.path().to_str().unwrap().to_string(),
800        );
801        let file_name_gen =
802            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
803
804        // prepare data
805        let schema = {
806            let fields =
807                vec![
808                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
809                        PARQUET_FIELD_ID_META_KEY.to_string(),
810                        "0".to_string(),
811                    )])),
812                ];
813            Arc::new(arrow_schema::Schema::new(fields))
814        };
815        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
816        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
817        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
818        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
819
820        let output_file = file_io.new_output(
821            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
822        )?;
823
824        // write data
825        let mut pw = ParquetWriterBuilder::new(
826            WriterProperties::builder()
827                .set_max_row_group_size(128)
828                .build(),
829            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
830        )
831        .build(output_file)
832        .await?;
833        pw.write(&to_write).await?;
834        pw.write(&to_write_null).await?;
835        let res = pw.close().await?;
836        assert_eq!(res.len(), 1);
837        let data_file = res
838            .into_iter()
839            .next()
840            .unwrap()
841            // Put dummy field for build successfully.
842            .content(DataContentType::Data)
843            .partition(Struct::empty())
844            .partition_spec_id(0)
845            .build()
846            .unwrap();
847
848        // check data file
849        assert_eq!(data_file.record_count(), 2048);
850        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
851        assert_eq!(
852            *data_file.lower_bounds(),
853            HashMap::from([(0, Datum::long(0))])
854        );
855        assert_eq!(
856            *data_file.upper_bounds(),
857            HashMap::from([(0, Datum::long(1023))])
858        );
859        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
860
861        // check the written file
862        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
863        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
864
865        Ok(())
866    }
867
868    #[tokio::test]
869    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
870        let temp_dir = TempDir::new().unwrap();
871        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
872        let location_gen = DefaultLocationGenerator::with_data_location(
873            temp_dir.path().to_str().unwrap().to_string(),
874        );
875        let file_name_gen =
876            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
877
878        // prepare data
879        let schema = nested_schema_for_test();
880        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
881        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
882        let col1 = Arc::new(StructArray::new(
883            {
884                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
885                    fields.clone()
886                } else {
887                    unreachable!()
888                }
889            },
890            vec![
891                Arc::new(Int64Array::from_iter_values(0..1024)),
892                Arc::new(Int64Array::from_iter_values(0..1024)),
893            ],
894            None,
895        ));
896        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
897            (0..1024).map(|n| n.to_string()),
898        )) as ArrayRef;
899        let col3 = Arc::new({
900            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
901                (0..1024).map(|n| Some(vec![Some(n)])),
902            )
903            .into_parts();
904            arrow_array::ListArray::new(
905                {
906                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
907                        field.clone()
908                    } else {
909                        unreachable!()
910                    }
911                },
912                list_parts.1,
913                list_parts.2,
914                list_parts.3,
915            )
916        }) as ArrayRef;
917        let col4 = Arc::new(StructArray::new(
918            {
919                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
920                    fields.clone()
921                } else {
922                    unreachable!()
923                }
924            },
925            vec![Arc::new(StructArray::new(
926                {
927                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
928                        if let DataType::Struct(fields) = fields[0].data_type() {
929                            fields.clone()
930                        } else {
931                            unreachable!()
932                        }
933                    } else {
934                        unreachable!()
935                    }
936                },
937                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
938                None,
939            ))],
940            None,
941        ));
942        let col5 = Arc::new({
943            let mut map_array_builder = MapBuilder::new(
944                None,
945                arrow_array::builder::StringBuilder::new(),
946                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
947                    Int64Type,
948                >::new()),
949            );
950            for i in 0..1024 {
951                map_array_builder.keys().append_value(i.to_string());
952                map_array_builder
953                    .values()
954                    .append_value(vec![Some(i as i64); i + 1]);
955                map_array_builder.append(true)?;
956            }
957            let (_, offset_buffer, struct_array, null_buffer, ordered) =
958                map_array_builder.finish().into_parts();
959            let struct_array = {
960                let (_, mut arrays, nulls) = struct_array.into_parts();
961                let list_array = {
962                    let list_array = arrays[1]
963                        .as_any()
964                        .downcast_ref::<ListArray>()
965                        .unwrap()
966                        .clone();
967                    let (_, offsets, array, nulls) = list_array.into_parts();
968                    let list_field = {
969                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
970                            if let DataType::Struct(fields) = map_field.data_type() {
971                                if let DataType::List(list_field) = fields[1].data_type() {
972                                    list_field.clone()
973                                } else {
974                                    unreachable!()
975                                }
976                            } else {
977                                unreachable!()
978                            }
979                        } else {
980                            unreachable!()
981                        }
982                    };
983                    ListArray::new(list_field, offsets, array, nulls)
984                };
985                arrays[1] = Arc::new(list_array) as ArrayRef;
986                StructArray::new(
987                    {
988                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
989                            if let DataType::Struct(fields) = map_field.data_type() {
990                                fields.clone()
991                            } else {
992                                unreachable!()
993                            }
994                        } else {
995                            unreachable!()
996                        }
997                    },
998                    arrays,
999                    nulls,
1000                )
1001            };
1002            arrow_array::MapArray::new(
1003                {
1004                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1005                        map_field.clone()
1006                    } else {
1007                        unreachable!()
1008                    }
1009                },
1010                offset_buffer,
1011                struct_array,
1012                null_buffer,
1013                ordered,
1014            )
1015        }) as ArrayRef;
1016        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1017            col0, col1, col2, col3, col4, col5,
1018        ])
1019        .unwrap();
1020        let output_file = file_io.new_output(
1021            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1022        )?;
1023
1024        // write data
1025        let mut pw =
1026            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1027                .build(output_file)
1028                .await?;
1029        pw.write(&to_write).await?;
1030        let res = pw.close().await?;
1031        assert_eq!(res.len(), 1);
1032        let data_file = res
1033            .into_iter()
1034            .next()
1035            .unwrap()
1036            // Put dummy field for build successfully.
1037            .content(crate::spec::DataContentType::Data)
1038            .partition(Struct::empty())
1039            .partition_spec_id(0)
1040            .build()
1041            .unwrap();
1042
1043        // check data file
1044        assert_eq!(data_file.record_count(), 1024);
1045        assert_eq!(
1046            *data_file.value_counts(),
1047            HashMap::from([
1048                (0, 1024),
1049                (5, 1024),
1050                (6, 1024),
1051                (2, 1024),
1052                (7, 1024),
1053                (9, 1024),
1054                (11, 1024),
1055                (13, (1..1025).sum()),
1056            ])
1057        );
1058        assert_eq!(
1059            *data_file.lower_bounds(),
1060            HashMap::from([
1061                (0, Datum::long(0)),
1062                (5, Datum::long(0)),
1063                (6, Datum::long(0)),
1064                (2, Datum::string("0")),
1065                (7, Datum::long(0)),
1066                (9, Datum::long(0)),
1067                (11, Datum::string("0")),
1068                (13, Datum::long(0))
1069            ])
1070        );
1071        assert_eq!(
1072            *data_file.upper_bounds(),
1073            HashMap::from([
1074                (0, Datum::long(1023)),
1075                (5, Datum::long(1023)),
1076                (6, Datum::long(1023)),
1077                (2, Datum::string("999")),
1078                (7, Datum::long(1023)),
1079                (9, Datum::long(1023)),
1080                (11, Datum::string("999")),
1081                (13, Datum::long(1023))
1082            ])
1083        );
1084
1085        // check the written file
1086        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1087
1088        Ok(())
1089    }
1090
1091    #[tokio::test]
1092    async fn test_all_type_for_write() -> Result<()> {
1093        let temp_dir = TempDir::new().unwrap();
1094        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1095        let location_gen = DefaultLocationGenerator::with_data_location(
1096            temp_dir.path().to_str().unwrap().to_string(),
1097        );
1098        let file_name_gen =
1099            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1100
1101        // prepare data
1102        // generate iceberg schema for all type
1103        let schema = schema_for_all_type();
1104        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1105        let col0 = Arc::new(BooleanArray::from(vec![
1106            Some(true),
1107            Some(false),
1108            None,
1109            Some(true),
1110        ])) as ArrayRef;
1111        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1112        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1113        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1114            Some(0.5),
1115            Some(2.0),
1116            None,
1117            Some(3.5),
1118        ])) as ArrayRef;
1119        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1120            Some(0.5),
1121            Some(2.0),
1122            None,
1123            Some(3.5),
1124        ])) as ArrayRef;
1125        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1126            Some("a"),
1127            Some("b"),
1128            None,
1129            Some("d"),
1130        ])) as ArrayRef;
1131        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1132            Some(b"one"),
1133            None,
1134            Some(b""),
1135            Some(b"zzzz"),
1136        ])) as ArrayRef;
1137        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1138            Some(0),
1139            Some(1),
1140            None,
1141            Some(3),
1142        ])) as ArrayRef;
1143        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1144            Some(0),
1145            Some(1),
1146            None,
1147            Some(3),
1148        ])) as ArrayRef;
1149        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1150            Some(0),
1151            Some(1),
1152            None,
1153            Some(3),
1154        ])) as ArrayRef;
1155        let col10 = Arc::new(
1156            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1157                .with_timezone_utc(),
1158        ) as ArrayRef;
1159        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1160            Some(0),
1161            Some(1),
1162            None,
1163            Some(3),
1164        ])) as ArrayRef;
1165        let col12 = Arc::new(
1166            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1167                .with_timezone_utc(),
1168        ) as ArrayRef;
1169        let col13 = Arc::new(
1170            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1171                .with_precision_and_scale(10, 5)
1172                .unwrap(),
1173        ) as ArrayRef;
1174        let col14 = Arc::new(
1175            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1176                vec![
1177                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1178                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1179                    None,
1180                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1181                ]
1182                .into_iter(),
1183                16,
1184            )
1185            .unwrap(),
1186        ) as ArrayRef;
1187        let col15 = Arc::new(
1188            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1189                vec![
1190                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1191                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1192                    None,
1193                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1194                ]
1195                .into_iter(),
1196                10,
1197            )
1198            .unwrap(),
1199        ) as ArrayRef;
1200        let col16 = Arc::new(
1201            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1202                .with_precision_and_scale(38, 5)
1203                .unwrap(),
1204        ) as ArrayRef;
1205        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1206            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1207            col14, col15, col16,
1208        ])
1209        .unwrap();
1210        let output_file = file_io.new_output(
1211            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1212        )?;
1213
1214        // write data
1215        let mut pw =
1216            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1217                .build(output_file)
1218                .await?;
1219        pw.write(&to_write).await?;
1220        let res = pw.close().await?;
1221        assert_eq!(res.len(), 1);
1222        let data_file = res
1223            .into_iter()
1224            .next()
1225            .unwrap()
1226            // Put dummy field for build successfully.
1227            .content(crate::spec::DataContentType::Data)
1228            .partition(Struct::empty())
1229            .partition_spec_id(0)
1230            .build()
1231            .unwrap();
1232
1233        // check data file
1234        assert_eq!(data_file.record_count(), 4);
1235        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1236        assert!(
1237            data_file
1238                .null_value_counts()
1239                .iter()
1240                .all(|(_, &v)| { v == 1 })
1241        );
1242        assert_eq!(
1243            *data_file.lower_bounds(),
1244            HashMap::from([
1245                (0, Datum::bool(false)),
1246                (1, Datum::int(1)),
1247                (2, Datum::long(1)),
1248                (3, Datum::float(0.5)),
1249                (4, Datum::double(0.5)),
1250                (5, Datum::string("a")),
1251                (6, Datum::binary(vec![])),
1252                (7, Datum::date(0)),
1253                (8, Datum::time_micros(0).unwrap()),
1254                (9, Datum::timestamp_micros(0)),
1255                (10, Datum::timestamptz_micros(0)),
1256                (11, Datum::timestamp_nanos(0)),
1257                (12, Datum::timestamptz_nanos(0)),
1258                (
1259                    13,
1260                    Datum::new(
1261                        PrimitiveType::Decimal {
1262                            precision: 10,
1263                            scale: 5
1264                        },
1265                        PrimitiveLiteral::Int128(1)
1266                    )
1267                ),
1268                (14, Datum::uuid(Uuid::from_u128(0))),
1269                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1270                (
1271                    16,
1272                    Datum::new(
1273                        PrimitiveType::Decimal {
1274                            precision: 38,
1275                            scale: 5
1276                        },
1277                        PrimitiveLiteral::Int128(1)
1278                    )
1279                ),
1280            ])
1281        );
1282        assert_eq!(
1283            *data_file.upper_bounds(),
1284            HashMap::from([
1285                (0, Datum::bool(true)),
1286                (1, Datum::int(4)),
1287                (2, Datum::long(4)),
1288                (3, Datum::float(3.5)),
1289                (4, Datum::double(3.5)),
1290                (5, Datum::string("d")),
1291                (6, Datum::binary(vec![122, 122, 122, 122])),
1292                (7, Datum::date(3)),
1293                (8, Datum::time_micros(3).unwrap()),
1294                (9, Datum::timestamp_micros(3)),
1295                (10, Datum::timestamptz_micros(3)),
1296                (11, Datum::timestamp_nanos(3)),
1297                (12, Datum::timestamptz_nanos(3)),
1298                (
1299                    13,
1300                    Datum::new(
1301                        PrimitiveType::Decimal {
1302                            precision: 10,
1303                            scale: 5
1304                        },
1305                        PrimitiveLiteral::Int128(100)
1306                    )
1307                ),
1308                (14, Datum::uuid(Uuid::from_u128(3))),
1309                (
1310                    15,
1311                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1312                ),
1313                (
1314                    16,
1315                    Datum::new(
1316                        PrimitiveType::Decimal {
1317                            precision: 38,
1318                            scale: 5
1319                        },
1320                        PrimitiveLiteral::Int128(100)
1321                    )
1322                ),
1323            ])
1324        );
1325
1326        // check the written file
1327        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1328
1329        Ok(())
1330    }
1331
1332    #[tokio::test]
1333    async fn test_decimal_bound() -> Result<()> {
1334        let temp_dir = TempDir::new().unwrap();
1335        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1336        let location_gen = DefaultLocationGenerator::with_data_location(
1337            temp_dir.path().to_str().unwrap().to_string(),
1338        );
1339        let file_name_gen =
1340            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1341
1342        // test 1.1 and 2.2
1343        let schema = Arc::new(
1344            Schema::builder()
1345                .with_fields(vec![
1346                    NestedField::optional(
1347                        0,
1348                        "decimal",
1349                        Type::Primitive(PrimitiveType::Decimal {
1350                            precision: 28,
1351                            scale: 10,
1352                        }),
1353                    )
1354                    .into(),
1355                ])
1356                .build()
1357                .unwrap(),
1358        );
1359        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1360        let output_file = file_io.new_output(
1361            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1362        )?;
1363        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1364            .build(output_file)
1365            .await?;
1366        let col0 = Arc::new(
1367            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1368                .with_data_type(DataType::Decimal128(28, 10)),
1369        ) as ArrayRef;
1370        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1371        pw.write(&to_write).await?;
1372        let res = pw.close().await?;
1373        assert_eq!(res.len(), 1);
1374        let data_file = res
1375            .into_iter()
1376            .next()
1377            .unwrap()
1378            .content(crate::spec::DataContentType::Data)
1379            .partition(Struct::empty())
1380            .partition_spec_id(0)
1381            .build()
1382            .unwrap();
1383        assert_eq!(
1384            data_file.upper_bounds().get(&0),
1385            Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1386                .as_ref()
1387        );
1388        assert_eq!(
1389            data_file.lower_bounds().get(&0),
1390            Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1391                .as_ref()
1392        );
1393
1394        // test -1.1 and -2.2
1395        let schema = Arc::new(
1396            Schema::builder()
1397                .with_fields(vec![
1398                    NestedField::optional(
1399                        0,
1400                        "decimal",
1401                        Type::Primitive(PrimitiveType::Decimal {
1402                            precision: 28,
1403                            scale: 10,
1404                        }),
1405                    )
1406                    .into(),
1407                ])
1408                .build()
1409                .unwrap(),
1410        );
1411        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1412        let output_file = file_io.new_output(
1413            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1414        )?;
1415        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1416            .build(output_file)
1417            .await?;
1418        let col0 = Arc::new(
1419            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1420                .with_data_type(DataType::Decimal128(28, 10)),
1421        ) as ArrayRef;
1422        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1423        pw.write(&to_write).await?;
1424        let res = pw.close().await?;
1425        assert_eq!(res.len(), 1);
1426        let data_file = res
1427            .into_iter()
1428            .next()
1429            .unwrap()
1430            .content(crate::spec::DataContentType::Data)
1431            .partition(Struct::empty())
1432            .partition_spec_id(0)
1433            .build()
1434            .unwrap();
1435        assert_eq!(
1436            data_file.upper_bounds().get(&0),
1437            Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1438                .as_ref()
1439        );
1440        assert_eq!(
1441            data_file.lower_bounds().get(&0),
1442            Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1443                .as_ref()
1444        );
1445
1446        // test max and min of rust_decimal
1447        let decimal_max = Decimal::MAX;
1448        let decimal_min = Decimal::MIN;
1449        assert_eq!(decimal_max.scale(), decimal_min.scale());
1450        let schema = Arc::new(
1451            Schema::builder()
1452                .with_fields(vec![
1453                    NestedField::optional(
1454                        0,
1455                        "decimal",
1456                        Type::Primitive(PrimitiveType::Decimal {
1457                            precision: 38,
1458                            scale: decimal_max.scale(),
1459                        }),
1460                    )
1461                    .into(),
1462                ])
1463                .build()
1464                .unwrap(),
1465        );
1466        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1467        let output_file = file_io.new_output(
1468            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1469        )?;
1470        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1471            .build(output_file)
1472            .await?;
1473        let col0 = Arc::new(
1474            Decimal128Array::from(vec![
1475                Some(decimal_max.mantissa()),
1476                Some(decimal_min.mantissa()),
1477            ])
1478            .with_data_type(DataType::Decimal128(38, 0)),
1479        ) as ArrayRef;
1480        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1481        pw.write(&to_write).await?;
1482        let res = pw.close().await?;
1483        assert_eq!(res.len(), 1);
1484        let data_file = res
1485            .into_iter()
1486            .next()
1487            .unwrap()
1488            .content(crate::spec::DataContentType::Data)
1489            .partition(Struct::empty())
1490            .partition_spec_id(0)
1491            .build()
1492            .unwrap();
1493        assert_eq!(
1494            data_file.upper_bounds().get(&0),
1495            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1496        );
1497        assert_eq!(
1498            data_file.lower_bounds().get(&0),
1499            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1500        );
1501
1502        // test max and min for scale 38
1503        // # TODO
1504        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1505        // let schema = Arc::new(
1506        //     Schema::builder()
1507        //         .with_fields(vec![NestedField::optional(
1508        //             0,
1509        //             "decimal",
1510        //             Type::Primitive(PrimitiveType::Decimal {
1511        //                 precision: 38,
1512        //                 scale: 0,
1513        //             }),
1514        //         )
1515        //         .into()])
1516        //         .build()
1517        //         .unwrap(),
1518        // );
1519        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1520        // let mut pw = ParquetWriterBuilder::new(
1521        //     WriterProperties::builder().build(),
1522        //     schema,
1523        //     file_io.clone(),
1524        //     loccation_gen,
1525        //     file_name_gen,
1526        // )
1527        // .build()
1528        // .await?;
1529        // let col0 = Arc::new(
1530        //     Decimal128Array::from(vec![
1531        //         Some(99999999999999999999999999999999999999_i128),
1532        //         Some(-99999999999999999999999999999999999999_i128),
1533        //     ])
1534        //     .with_data_type(DataType::Decimal128(38, 0)),
1535        // ) as ArrayRef;
1536        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1537        // pw.write(&to_write).await?;
1538        // let res = pw.close().await?;
1539        // assert_eq!(res.len(), 1);
1540        // let data_file = res
1541        //     .into_iter()
1542        //     .next()
1543        //     .unwrap()
1544        //     .content(crate::spec::DataContentType::Data)
1545        //     .partition(Struct::empty())
1546        //     .build()
1547        //     .unwrap();
1548        // assert_eq!(
1549        //     data_file.upper_bounds().get(&0),
1550        //     Some(Datum::new(
1551        //         PrimitiveType::Decimal {
1552        //             precision: 38,
1553        //             scale: 0
1554        //         },
1555        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1556        //     ))
1557        //     .as_ref()
1558        // );
1559        // assert_eq!(
1560        //     data_file.lower_bounds().get(&0),
1561        //     Some(Datum::new(
1562        //         PrimitiveType::Decimal {
1563        //             precision: 38,
1564        //             scale: 0
1565        //         },
1566        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1567        //     ))
1568        //     .as_ref()
1569        // );
1570
1571        Ok(())
1572    }
1573
1574    #[tokio::test]
1575    async fn test_empty_write() -> Result<()> {
1576        let temp_dir = TempDir::new().unwrap();
1577        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1578        let location_gen = DefaultLocationGenerator::with_data_location(
1579            temp_dir.path().to_str().unwrap().to_string(),
1580        );
1581        let file_name_gen =
1582            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1583
1584        // Test that file will create if data to write
1585        let schema = {
1586            let fields = vec![
1587                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1588                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1589                ),
1590            ];
1591            Arc::new(arrow_schema::Schema::new(fields))
1592        };
1593        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1594        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1595        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1596        let output_file = file_io.new_output(&file_path)?;
1597        let mut pw = ParquetWriterBuilder::new(
1598            WriterProperties::builder().build(),
1599            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1600        )
1601        .build(output_file)
1602        .await?;
1603        pw.write(&to_write).await?;
1604        pw.close().await.unwrap();
1605        assert!(file_io.exists(&file_path).await.unwrap());
1606
1607        // Test that file will not create if no data to write
1608        let file_name_gen =
1609            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1610        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1611        let output_file = file_io.new_output(&file_path)?;
1612        let pw = ParquetWriterBuilder::new(
1613            WriterProperties::builder().build(),
1614            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1615        )
1616        .build(output_file)
1617        .await?;
1618        pw.close().await.unwrap();
1619        assert!(!file_io.exists(&file_path).await.unwrap());
1620
1621        Ok(())
1622    }
1623
1624    #[tokio::test]
1625    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1626        let temp_dir = TempDir::new().unwrap();
1627        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1628        let location_gen = DefaultLocationGenerator::with_data_location(
1629            temp_dir.path().to_str().unwrap().to_string(),
1630        );
1631        let file_name_gen =
1632            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1633
1634        // prepare data
1635        let arrow_schema = {
1636            let fields = vec![
1637                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1638                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1639                ),
1640                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1641                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1642                ),
1643            ];
1644            Arc::new(arrow_schema::Schema::new(fields))
1645        };
1646
1647        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1648            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1649            None,
1650        )) as ArrayRef;
1651
1652        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1653            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1654            None,
1655        )) as ArrayRef;
1656
1657        let to_write =
1658            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1659        let output_file = file_io.new_output(
1660            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1661        )?;
1662
1663        // write data
1664        let mut pw = ParquetWriterBuilder::new(
1665            WriterProperties::builder().build(),
1666            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1667        )
1668        .build(output_file)
1669        .await?;
1670
1671        pw.write(&to_write).await?;
1672        let res = pw.close().await?;
1673        assert_eq!(res.len(), 1);
1674        let data_file = res
1675            .into_iter()
1676            .next()
1677            .unwrap()
1678            // Put dummy field for build successfully.
1679            .content(crate::spec::DataContentType::Data)
1680            .partition(Struct::empty())
1681            .partition_spec_id(0)
1682            .build()
1683            .unwrap();
1684
1685        // check data file
1686        assert_eq!(data_file.record_count(), 4);
1687        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1688        assert_eq!(
1689            *data_file.lower_bounds(),
1690            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1691        );
1692        assert_eq!(
1693            *data_file.upper_bounds(),
1694            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1695        );
1696        assert_eq!(
1697            *data_file.null_value_counts(),
1698            HashMap::from([(0, 0), (1, 0)])
1699        );
1700        assert_eq!(
1701            *data_file.nan_value_counts(),
1702            HashMap::from([(0, 1), (1, 1)])
1703        );
1704
1705        // check the written file
1706        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1707        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1708
1709        Ok(())
1710    }
1711
1712    #[tokio::test]
1713    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1714        let temp_dir = TempDir::new().unwrap();
1715        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1716        let location_gen = DefaultLocationGenerator::with_data_location(
1717            temp_dir.path().to_str().unwrap().to_string(),
1718        );
1719        let file_name_gen =
1720            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1721
1722        let schema_struct_float_fields = Fields::from(vec![
1723            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1724                PARQUET_FIELD_ID_META_KEY.to_string(),
1725                "4".to_string(),
1726            )])),
1727        ]);
1728
1729        let schema_struct_nested_float_fields = Fields::from(vec![
1730            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1731                PARQUET_FIELD_ID_META_KEY.to_string(),
1732                "7".to_string(),
1733            )])),
1734        ]);
1735
1736        let schema_struct_nested_fields = Fields::from(vec![
1737            Field::new(
1738                "col6",
1739                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1740                false,
1741            )
1742            .with_metadata(HashMap::from([(
1743                PARQUET_FIELD_ID_META_KEY.to_string(),
1744                "6".to_string(),
1745            )])),
1746        ]);
1747
1748        // prepare data
1749        let arrow_schema = {
1750            let fields = vec![
1751                Field::new(
1752                    "col3",
1753                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1754                    false,
1755                )
1756                .with_metadata(HashMap::from([(
1757                    PARQUET_FIELD_ID_META_KEY.to_string(),
1758                    "3".to_string(),
1759                )])),
1760                Field::new(
1761                    "col5",
1762                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1763                    false,
1764                )
1765                .with_metadata(HashMap::from([(
1766                    PARQUET_FIELD_ID_META_KEY.to_string(),
1767                    "5".to_string(),
1768                )])),
1769            ];
1770            Arc::new(arrow_schema::Schema::new(fields))
1771        };
1772
1773        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1774            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1775            None,
1776        )) as ArrayRef;
1777
1778        let struct_float_field_col = Arc::new(StructArray::new(
1779            schema_struct_float_fields,
1780            vec![float_32_col.clone()],
1781            None,
1782        )) as ArrayRef;
1783
1784        let struct_nested_float_field_col = Arc::new(StructArray::new(
1785            schema_struct_nested_fields,
1786            vec![Arc::new(StructArray::new(
1787                schema_struct_nested_float_fields,
1788                vec![float_32_col.clone()],
1789                None,
1790            )) as ArrayRef],
1791            None,
1792        )) as ArrayRef;
1793
1794        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1795            struct_float_field_col,
1796            struct_nested_float_field_col,
1797        ])
1798        .unwrap();
1799        let output_file = file_io.new_output(
1800            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1801        )?;
1802
1803        // write data
1804        let mut pw = ParquetWriterBuilder::new(
1805            WriterProperties::builder().build(),
1806            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1807        )
1808        .build(output_file)
1809        .await?;
1810
1811        pw.write(&to_write).await?;
1812        let res = pw.close().await?;
1813        assert_eq!(res.len(), 1);
1814        let data_file = res
1815            .into_iter()
1816            .next()
1817            .unwrap()
1818            // Put dummy field for build successfully.
1819            .content(crate::spec::DataContentType::Data)
1820            .partition(Struct::empty())
1821            .partition_spec_id(0)
1822            .build()
1823            .unwrap();
1824
1825        // check data file
1826        assert_eq!(data_file.record_count(), 4);
1827        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1828        assert_eq!(
1829            *data_file.lower_bounds(),
1830            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1831        );
1832        assert_eq!(
1833            *data_file.upper_bounds(),
1834            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1835        );
1836        assert_eq!(
1837            *data_file.null_value_counts(),
1838            HashMap::from([(4, 0), (7, 0)])
1839        );
1840        assert_eq!(
1841            *data_file.nan_value_counts(),
1842            HashMap::from([(4, 1), (7, 1)])
1843        );
1844
1845        // check the written file
1846        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1847        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1848
1849        Ok(())
1850    }
1851
1852    #[tokio::test]
1853    async fn test_nan_val_cnts_list_type() -> Result<()> {
1854        let temp_dir = TempDir::new().unwrap();
1855        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1856        let location_gen = DefaultLocationGenerator::with_data_location(
1857            temp_dir.path().to_str().unwrap().to_string(),
1858        );
1859        let file_name_gen =
1860            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1861
1862        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1863            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1864        );
1865
1866        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1867            .with_metadata(HashMap::from([(
1868                PARQUET_FIELD_ID_META_KEY.to_string(),
1869                "4".to_string(),
1870            )]));
1871
1872        let schema_struct_list_field = Fields::from(vec![
1873            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1874                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1875            ),
1876        ]);
1877
1878        let arrow_schema = {
1879            let fields = vec![
1880                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1881                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1882                ),
1883                Field::new_struct("col1", schema_struct_list_field.clone(), true)
1884                    .with_metadata(HashMap::from([(
1885                        PARQUET_FIELD_ID_META_KEY.to_string(),
1886                        "2".to_string(),
1887                    )]))
1888                    .clone(),
1889                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
1890                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1891                // ).clone(),
1892            ];
1893            Arc::new(arrow_schema::Schema::new(fields))
1894        };
1895
1896        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1897            Some(1.0_f32),
1898            Some(f32::NAN),
1899            Some(2.0),
1900            Some(2.0),
1901        ])])
1902        .into_parts();
1903
1904        let list_float_field_col = Arc::new({
1905            let list_parts = list_parts.clone();
1906            ListArray::new(
1907                {
1908                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
1909                        field.clone()
1910                    } else {
1911                        unreachable!()
1912                    }
1913                },
1914                list_parts.1,
1915                list_parts.2,
1916                list_parts.3,
1917            )
1918        }) as ArrayRef;
1919
1920        let struct_list_fields_schema =
1921            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1922                fields.clone()
1923            } else {
1924                unreachable!()
1925            };
1926
1927        let struct_list_float_field_col = Arc::new({
1928            ListArray::new(
1929                {
1930                    if let DataType::List(field) = struct_list_fields_schema
1931                        .first()
1932                        .expect("could not find first list field")
1933                        .data_type()
1934                    {
1935                        field.clone()
1936                    } else {
1937                        unreachable!()
1938                    }
1939                },
1940                list_parts.1,
1941                list_parts.2,
1942                list_parts.3,
1943            )
1944        }) as ArrayRef;
1945
1946        let struct_list_float_field_col = Arc::new(StructArray::new(
1947            struct_list_fields_schema,
1948            vec![struct_list_float_field_col.clone()],
1949            None,
1950        )) as ArrayRef;
1951
1952        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1953            list_float_field_col,
1954            struct_list_float_field_col,
1955            // large_list_float_field_col,
1956        ])
1957        .expect("Could not form record batch");
1958        let output_file = file_io.new_output(
1959            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1960        )?;
1961
1962        // write data
1963        let mut pw = ParquetWriterBuilder::new(
1964            WriterProperties::builder().build(),
1965            Arc::new(
1966                to_write
1967                    .schema()
1968                    .as_ref()
1969                    .try_into()
1970                    .expect("Could not convert iceberg schema"),
1971            ),
1972        )
1973        .build(output_file)
1974        .await?;
1975
1976        pw.write(&to_write).await?;
1977        let res = pw.close().await?;
1978        assert_eq!(res.len(), 1);
1979        let data_file = res
1980            .into_iter()
1981            .next()
1982            .unwrap()
1983            .content(crate::spec::DataContentType::Data)
1984            .partition(Struct::empty())
1985            .partition_spec_id(0)
1986            .build()
1987            .unwrap();
1988
1989        // check data file
1990        assert_eq!(data_file.record_count(), 1);
1991        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
1992        assert_eq!(
1993            *data_file.lower_bounds(),
1994            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
1995        );
1996        assert_eq!(
1997            *data_file.upper_bounds(),
1998            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
1999        );
2000        assert_eq!(
2001            *data_file.null_value_counts(),
2002            HashMap::from([(1, 0), (4, 0)])
2003        );
2004        assert_eq!(
2005            *data_file.nan_value_counts(),
2006            HashMap::from([(1, 1), (4, 1)])
2007        );
2008
2009        // check the written file
2010        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2011        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2012
2013        Ok(())
2014    }
2015
2016    macro_rules! construct_map_arr {
2017        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2018            let int_builder = Int32Builder::new();
2019            let float_builder = Float32Builder::with_capacity(4);
2020            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2021            builder.keys().append_value(1);
2022            builder.values().append_value(1.0_f32);
2023            builder.append(true).unwrap();
2024            builder.keys().append_value(2);
2025            builder.values().append_value(f32::NAN);
2026            builder.append(true).unwrap();
2027            builder.keys().append_value(3);
2028            builder.values().append_value(2.0);
2029            builder.append(true).unwrap();
2030            builder.keys().append_value(4);
2031            builder.values().append_value(2.0);
2032            builder.append(true).unwrap();
2033            let array = builder.finish();
2034
2035            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2036            let new_struct_fields_schema =
2037                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2038
2039            let entries = {
2040                let (_, arrays, nulls) = entries.into_parts();
2041                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2042            };
2043
2044            let field = Arc::new(Field::new(
2045                DEFAULT_MAP_FIELD_NAME,
2046                DataType::Struct(new_struct_fields_schema),
2047                false,
2048            ));
2049
2050            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2051        }};
2052    }
2053
2054    #[tokio::test]
2055    async fn test_nan_val_cnts_map_type() -> Result<()> {
2056        let temp_dir = TempDir::new().unwrap();
2057        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2058        let location_gen = DefaultLocationGenerator::with_data_location(
2059            temp_dir.path().to_str().unwrap().to_string(),
2060        );
2061        let file_name_gen =
2062            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2063
2064        let map_key_field_schema =
2065            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2066                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2067            ]));
2068
2069        let map_value_field_schema =
2070            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2071                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2072            ));
2073
2074        let struct_map_key_field_schema =
2075            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2076                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2077            ]));
2078
2079        let struct_map_value_field_schema =
2080            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2081                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2082            ));
2083
2084        let schema_struct_map_field = Fields::from(vec![
2085            Field::new_map(
2086                "col3",
2087                DEFAULT_MAP_FIELD_NAME,
2088                struct_map_key_field_schema.clone(),
2089                struct_map_value_field_schema.clone(),
2090                false,
2091                false,
2092            )
2093            .with_metadata(HashMap::from([(
2094                PARQUET_FIELD_ID_META_KEY.to_string(),
2095                "5".to_string(),
2096            )])),
2097        ]);
2098
2099        let arrow_schema = {
2100            let fields = vec![
2101                Field::new_map(
2102                    "col0",
2103                    DEFAULT_MAP_FIELD_NAME,
2104                    map_key_field_schema.clone(),
2105                    map_value_field_schema.clone(),
2106                    false,
2107                    false,
2108                )
2109                .with_metadata(HashMap::from([(
2110                    PARQUET_FIELD_ID_META_KEY.to_string(),
2111                    "0".to_string(),
2112                )])),
2113                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2114                    .with_metadata(HashMap::from([(
2115                        PARQUET_FIELD_ID_META_KEY.to_string(),
2116                        "3".to_string(),
2117                    )]))
2118                    .clone(),
2119            ];
2120            Arc::new(arrow_schema::Schema::new(fields))
2121        };
2122
2123        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2124
2125        let struct_map_arr =
2126            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2127
2128        let struct_list_float_field_col = Arc::new(StructArray::new(
2129            schema_struct_map_field,
2130            vec![struct_map_arr],
2131            None,
2132        )) as ArrayRef;
2133
2134        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2135            map_array,
2136            struct_list_float_field_col,
2137        ])
2138        .expect("Could not form record batch");
2139        let output_file = file_io.new_output(
2140            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2141        )?;
2142
2143        // write data
2144        let mut pw = ParquetWriterBuilder::new(
2145            WriterProperties::builder().build(),
2146            Arc::new(
2147                to_write
2148                    .schema()
2149                    .as_ref()
2150                    .try_into()
2151                    .expect("Could not convert iceberg schema"),
2152            ),
2153        )
2154        .build(output_file)
2155        .await?;
2156
2157        pw.write(&to_write).await?;
2158        let res = pw.close().await?;
2159        assert_eq!(res.len(), 1);
2160        let data_file = res
2161            .into_iter()
2162            .next()
2163            .unwrap()
2164            .content(crate::spec::DataContentType::Data)
2165            .partition(Struct::empty())
2166            .partition_spec_id(0)
2167            .build()
2168            .unwrap();
2169
2170        // check data file
2171        assert_eq!(data_file.record_count(), 4);
2172        assert_eq!(
2173            *data_file.value_counts(),
2174            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2175        );
2176        assert_eq!(
2177            *data_file.lower_bounds(),
2178            HashMap::from([
2179                (1, Datum::int(1)),
2180                (2, Datum::float(1.0)),
2181                (6, Datum::int(1)),
2182                (7, Datum::float(1.0))
2183            ])
2184        );
2185        assert_eq!(
2186            *data_file.upper_bounds(),
2187            HashMap::from([
2188                (1, Datum::int(4)),
2189                (2, Datum::float(2.0)),
2190                (6, Datum::int(4)),
2191                (7, Datum::float(2.0))
2192            ])
2193        );
2194        assert_eq!(
2195            *data_file.null_value_counts(),
2196            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2197        );
2198        assert_eq!(
2199            *data_file.nan_value_counts(),
2200            HashMap::from([(2, 1), (7, 1)])
2201        );
2202
2203        // check the written file
2204        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2205        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2206
2207        Ok(())
2208    }
2209
2210    #[tokio::test]
2211    async fn test_write_empty_parquet_file() {
2212        let temp_dir = TempDir::new().unwrap();
2213        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2214        let location_gen = DefaultLocationGenerator::with_data_location(
2215            temp_dir.path().to_str().unwrap().to_string(),
2216        );
2217        let file_name_gen =
2218            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2219        let output_file = file_io
2220            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2221            .unwrap();
2222
2223        // write data
2224        let pw = ParquetWriterBuilder::new(
2225            WriterProperties::builder().build(),
2226            Arc::new(
2227                Schema::builder()
2228                    .with_schema_id(1)
2229                    .with_fields(vec![
2230                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2231                            .with_id(0)
2232                            .into(),
2233                    ])
2234                    .build()
2235                    .expect("Failed to create schema"),
2236            ),
2237        )
2238        .build(output_file)
2239        .await
2240        .unwrap();
2241
2242        let res = pw.close().await.unwrap();
2243        assert_eq!(res.len(), 0);
2244
2245        // Check that file should have been deleted.
2246        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2247    }
2248
2249    #[test]
2250    fn test_min_max_aggregator() {
2251        let schema = Arc::new(
2252            Schema::builder()
2253                .with_schema_id(1)
2254                .with_fields(vec![
2255                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2256                        .with_id(0)
2257                        .into(),
2258                ])
2259                .build()
2260                .expect("Failed to create schema"),
2261        );
2262        let mut min_max_agg = MinMaxColAggregator::new(schema);
2263        let create_statistics =
2264            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2265        min_max_agg
2266            .update(0, create_statistics(None, Some(42)))
2267            .unwrap();
2268        min_max_agg
2269            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2270            .unwrap();
2271        min_max_agg
2272            .update(0, create_statistics(Some(i32::MIN), None))
2273            .unwrap();
2274        min_max_agg
2275            .update(0, create_statistics(None, None))
2276            .unwrap();
2277
2278        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2279
2280        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2281        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2282    }
2283}