iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33use parquet::format::FileMetaData;
34use parquet::thrift::{TCompactOutputProtocol, TSerializable};
35use thrift::protocol::TOutputProtocol;
36
37use super::location_generator::{FileNameGenerator, LocationGenerator};
38use super::{FileWriter, FileWriterBuilder};
39use crate::arrow::{
40    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
41    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
42};
43use crate::io::{FileIO, FileWrite, OutputFile};
44use crate::spec::{
45    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
46    NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor,
47    Struct, StructType, TableMetadata, Type, visit_schema,
48};
49use crate::transform::create_transform_function;
50use crate::writer::{CurrentFileStatus, DataFile};
51use crate::{Error, ErrorKind, Result};
52
53/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
54#[derive(Clone, Debug)]
55pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
56    props: WriterProperties,
57    schema: SchemaRef,
58    partition_key: Option<PartitionKey>,
59    match_mode: FieldMatchMode,
60
61    file_io: FileIO,
62    location_generator: T,
63    file_name_generator: F,
64}
65
66impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
67    /// Create a new `ParquetWriterBuilder`
68    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
69    pub fn new(
70        props: WriterProperties,
71        schema: SchemaRef,
72        partition_key: Option<PartitionKey>,
73        file_io: FileIO,
74        location_generator: T,
75        file_name_generator: F,
76    ) -> Self {
77        Self::new_with_match_mode(
78            props,
79            schema,
80            partition_key,
81            FieldMatchMode::Id,
82            file_io,
83            location_generator,
84            file_name_generator,
85        )
86    }
87
88    /// Create a new `ParquetWriterBuilder` with custom match mode
89    pub fn new_with_match_mode(
90        props: WriterProperties,
91        schema: SchemaRef,
92        partition_key: Option<PartitionKey>,
93        match_mode: FieldMatchMode,
94        file_io: FileIO,
95        location_generator: T,
96        file_name_generator: F,
97    ) -> Self {
98        Self {
99            props,
100            schema,
101            partition_key,
102            match_mode,
103            file_io,
104            location_generator,
105            file_name_generator,
106        }
107    }
108}
109
110impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWriterBuilder<T, F> {
111    type R = ParquetWriter;
112
113    async fn build(self) -> Result<Self::R> {
114        let out_file = self
115            .file_io
116            .new_output(self.location_generator.generate_location(
117                self.partition_key.as_ref(),
118                &self.file_name_generator.generate_file_name(),
119            ))?;
120
121        Ok(ParquetWriter {
122            schema: self.schema.clone(),
123            inner_writer: None,
124            writer_properties: self.props,
125            current_row_num: 0,
126            out_file,
127            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
128        })
129    }
130}
131
132/// A mapping from Parquet column path names to internal field id
133struct IndexByParquetPathName {
134    name_to_id: HashMap<String, i32>,
135
136    field_names: Vec<String>,
137
138    field_id: i32,
139}
140
141impl IndexByParquetPathName {
142    /// Creates a new, empty `IndexByParquetPathName`
143    pub fn new() -> Self {
144        Self {
145            name_to_id: HashMap::new(),
146            field_names: Vec::new(),
147            field_id: 0,
148        }
149    }
150
151    /// Retrieves the internal field ID
152    pub fn get(&self, name: &str) -> Option<&i32> {
153        self.name_to_id.get(name)
154    }
155}
156
157impl Default for IndexByParquetPathName {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl SchemaVisitor for IndexByParquetPathName {
164    type T = ();
165
166    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
167        self.field_names.push(field.name.to_string());
168        self.field_id = field.id;
169        Ok(())
170    }
171
172    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
173        self.field_names.pop();
174        Ok(())
175    }
176
177    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
178        self.field_names.push(format!("list.{}", field.name));
179        self.field_id = field.id;
180        Ok(())
181    }
182
183    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
184        self.field_names.pop();
185        Ok(())
186    }
187
188    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
189        self.field_names
190            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
191        self.field_id = field.id;
192        Ok(())
193    }
194
195    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
196        self.field_names.pop();
197        Ok(())
198    }
199
200    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
201        self.field_names
202            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
203        self.field_id = field.id;
204        Ok(())
205    }
206
207    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
208        self.field_names.pop();
209        Ok(())
210    }
211
212    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
213        Ok(())
214    }
215
216    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
217        Ok(())
218    }
219
220    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
221        Ok(())
222    }
223
224    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
225        Ok(())
226    }
227
228    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
229        Ok(())
230    }
231
232    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
233        let full_name = self.field_names.iter().map(String::as_str).join(".");
234        let field_id = self.field_id;
235        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
236            return Err(Error::new(
237                ErrorKind::DataInvalid,
238                format!(
239                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
240                ),
241            ));
242        } else {
243            self.name_to_id.insert(full_name, field_id);
244        }
245
246        Ok(())
247    }
248}
249
250/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
251pub struct ParquetWriter {
252    schema: SchemaRef,
253    out_file: OutputFile,
254    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
255    writer_properties: WriterProperties,
256    current_row_num: usize,
257    nan_value_count_visitor: NanValueCountVisitor,
258}
259
260/// Used to aggregate min and max value of each column.
261struct MinMaxColAggregator {
262    lower_bounds: HashMap<i32, Datum>,
263    upper_bounds: HashMap<i32, Datum>,
264    schema: SchemaRef,
265}
266
267impl MinMaxColAggregator {
268    /// Creates new and empty `MinMaxColAggregator`
269    fn new(schema: SchemaRef) -> Self {
270        Self {
271            lower_bounds: HashMap::new(),
272            upper_bounds: HashMap::new(),
273            schema,
274        }
275    }
276
277    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
278        self.lower_bounds
279            .entry(field_id)
280            .and_modify(|e| {
281                if *e > datum {
282                    *e = datum.clone()
283                }
284            })
285            .or_insert(datum);
286    }
287
288    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
289        self.upper_bounds
290            .entry(field_id)
291            .and_modify(|e| {
292                if *e < datum {
293                    *e = datum.clone()
294                }
295            })
296            .or_insert(datum);
297    }
298
299    /// Update statistics
300    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
301        let Some(ty) = self
302            .schema
303            .field_by_id(field_id)
304            .map(|f| f.field_type.as_ref())
305        else {
306            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
307            // Ignore the field if it is not in schema.
308            return Ok(());
309        };
310        let Type::Primitive(ty) = ty.clone() else {
311            return Err(Error::new(
312                ErrorKind::Unexpected,
313                format!(
314                    "Composed type {} is not supported for min max aggregation.",
315                    ty
316                ),
317            ));
318        };
319
320        if value.min_is_exact() {
321            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
322                return Err(Error::new(
323                    ErrorKind::Unexpected,
324                    format!("Statistics {} is not match with field type {}.", value, ty),
325                ));
326            };
327
328            self.update_state_min(field_id, min_datum);
329        }
330
331        if value.max_is_exact() {
332            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
333                return Err(Error::new(
334                    ErrorKind::Unexpected,
335                    format!("Statistics {} is not match with field type {}.", value, ty),
336                ));
337            };
338
339            self.update_state_max(field_id, max_datum);
340        }
341
342        Ok(())
343    }
344
345    /// Returns lower and upper bounds
346    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
347        (self.lower_bounds, self.upper_bounds)
348    }
349}
350
351impl ParquetWriter {
352    /// Converts parquet files to data files
353    #[allow(dead_code)]
354    pub(crate) async fn parquet_files_to_data_files(
355        file_io: &FileIO,
356        file_paths: Vec<String>,
357        table_metadata: &TableMetadata,
358    ) -> Result<Vec<DataFile>> {
359        // TODO: support adding to partitioned table
360        let mut data_files: Vec<DataFile> = Vec::new();
361
362        for file_path in file_paths {
363            let input_file = file_io.new_input(&file_path)?;
364            let file_metadata = input_file.metadata().await?;
365            let file_size_in_bytes = file_metadata.size as usize;
366            let reader = input_file.reader().await?;
367
368            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
369            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
370                Error::new(
371                    ErrorKind::DataInvalid,
372                    format!("Error reading Parquet metadata: {}", err),
373                )
374            })?;
375            let mut builder = ParquetWriter::parquet_to_data_file_builder(
376                table_metadata.current_schema().clone(),
377                parquet_metadata,
378                file_size_in_bytes,
379                file_path,
380                // TODO: Implement nan_value_counts here
381                HashMap::new(),
382            )?;
383            builder.partition_spec_id(table_metadata.default_partition_spec_id());
384            let data_file = builder.build().unwrap();
385            data_files.push(data_file);
386        }
387
388        Ok(data_files)
389    }
390
391    fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> {
392        let mut buffer = Vec::new();
393        {
394            let mut protocol = TCompactOutputProtocol::new(&mut buffer);
395            file_metadata
396                .write_to_out_protocol(&mut protocol)
397                .map_err(|err| {
398                    Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata")
399                        .with_source(err)
400                })?;
401
402            protocol.flush().map_err(|err| {
403                Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err)
404            })?;
405        }
406
407        let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
408            Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err)
409        })?;
410
411        Ok(parquet_metadata)
412    }
413
414    /// `ParquetMetadata` to data file builder
415    pub(crate) fn parquet_to_data_file_builder(
416        schema: SchemaRef,
417        metadata: Arc<ParquetMetaData>,
418        written_size: usize,
419        file_path: String,
420        nan_value_counts: HashMap<i32, u64>,
421    ) -> Result<DataFileBuilder> {
422        let index_by_parquet_path = {
423            let mut visitor = IndexByParquetPathName::new();
424            visit_schema(&schema, &mut visitor)?;
425            visitor
426        };
427
428        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
429            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
430            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
431            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
432            let mut min_max_agg = MinMaxColAggregator::new(schema);
433
434            for row_group in metadata.row_groups() {
435                for column_chunk_metadata in row_group.columns() {
436                    let parquet_path = column_chunk_metadata.column_descr().path().string();
437
438                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
439                        continue;
440                    };
441
442                    *per_col_size.entry(field_id).or_insert(0) +=
443                        column_chunk_metadata.compressed_size() as u64;
444                    *per_col_val_num.entry(field_id).or_insert(0) +=
445                        column_chunk_metadata.num_values() as u64;
446
447                    if let Some(statistics) = column_chunk_metadata.statistics() {
448                        if let Some(null_count) = statistics.null_count_opt() {
449                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
450                        }
451
452                        min_max_agg.update(field_id, statistics.clone())?;
453                    }
454                }
455            }
456            (
457                per_col_size,
458                per_col_val_num,
459                per_col_null_val_num,
460                min_max_agg.produce(),
461            )
462        };
463
464        let mut builder = DataFileBuilder::default();
465        builder
466            .content(DataContentType::Data)
467            .file_path(file_path)
468            .file_format(DataFileFormat::Parquet)
469            .partition(Struct::empty())
470            .record_count(metadata.file_metadata().num_rows() as u64)
471            .file_size_in_bytes(written_size as u64)
472            .column_sizes(column_sizes)
473            .value_counts(value_counts)
474            .null_value_counts(null_value_counts)
475            .nan_value_counts(nan_value_counts)
476            // # NOTE:
477            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
478            .lower_bounds(lower_bounds)
479            .upper_bounds(upper_bounds)
480            .split_offsets(
481                metadata
482                    .row_groups()
483                    .iter()
484                    .filter_map(|group| group.file_offset())
485                    .collect(),
486            );
487
488        Ok(builder)
489    }
490
491    #[allow(dead_code)]
492    fn partition_value_from_bounds(
493        table_spec: Arc<PartitionSpec>,
494        lower_bounds: &HashMap<i32, Datum>,
495        upper_bounds: &HashMap<i32, Datum>,
496    ) -> Result<Struct> {
497        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
498
499        for field in table_spec.fields() {
500            if let (Some(lower), Some(upper)) = (
501                lower_bounds.get(&field.source_id),
502                upper_bounds.get(&field.source_id),
503            ) {
504                if !field.transform.preserves_order() {
505                    return Err(Error::new(
506                        ErrorKind::DataInvalid,
507                        format!(
508                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
509                            field.name, field.transform
510                        ),
511                    ));
512                }
513
514                if lower != upper {
515                    return Err(Error::new(
516                        ErrorKind::DataInvalid,
517                        format!(
518                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
519                            field.name, lower, upper
520                        ),
521                    ));
522                }
523
524                let transform_fn = create_transform_function(&field.transform)?;
525                let transform_literal =
526                    Literal::from(transform_fn.transform_literal_result(lower)?);
527
528                partition_literals.push(Some(transform_literal));
529            } else {
530                partition_literals.push(None);
531            }
532        }
533
534        let partition_struct = Struct::from_iter(partition_literals);
535
536        Ok(partition_struct)
537    }
538}
539
540impl FileWriter for ParquetWriter {
541    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
542        // Skip empty batch
543        if batch.num_rows() == 0 {
544            return Ok(());
545        }
546
547        self.current_row_num += batch.num_rows();
548
549        let batch_c = batch.clone();
550        self.nan_value_count_visitor
551            .compute(self.schema.clone(), batch_c)?;
552
553        // Lazy initialize the writer
554        let writer = if let Some(writer) = &mut self.inner_writer {
555            writer
556        } else {
557            let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
558            let inner_writer = self.out_file.writer().await?;
559            let async_writer = AsyncFileWriter::new(inner_writer);
560            let writer = AsyncArrowWriter::try_new(
561                async_writer,
562                arrow_schema.clone(),
563                Some(self.writer_properties.clone()),
564            )
565            .map_err(|err| {
566                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
567                    .with_source(err)
568            })?;
569            self.inner_writer = Some(writer);
570            self.inner_writer.as_mut().unwrap()
571        };
572
573        writer.write(batch).await.map_err(|err| {
574            Error::new(
575                ErrorKind::Unexpected,
576                "Failed to write using parquet writer.",
577            )
578            .with_source(err)
579        })?;
580
581        Ok(())
582    }
583
584    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
585        let mut writer = match self.inner_writer.take() {
586            Some(writer) => writer,
587            None => return Ok(vec![]),
588        };
589
590        let metadata = writer.finish().await.map_err(|err| {
591            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
592        })?;
593
594        let written_size = writer.bytes_written();
595
596        if self.current_row_num == 0 {
597            self.out_file.delete().await.map_err(|err| {
598                Error::new(
599                    ErrorKind::Unexpected,
600                    "Failed to delete empty parquet file.",
601                )
602                .with_source(err)
603            })?;
604            Ok(vec![])
605        } else {
606            let parquet_metadata =
607                Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
608                    Error::new(
609                        ErrorKind::Unexpected,
610                        "Failed to convert metadata from thrift to parquet.",
611                    )
612                    .with_source(err)
613                })?);
614
615            Ok(vec![Self::parquet_to_data_file_builder(
616                self.schema,
617                parquet_metadata,
618                written_size,
619                self.out_file.location().to_string(),
620                self.nan_value_count_visitor.nan_value_counts,
621            )?])
622        }
623    }
624}
625
626impl CurrentFileStatus for ParquetWriter {
627    fn current_file_path(&self) -> String {
628        self.out_file.location().to_string()
629    }
630
631    fn current_row_num(&self) -> usize {
632        self.current_row_num
633    }
634
635    fn current_written_size(&self) -> usize {
636        if let Some(inner) = self.inner_writer.as_ref() {
637            // inner/AsyncArrowWriter contains sync and async writers
638            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
639            inner.bytes_written() + inner.in_progress_size()
640        } else {
641            // inner writer is not initialized yet
642            0
643        }
644    }
645
646    fn current_schema(&self) -> SchemaRef {
647        self.schema.clone()
648    }
649}
650
651/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
652///
653/// # NOTES
654///
655/// We keep this wrapper been used inside only.
656struct AsyncFileWriter<W: FileWrite>(W);
657
658impl<W: FileWrite> AsyncFileWriter<W> {
659    /// Create a new `AsyncFileWriter` with the given writer.
660    pub fn new(writer: W) -> Self {
661        Self(writer)
662    }
663}
664
665impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
666    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
667        Box::pin(async {
668            self.0
669                .write(bs)
670                .await
671                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
672        })
673    }
674
675    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
676        Box::pin(async {
677            self.0
678                .close()
679                .await
680                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
681        })
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use std::collections::HashMap;
688    use std::sync::Arc;
689
690    use anyhow::Result;
691    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
692    use arrow_array::types::{Float32Type, Int64Type};
693    use arrow_array::{
694        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
695        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
696    };
697    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
698    use arrow_select::concat::concat_batches;
699    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
700    use parquet::file::statistics::ValueStatistics;
701    use rust_decimal::Decimal;
702    use tempfile::TempDir;
703    use uuid::Uuid;
704
705    use super::*;
706    use crate::arrow::schema_to_arrow_schema;
707    use crate::io::FileIOBuilder;
708    use crate::spec::{PrimitiveLiteral, Struct, *};
709    use crate::writer::file_writer::location_generator::{
710        DefaultFileNameGenerator, DefaultLocationGenerator,
711    };
712    use crate::writer::tests::check_parquet_data_file;
713
714    fn schema_for_all_type() -> Schema {
715        Schema::builder()
716            .with_schema_id(1)
717            .with_fields(vec![
718                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
719                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
720                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
721                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
722                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
723                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
724                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
725                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
726                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
727                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
728                    .into(),
729                NestedField::optional(
730                    10,
731                    "timestamptz",
732                    Type::Primitive(PrimitiveType::Timestamptz),
733                )
734                .into(),
735                NestedField::optional(
736                    11,
737                    "timestamp_ns",
738                    Type::Primitive(PrimitiveType::TimestampNs),
739                )
740                .into(),
741                NestedField::optional(
742                    12,
743                    "timestamptz_ns",
744                    Type::Primitive(PrimitiveType::TimestamptzNs),
745                )
746                .into(),
747                NestedField::optional(
748                    13,
749                    "decimal",
750                    Type::Primitive(PrimitiveType::Decimal {
751                        precision: 10,
752                        scale: 5,
753                    }),
754                )
755                .into(),
756                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
757                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
758                    .into(),
759                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
760                // so we need to add a new field for it.
761                NestedField::optional(
762                    16,
763                    "decimal_38",
764                    Type::Primitive(PrimitiveType::Decimal {
765                        precision: 38,
766                        scale: 5,
767                    }),
768                )
769                .into(),
770            ])
771            .build()
772            .unwrap()
773    }
774
775    fn nested_schema_for_test() -> Schema {
776        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
777        Schema::builder()
778            .with_schema_id(1)
779            .with_fields(vec![
780                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
781                NestedField::required(
782                    1,
783                    "col1",
784                    Type::Struct(StructType::new(vec![
785                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
786                            .into(),
787                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
788                            .into(),
789                    ])),
790                )
791                .into(),
792                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
793                NestedField::required(
794                    3,
795                    "col3",
796                    Type::List(ListType::new(
797                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
798                            .into(),
799                    )),
800                )
801                .into(),
802                NestedField::required(
803                    4,
804                    "col4",
805                    Type::Struct(StructType::new(vec![
806                        NestedField::required(
807                            8,
808                            "col_4_8",
809                            Type::Struct(StructType::new(vec![
810                                NestedField::required(
811                                    9,
812                                    "col_4_8_9",
813                                    Type::Primitive(PrimitiveType::Long),
814                                )
815                                .into(),
816                            ])),
817                        )
818                        .into(),
819                    ])),
820                )
821                .into(),
822                NestedField::required(
823                    10,
824                    "col5",
825                    Type::Map(MapType::new(
826                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
827                            .into(),
828                        NestedField::required(
829                            12,
830                            "value",
831                            Type::List(ListType::new(
832                                NestedField::required(
833                                    13,
834                                    "item",
835                                    Type::Primitive(PrimitiveType::Long),
836                                )
837                                .into(),
838                            )),
839                        )
840                        .into(),
841                    )),
842                )
843                .into(),
844            ])
845            .build()
846            .unwrap()
847    }
848
849    #[tokio::test]
850    async fn test_index_by_parquet_path() {
851        let expect = HashMap::from([
852            ("col0".to_string(), 0),
853            ("col1.col_1_5".to_string(), 5),
854            ("col1.col_1_6".to_string(), 6),
855            ("col2".to_string(), 2),
856            ("col3.list.element".to_string(), 7),
857            ("col4.col_4_8.col_4_8_9".to_string(), 9),
858            ("col5.key_value.key".to_string(), 11),
859            ("col5.key_value.value.list.item".to_string(), 13),
860        ]);
861        let mut visitor = IndexByParquetPathName::new();
862        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
863        assert_eq!(visitor.name_to_id, expect);
864    }
865
866    #[tokio::test]
867    async fn test_parquet_writer() -> Result<()> {
868        let temp_dir = TempDir::new().unwrap();
869        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
870        let location_gen = DefaultLocationGenerator::with_data_location(
871            temp_dir.path().to_str().unwrap().to_string(),
872        );
873        let file_name_gen =
874            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
875
876        // prepare data
877        let schema = {
878            let fields = vec![
879                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
880                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
881                ),
882            ];
883            Arc::new(arrow_schema::Schema::new(fields))
884        };
885        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
886        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
887        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
888        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
889
890        // write data
891        let mut pw = ParquetWriterBuilder::new(
892            WriterProperties::builder()
893                .set_max_row_group_size(128)
894                .build(),
895            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
896            None,
897            file_io.clone(),
898            location_gen,
899            file_name_gen,
900        )
901        .build()
902        .await?;
903        pw.write(&to_write).await?;
904        pw.write(&to_write_null).await?;
905        let res = pw.close().await?;
906        assert_eq!(res.len(), 1);
907        let data_file = res
908            .into_iter()
909            .next()
910            .unwrap()
911            // Put dummy field for build successfully.
912            .content(crate::spec::DataContentType::Data)
913            .partition(Struct::empty())
914            .partition_spec_id(0)
915            .build()
916            .unwrap();
917
918        // check data file
919        assert_eq!(data_file.record_count(), 2048);
920        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
921        assert_eq!(
922            *data_file.lower_bounds(),
923            HashMap::from([(0, Datum::long(0))])
924        );
925        assert_eq!(
926            *data_file.upper_bounds(),
927            HashMap::from([(0, Datum::long(1023))])
928        );
929        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
930
931        // check the written file
932        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
933        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
934
935        Ok(())
936    }
937
938    #[tokio::test]
939    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
940        let temp_dir = TempDir::new().unwrap();
941        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
942        let location_gen = DefaultLocationGenerator::with_data_location(
943            temp_dir.path().to_str().unwrap().to_string(),
944        );
945        let file_name_gen =
946            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
947
948        // prepare data
949        let schema = nested_schema_for_test();
950        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
951        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
952        let col1 = Arc::new(StructArray::new(
953            {
954                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
955                    fields.clone()
956                } else {
957                    unreachable!()
958                }
959            },
960            vec![
961                Arc::new(Int64Array::from_iter_values(0..1024)),
962                Arc::new(Int64Array::from_iter_values(0..1024)),
963            ],
964            None,
965        ));
966        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
967            (0..1024).map(|n| n.to_string()),
968        )) as ArrayRef;
969        let col3 = Arc::new({
970            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
971                (0..1024).map(|n| Some(vec![Some(n)])),
972            )
973            .into_parts();
974            arrow_array::ListArray::new(
975                {
976                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
977                        field.clone()
978                    } else {
979                        unreachable!()
980                    }
981                },
982                list_parts.1,
983                list_parts.2,
984                list_parts.3,
985            )
986        }) as ArrayRef;
987        let col4 = Arc::new(StructArray::new(
988            {
989                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
990                    fields.clone()
991                } else {
992                    unreachable!()
993                }
994            },
995            vec![Arc::new(StructArray::new(
996                {
997                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
998                        if let DataType::Struct(fields) = fields[0].data_type() {
999                            fields.clone()
1000                        } else {
1001                            unreachable!()
1002                        }
1003                    } else {
1004                        unreachable!()
1005                    }
1006                },
1007                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
1008                None,
1009            ))],
1010            None,
1011        ));
1012        let col5 = Arc::new({
1013            let mut map_array_builder = arrow_array::builder::MapBuilder::new(
1014                None,
1015                arrow_array::builder::StringBuilder::new(),
1016                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
1017                    Int64Type,
1018                >::new()),
1019            );
1020            for i in 0..1024 {
1021                map_array_builder.keys().append_value(i.to_string());
1022                map_array_builder
1023                    .values()
1024                    .append_value(vec![Some(i as i64); i + 1]);
1025                map_array_builder.append(true)?;
1026            }
1027            let (_, offset_buffer, struct_array, null_buffer, ordered) =
1028                map_array_builder.finish().into_parts();
1029            let struct_array = {
1030                let (_, mut arrays, nulls) = struct_array.into_parts();
1031                let list_array = {
1032                    let list_array = arrays[1]
1033                        .as_any()
1034                        .downcast_ref::<ListArray>()
1035                        .unwrap()
1036                        .clone();
1037                    let (_, offsets, array, nulls) = list_array.into_parts();
1038                    let list_field = {
1039                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1040                            if let DataType::Struct(fields) = map_field.data_type() {
1041                                if let DataType::List(list_field) = fields[1].data_type() {
1042                                    list_field.clone()
1043                                } else {
1044                                    unreachable!()
1045                                }
1046                            } else {
1047                                unreachable!()
1048                            }
1049                        } else {
1050                            unreachable!()
1051                        }
1052                    };
1053                    ListArray::new(list_field, offsets, array, nulls)
1054                };
1055                arrays[1] = Arc::new(list_array) as ArrayRef;
1056                StructArray::new(
1057                    {
1058                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1059                            if let DataType::Struct(fields) = map_field.data_type() {
1060                                fields.clone()
1061                            } else {
1062                                unreachable!()
1063                            }
1064                        } else {
1065                            unreachable!()
1066                        }
1067                    },
1068                    arrays,
1069                    nulls,
1070                )
1071            };
1072            arrow_array::MapArray::new(
1073                {
1074                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1075                        map_field.clone()
1076                    } else {
1077                        unreachable!()
1078                    }
1079                },
1080                offset_buffer,
1081                struct_array,
1082                null_buffer,
1083                ordered,
1084            )
1085        }) as ArrayRef;
1086        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1087            col0, col1, col2, col3, col4, col5,
1088        ])
1089        .unwrap();
1090
1091        // write data
1092        let mut pw = ParquetWriterBuilder::new(
1093            WriterProperties::builder().build(),
1094            Arc::new(schema),
1095            None,
1096            file_io.clone(),
1097            location_gen,
1098            file_name_gen,
1099        )
1100        .build()
1101        .await?;
1102        pw.write(&to_write).await?;
1103        let res = pw.close().await?;
1104        assert_eq!(res.len(), 1);
1105        let data_file = res
1106            .into_iter()
1107            .next()
1108            .unwrap()
1109            // Put dummy field for build successfully.
1110            .content(crate::spec::DataContentType::Data)
1111            .partition(Struct::empty())
1112            .partition_spec_id(0)
1113            .build()
1114            .unwrap();
1115
1116        // check data file
1117        assert_eq!(data_file.record_count(), 1024);
1118        assert_eq!(
1119            *data_file.value_counts(),
1120            HashMap::from([
1121                (0, 1024),
1122                (5, 1024),
1123                (6, 1024),
1124                (2, 1024),
1125                (7, 1024),
1126                (9, 1024),
1127                (11, 1024),
1128                (13, (1..1025).sum()),
1129            ])
1130        );
1131        assert_eq!(
1132            *data_file.lower_bounds(),
1133            HashMap::from([
1134                (0, Datum::long(0)),
1135                (5, Datum::long(0)),
1136                (6, Datum::long(0)),
1137                (2, Datum::string("0")),
1138                (7, Datum::long(0)),
1139                (9, Datum::long(0)),
1140                (11, Datum::string("0")),
1141                (13, Datum::long(0))
1142            ])
1143        );
1144        assert_eq!(
1145            *data_file.upper_bounds(),
1146            HashMap::from([
1147                (0, Datum::long(1023)),
1148                (5, Datum::long(1023)),
1149                (6, Datum::long(1023)),
1150                (2, Datum::string("999")),
1151                (7, Datum::long(1023)),
1152                (9, Datum::long(1023)),
1153                (11, Datum::string("999")),
1154                (13, Datum::long(1023))
1155            ])
1156        );
1157
1158        // check the written file
1159        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1160
1161        Ok(())
1162    }
1163
1164    #[tokio::test]
1165    async fn test_all_type_for_write() -> Result<()> {
1166        let temp_dir = TempDir::new().unwrap();
1167        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1168        let loccation_gen = DefaultLocationGenerator::with_data_location(
1169            temp_dir.path().to_str().unwrap().to_string(),
1170        );
1171        let file_name_gen =
1172            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1173
1174        // prepare data
1175        // generate iceberg schema for all type
1176        let schema = schema_for_all_type();
1177        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1178        let col0 = Arc::new(BooleanArray::from(vec![
1179            Some(true),
1180            Some(false),
1181            None,
1182            Some(true),
1183        ])) as ArrayRef;
1184        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1185        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1186        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1187            Some(0.5),
1188            Some(2.0),
1189            None,
1190            Some(3.5),
1191        ])) as ArrayRef;
1192        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1193            Some(0.5),
1194            Some(2.0),
1195            None,
1196            Some(3.5),
1197        ])) as ArrayRef;
1198        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1199            Some("a"),
1200            Some("b"),
1201            None,
1202            Some("d"),
1203        ])) as ArrayRef;
1204        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1205            Some(b"one"),
1206            None,
1207            Some(b""),
1208            Some(b"zzzz"),
1209        ])) as ArrayRef;
1210        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1211            Some(0),
1212            Some(1),
1213            None,
1214            Some(3),
1215        ])) as ArrayRef;
1216        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1217            Some(0),
1218            Some(1),
1219            None,
1220            Some(3),
1221        ])) as ArrayRef;
1222        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1223            Some(0),
1224            Some(1),
1225            None,
1226            Some(3),
1227        ])) as ArrayRef;
1228        let col10 = Arc::new(
1229            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1230                .with_timezone_utc(),
1231        ) as ArrayRef;
1232        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1233            Some(0),
1234            Some(1),
1235            None,
1236            Some(3),
1237        ])) as ArrayRef;
1238        let col12 = Arc::new(
1239            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1240                .with_timezone_utc(),
1241        ) as ArrayRef;
1242        let col13 = Arc::new(
1243            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1244                .with_precision_and_scale(10, 5)
1245                .unwrap(),
1246        ) as ArrayRef;
1247        let col14 = Arc::new(
1248            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1249                vec![
1250                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1251                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1252                    None,
1253                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1254                ]
1255                .into_iter(),
1256                16,
1257            )
1258            .unwrap(),
1259        ) as ArrayRef;
1260        let col15 = Arc::new(
1261            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1262                vec![
1263                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1264                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1265                    None,
1266                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1267                ]
1268                .into_iter(),
1269                10,
1270            )
1271            .unwrap(),
1272        ) as ArrayRef;
1273        let col16 = Arc::new(
1274            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1275                .with_precision_and_scale(38, 5)
1276                .unwrap(),
1277        ) as ArrayRef;
1278        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1279            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1280            col14, col15, col16,
1281        ])
1282        .unwrap();
1283
1284        // write data
1285        let mut pw = ParquetWriterBuilder::new(
1286            WriterProperties::builder().build(),
1287            Arc::new(schema),
1288            None,
1289            file_io.clone(),
1290            loccation_gen,
1291            file_name_gen,
1292        )
1293        .build()
1294        .await?;
1295        pw.write(&to_write).await?;
1296        let res = pw.close().await?;
1297        assert_eq!(res.len(), 1);
1298        let data_file = res
1299            .into_iter()
1300            .next()
1301            .unwrap()
1302            // Put dummy field for build successfully.
1303            .content(crate::spec::DataContentType::Data)
1304            .partition(Struct::empty())
1305            .partition_spec_id(0)
1306            .build()
1307            .unwrap();
1308
1309        // check data file
1310        assert_eq!(data_file.record_count(), 4);
1311        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1312        assert!(
1313            data_file
1314                .null_value_counts()
1315                .iter()
1316                .all(|(_, &v)| { v == 1 })
1317        );
1318        assert_eq!(
1319            *data_file.lower_bounds(),
1320            HashMap::from([
1321                (0, Datum::bool(false)),
1322                (1, Datum::int(1)),
1323                (2, Datum::long(1)),
1324                (3, Datum::float(0.5)),
1325                (4, Datum::double(0.5)),
1326                (5, Datum::string("a")),
1327                (6, Datum::binary(vec![])),
1328                (7, Datum::date(0)),
1329                (8, Datum::time_micros(0).unwrap()),
1330                (9, Datum::timestamp_micros(0)),
1331                (10, Datum::timestamptz_micros(0)),
1332                (11, Datum::timestamp_nanos(0)),
1333                (12, Datum::timestamptz_nanos(0)),
1334                (
1335                    13,
1336                    Datum::new(
1337                        PrimitiveType::Decimal {
1338                            precision: 10,
1339                            scale: 5
1340                        },
1341                        PrimitiveLiteral::Int128(1)
1342                    )
1343                ),
1344                (14, Datum::uuid(Uuid::from_u128(0))),
1345                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1346                (
1347                    16,
1348                    Datum::new(
1349                        PrimitiveType::Decimal {
1350                            precision: 38,
1351                            scale: 5
1352                        },
1353                        PrimitiveLiteral::Int128(1)
1354                    )
1355                ),
1356            ])
1357        );
1358        assert_eq!(
1359            *data_file.upper_bounds(),
1360            HashMap::from([
1361                (0, Datum::bool(true)),
1362                (1, Datum::int(4)),
1363                (2, Datum::long(4)),
1364                (3, Datum::float(3.5)),
1365                (4, Datum::double(3.5)),
1366                (5, Datum::string("d")),
1367                (6, Datum::binary(vec![122, 122, 122, 122])),
1368                (7, Datum::date(3)),
1369                (8, Datum::time_micros(3).unwrap()),
1370                (9, Datum::timestamp_micros(3)),
1371                (10, Datum::timestamptz_micros(3)),
1372                (11, Datum::timestamp_nanos(3)),
1373                (12, Datum::timestamptz_nanos(3)),
1374                (
1375                    13,
1376                    Datum::new(
1377                        PrimitiveType::Decimal {
1378                            precision: 10,
1379                            scale: 5
1380                        },
1381                        PrimitiveLiteral::Int128(100)
1382                    )
1383                ),
1384                (14, Datum::uuid(Uuid::from_u128(3))),
1385                (
1386                    15,
1387                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1388                ),
1389                (
1390                    16,
1391                    Datum::new(
1392                        PrimitiveType::Decimal {
1393                            precision: 38,
1394                            scale: 5
1395                        },
1396                        PrimitiveLiteral::Int128(100)
1397                    )
1398                ),
1399            ])
1400        );
1401
1402        // check the written file
1403        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1404
1405        Ok(())
1406    }
1407
1408    #[tokio::test]
1409    async fn test_decimal_bound() -> Result<()> {
1410        let temp_dir = TempDir::new().unwrap();
1411        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1412        let loccation_gen = DefaultLocationGenerator::with_data_location(
1413            temp_dir.path().to_str().unwrap().to_string(),
1414        );
1415        let file_name_gen =
1416            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1417
1418        // test 1.1 and 2.2
1419        let schema = Arc::new(
1420            Schema::builder()
1421                .with_fields(vec![
1422                    NestedField::optional(
1423                        0,
1424                        "decimal",
1425                        Type::Primitive(PrimitiveType::Decimal {
1426                            precision: 28,
1427                            scale: 10,
1428                        }),
1429                    )
1430                    .into(),
1431                ])
1432                .build()
1433                .unwrap(),
1434        );
1435        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1436        let mut pw = ParquetWriterBuilder::new(
1437            WriterProperties::builder().build(),
1438            schema.clone(),
1439            None,
1440            file_io.clone(),
1441            loccation_gen.clone(),
1442            file_name_gen.clone(),
1443        )
1444        .build()
1445        .await?;
1446        let col0 = Arc::new(
1447            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1448                .with_data_type(DataType::Decimal128(28, 10)),
1449        ) as ArrayRef;
1450        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1451        pw.write(&to_write).await?;
1452        let res = pw.close().await?;
1453        assert_eq!(res.len(), 1);
1454        let data_file = res
1455            .into_iter()
1456            .next()
1457            .unwrap()
1458            .content(crate::spec::DataContentType::Data)
1459            .partition(Struct::empty())
1460            .partition_spec_id(0)
1461            .build()
1462            .unwrap();
1463        assert_eq!(
1464            data_file.upper_bounds().get(&0),
1465            Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1466                .as_ref()
1467        );
1468        assert_eq!(
1469            data_file.lower_bounds().get(&0),
1470            Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1471                .as_ref()
1472        );
1473
1474        // test -1.1 and -2.2
1475        let schema = Arc::new(
1476            Schema::builder()
1477                .with_fields(vec![
1478                    NestedField::optional(
1479                        0,
1480                        "decimal",
1481                        Type::Primitive(PrimitiveType::Decimal {
1482                            precision: 28,
1483                            scale: 10,
1484                        }),
1485                    )
1486                    .into(),
1487                ])
1488                .build()
1489                .unwrap(),
1490        );
1491        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1492        let mut pw = ParquetWriterBuilder::new(
1493            WriterProperties::builder().build(),
1494            schema.clone(),
1495            None,
1496            file_io.clone(),
1497            loccation_gen.clone(),
1498            file_name_gen.clone(),
1499        )
1500        .build()
1501        .await?;
1502        let col0 = Arc::new(
1503            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1504                .with_data_type(DataType::Decimal128(28, 10)),
1505        ) as ArrayRef;
1506        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1507        pw.write(&to_write).await?;
1508        let res = pw.close().await?;
1509        assert_eq!(res.len(), 1);
1510        let data_file = res
1511            .into_iter()
1512            .next()
1513            .unwrap()
1514            .content(crate::spec::DataContentType::Data)
1515            .partition(Struct::empty())
1516            .partition_spec_id(0)
1517            .build()
1518            .unwrap();
1519        assert_eq!(
1520            data_file.upper_bounds().get(&0),
1521            Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1522                .as_ref()
1523        );
1524        assert_eq!(
1525            data_file.lower_bounds().get(&0),
1526            Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1527                .as_ref()
1528        );
1529
1530        // test max and min of rust_decimal
1531        let decimal_max = Decimal::MAX;
1532        let decimal_min = Decimal::MIN;
1533        assert_eq!(decimal_max.scale(), decimal_min.scale());
1534        let schema = Arc::new(
1535            Schema::builder()
1536                .with_fields(vec![
1537                    NestedField::optional(
1538                        0,
1539                        "decimal",
1540                        Type::Primitive(PrimitiveType::Decimal {
1541                            precision: 38,
1542                            scale: decimal_max.scale(),
1543                        }),
1544                    )
1545                    .into(),
1546                ])
1547                .build()
1548                .unwrap(),
1549        );
1550        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1551        let mut pw = ParquetWriterBuilder::new(
1552            WriterProperties::builder().build(),
1553            schema,
1554            None,
1555            file_io.clone(),
1556            loccation_gen,
1557            file_name_gen,
1558        )
1559        .build()
1560        .await?;
1561        let col0 = Arc::new(
1562            Decimal128Array::from(vec![
1563                Some(decimal_max.mantissa()),
1564                Some(decimal_min.mantissa()),
1565            ])
1566            .with_data_type(DataType::Decimal128(38, 0)),
1567        ) as ArrayRef;
1568        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1569        pw.write(&to_write).await?;
1570        let res = pw.close().await?;
1571        assert_eq!(res.len(), 1);
1572        let data_file = res
1573            .into_iter()
1574            .next()
1575            .unwrap()
1576            .content(crate::spec::DataContentType::Data)
1577            .partition(Struct::empty())
1578            .partition_spec_id(0)
1579            .build()
1580            .unwrap();
1581        assert_eq!(
1582            data_file.upper_bounds().get(&0),
1583            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1584        );
1585        assert_eq!(
1586            data_file.lower_bounds().get(&0),
1587            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1588        );
1589
1590        // test max and min for scale 38
1591        // # TODO
1592        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1593        // let schema = Arc::new(
1594        //     Schema::builder()
1595        //         .with_fields(vec![NestedField::optional(
1596        //             0,
1597        //             "decimal",
1598        //             Type::Primitive(PrimitiveType::Decimal {
1599        //                 precision: 38,
1600        //                 scale: 0,
1601        //             }),
1602        //         )
1603        //         .into()])
1604        //         .build()
1605        //         .unwrap(),
1606        // );
1607        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1608        // let mut pw = ParquetWriterBuilder::new(
1609        //     WriterProperties::builder().build(),
1610        //     schema,
1611        //     file_io.clone(),
1612        //     loccation_gen,
1613        //     file_name_gen,
1614        // )
1615        // .build()
1616        // .await?;
1617        // let col0 = Arc::new(
1618        //     Decimal128Array::from(vec![
1619        //         Some(99999999999999999999999999999999999999_i128),
1620        //         Some(-99999999999999999999999999999999999999_i128),
1621        //     ])
1622        //     .with_data_type(DataType::Decimal128(38, 0)),
1623        // ) as ArrayRef;
1624        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1625        // pw.write(&to_write).await?;
1626        // let res = pw.close().await?;
1627        // assert_eq!(res.len(), 1);
1628        // let data_file = res
1629        //     .into_iter()
1630        //     .next()
1631        //     .unwrap()
1632        //     .content(crate::spec::DataContentType::Data)
1633        //     .partition(Struct::empty())
1634        //     .build()
1635        //     .unwrap();
1636        // assert_eq!(
1637        //     data_file.upper_bounds().get(&0),
1638        //     Some(Datum::new(
1639        //         PrimitiveType::Decimal {
1640        //             precision: 38,
1641        //             scale: 0
1642        //         },
1643        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1644        //     ))
1645        //     .as_ref()
1646        // );
1647        // assert_eq!(
1648        //     data_file.lower_bounds().get(&0),
1649        //     Some(Datum::new(
1650        //         PrimitiveType::Decimal {
1651        //             precision: 38,
1652        //             scale: 0
1653        //         },
1654        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1655        //     ))
1656        //     .as_ref()
1657        // );
1658
1659        Ok(())
1660    }
1661
1662    #[tokio::test]
1663    async fn test_empty_write() -> Result<()> {
1664        let temp_dir = TempDir::new().unwrap();
1665        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1666        let location_gen = DefaultLocationGenerator::with_data_location(
1667            temp_dir.path().to_str().unwrap().to_string(),
1668        );
1669        let file_name_gen =
1670            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1671
1672        // Test that file will create if data to write
1673        let schema = {
1674            let fields = vec![
1675                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1676                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1677                ),
1678            ];
1679            Arc::new(arrow_schema::Schema::new(fields))
1680        };
1681        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1682        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1683        let mut pw = ParquetWriterBuilder::new(
1684            WriterProperties::builder().build(),
1685            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1686            None,
1687            file_io.clone(),
1688            location_gen.clone(),
1689            file_name_gen,
1690        )
1691        .build()
1692        .await?;
1693        pw.write(&to_write).await?;
1694        let file_path = pw.out_file.location().to_string();
1695        pw.close().await.unwrap();
1696        assert!(file_io.exists(file_path).await.unwrap());
1697
1698        // Test that file will not create if no data to write
1699        let file_name_gen =
1700            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1701        let pw = ParquetWriterBuilder::new(
1702            WriterProperties::builder().build(),
1703            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1704            None,
1705            file_io.clone(),
1706            location_gen,
1707            file_name_gen,
1708        )
1709        .build()
1710        .await?;
1711        let file_path = pw.out_file.location().to_string();
1712        pw.close().await.unwrap();
1713        assert!(!file_io.exists(file_path).await.unwrap());
1714
1715        Ok(())
1716    }
1717
1718    #[tokio::test]
1719    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1720        let temp_dir = TempDir::new().unwrap();
1721        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1722        let location_gen = DefaultLocationGenerator::with_data_location(
1723            temp_dir.path().to_str().unwrap().to_string(),
1724        );
1725        let file_name_gen =
1726            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1727
1728        // prepare data
1729        let arrow_schema = {
1730            let fields = vec![
1731                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1732                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1733                ),
1734                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1735                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1736                ),
1737            ];
1738            Arc::new(arrow_schema::Schema::new(fields))
1739        };
1740
1741        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1742            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1743            None,
1744        )) as ArrayRef;
1745
1746        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1747            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1748            None,
1749        )) as ArrayRef;
1750
1751        let to_write =
1752            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1753
1754        // write data
1755        let mut pw = ParquetWriterBuilder::new(
1756            WriterProperties::builder().build(),
1757            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1758            None,
1759            file_io.clone(),
1760            location_gen,
1761            file_name_gen,
1762        )
1763        .build()
1764        .await?;
1765
1766        pw.write(&to_write).await?;
1767        let res = pw.close().await?;
1768        assert_eq!(res.len(), 1);
1769        let data_file = res
1770            .into_iter()
1771            .next()
1772            .unwrap()
1773            // Put dummy field for build successfully.
1774            .content(crate::spec::DataContentType::Data)
1775            .partition(Struct::empty())
1776            .partition_spec_id(0)
1777            .build()
1778            .unwrap();
1779
1780        // check data file
1781        assert_eq!(data_file.record_count(), 4);
1782        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1783        assert_eq!(
1784            *data_file.lower_bounds(),
1785            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1786        );
1787        assert_eq!(
1788            *data_file.upper_bounds(),
1789            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1790        );
1791        assert_eq!(
1792            *data_file.null_value_counts(),
1793            HashMap::from([(0, 0), (1, 0)])
1794        );
1795        assert_eq!(
1796            *data_file.nan_value_counts(),
1797            HashMap::from([(0, 1), (1, 1)])
1798        );
1799
1800        // check the written file
1801        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1802        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1803
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1809        let temp_dir = TempDir::new().unwrap();
1810        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1811        let location_gen = DefaultLocationGenerator::with_data_location(
1812            temp_dir.path().to_str().unwrap().to_string(),
1813        );
1814        let file_name_gen =
1815            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1816
1817        let schema_struct_float_fields = Fields::from(vec![
1818            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1819                PARQUET_FIELD_ID_META_KEY.to_string(),
1820                "4".to_string(),
1821            )])),
1822        ]);
1823
1824        let schema_struct_nested_float_fields = Fields::from(vec![
1825            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1826                PARQUET_FIELD_ID_META_KEY.to_string(),
1827                "7".to_string(),
1828            )])),
1829        ]);
1830
1831        let schema_struct_nested_fields = Fields::from(vec![
1832            Field::new(
1833                "col6",
1834                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1835                false,
1836            )
1837            .with_metadata(HashMap::from([(
1838                PARQUET_FIELD_ID_META_KEY.to_string(),
1839                "6".to_string(),
1840            )])),
1841        ]);
1842
1843        // prepare data
1844        let arrow_schema = {
1845            let fields = vec![
1846                Field::new(
1847                    "col3",
1848                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1849                    false,
1850                )
1851                .with_metadata(HashMap::from([(
1852                    PARQUET_FIELD_ID_META_KEY.to_string(),
1853                    "3".to_string(),
1854                )])),
1855                Field::new(
1856                    "col5",
1857                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1858                    false,
1859                )
1860                .with_metadata(HashMap::from([(
1861                    PARQUET_FIELD_ID_META_KEY.to_string(),
1862                    "5".to_string(),
1863                )])),
1864            ];
1865            Arc::new(arrow_schema::Schema::new(fields))
1866        };
1867
1868        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1869            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1870            None,
1871        )) as ArrayRef;
1872
1873        let struct_float_field_col = Arc::new(StructArray::new(
1874            schema_struct_float_fields,
1875            vec![float_32_col.clone()],
1876            None,
1877        )) as ArrayRef;
1878
1879        let struct_nested_float_field_col = Arc::new(StructArray::new(
1880            schema_struct_nested_fields,
1881            vec![Arc::new(StructArray::new(
1882                schema_struct_nested_float_fields,
1883                vec![float_32_col.clone()],
1884                None,
1885            )) as ArrayRef],
1886            None,
1887        )) as ArrayRef;
1888
1889        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1890            struct_float_field_col,
1891            struct_nested_float_field_col,
1892        ])
1893        .unwrap();
1894
1895        // write data
1896        let mut pw = ParquetWriterBuilder::new(
1897            WriterProperties::builder().build(),
1898            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1899            None,
1900            file_io.clone(),
1901            location_gen,
1902            file_name_gen,
1903        )
1904        .build()
1905        .await?;
1906
1907        pw.write(&to_write).await?;
1908        let res = pw.close().await?;
1909        assert_eq!(res.len(), 1);
1910        let data_file = res
1911            .into_iter()
1912            .next()
1913            .unwrap()
1914            // Put dummy field for build successfully.
1915            .content(crate::spec::DataContentType::Data)
1916            .partition(Struct::empty())
1917            .partition_spec_id(0)
1918            .build()
1919            .unwrap();
1920
1921        // check data file
1922        assert_eq!(data_file.record_count(), 4);
1923        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1924        assert_eq!(
1925            *data_file.lower_bounds(),
1926            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1927        );
1928        assert_eq!(
1929            *data_file.upper_bounds(),
1930            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1931        );
1932        assert_eq!(
1933            *data_file.null_value_counts(),
1934            HashMap::from([(4, 0), (7, 0)])
1935        );
1936        assert_eq!(
1937            *data_file.nan_value_counts(),
1938            HashMap::from([(4, 1), (7, 1)])
1939        );
1940
1941        // check the written file
1942        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1943        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1944
1945        Ok(())
1946    }
1947
1948    #[tokio::test]
1949    async fn test_nan_val_cnts_list_type() -> Result<()> {
1950        let temp_dir = TempDir::new().unwrap();
1951        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1952        let location_gen = DefaultLocationGenerator::with_data_location(
1953            temp_dir.path().to_str().unwrap().to_string(),
1954        );
1955        let file_name_gen =
1956            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1957
1958        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1959            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1960        );
1961
1962        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1963            .with_metadata(HashMap::from([(
1964                PARQUET_FIELD_ID_META_KEY.to_string(),
1965                "4".to_string(),
1966            )]));
1967
1968        let schema_struct_list_field = Fields::from(vec![
1969            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1970                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1971            ),
1972        ]);
1973
1974        let arrow_schema = {
1975            let fields = vec![
1976                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1977                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1978                ),
1979                Field::new_struct("col1", schema_struct_list_field.clone(), true)
1980                    .with_metadata(HashMap::from([(
1981                        PARQUET_FIELD_ID_META_KEY.to_string(),
1982                        "2".to_string(),
1983                    )]))
1984                    .clone(),
1985                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
1986                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1987                // ).clone(),
1988            ];
1989            Arc::new(arrow_schema::Schema::new(fields))
1990        };
1991
1992        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1993            Some(1.0_f32),
1994            Some(f32::NAN),
1995            Some(2.0),
1996            Some(2.0),
1997        ])])
1998        .into_parts();
1999
2000        let list_float_field_col = Arc::new({
2001            let list_parts = list_parts.clone();
2002            ListArray::new(
2003                {
2004                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
2005                        field.clone()
2006                    } else {
2007                        unreachable!()
2008                    }
2009                },
2010                list_parts.1,
2011                list_parts.2,
2012                list_parts.3,
2013            )
2014        }) as ArrayRef;
2015
2016        let struct_list_fields_schema =
2017            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
2018                fields.clone()
2019            } else {
2020                unreachable!()
2021            };
2022
2023        let struct_list_float_field_col = Arc::new({
2024            ListArray::new(
2025                {
2026                    if let DataType::List(field) = struct_list_fields_schema
2027                        .first()
2028                        .expect("could not find first list field")
2029                        .data_type()
2030                    {
2031                        field.clone()
2032                    } else {
2033                        unreachable!()
2034                    }
2035                },
2036                list_parts.1,
2037                list_parts.2,
2038                list_parts.3,
2039            )
2040        }) as ArrayRef;
2041
2042        let struct_list_float_field_col = Arc::new(StructArray::new(
2043            struct_list_fields_schema,
2044            vec![struct_list_float_field_col.clone()],
2045            None,
2046        )) as ArrayRef;
2047
2048        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2049            list_float_field_col,
2050            struct_list_float_field_col,
2051            // large_list_float_field_col,
2052        ])
2053        .expect("Could not form record batch");
2054
2055        // write data
2056        let mut pw = ParquetWriterBuilder::new(
2057            WriterProperties::builder().build(),
2058            Arc::new(
2059                to_write
2060                    .schema()
2061                    .as_ref()
2062                    .try_into()
2063                    .expect("Could not convert iceberg schema"),
2064            ),
2065            None,
2066            file_io.clone(),
2067            location_gen,
2068            file_name_gen,
2069        )
2070        .build()
2071        .await?;
2072
2073        pw.write(&to_write).await?;
2074        let res = pw.close().await?;
2075        assert_eq!(res.len(), 1);
2076        let data_file = res
2077            .into_iter()
2078            .next()
2079            .unwrap()
2080            .content(crate::spec::DataContentType::Data)
2081            .partition(Struct::empty())
2082            .partition_spec_id(0)
2083            .build()
2084            .unwrap();
2085
2086        // check data file
2087        assert_eq!(data_file.record_count(), 1);
2088        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2089        assert_eq!(
2090            *data_file.lower_bounds(),
2091            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2092        );
2093        assert_eq!(
2094            *data_file.upper_bounds(),
2095            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2096        );
2097        assert_eq!(
2098            *data_file.null_value_counts(),
2099            HashMap::from([(1, 0), (4, 0)])
2100        );
2101        assert_eq!(
2102            *data_file.nan_value_counts(),
2103            HashMap::from([(1, 1), (4, 1)])
2104        );
2105
2106        // check the written file
2107        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2108        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2109
2110        Ok(())
2111    }
2112
2113    macro_rules! construct_map_arr {
2114        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2115            let int_builder = Int32Builder::new();
2116            let float_builder = Float32Builder::with_capacity(4);
2117            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2118            builder.keys().append_value(1);
2119            builder.values().append_value(1.0_f32);
2120            builder.append(true).unwrap();
2121            builder.keys().append_value(2);
2122            builder.values().append_value(f32::NAN);
2123            builder.append(true).unwrap();
2124            builder.keys().append_value(3);
2125            builder.values().append_value(2.0);
2126            builder.append(true).unwrap();
2127            builder.keys().append_value(4);
2128            builder.values().append_value(2.0);
2129            builder.append(true).unwrap();
2130            let array = builder.finish();
2131
2132            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2133            let new_struct_fields_schema =
2134                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2135
2136            let entries = {
2137                let (_, arrays, nulls) = entries.into_parts();
2138                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2139            };
2140
2141            let field = Arc::new(Field::new(
2142                DEFAULT_MAP_FIELD_NAME,
2143                DataType::Struct(new_struct_fields_schema),
2144                false,
2145            ));
2146
2147            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2148        }};
2149    }
2150
2151    #[tokio::test]
2152    async fn test_nan_val_cnts_map_type() -> Result<()> {
2153        let temp_dir = TempDir::new().unwrap();
2154        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2155        let location_gen = DefaultLocationGenerator::with_data_location(
2156            temp_dir.path().to_str().unwrap().to_string(),
2157        );
2158        let file_name_gen =
2159            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2160
2161        let map_key_field_schema =
2162            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2163                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2164            ]));
2165
2166        let map_value_field_schema =
2167            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2168                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2169            ));
2170
2171        let struct_map_key_field_schema =
2172            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2173                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2174            ]));
2175
2176        let struct_map_value_field_schema =
2177            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2178                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2179            ));
2180
2181        let schema_struct_map_field = Fields::from(vec![
2182            Field::new_map(
2183                "col3",
2184                DEFAULT_MAP_FIELD_NAME,
2185                struct_map_key_field_schema.clone(),
2186                struct_map_value_field_schema.clone(),
2187                false,
2188                false,
2189            )
2190            .with_metadata(HashMap::from([(
2191                PARQUET_FIELD_ID_META_KEY.to_string(),
2192                "5".to_string(),
2193            )])),
2194        ]);
2195
2196        let arrow_schema = {
2197            let fields = vec![
2198                Field::new_map(
2199                    "col0",
2200                    DEFAULT_MAP_FIELD_NAME,
2201                    map_key_field_schema.clone(),
2202                    map_value_field_schema.clone(),
2203                    false,
2204                    false,
2205                )
2206                .with_metadata(HashMap::from([(
2207                    PARQUET_FIELD_ID_META_KEY.to_string(),
2208                    "0".to_string(),
2209                )])),
2210                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2211                    .with_metadata(HashMap::from([(
2212                        PARQUET_FIELD_ID_META_KEY.to_string(),
2213                        "3".to_string(),
2214                    )]))
2215                    .clone(),
2216            ];
2217            Arc::new(arrow_schema::Schema::new(fields))
2218        };
2219
2220        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2221
2222        let struct_map_arr =
2223            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2224
2225        let struct_list_float_field_col = Arc::new(StructArray::new(
2226            schema_struct_map_field,
2227            vec![struct_map_arr],
2228            None,
2229        )) as ArrayRef;
2230
2231        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2232            map_array,
2233            struct_list_float_field_col,
2234        ])
2235        .expect("Could not form record batch");
2236
2237        // write data
2238        let mut pw = ParquetWriterBuilder::new(
2239            WriterProperties::builder().build(),
2240            Arc::new(
2241                to_write
2242                    .schema()
2243                    .as_ref()
2244                    .try_into()
2245                    .expect("Could not convert iceberg schema"),
2246            ),
2247            None,
2248            file_io.clone(),
2249            location_gen,
2250            file_name_gen,
2251        )
2252        .build()
2253        .await?;
2254
2255        pw.write(&to_write).await?;
2256        let res = pw.close().await?;
2257        assert_eq!(res.len(), 1);
2258        let data_file = res
2259            .into_iter()
2260            .next()
2261            .unwrap()
2262            .content(crate::spec::DataContentType::Data)
2263            .partition(Struct::empty())
2264            .partition_spec_id(0)
2265            .build()
2266            .unwrap();
2267
2268        // check data file
2269        assert_eq!(data_file.record_count(), 4);
2270        assert_eq!(
2271            *data_file.value_counts(),
2272            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2273        );
2274        assert_eq!(
2275            *data_file.lower_bounds(),
2276            HashMap::from([
2277                (1, Datum::int(1)),
2278                (2, Datum::float(1.0)),
2279                (6, Datum::int(1)),
2280                (7, Datum::float(1.0))
2281            ])
2282        );
2283        assert_eq!(
2284            *data_file.upper_bounds(),
2285            HashMap::from([
2286                (1, Datum::int(4)),
2287                (2, Datum::float(2.0)),
2288                (6, Datum::int(4)),
2289                (7, Datum::float(2.0))
2290            ])
2291        );
2292        assert_eq!(
2293            *data_file.null_value_counts(),
2294            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2295        );
2296        assert_eq!(
2297            *data_file.nan_value_counts(),
2298            HashMap::from([(2, 1), (7, 1)])
2299        );
2300
2301        // check the written file
2302        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2303        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2304
2305        Ok(())
2306    }
2307
2308    #[tokio::test]
2309    async fn test_write_empty_parquet_file() {
2310        let temp_dir = TempDir::new().unwrap();
2311        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2312        let location_gen = DefaultLocationGenerator::with_data_location(
2313            temp_dir.path().to_str().unwrap().to_string(),
2314        );
2315        let file_name_gen =
2316            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2317
2318        // write data
2319        let pw = ParquetWriterBuilder::new(
2320            WriterProperties::builder().build(),
2321            Arc::new(
2322                Schema::builder()
2323                    .with_schema_id(1)
2324                    .with_fields(vec![
2325                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2326                            .with_id(0)
2327                            .into(),
2328                    ])
2329                    .build()
2330                    .expect("Failed to create schema"),
2331            ),
2332            None,
2333            file_io.clone(),
2334            location_gen,
2335            file_name_gen,
2336        )
2337        .build()
2338        .await
2339        .unwrap();
2340
2341        let res = pw.close().await.unwrap();
2342        assert_eq!(res.len(), 0);
2343
2344        // Check that file should have been deleted.
2345        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2346    }
2347
2348    #[test]
2349    fn test_min_max_aggregator() {
2350        let schema = Arc::new(
2351            Schema::builder()
2352                .with_schema_id(1)
2353                .with_fields(vec![
2354                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2355                        .with_id(0)
2356                        .into(),
2357                ])
2358                .build()
2359                .expect("Failed to create schema"),
2360        );
2361        let mut min_max_agg = MinMaxColAggregator::new(schema);
2362        let create_statistics =
2363            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2364        min_max_agg
2365            .update(0, create_statistics(None, Some(42)))
2366            .unwrap();
2367        min_max_agg
2368            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2369            .unwrap();
2370        min_max_agg
2371            .update(0, create_statistics(Some(i32::MIN), None))
2372            .unwrap();
2373        min_max_agg
2374            .update(0, create_statistics(None, None))
2375            .unwrap();
2376
2377        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2378
2379        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2380        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2381    }
2382}