Skip to main content

iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::RecordBatch;
24use arrow_schema::{DataType, Field, Schema as ArrowSchema};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use itertools::Itertools;
28use parquet::arrow::AsyncArrowWriter;
29use parquet::arrow::async_reader::AsyncFileReader;
30use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
31use parquet::file::metadata::ParquetMetaData;
32use parquet::file::properties::WriterProperties;
33use parquet::file::statistics::Statistics;
34
35use super::{FileWriter, FileWriterBuilder};
36use crate::arrow::{
37    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
38    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
39};
40use crate::io::{FileIO, FileWrite, OutputFile};
41use crate::spec::{
42    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
43    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
44    StructType, TableMetadata, Type, visit_schema,
45};
46use crate::transform::create_transform_function;
47use crate::writer::{CurrentFileStatus, DataFile};
48use crate::{Error, ErrorKind, Result};
49
50/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
51#[derive(Clone, Debug)]
52pub struct ParquetWriterBuilder {
53    props: WriterProperties,
54    schema: SchemaRef,
55    match_mode: FieldMatchMode,
56    /// Optional Arrow schema to use for writing. If provided, this schema is used
57    /// directly for the Parquet writer instead of converting from the Iceberg schema.
58    /// This allows preserving Arrow extension metadata in the output.
59    arrow_schema: Option<Arc<ArrowSchema>>,
60}
61
62impl ParquetWriterBuilder {
63    /// Create a new `ParquetWriterBuilder`
64    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
65    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
66        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
67    }
68
69    /// Create a new `ParquetWriterBuilder` with custom match mode
70    pub fn new_with_match_mode(
71        props: WriterProperties,
72        schema: SchemaRef,
73        match_mode: FieldMatchMode,
74    ) -> Self {
75        Self {
76            props,
77            schema,
78            match_mode,
79            arrow_schema: None,
80        }
81    }
82
83    /// Set an Arrow schema to use for writing instead of deriving one from the Iceberg schema.
84    /// The Arrow schema's structure must match the Iceberg schema (field names, types, nullability),
85    /// but may contain additional metadata (e.g., Arrow extension types).
86    /// Returns an error if the schemas don't match structurally.
87    pub fn with_arrow_schema(mut self, arrow_schema: Arc<ArrowSchema>) -> Result<Self> {
88        validate_arrow_schema_matches_iceberg(&arrow_schema, &self.schema)?;
89        self.arrow_schema = Some(arrow_schema);
90        Ok(self)
91    }
92}
93
94impl FileWriterBuilder for ParquetWriterBuilder {
95    type R = ParquetWriter;
96
97    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
98        Ok(ParquetWriter {
99            schema: self.schema.clone(),
100            arrow_schema: self.arrow_schema.clone(),
101            inner_writer: None,
102            writer_properties: self.props.clone(),
103            current_row_num: 0,
104            output_file,
105            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
106        })
107    }
108}
109
110/// A mapping from Parquet column path names to internal field id
111struct IndexByParquetPathName {
112    name_to_id: HashMap<String, i32>,
113
114    field_names: Vec<String>,
115
116    field_id: i32,
117}
118
119impl IndexByParquetPathName {
120    /// Creates a new, empty `IndexByParquetPathName`
121    pub fn new() -> Self {
122        Self {
123            name_to_id: HashMap::new(),
124            field_names: Vec::new(),
125            field_id: 0,
126        }
127    }
128
129    /// Retrieves the internal field ID
130    pub fn get(&self, name: &str) -> Option<&i32> {
131        self.name_to_id.get(name)
132    }
133}
134
135impl Default for IndexByParquetPathName {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl SchemaVisitor for IndexByParquetPathName {
142    type T = ();
143
144    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
145        self.field_names.push(field.name.to_string());
146        self.field_id = field.id;
147        Ok(())
148    }
149
150    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
151        self.field_names.pop();
152        Ok(())
153    }
154
155    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
156        self.field_names.push(format!("list.{}", field.name));
157        self.field_id = field.id;
158        Ok(())
159    }
160
161    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
162        self.field_names.pop();
163        Ok(())
164    }
165
166    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
167        self.field_names
168            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
169        self.field_id = field.id;
170        Ok(())
171    }
172
173    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
174        self.field_names.pop();
175        Ok(())
176    }
177
178    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
179        self.field_names
180            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
181        self.field_id = field.id;
182        Ok(())
183    }
184
185    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
186        self.field_names.pop();
187        Ok(())
188    }
189
190    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
191        Ok(())
192    }
193
194    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
195        Ok(())
196    }
197
198    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
199        Ok(())
200    }
201
202    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
203        Ok(())
204    }
205
206    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
207        Ok(())
208    }
209
210    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
211        let full_name = self.field_names.iter().map(String::as_str).join(".");
212        let field_id = self.field_id;
213        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
214            return Err(Error::new(
215                ErrorKind::DataInvalid,
216                format!(
217                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
218                ),
219            ));
220        } else {
221            self.name_to_id.insert(full_name, field_id);
222        }
223
224        Ok(())
225    }
226}
227
228/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
229pub struct ParquetWriter {
230    schema: SchemaRef,
231    /// Optional Arrow schema to use for writing. If provided, this is used directly
232    /// instead of converting from the Iceberg schema, preserving any extension metadata.
233    arrow_schema: Option<Arc<ArrowSchema>>,
234    output_file: OutputFile,
235    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
236    writer_properties: WriterProperties,
237    current_row_num: usize,
238    nan_value_count_visitor: NanValueCountVisitor,
239}
240
241/// Used to aggregate min and max value of each column.
242struct MinMaxColAggregator {
243    lower_bounds: HashMap<i32, Datum>,
244    upper_bounds: HashMap<i32, Datum>,
245    schema: SchemaRef,
246}
247
248impl MinMaxColAggregator {
249    /// Creates new and empty `MinMaxColAggregator`
250    fn new(schema: SchemaRef) -> Self {
251        Self {
252            lower_bounds: HashMap::new(),
253            upper_bounds: HashMap::new(),
254            schema,
255        }
256    }
257
258    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
259        self.lower_bounds
260            .entry(field_id)
261            .and_modify(|e| {
262                if *e > datum {
263                    *e = datum.clone()
264                }
265            })
266            .or_insert(datum);
267    }
268
269    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
270        self.upper_bounds
271            .entry(field_id)
272            .and_modify(|e| {
273                if *e < datum {
274                    *e = datum.clone()
275                }
276            })
277            .or_insert(datum);
278    }
279
280    /// Update statistics
281    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
282        let Some(ty) = self
283            .schema
284            .field_by_id(field_id)
285            .map(|f| f.field_type.as_ref())
286        else {
287            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
288            // Ignore the field if it is not in schema.
289            return Ok(());
290        };
291        let Type::Primitive(ty) = ty.clone() else {
292            return Err(Error::new(
293                ErrorKind::Unexpected,
294                format!("Composed type {ty} is not supported for min max aggregation."),
295            ));
296        };
297
298        if value.min_is_exact() {
299            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
300                return Err(Error::new(
301                    ErrorKind::Unexpected,
302                    format!("Statistics {value} is not match with field type {ty}."),
303                ));
304            };
305
306            self.update_state_min(field_id, min_datum);
307        }
308
309        if value.max_is_exact() {
310            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
311                return Err(Error::new(
312                    ErrorKind::Unexpected,
313                    format!("Statistics {value} is not match with field type {ty}."),
314                ));
315            };
316
317            self.update_state_max(field_id, max_datum);
318        }
319
320        Ok(())
321    }
322
323    /// Returns lower and upper bounds
324    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
325        (self.lower_bounds, self.upper_bounds)
326    }
327}
328
329impl ParquetWriter {
330    /// Converts parquet files to data files
331    #[allow(dead_code)]
332    pub(crate) async fn parquet_files_to_data_files(
333        file_io: &FileIO,
334        file_paths: Vec<String>,
335        table_metadata: &TableMetadata,
336    ) -> Result<Vec<DataFile>> {
337        // TODO: support adding to partitioned table
338        let mut data_files: Vec<DataFile> = Vec::new();
339
340        for file_path in file_paths {
341            let input_file = file_io.new_input(&file_path)?;
342            let file_metadata = input_file.metadata().await?;
343            let file_size_in_bytes = file_metadata.size as usize;
344            let reader = input_file.reader().await?;
345
346            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
347            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
348                Error::new(
349                    ErrorKind::DataInvalid,
350                    format!("Error reading Parquet metadata: {err}"),
351                )
352            })?;
353            let mut builder = ParquetWriter::parquet_to_data_file_builder(
354                table_metadata.current_schema().clone(),
355                parquet_metadata,
356                file_size_in_bytes,
357                file_path,
358                // TODO: Implement nan_value_counts here
359                HashMap::new(),
360            )?;
361            builder.partition_spec_id(table_metadata.default_partition_spec_id());
362            let data_file = builder.build().unwrap();
363            data_files.push(data_file);
364        }
365
366        Ok(data_files)
367    }
368
369    /// `ParquetMetadata` to data file builder
370    pub(crate) fn parquet_to_data_file_builder(
371        schema: SchemaRef,
372        metadata: Arc<ParquetMetaData>,
373        written_size: usize,
374        file_path: String,
375        nan_value_counts: HashMap<i32, u64>,
376    ) -> Result<DataFileBuilder> {
377        let index_by_parquet_path = {
378            let mut visitor = IndexByParquetPathName::new();
379            visit_schema(&schema, &mut visitor)?;
380            visitor
381        };
382
383        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
384            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
385            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
386            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
387            let mut min_max_agg = MinMaxColAggregator::new(schema);
388
389            for row_group in metadata.row_groups() {
390                for column_chunk_metadata in row_group.columns() {
391                    let parquet_path = column_chunk_metadata.column_descr().path().string();
392
393                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
394                        continue;
395                    };
396
397                    *per_col_size.entry(field_id).or_insert(0) +=
398                        column_chunk_metadata.compressed_size() as u64;
399                    *per_col_val_num.entry(field_id).or_insert(0) +=
400                        column_chunk_metadata.num_values() as u64;
401
402                    if let Some(statistics) = column_chunk_metadata.statistics() {
403                        if let Some(null_count) = statistics.null_count_opt() {
404                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
405                        }
406
407                        min_max_agg.update(field_id, statistics.clone())?;
408                    }
409                }
410            }
411            (
412                per_col_size,
413                per_col_val_num,
414                per_col_null_val_num,
415                min_max_agg.produce(),
416            )
417        };
418
419        let mut builder = DataFileBuilder::default();
420        builder
421            .content(DataContentType::Data)
422            .file_path(file_path)
423            .file_format(DataFileFormat::Parquet)
424            .partition(Struct::empty())
425            .record_count(metadata.file_metadata().num_rows() as u64)
426            .file_size_in_bytes(written_size as u64)
427            .column_sizes(column_sizes)
428            .value_counts(value_counts)
429            .null_value_counts(null_value_counts)
430            .nan_value_counts(nan_value_counts)
431            // # NOTE:
432            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
433            .lower_bounds(lower_bounds)
434            .upper_bounds(upper_bounds)
435            .split_offsets(Some(
436                metadata
437                    .row_groups()
438                    .iter()
439                    .filter_map(|group| group.file_offset())
440                    .collect(),
441            ));
442
443        Ok(builder)
444    }
445
446    #[allow(dead_code)]
447    fn partition_value_from_bounds(
448        table_spec: Arc<PartitionSpec>,
449        lower_bounds: &HashMap<i32, Datum>,
450        upper_bounds: &HashMap<i32, Datum>,
451    ) -> Result<Struct> {
452        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
453
454        for field in table_spec.fields() {
455            if let (Some(lower), Some(upper)) = (
456                lower_bounds.get(&field.source_id),
457                upper_bounds.get(&field.source_id),
458            ) {
459                if !field.transform.preserves_order() {
460                    return Err(Error::new(
461                        ErrorKind::DataInvalid,
462                        format!(
463                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
464                            field.name, field.transform
465                        ),
466                    ));
467                }
468
469                if lower != upper {
470                    return Err(Error::new(
471                        ErrorKind::DataInvalid,
472                        format!(
473                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
474                            field.name, lower, upper
475                        ),
476                    ));
477                }
478
479                let transform_fn = create_transform_function(&field.transform)?;
480                let transform_literal =
481                    Literal::from(transform_fn.transform_literal_result(lower)?);
482
483                partition_literals.push(Some(transform_literal));
484            } else {
485                partition_literals.push(None);
486            }
487        }
488
489        let partition_struct = Struct::from_iter(partition_literals);
490
491        Ok(partition_struct)
492    }
493}
494
495/// Validate that an Arrow schema's structure matches an Iceberg schema.
496/// This checks field names, types, and nullability, but ignores metadata differences.
497fn validate_arrow_schema_matches_iceberg(
498    arrow_schema: &ArrowSchema,
499    iceberg_schema: &crate::spec::Schema,
500) -> Result<()> {
501    let expected: ArrowSchema = iceberg_schema.try_into()?;
502    validate_schemas_match(&expected, arrow_schema, "")
503}
504
505fn validate_schemas_match(expected: &ArrowSchema, actual: &ArrowSchema, path: &str) -> Result<()> {
506    if expected.fields().len() != actual.fields().len() {
507        return Err(Error::new(
508            ErrorKind::DataInvalid,
509            format!(
510                "Schema mismatch at '{}': expected {} fields, got {}",
511                path,
512                expected.fields().len(),
513                actual.fields().len()
514            ),
515        ));
516    }
517    for (e, a) in expected.fields().iter().zip(actual.fields().iter()) {
518        validate_fields_match(e, a, path)?;
519    }
520    Ok(())
521}
522
523fn validate_fields_match(expected: &Field, actual: &Field, parent_path: &str) -> Result<()> {
524    let path = if parent_path.is_empty() {
525        expected.name().to_string()
526    } else {
527        format!("{}.{}", parent_path, expected.name())
528    };
529
530    if expected.name() != actual.name() {
531        return Err(Error::new(
532            ErrorKind::DataInvalid,
533            format!(
534                "Field name mismatch at '{}': expected '{}', got '{}'",
535                parent_path,
536                expected.name(),
537                actual.name()
538            ),
539        ));
540    }
541
542    if expected.is_nullable() != actual.is_nullable() {
543        return Err(Error::new(
544            ErrorKind::DataInvalid,
545            format!(
546                "Nullability mismatch at '{}': expected {}, got {}",
547                path,
548                expected.is_nullable(),
549                actual.is_nullable()
550            ),
551        ));
552    }
553
554    validate_datatypes_match(expected.data_type(), actual.data_type(), &path)
555}
556
557fn validate_datatypes_match(expected: &DataType, actual: &DataType, path: &str) -> Result<()> {
558    // Compare types structurally, ignoring metadata
559    match (expected, actual) {
560        (DataType::Struct(e_fields), DataType::Struct(a_fields)) => {
561            if e_fields.len() != a_fields.len() {
562                return Err(Error::new(
563                    ErrorKind::DataInvalid,
564                    format!(
565                        "Struct field count mismatch at '{}': expected {}, got {}",
566                        path,
567                        e_fields.len(),
568                        a_fields.len()
569                    ),
570                ));
571            }
572            for (e, a) in e_fields.iter().zip(a_fields.iter()) {
573                validate_fields_match(e, a, path)?;
574            }
575            Ok(())
576        }
577        (DataType::List(e_inner), DataType::List(a_inner))
578        | (DataType::LargeList(e_inner), DataType::LargeList(a_inner)) => {
579            validate_fields_match(e_inner, a_inner, path)
580        }
581        (DataType::Map(e_entries, _), DataType::Map(a_entries, _)) => {
582            validate_fields_match(e_entries, a_entries, path)
583        }
584        _ => {
585            // For primitive types, use direct comparison (metadata not relevant)
586            if std::mem::discriminant(expected) != std::mem::discriminant(actual) {
587                return Err(Error::new(
588                    ErrorKind::DataInvalid,
589                    format!(
590                        "Type mismatch at '{}': expected {:?}, got {:?}",
591                        path, expected, actual
592                    ),
593                ));
594            }
595            Ok(())
596        }
597    }
598}
599
600impl FileWriter for ParquetWriter {
601    async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
602        // Skip empty batch
603        if batch.num_rows() == 0 {
604            return Ok(());
605        }
606
607        self.current_row_num += batch.num_rows();
608
609        self.nan_value_count_visitor
610            .compute(self.schema.clone(), batch.clone())?;
611
612        let writer = if let Some(writer) = &mut self.inner_writer {
613            writer
614        } else {
615            // Use the provided Arrow schema if available, otherwise derive from Iceberg schema.
616            // Using a provided Arrow schema allows preserving extension metadata in the output.
617            let arrow_schema: Arc<ArrowSchema> = match &self.arrow_schema {
618                Some(schema) => Arc::clone(schema),
619                None => Arc::new(self.schema.as_ref().try_into()?),
620            };
621            let inner_writer = self.output_file.writer().await?;
622            let async_writer = AsyncFileWriter::new(inner_writer);
623            let writer = AsyncArrowWriter::try_new(
624                async_writer,
625                arrow_schema,
626                Some(self.writer_properties.clone()),
627            )
628            .map_err(|err| {
629                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
630                    .with_source(err)
631            })?;
632            self.inner_writer = Some(writer);
633            self.inner_writer.as_mut().unwrap()
634        };
635
636        writer.write(batch).await.map_err(|err| {
637            Error::new(
638                ErrorKind::Unexpected,
639                "Failed to write using parquet writer.",
640            )
641            .with_source(err)
642        })?;
643
644        Ok(())
645    }
646
647    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
648        let mut writer = match self.inner_writer.take() {
649            Some(writer) => writer,
650            None => return Ok(vec![]),
651        };
652
653        let metadata = writer.finish().await.map_err(|err| {
654            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
655        })?;
656
657        let written_size = writer.bytes_written();
658
659        if self.current_row_num == 0 {
660            self.output_file.delete().await.map_err(|err| {
661                Error::new(
662                    ErrorKind::Unexpected,
663                    "Failed to delete empty parquet file.",
664                )
665                .with_source(err)
666            })?;
667            Ok(vec![])
668        } else {
669            let parquet_metadata = Arc::new(metadata);
670
671            Ok(vec![Self::parquet_to_data_file_builder(
672                self.schema,
673                parquet_metadata,
674                written_size,
675                self.output_file.location().to_string(),
676                self.nan_value_count_visitor.nan_value_counts,
677            )?])
678        }
679    }
680}
681
682impl CurrentFileStatus for ParquetWriter {
683    fn current_file_path(&self) -> String {
684        self.output_file.location().to_string()
685    }
686
687    fn current_row_num(&self) -> usize {
688        self.current_row_num
689    }
690
691    fn current_written_size(&self) -> usize {
692        if let Some(inner) = self.inner_writer.as_ref() {
693            // inner/AsyncArrowWriter contains sync and async writers
694            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
695            inner.bytes_written() + inner.in_progress_size()
696        } else {
697            // inner writer is not initialized yet
698            0
699        }
700    }
701
702    fn current_schema(&self) -> SchemaRef {
703        self.schema.clone()
704    }
705}
706
707/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
708///
709/// # NOTES
710///
711/// We keep this wrapper been used inside only.
712struct AsyncFileWriter<W: FileWrite>(W);
713
714impl<W: FileWrite> AsyncFileWriter<W> {
715    /// Create a new `AsyncFileWriter` with the given writer.
716    pub fn new(writer: W) -> Self {
717        Self(writer)
718    }
719}
720
721impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
722    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
723        Box::pin(async {
724            self.0
725                .write(bs)
726                .await
727                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
728        })
729    }
730
731    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
732        Box::pin(async {
733            self.0
734                .close()
735                .await
736                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
737        })
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use std::collections::HashMap;
744    use std::sync::Arc;
745
746    use anyhow::Result;
747    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
748    use arrow_array::types::{Float32Type, Int64Type};
749    use arrow_array::{
750        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
751        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
752    };
753    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
754    use arrow_select::concat::concat_batches;
755    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
756    use parquet::file::statistics::ValueStatistics;
757    use rust_decimal::Decimal;
758    use tempfile::TempDir;
759    use uuid::Uuid;
760
761    use super::*;
762    use crate::arrow::schema_to_arrow_schema;
763    use crate::io::FileIOBuilder;
764    use crate::spec::{PrimitiveLiteral, Struct, *};
765    use crate::writer::file_writer::location_generator::{
766        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
767    };
768    use crate::writer::tests::check_parquet_data_file;
769
770    fn schema_for_all_type() -> Schema {
771        Schema::builder()
772            .with_schema_id(1)
773            .with_fields(vec![
774                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
775                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
776                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
777                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
778                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
779                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
780                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
781                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
782                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
783                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
784                    .into(),
785                NestedField::optional(
786                    10,
787                    "timestamptz",
788                    Type::Primitive(PrimitiveType::Timestamptz),
789                )
790                .into(),
791                NestedField::optional(
792                    11,
793                    "timestamp_ns",
794                    Type::Primitive(PrimitiveType::TimestampNs),
795                )
796                .into(),
797                NestedField::optional(
798                    12,
799                    "timestamptz_ns",
800                    Type::Primitive(PrimitiveType::TimestamptzNs),
801                )
802                .into(),
803                NestedField::optional(
804                    13,
805                    "decimal",
806                    Type::Primitive(PrimitiveType::Decimal {
807                        precision: 10,
808                        scale: 5,
809                    }),
810                )
811                .into(),
812                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
813                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
814                    .into(),
815                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
816                // so we need to add a new field for it.
817                NestedField::optional(
818                    16,
819                    "decimal_38",
820                    Type::Primitive(PrimitiveType::Decimal {
821                        precision: 38,
822                        scale: 5,
823                    }),
824                )
825                .into(),
826            ])
827            .build()
828            .unwrap()
829    }
830
831    fn nested_schema_for_test() -> Schema {
832        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
833        Schema::builder()
834            .with_schema_id(1)
835            .with_fields(vec![
836                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
837                NestedField::required(
838                    1,
839                    "col1",
840                    Type::Struct(StructType::new(vec![
841                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
842                            .into(),
843                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
844                            .into(),
845                    ])),
846                )
847                .into(),
848                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
849                NestedField::required(
850                    3,
851                    "col3",
852                    Type::List(ListType::new(
853                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
854                            .into(),
855                    )),
856                )
857                .into(),
858                NestedField::required(
859                    4,
860                    "col4",
861                    Type::Struct(StructType::new(vec![
862                        NestedField::required(
863                            8,
864                            "col_4_8",
865                            Type::Struct(StructType::new(vec![
866                                NestedField::required(
867                                    9,
868                                    "col_4_8_9",
869                                    Type::Primitive(PrimitiveType::Long),
870                                )
871                                .into(),
872                            ])),
873                        )
874                        .into(),
875                    ])),
876                )
877                .into(),
878                NestedField::required(
879                    10,
880                    "col5",
881                    Type::Map(MapType::new(
882                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
883                            .into(),
884                        NestedField::required(
885                            12,
886                            "value",
887                            Type::List(ListType::new(
888                                NestedField::required(
889                                    13,
890                                    "item",
891                                    Type::Primitive(PrimitiveType::Long),
892                                )
893                                .into(),
894                            )),
895                        )
896                        .into(),
897                    )),
898                )
899                .into(),
900            ])
901            .build()
902            .unwrap()
903    }
904
905    #[tokio::test]
906    async fn test_index_by_parquet_path() {
907        let expect = HashMap::from([
908            ("col0".to_string(), 0),
909            ("col1.col_1_5".to_string(), 5),
910            ("col1.col_1_6".to_string(), 6),
911            ("col2".to_string(), 2),
912            ("col3.list.element".to_string(), 7),
913            ("col4.col_4_8.col_4_8_9".to_string(), 9),
914            ("col5.key_value.key".to_string(), 11),
915            ("col5.key_value.value.list.item".to_string(), 13),
916        ]);
917        let mut visitor = IndexByParquetPathName::new();
918        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
919        assert_eq!(visitor.name_to_id, expect);
920    }
921
922    #[tokio::test]
923    async fn test_parquet_writer() -> Result<()> {
924        let temp_dir = TempDir::new().unwrap();
925        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
926        let location_gen = DefaultLocationGenerator::with_data_location(
927            temp_dir.path().to_str().unwrap().to_string(),
928        );
929        let file_name_gen =
930            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
931
932        // prepare data
933        let schema = {
934            let fields =
935                vec![
936                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
937                        PARQUET_FIELD_ID_META_KEY.to_string(),
938                        "0".to_string(),
939                    )])),
940                ];
941            Arc::new(arrow_schema::Schema::new(fields))
942        };
943        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
944        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
945        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
946        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
947
948        let output_file = file_io.new_output(
949            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
950        )?;
951
952        // write data
953        let mut pw = ParquetWriterBuilder::new(
954            WriterProperties::builder()
955                .set_max_row_group_size(128)
956                .build(),
957            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
958        )
959        .build(output_file)
960        .await?;
961        pw.write(&to_write).await?;
962        pw.write(&to_write_null).await?;
963        let res = pw.close().await?;
964        assert_eq!(res.len(), 1);
965        let data_file = res
966            .into_iter()
967            .next()
968            .unwrap()
969            // Put dummy field for build successfully.
970            .content(DataContentType::Data)
971            .partition(Struct::empty())
972            .partition_spec_id(0)
973            .build()
974            .unwrap();
975
976        // check data file
977        assert_eq!(data_file.record_count(), 2048);
978        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
979        assert_eq!(
980            *data_file.lower_bounds(),
981            HashMap::from([(0, Datum::long(0))])
982        );
983        assert_eq!(
984            *data_file.upper_bounds(),
985            HashMap::from([(0, Datum::long(1023))])
986        );
987        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
988
989        // check the written file
990        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
991        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
992
993        Ok(())
994    }
995
996    #[tokio::test]
997    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
998        let temp_dir = TempDir::new().unwrap();
999        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1000        let location_gen = DefaultLocationGenerator::with_data_location(
1001            temp_dir.path().to_str().unwrap().to_string(),
1002        );
1003        let file_name_gen =
1004            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1005
1006        // prepare data
1007        let schema = nested_schema_for_test();
1008        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1009        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1010        let col1 = Arc::new(StructArray::new(
1011            {
1012                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1013                    fields.clone()
1014                } else {
1015                    unreachable!()
1016                }
1017            },
1018            vec![
1019                Arc::new(Int64Array::from_iter_values(0..1024)),
1020                Arc::new(Int64Array::from_iter_values(0..1024)),
1021            ],
1022            None,
1023        ));
1024        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
1025            (0..1024).map(|n| n.to_string()),
1026        )) as ArrayRef;
1027        let col3 = Arc::new({
1028            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
1029                (0..1024).map(|n| Some(vec![Some(n)])),
1030            )
1031            .into_parts();
1032            arrow_array::ListArray::new(
1033                {
1034                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
1035                        field.clone()
1036                    } else {
1037                        unreachable!()
1038                    }
1039                },
1040                list_parts.1,
1041                list_parts.2,
1042                list_parts.3,
1043            )
1044        }) as ArrayRef;
1045        let col4 = Arc::new(StructArray::new(
1046            {
1047                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1048                    fields.clone()
1049                } else {
1050                    unreachable!()
1051                }
1052            },
1053            vec![Arc::new(StructArray::new(
1054                {
1055                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1056                        if let DataType::Struct(fields) = fields[0].data_type() {
1057                            fields.clone()
1058                        } else {
1059                            unreachable!()
1060                        }
1061                    } else {
1062                        unreachable!()
1063                    }
1064                },
1065                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
1066                None,
1067            ))],
1068            None,
1069        ));
1070        let col5 = Arc::new({
1071            let mut map_array_builder = MapBuilder::new(
1072                None,
1073                arrow_array::builder::StringBuilder::new(),
1074                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
1075                    Int64Type,
1076                >::new()),
1077            );
1078            for i in 0..1024 {
1079                map_array_builder.keys().append_value(i.to_string());
1080                map_array_builder
1081                    .values()
1082                    .append_value(vec![Some(i as i64); i + 1]);
1083                map_array_builder.append(true)?;
1084            }
1085            let (_, offset_buffer, struct_array, null_buffer, ordered) =
1086                map_array_builder.finish().into_parts();
1087            let struct_array = {
1088                let (_, mut arrays, nulls) = struct_array.into_parts();
1089                let list_array = {
1090                    let list_array = arrays[1]
1091                        .as_any()
1092                        .downcast_ref::<ListArray>()
1093                        .unwrap()
1094                        .clone();
1095                    let (_, offsets, array, nulls) = list_array.into_parts();
1096                    let list_field = {
1097                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1098                            if let DataType::Struct(fields) = map_field.data_type() {
1099                                if let DataType::List(list_field) = fields[1].data_type() {
1100                                    list_field.clone()
1101                                } else {
1102                                    unreachable!()
1103                                }
1104                            } else {
1105                                unreachable!()
1106                            }
1107                        } else {
1108                            unreachable!()
1109                        }
1110                    };
1111                    ListArray::new(list_field, offsets, array, nulls)
1112                };
1113                arrays[1] = Arc::new(list_array) as ArrayRef;
1114                StructArray::new(
1115                    {
1116                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1117                            if let DataType::Struct(fields) = map_field.data_type() {
1118                                fields.clone()
1119                            } else {
1120                                unreachable!()
1121                            }
1122                        } else {
1123                            unreachable!()
1124                        }
1125                    },
1126                    arrays,
1127                    nulls,
1128                )
1129            };
1130            arrow_array::MapArray::new(
1131                {
1132                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1133                        map_field.clone()
1134                    } else {
1135                        unreachable!()
1136                    }
1137                },
1138                offset_buffer,
1139                struct_array,
1140                null_buffer,
1141                ordered,
1142            )
1143        }) as ArrayRef;
1144        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1145            col0, col1, col2, col3, col4, col5,
1146        ])
1147        .unwrap();
1148        let output_file = file_io.new_output(
1149            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1150        )?;
1151
1152        // write data
1153        let mut pw =
1154            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1155                .build(output_file)
1156                .await?;
1157        pw.write(&to_write).await?;
1158        let res = pw.close().await?;
1159        assert_eq!(res.len(), 1);
1160        let data_file = res
1161            .into_iter()
1162            .next()
1163            .unwrap()
1164            // Put dummy field for build successfully.
1165            .content(crate::spec::DataContentType::Data)
1166            .partition(Struct::empty())
1167            .partition_spec_id(0)
1168            .build()
1169            .unwrap();
1170
1171        // check data file
1172        assert_eq!(data_file.record_count(), 1024);
1173        assert_eq!(
1174            *data_file.value_counts(),
1175            HashMap::from([
1176                (0, 1024),
1177                (5, 1024),
1178                (6, 1024),
1179                (2, 1024),
1180                (7, 1024),
1181                (9, 1024),
1182                (11, 1024),
1183                (13, (1..1025).sum()),
1184            ])
1185        );
1186        assert_eq!(
1187            *data_file.lower_bounds(),
1188            HashMap::from([
1189                (0, Datum::long(0)),
1190                (5, Datum::long(0)),
1191                (6, Datum::long(0)),
1192                (2, Datum::string("0")),
1193                (7, Datum::long(0)),
1194                (9, Datum::long(0)),
1195                (11, Datum::string("0")),
1196                (13, Datum::long(0))
1197            ])
1198        );
1199        assert_eq!(
1200            *data_file.upper_bounds(),
1201            HashMap::from([
1202                (0, Datum::long(1023)),
1203                (5, Datum::long(1023)),
1204                (6, Datum::long(1023)),
1205                (2, Datum::string("999")),
1206                (7, Datum::long(1023)),
1207                (9, Datum::long(1023)),
1208                (11, Datum::string("999")),
1209                (13, Datum::long(1023))
1210            ])
1211        );
1212
1213        // check the written file
1214        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1215
1216        Ok(())
1217    }
1218
1219    #[tokio::test]
1220    async fn test_all_type_for_write() -> Result<()> {
1221        let temp_dir = TempDir::new().unwrap();
1222        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1223        let location_gen = DefaultLocationGenerator::with_data_location(
1224            temp_dir.path().to_str().unwrap().to_string(),
1225        );
1226        let file_name_gen =
1227            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1228
1229        // prepare data
1230        // generate iceberg schema for all type
1231        let schema = schema_for_all_type();
1232        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1233        let col0 = Arc::new(BooleanArray::from(vec![
1234            Some(true),
1235            Some(false),
1236            None,
1237            Some(true),
1238        ])) as ArrayRef;
1239        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1240        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1241        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1242            Some(0.5),
1243            Some(2.0),
1244            None,
1245            Some(3.5),
1246        ])) as ArrayRef;
1247        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1248            Some(0.5),
1249            Some(2.0),
1250            None,
1251            Some(3.5),
1252        ])) as ArrayRef;
1253        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1254            Some("a"),
1255            Some("b"),
1256            None,
1257            Some("d"),
1258        ])) as ArrayRef;
1259        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1260            Some(b"one"),
1261            None,
1262            Some(b""),
1263            Some(b"zzzz"),
1264        ])) as ArrayRef;
1265        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1266            Some(0),
1267            Some(1),
1268            None,
1269            Some(3),
1270        ])) as ArrayRef;
1271        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1272            Some(0),
1273            Some(1),
1274            None,
1275            Some(3),
1276        ])) as ArrayRef;
1277        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1278            Some(0),
1279            Some(1),
1280            None,
1281            Some(3),
1282        ])) as ArrayRef;
1283        let col10 = Arc::new(
1284            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1285                .with_timezone_utc(),
1286        ) as ArrayRef;
1287        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1288            Some(0),
1289            Some(1),
1290            None,
1291            Some(3),
1292        ])) as ArrayRef;
1293        let col12 = Arc::new(
1294            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1295                .with_timezone_utc(),
1296        ) as ArrayRef;
1297        let col13 = Arc::new(
1298            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1299                .with_precision_and_scale(10, 5)
1300                .unwrap(),
1301        ) as ArrayRef;
1302        let col14 = Arc::new(
1303            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1304                vec![
1305                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1306                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1307                    None,
1308                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1309                ]
1310                .into_iter(),
1311                16,
1312            )
1313            .unwrap(),
1314        ) as ArrayRef;
1315        let col15 = Arc::new(
1316            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1317                vec![
1318                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1319                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1320                    None,
1321                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1322                ]
1323                .into_iter(),
1324                10,
1325            )
1326            .unwrap(),
1327        ) as ArrayRef;
1328        let col16 = Arc::new(
1329            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1330                .with_precision_and_scale(38, 5)
1331                .unwrap(),
1332        ) as ArrayRef;
1333        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1334            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1335            col14, col15, col16,
1336        ])
1337        .unwrap();
1338        let output_file = file_io.new_output(
1339            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1340        )?;
1341
1342        // write data
1343        let mut pw =
1344            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1345                .build(output_file)
1346                .await?;
1347        pw.write(&to_write).await?;
1348        let res = pw.close().await?;
1349        assert_eq!(res.len(), 1);
1350        let data_file = res
1351            .into_iter()
1352            .next()
1353            .unwrap()
1354            // Put dummy field for build successfully.
1355            .content(crate::spec::DataContentType::Data)
1356            .partition(Struct::empty())
1357            .partition_spec_id(0)
1358            .build()
1359            .unwrap();
1360
1361        // check data file
1362        assert_eq!(data_file.record_count(), 4);
1363        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1364        assert!(
1365            data_file
1366                .null_value_counts()
1367                .iter()
1368                .all(|(_, &v)| { v == 1 })
1369        );
1370        assert_eq!(
1371            *data_file.lower_bounds(),
1372            HashMap::from([
1373                (0, Datum::bool(false)),
1374                (1, Datum::int(1)),
1375                (2, Datum::long(1)),
1376                (3, Datum::float(0.5)),
1377                (4, Datum::double(0.5)),
1378                (5, Datum::string("a")),
1379                (6, Datum::binary(vec![])),
1380                (7, Datum::date(0)),
1381                (8, Datum::time_micros(0).unwrap()),
1382                (9, Datum::timestamp_micros(0)),
1383                (10, Datum::timestamptz_micros(0)),
1384                (11, Datum::timestamp_nanos(0)),
1385                (12, Datum::timestamptz_nanos(0)),
1386                (
1387                    13,
1388                    Datum::new(
1389                        PrimitiveType::Decimal {
1390                            precision: 10,
1391                            scale: 5
1392                        },
1393                        PrimitiveLiteral::Int128(1)
1394                    )
1395                ),
1396                (14, Datum::uuid(Uuid::from_u128(0))),
1397                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1398                (
1399                    16,
1400                    Datum::new(
1401                        PrimitiveType::Decimal {
1402                            precision: 38,
1403                            scale: 5
1404                        },
1405                        PrimitiveLiteral::Int128(1)
1406                    )
1407                ),
1408            ])
1409        );
1410        assert_eq!(
1411            *data_file.upper_bounds(),
1412            HashMap::from([
1413                (0, Datum::bool(true)),
1414                (1, Datum::int(4)),
1415                (2, Datum::long(4)),
1416                (3, Datum::float(3.5)),
1417                (4, Datum::double(3.5)),
1418                (5, Datum::string("d")),
1419                (6, Datum::binary(vec![122, 122, 122, 122])),
1420                (7, Datum::date(3)),
1421                (8, Datum::time_micros(3).unwrap()),
1422                (9, Datum::timestamp_micros(3)),
1423                (10, Datum::timestamptz_micros(3)),
1424                (11, Datum::timestamp_nanos(3)),
1425                (12, Datum::timestamptz_nanos(3)),
1426                (
1427                    13,
1428                    Datum::new(
1429                        PrimitiveType::Decimal {
1430                            precision: 10,
1431                            scale: 5
1432                        },
1433                        PrimitiveLiteral::Int128(100)
1434                    )
1435                ),
1436                (14, Datum::uuid(Uuid::from_u128(3))),
1437                (
1438                    15,
1439                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1440                ),
1441                (
1442                    16,
1443                    Datum::new(
1444                        PrimitiveType::Decimal {
1445                            precision: 38,
1446                            scale: 5
1447                        },
1448                        PrimitiveLiteral::Int128(100)
1449                    )
1450                ),
1451            ])
1452        );
1453
1454        // check the written file
1455        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1456
1457        Ok(())
1458    }
1459
1460    #[tokio::test]
1461    async fn test_decimal_bound() -> Result<()> {
1462        let temp_dir = TempDir::new().unwrap();
1463        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1464        let location_gen = DefaultLocationGenerator::with_data_location(
1465            temp_dir.path().to_str().unwrap().to_string(),
1466        );
1467        let file_name_gen =
1468            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1469
1470        // test 1.1 and 2.2
1471        let schema = Arc::new(
1472            Schema::builder()
1473                .with_fields(vec![
1474                    NestedField::optional(
1475                        0,
1476                        "decimal",
1477                        Type::Primitive(PrimitiveType::Decimal {
1478                            precision: 28,
1479                            scale: 10,
1480                        }),
1481                    )
1482                    .into(),
1483                ])
1484                .build()
1485                .unwrap(),
1486        );
1487        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1488        let output_file = file_io.new_output(
1489            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1490        )?;
1491        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1492            .build(output_file)
1493            .await?;
1494        let col0 = Arc::new(
1495            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1496                .with_data_type(DataType::Decimal128(28, 10)),
1497        ) as ArrayRef;
1498        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1499        pw.write(&to_write).await?;
1500        let res = pw.close().await?;
1501        assert_eq!(res.len(), 1);
1502        let data_file = res
1503            .into_iter()
1504            .next()
1505            .unwrap()
1506            .content(crate::spec::DataContentType::Data)
1507            .partition(Struct::empty())
1508            .partition_spec_id(0)
1509            .build()
1510            .unwrap();
1511        assert_eq!(
1512            data_file.upper_bounds().get(&0),
1513            Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1514                .as_ref()
1515        );
1516        assert_eq!(
1517            data_file.lower_bounds().get(&0),
1518            Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1519                .as_ref()
1520        );
1521
1522        // test -1.1 and -2.2
1523        let schema = Arc::new(
1524            Schema::builder()
1525                .with_fields(vec![
1526                    NestedField::optional(
1527                        0,
1528                        "decimal",
1529                        Type::Primitive(PrimitiveType::Decimal {
1530                            precision: 28,
1531                            scale: 10,
1532                        }),
1533                    )
1534                    .into(),
1535                ])
1536                .build()
1537                .unwrap(),
1538        );
1539        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1540        let output_file = file_io.new_output(
1541            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1542        )?;
1543        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1544            .build(output_file)
1545            .await?;
1546        let col0 = Arc::new(
1547            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1548                .with_data_type(DataType::Decimal128(28, 10)),
1549        ) as ArrayRef;
1550        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1551        pw.write(&to_write).await?;
1552        let res = pw.close().await?;
1553        assert_eq!(res.len(), 1);
1554        let data_file = res
1555            .into_iter()
1556            .next()
1557            .unwrap()
1558            .content(crate::spec::DataContentType::Data)
1559            .partition(Struct::empty())
1560            .partition_spec_id(0)
1561            .build()
1562            .unwrap();
1563        assert_eq!(
1564            data_file.upper_bounds().get(&0),
1565            Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1566                .as_ref()
1567        );
1568        assert_eq!(
1569            data_file.lower_bounds().get(&0),
1570            Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1571                .as_ref()
1572        );
1573
1574        // test max and min of rust_decimal
1575        let decimal_max = Decimal::MAX;
1576        let decimal_min = Decimal::MIN;
1577        assert_eq!(decimal_max.scale(), decimal_min.scale());
1578        let schema = Arc::new(
1579            Schema::builder()
1580                .with_fields(vec![
1581                    NestedField::optional(
1582                        0,
1583                        "decimal",
1584                        Type::Primitive(PrimitiveType::Decimal {
1585                            precision: 38,
1586                            scale: decimal_max.scale(),
1587                        }),
1588                    )
1589                    .into(),
1590                ])
1591                .build()
1592                .unwrap(),
1593        );
1594        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1595        let output_file = file_io.new_output(
1596            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1597        )?;
1598        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1599            .build(output_file)
1600            .await?;
1601        let col0 = Arc::new(
1602            Decimal128Array::from(vec![
1603                Some(decimal_max.mantissa()),
1604                Some(decimal_min.mantissa()),
1605            ])
1606            .with_data_type(DataType::Decimal128(38, 0)),
1607        ) as ArrayRef;
1608        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1609        pw.write(&to_write).await?;
1610        let res = pw.close().await?;
1611        assert_eq!(res.len(), 1);
1612        let data_file = res
1613            .into_iter()
1614            .next()
1615            .unwrap()
1616            .content(crate::spec::DataContentType::Data)
1617            .partition(Struct::empty())
1618            .partition_spec_id(0)
1619            .build()
1620            .unwrap();
1621        assert_eq!(
1622            data_file.upper_bounds().get(&0),
1623            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1624        );
1625        assert_eq!(
1626            data_file.lower_bounds().get(&0),
1627            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1628        );
1629
1630        // test max and min for scale 38
1631        // # TODO
1632        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1633        // let schema = Arc::new(
1634        //     Schema::builder()
1635        //         .with_fields(vec![NestedField::optional(
1636        //             0,
1637        //             "decimal",
1638        //             Type::Primitive(PrimitiveType::Decimal {
1639        //                 precision: 38,
1640        //                 scale: 0,
1641        //             }),
1642        //         )
1643        //         .into()])
1644        //         .build()
1645        //         .unwrap(),
1646        // );
1647        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1648        // let mut pw = ParquetWriterBuilder::new(
1649        //     WriterProperties::builder().build(),
1650        //     schema,
1651        //     file_io.clone(),
1652        //     loccation_gen,
1653        //     file_name_gen,
1654        // )
1655        // .build()
1656        // .await?;
1657        // let col0 = Arc::new(
1658        //     Decimal128Array::from(vec![
1659        //         Some(99999999999999999999999999999999999999_i128),
1660        //         Some(-99999999999999999999999999999999999999_i128),
1661        //     ])
1662        //     .with_data_type(DataType::Decimal128(38, 0)),
1663        // ) as ArrayRef;
1664        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1665        // pw.write(&to_write).await?;
1666        // let res = pw.close().await?;
1667        // assert_eq!(res.len(), 1);
1668        // let data_file = res
1669        //     .into_iter()
1670        //     .next()
1671        //     .unwrap()
1672        //     .content(crate::spec::DataContentType::Data)
1673        //     .partition(Struct::empty())
1674        //     .build()
1675        //     .unwrap();
1676        // assert_eq!(
1677        //     data_file.upper_bounds().get(&0),
1678        //     Some(Datum::new(
1679        //         PrimitiveType::Decimal {
1680        //             precision: 38,
1681        //             scale: 0
1682        //         },
1683        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1684        //     ))
1685        //     .as_ref()
1686        // );
1687        // assert_eq!(
1688        //     data_file.lower_bounds().get(&0),
1689        //     Some(Datum::new(
1690        //         PrimitiveType::Decimal {
1691        //             precision: 38,
1692        //             scale: 0
1693        //         },
1694        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1695        //     ))
1696        //     .as_ref()
1697        // );
1698
1699        Ok(())
1700    }
1701
1702    #[tokio::test]
1703    async fn test_empty_write() -> Result<()> {
1704        let temp_dir = TempDir::new().unwrap();
1705        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1706        let location_gen = DefaultLocationGenerator::with_data_location(
1707            temp_dir.path().to_str().unwrap().to_string(),
1708        );
1709        let file_name_gen =
1710            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1711
1712        // Test that file will create if data to write
1713        let schema = {
1714            let fields = vec![
1715                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1716                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1717                ),
1718            ];
1719            Arc::new(arrow_schema::Schema::new(fields))
1720        };
1721        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1722        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1723        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1724        let output_file = file_io.new_output(&file_path)?;
1725        let mut pw = ParquetWriterBuilder::new(
1726            WriterProperties::builder().build(),
1727            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1728        )
1729        .build(output_file)
1730        .await?;
1731        pw.write(&to_write).await?;
1732        pw.close().await.unwrap();
1733        assert!(file_io.exists(&file_path).await.unwrap());
1734
1735        // Test that file will not create if no data to write
1736        let file_name_gen =
1737            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1738        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1739        let output_file = file_io.new_output(&file_path)?;
1740        let pw = ParquetWriterBuilder::new(
1741            WriterProperties::builder().build(),
1742            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1743        )
1744        .build(output_file)
1745        .await?;
1746        pw.close().await.unwrap();
1747        assert!(!file_io.exists(&file_path).await.unwrap());
1748
1749        Ok(())
1750    }
1751
1752    #[tokio::test]
1753    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1754        let temp_dir = TempDir::new().unwrap();
1755        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1756        let location_gen = DefaultLocationGenerator::with_data_location(
1757            temp_dir.path().to_str().unwrap().to_string(),
1758        );
1759        let file_name_gen =
1760            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1761
1762        // prepare data
1763        let arrow_schema = {
1764            let fields = vec![
1765                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1766                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1767                ),
1768                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1769                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1770                ),
1771            ];
1772            Arc::new(arrow_schema::Schema::new(fields))
1773        };
1774
1775        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1776            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1777            None,
1778        )) as ArrayRef;
1779
1780        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1781            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1782            None,
1783        )) as ArrayRef;
1784
1785        let to_write =
1786            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1787        let output_file = file_io.new_output(
1788            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1789        )?;
1790
1791        // write data
1792        let mut pw = ParquetWriterBuilder::new(
1793            WriterProperties::builder().build(),
1794            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1795        )
1796        .build(output_file)
1797        .await?;
1798
1799        pw.write(&to_write).await?;
1800        let res = pw.close().await?;
1801        assert_eq!(res.len(), 1);
1802        let data_file = res
1803            .into_iter()
1804            .next()
1805            .unwrap()
1806            // Put dummy field for build successfully.
1807            .content(crate::spec::DataContentType::Data)
1808            .partition(Struct::empty())
1809            .partition_spec_id(0)
1810            .build()
1811            .unwrap();
1812
1813        // check data file
1814        assert_eq!(data_file.record_count(), 4);
1815        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1816        assert_eq!(
1817            *data_file.lower_bounds(),
1818            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1819        );
1820        assert_eq!(
1821            *data_file.upper_bounds(),
1822            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1823        );
1824        assert_eq!(
1825            *data_file.null_value_counts(),
1826            HashMap::from([(0, 0), (1, 0)])
1827        );
1828        assert_eq!(
1829            *data_file.nan_value_counts(),
1830            HashMap::from([(0, 1), (1, 1)])
1831        );
1832
1833        // check the written file
1834        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1835        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1836
1837        Ok(())
1838    }
1839
1840    #[tokio::test]
1841    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1842        let temp_dir = TempDir::new().unwrap();
1843        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1844        let location_gen = DefaultLocationGenerator::with_data_location(
1845            temp_dir.path().to_str().unwrap().to_string(),
1846        );
1847        let file_name_gen =
1848            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1849
1850        let schema_struct_float_fields = Fields::from(vec![
1851            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1852                PARQUET_FIELD_ID_META_KEY.to_string(),
1853                "4".to_string(),
1854            )])),
1855        ]);
1856
1857        let schema_struct_nested_float_fields = Fields::from(vec![
1858            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1859                PARQUET_FIELD_ID_META_KEY.to_string(),
1860                "7".to_string(),
1861            )])),
1862        ]);
1863
1864        let schema_struct_nested_fields = Fields::from(vec![
1865            Field::new(
1866                "col6",
1867                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1868                false,
1869            )
1870            .with_metadata(HashMap::from([(
1871                PARQUET_FIELD_ID_META_KEY.to_string(),
1872                "6".to_string(),
1873            )])),
1874        ]);
1875
1876        // prepare data
1877        let arrow_schema = {
1878            let fields = vec![
1879                Field::new(
1880                    "col3",
1881                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1882                    false,
1883                )
1884                .with_metadata(HashMap::from([(
1885                    PARQUET_FIELD_ID_META_KEY.to_string(),
1886                    "3".to_string(),
1887                )])),
1888                Field::new(
1889                    "col5",
1890                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1891                    false,
1892                )
1893                .with_metadata(HashMap::from([(
1894                    PARQUET_FIELD_ID_META_KEY.to_string(),
1895                    "5".to_string(),
1896                )])),
1897            ];
1898            Arc::new(arrow_schema::Schema::new(fields))
1899        };
1900
1901        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1902            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1903            None,
1904        )) as ArrayRef;
1905
1906        let struct_float_field_col = Arc::new(StructArray::new(
1907            schema_struct_float_fields,
1908            vec![float_32_col.clone()],
1909            None,
1910        )) as ArrayRef;
1911
1912        let struct_nested_float_field_col = Arc::new(StructArray::new(
1913            schema_struct_nested_fields,
1914            vec![Arc::new(StructArray::new(
1915                schema_struct_nested_float_fields,
1916                vec![float_32_col.clone()],
1917                None,
1918            )) as ArrayRef],
1919            None,
1920        )) as ArrayRef;
1921
1922        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1923            struct_float_field_col,
1924            struct_nested_float_field_col,
1925        ])
1926        .unwrap();
1927        let output_file = file_io.new_output(
1928            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1929        )?;
1930
1931        // write data
1932        let mut pw = ParquetWriterBuilder::new(
1933            WriterProperties::builder().build(),
1934            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1935        )
1936        .build(output_file)
1937        .await?;
1938
1939        pw.write(&to_write).await?;
1940        let res = pw.close().await?;
1941        assert_eq!(res.len(), 1);
1942        let data_file = res
1943            .into_iter()
1944            .next()
1945            .unwrap()
1946            // Put dummy field for build successfully.
1947            .content(crate::spec::DataContentType::Data)
1948            .partition(Struct::empty())
1949            .partition_spec_id(0)
1950            .build()
1951            .unwrap();
1952
1953        // check data file
1954        assert_eq!(data_file.record_count(), 4);
1955        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1956        assert_eq!(
1957            *data_file.lower_bounds(),
1958            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1959        );
1960        assert_eq!(
1961            *data_file.upper_bounds(),
1962            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1963        );
1964        assert_eq!(
1965            *data_file.null_value_counts(),
1966            HashMap::from([(4, 0), (7, 0)])
1967        );
1968        assert_eq!(
1969            *data_file.nan_value_counts(),
1970            HashMap::from([(4, 1), (7, 1)])
1971        );
1972
1973        // check the written file
1974        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1975        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1976
1977        Ok(())
1978    }
1979
1980    #[tokio::test]
1981    async fn test_nan_val_cnts_list_type() -> Result<()> {
1982        let temp_dir = TempDir::new().unwrap();
1983        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1984        let location_gen = DefaultLocationGenerator::with_data_location(
1985            temp_dir.path().to_str().unwrap().to_string(),
1986        );
1987        let file_name_gen =
1988            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1989
1990        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1991            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1992        );
1993
1994        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1995            .with_metadata(HashMap::from([(
1996                PARQUET_FIELD_ID_META_KEY.to_string(),
1997                "4".to_string(),
1998            )]));
1999
2000        let schema_struct_list_field = Fields::from(vec![
2001            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
2002                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
2003            ),
2004        ]);
2005
2006        let arrow_schema = {
2007            let fields = vec![
2008                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
2009                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
2010                ),
2011                Field::new_struct("col1", schema_struct_list_field.clone(), true)
2012                    .with_metadata(HashMap::from([(
2013                        PARQUET_FIELD_ID_META_KEY.to_string(),
2014                        "2".to_string(),
2015                    )]))
2016                    .clone(),
2017                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
2018                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
2019                // ).clone(),
2020            ];
2021            Arc::new(arrow_schema::Schema::new(fields))
2022        };
2023
2024        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
2025            Some(1.0_f32),
2026            Some(f32::NAN),
2027            Some(2.0),
2028            Some(2.0),
2029        ])])
2030        .into_parts();
2031
2032        let list_float_field_col = Arc::new({
2033            let list_parts = list_parts.clone();
2034            ListArray::new(
2035                {
2036                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
2037                        field.clone()
2038                    } else {
2039                        unreachable!()
2040                    }
2041                },
2042                list_parts.1,
2043                list_parts.2,
2044                list_parts.3,
2045            )
2046        }) as ArrayRef;
2047
2048        let struct_list_fields_schema =
2049            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
2050                fields.clone()
2051            } else {
2052                unreachable!()
2053            };
2054
2055        let struct_list_float_field_col = Arc::new({
2056            ListArray::new(
2057                {
2058                    if let DataType::List(field) = struct_list_fields_schema
2059                        .first()
2060                        .expect("could not find first list field")
2061                        .data_type()
2062                    {
2063                        field.clone()
2064                    } else {
2065                        unreachable!()
2066                    }
2067                },
2068                list_parts.1,
2069                list_parts.2,
2070                list_parts.3,
2071            )
2072        }) as ArrayRef;
2073
2074        let struct_list_float_field_col = Arc::new(StructArray::new(
2075            struct_list_fields_schema,
2076            vec![struct_list_float_field_col.clone()],
2077            None,
2078        )) as ArrayRef;
2079
2080        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2081            list_float_field_col,
2082            struct_list_float_field_col,
2083            // large_list_float_field_col,
2084        ])
2085        .expect("Could not form record batch");
2086        let output_file = file_io.new_output(
2087            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2088        )?;
2089
2090        // write data
2091        let mut pw = ParquetWriterBuilder::new(
2092            WriterProperties::builder().build(),
2093            Arc::new(
2094                to_write
2095                    .schema()
2096                    .as_ref()
2097                    .try_into()
2098                    .expect("Could not convert iceberg schema"),
2099            ),
2100        )
2101        .build(output_file)
2102        .await?;
2103
2104        pw.write(&to_write).await?;
2105        let res = pw.close().await?;
2106        assert_eq!(res.len(), 1);
2107        let data_file = res
2108            .into_iter()
2109            .next()
2110            .unwrap()
2111            .content(crate::spec::DataContentType::Data)
2112            .partition(Struct::empty())
2113            .partition_spec_id(0)
2114            .build()
2115            .unwrap();
2116
2117        // check data file
2118        assert_eq!(data_file.record_count(), 1);
2119        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2120        assert_eq!(
2121            *data_file.lower_bounds(),
2122            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2123        );
2124        assert_eq!(
2125            *data_file.upper_bounds(),
2126            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2127        );
2128        assert_eq!(
2129            *data_file.null_value_counts(),
2130            HashMap::from([(1, 0), (4, 0)])
2131        );
2132        assert_eq!(
2133            *data_file.nan_value_counts(),
2134            HashMap::from([(1, 1), (4, 1)])
2135        );
2136
2137        // check the written file
2138        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2139        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2140
2141        Ok(())
2142    }
2143
2144    macro_rules! construct_map_arr {
2145        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2146            let int_builder = Int32Builder::new();
2147            let float_builder = Float32Builder::with_capacity(4);
2148            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2149            builder.keys().append_value(1);
2150            builder.values().append_value(1.0_f32);
2151            builder.append(true).unwrap();
2152            builder.keys().append_value(2);
2153            builder.values().append_value(f32::NAN);
2154            builder.append(true).unwrap();
2155            builder.keys().append_value(3);
2156            builder.values().append_value(2.0);
2157            builder.append(true).unwrap();
2158            builder.keys().append_value(4);
2159            builder.values().append_value(2.0);
2160            builder.append(true).unwrap();
2161            let array = builder.finish();
2162
2163            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2164            let new_struct_fields_schema =
2165                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2166
2167            let entries = {
2168                let (_, arrays, nulls) = entries.into_parts();
2169                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2170            };
2171
2172            let field = Arc::new(Field::new(
2173                DEFAULT_MAP_FIELD_NAME,
2174                DataType::Struct(new_struct_fields_schema),
2175                false,
2176            ));
2177
2178            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2179        }};
2180    }
2181
2182    #[tokio::test]
2183    async fn test_nan_val_cnts_map_type() -> Result<()> {
2184        let temp_dir = TempDir::new().unwrap();
2185        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2186        let location_gen = DefaultLocationGenerator::with_data_location(
2187            temp_dir.path().to_str().unwrap().to_string(),
2188        );
2189        let file_name_gen =
2190            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2191
2192        let map_key_field_schema =
2193            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2194                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2195            ]));
2196
2197        let map_value_field_schema =
2198            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2199                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2200            ));
2201
2202        let struct_map_key_field_schema =
2203            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2204                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2205            ]));
2206
2207        let struct_map_value_field_schema =
2208            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2209                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2210            ));
2211
2212        let schema_struct_map_field = Fields::from(vec![
2213            Field::new_map(
2214                "col3",
2215                DEFAULT_MAP_FIELD_NAME,
2216                struct_map_key_field_schema.clone(),
2217                struct_map_value_field_schema.clone(),
2218                false,
2219                false,
2220            )
2221            .with_metadata(HashMap::from([(
2222                PARQUET_FIELD_ID_META_KEY.to_string(),
2223                "5".to_string(),
2224            )])),
2225        ]);
2226
2227        let arrow_schema = {
2228            let fields = vec![
2229                Field::new_map(
2230                    "col0",
2231                    DEFAULT_MAP_FIELD_NAME,
2232                    map_key_field_schema.clone(),
2233                    map_value_field_schema.clone(),
2234                    false,
2235                    false,
2236                )
2237                .with_metadata(HashMap::from([(
2238                    PARQUET_FIELD_ID_META_KEY.to_string(),
2239                    "0".to_string(),
2240                )])),
2241                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2242                    .with_metadata(HashMap::from([(
2243                        PARQUET_FIELD_ID_META_KEY.to_string(),
2244                        "3".to_string(),
2245                    )]))
2246                    .clone(),
2247            ];
2248            Arc::new(arrow_schema::Schema::new(fields))
2249        };
2250
2251        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2252
2253        let struct_map_arr =
2254            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2255
2256        let struct_list_float_field_col = Arc::new(StructArray::new(
2257            schema_struct_map_field,
2258            vec![struct_map_arr],
2259            None,
2260        )) as ArrayRef;
2261
2262        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2263            map_array,
2264            struct_list_float_field_col,
2265        ])
2266        .expect("Could not form record batch");
2267        let output_file = file_io.new_output(
2268            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2269        )?;
2270
2271        // write data
2272        let mut pw = ParquetWriterBuilder::new(
2273            WriterProperties::builder().build(),
2274            Arc::new(
2275                to_write
2276                    .schema()
2277                    .as_ref()
2278                    .try_into()
2279                    .expect("Could not convert iceberg schema"),
2280            ),
2281        )
2282        .build(output_file)
2283        .await?;
2284
2285        pw.write(&to_write).await?;
2286        let res = pw.close().await?;
2287        assert_eq!(res.len(), 1);
2288        let data_file = res
2289            .into_iter()
2290            .next()
2291            .unwrap()
2292            .content(crate::spec::DataContentType::Data)
2293            .partition(Struct::empty())
2294            .partition_spec_id(0)
2295            .build()
2296            .unwrap();
2297
2298        // check data file
2299        assert_eq!(data_file.record_count(), 4);
2300        assert_eq!(
2301            *data_file.value_counts(),
2302            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2303        );
2304        assert_eq!(
2305            *data_file.lower_bounds(),
2306            HashMap::from([
2307                (1, Datum::int(1)),
2308                (2, Datum::float(1.0)),
2309                (6, Datum::int(1)),
2310                (7, Datum::float(1.0))
2311            ])
2312        );
2313        assert_eq!(
2314            *data_file.upper_bounds(),
2315            HashMap::from([
2316                (1, Datum::int(4)),
2317                (2, Datum::float(2.0)),
2318                (6, Datum::int(4)),
2319                (7, Datum::float(2.0))
2320            ])
2321        );
2322        assert_eq!(
2323            *data_file.null_value_counts(),
2324            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2325        );
2326        assert_eq!(
2327            *data_file.nan_value_counts(),
2328            HashMap::from([(2, 1), (7, 1)])
2329        );
2330
2331        // check the written file
2332        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2333        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2334
2335        Ok(())
2336    }
2337
2338    #[tokio::test]
2339    async fn test_write_empty_parquet_file() {
2340        let temp_dir = TempDir::new().unwrap();
2341        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2342        let location_gen = DefaultLocationGenerator::with_data_location(
2343            temp_dir.path().to_str().unwrap().to_string(),
2344        );
2345        let file_name_gen =
2346            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2347        let output_file = file_io
2348            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2349            .unwrap();
2350
2351        // write data
2352        let pw = ParquetWriterBuilder::new(
2353            WriterProperties::builder().build(),
2354            Arc::new(
2355                Schema::builder()
2356                    .with_schema_id(1)
2357                    .with_fields(vec![
2358                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2359                            .with_id(0)
2360                            .into(),
2361                    ])
2362                    .build()
2363                    .expect("Failed to create schema"),
2364            ),
2365        )
2366        .build(output_file)
2367        .await
2368        .unwrap();
2369
2370        let res = pw.close().await.unwrap();
2371        assert_eq!(res.len(), 0);
2372
2373        // Check that file should have been deleted.
2374        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2375    }
2376
2377    #[test]
2378    fn test_min_max_aggregator() {
2379        let schema = Arc::new(
2380            Schema::builder()
2381                .with_schema_id(1)
2382                .with_fields(vec![
2383                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2384                        .with_id(0)
2385                        .into(),
2386                ])
2387                .build()
2388                .expect("Failed to create schema"),
2389        );
2390        let mut min_max_agg = MinMaxColAggregator::new(schema);
2391        let create_statistics =
2392            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2393        min_max_agg
2394            .update(0, create_statistics(None, Some(42)))
2395            .unwrap();
2396        min_max_agg
2397            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2398            .unwrap();
2399        min_max_agg
2400            .update(0, create_statistics(Some(i32::MIN), None))
2401            .unwrap();
2402        min_max_agg
2403            .update(0, create_statistics(None, None))
2404            .unwrap();
2405
2406        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2407
2408        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2409        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2410    }
2411}