Skip to main content

iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::RecordBatch;
24use arrow_schema::{DataType, Field, Schema as ArrowSchema};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use itertools::Itertools;
28use parquet::arrow::AsyncArrowWriter;
29use parquet::arrow::async_reader::AsyncFileReader;
30use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
31use parquet::file::metadata::ParquetMetaData;
32use parquet::file::properties::WriterProperties;
33use parquet::file::statistics::Statistics;
34
35use super::{FileWriter, FileWriterBuilder};
36use crate::arrow::{
37    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
38    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
39};
40use crate::io::{FileIO, FileWrite, OutputFile};
41use crate::spec::{
42    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
43    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
44    StructType, TableMetadata, Type, visit_schema,
45};
46use crate::transform::create_transform_function;
47use crate::writer::{CurrentFileStatus, DataFile};
48use crate::{Error, ErrorKind, Result};
49
50/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
51#[derive(Clone, Debug)]
52pub struct ParquetWriterBuilder {
53    props: WriterProperties,
54    schema: SchemaRef,
55    match_mode: FieldMatchMode,
56    /// Optional Arrow schema to use for writing. If provided, this schema is used
57    /// directly for the Parquet writer instead of converting from the Iceberg schema.
58    /// This allows preserving Arrow extension metadata in the output.
59    arrow_schema: Option<Arc<ArrowSchema>>,
60}
61
62impl ParquetWriterBuilder {
63    /// Create a new `ParquetWriterBuilder`
64    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
65    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
66        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
67    }
68
69    /// Create a new `ParquetWriterBuilder` with custom match mode
70    pub fn new_with_match_mode(
71        props: WriterProperties,
72        schema: SchemaRef,
73        match_mode: FieldMatchMode,
74    ) -> Self {
75        Self {
76            props,
77            schema,
78            match_mode,
79            arrow_schema: None,
80        }
81    }
82
83    /// Set an Arrow schema to use for writing instead of deriving one from the Iceberg schema.
84    /// The Arrow schema's structure must match the Iceberg schema (field names, types, nullability),
85    /// but may contain additional metadata (e.g., Arrow extension types).
86    /// Returns an error if the schemas don't match structurally.
87    pub fn with_arrow_schema(mut self, arrow_schema: Arc<ArrowSchema>) -> Result<Self> {
88        validate_arrow_schema_matches_iceberg(&arrow_schema, &self.schema)?;
89        self.arrow_schema = Some(arrow_schema);
90        Ok(self)
91    }
92}
93
94impl FileWriterBuilder for ParquetWriterBuilder {
95    type R = ParquetWriter;
96
97    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
98        Ok(ParquetWriter {
99            schema: self.schema.clone(),
100            arrow_schema: self.arrow_schema.clone(),
101            inner_writer: None,
102            writer_properties: self.props.clone(),
103            current_row_num: 0,
104            output_file,
105            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
106        })
107    }
108}
109
110/// A mapping from Parquet column path names to internal field id
111struct IndexByParquetPathName {
112    name_to_id: HashMap<String, i32>,
113
114    field_names: Vec<String>,
115
116    field_id: i32,
117}
118
119impl IndexByParquetPathName {
120    /// Creates a new, empty `IndexByParquetPathName`
121    pub fn new() -> Self {
122        Self {
123            name_to_id: HashMap::new(),
124            field_names: Vec::new(),
125            field_id: 0,
126        }
127    }
128
129    /// Retrieves the internal field ID
130    pub fn get(&self, name: &str) -> Option<&i32> {
131        self.name_to_id.get(name)
132    }
133}
134
135impl Default for IndexByParquetPathName {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl SchemaVisitor for IndexByParquetPathName {
142    type T = ();
143
144    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
145        self.field_names.push(field.name.to_string());
146        self.field_id = field.id;
147        Ok(())
148    }
149
150    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
151        self.field_names.pop();
152        Ok(())
153    }
154
155    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
156        self.field_names.push(format!("list.{}", field.name));
157        self.field_id = field.id;
158        Ok(())
159    }
160
161    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
162        self.field_names.pop();
163        Ok(())
164    }
165
166    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
167        self.field_names
168            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
169        self.field_id = field.id;
170        Ok(())
171    }
172
173    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
174        self.field_names.pop();
175        Ok(())
176    }
177
178    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
179        self.field_names
180            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
181        self.field_id = field.id;
182        Ok(())
183    }
184
185    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
186        self.field_names.pop();
187        Ok(())
188    }
189
190    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
191        Ok(())
192    }
193
194    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
195        Ok(())
196    }
197
198    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
199        Ok(())
200    }
201
202    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
203        Ok(())
204    }
205
206    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
207        Ok(())
208    }
209
210    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
211        let full_name = self.field_names.iter().map(String::as_str).join(".");
212        let field_id = self.field_id;
213        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
214            return Err(Error::new(
215                ErrorKind::DataInvalid,
216                format!(
217                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
218                ),
219            ));
220        } else {
221            self.name_to_id.insert(full_name, field_id);
222        }
223
224        Ok(())
225    }
226}
227
228/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
229pub struct ParquetWriter {
230    schema: SchemaRef,
231    /// Optional Arrow schema to use for writing. If provided, this is used directly
232    /// instead of converting from the Iceberg schema, preserving any extension metadata.
233    arrow_schema: Option<Arc<ArrowSchema>>,
234    output_file: OutputFile,
235    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter>>,
236    writer_properties: WriterProperties,
237    current_row_num: usize,
238    nan_value_count_visitor: NanValueCountVisitor,
239}
240
241/// Used to aggregate min and max value of each column.
242struct MinMaxColAggregator {
243    lower_bounds: HashMap<i32, Datum>,
244    upper_bounds: HashMap<i32, Datum>,
245    schema: SchemaRef,
246}
247
248impl MinMaxColAggregator {
249    /// Creates new and empty `MinMaxColAggregator`
250    fn new(schema: SchemaRef) -> Self {
251        Self {
252            lower_bounds: HashMap::new(),
253            upper_bounds: HashMap::new(),
254            schema,
255        }
256    }
257
258    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
259        self.lower_bounds
260            .entry(field_id)
261            .and_modify(|e| {
262                if *e > datum {
263                    *e = datum.clone()
264                }
265            })
266            .or_insert(datum);
267    }
268
269    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
270        self.upper_bounds
271            .entry(field_id)
272            .and_modify(|e| {
273                if *e < datum {
274                    *e = datum.clone()
275                }
276            })
277            .or_insert(datum);
278    }
279
280    /// Update statistics
281    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
282        let Some(ty) = self
283            .schema
284            .field_by_id(field_id)
285            .map(|f| f.field_type.as_ref())
286        else {
287            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
288            // Ignore the field if it is not in schema.
289            return Ok(());
290        };
291        let Type::Primitive(ty) = ty.clone() else {
292            return Err(Error::new(
293                ErrorKind::Unexpected,
294                format!("Composed type {ty} is not supported for min max aggregation."),
295            ));
296        };
297
298        if value.min_is_exact() {
299            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
300                return Err(Error::new(
301                    ErrorKind::Unexpected,
302                    format!("Statistics {value} is not match with field type {ty}."),
303                ));
304            };
305
306            self.update_state_min(field_id, min_datum);
307        }
308
309        if value.max_is_exact() {
310            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
311                return Err(Error::new(
312                    ErrorKind::Unexpected,
313                    format!("Statistics {value} is not match with field type {ty}."),
314                ));
315            };
316
317            self.update_state_max(field_id, max_datum);
318        }
319
320        Ok(())
321    }
322
323    /// Returns lower and upper bounds
324    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
325        (self.lower_bounds, self.upper_bounds)
326    }
327}
328
329impl ParquetWriter {
330    /// Converts parquet files to data files
331    #[allow(dead_code)]
332    pub(crate) async fn parquet_files_to_data_files(
333        file_io: &FileIO,
334        file_paths: Vec<String>,
335        table_metadata: &TableMetadata,
336    ) -> Result<Vec<DataFile>> {
337        // TODO: support adding to partitioned table
338        let mut data_files: Vec<DataFile> = Vec::new();
339
340        for file_path in file_paths {
341            let input_file = file_io.new_input(&file_path)?;
342            let file_metadata = input_file.metadata().await?;
343            let file_size_in_bytes = file_metadata.size as usize;
344            let reader = input_file.reader().await?;
345
346            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
347            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
348                Error::new(
349                    ErrorKind::DataInvalid,
350                    format!("Error reading Parquet metadata: {err}"),
351                )
352            })?;
353            let mut builder = ParquetWriter::parquet_to_data_file_builder(
354                table_metadata.current_schema().clone(),
355                parquet_metadata,
356                file_size_in_bytes,
357                file_path,
358                // TODO: Implement nan_value_counts here
359                HashMap::new(),
360            )?;
361            builder.partition_spec_id(table_metadata.default_partition_spec_id());
362            let data_file = builder.build().unwrap();
363            data_files.push(data_file);
364        }
365
366        Ok(data_files)
367    }
368
369    /// `ParquetMetadata` to data file builder
370    pub(crate) fn parquet_to_data_file_builder(
371        schema: SchemaRef,
372        metadata: Arc<ParquetMetaData>,
373        written_size: usize,
374        file_path: String,
375        nan_value_counts: HashMap<i32, u64>,
376    ) -> Result<DataFileBuilder> {
377        let index_by_parquet_path = {
378            let mut visitor = IndexByParquetPathName::new();
379            visit_schema(&schema, &mut visitor)?;
380            visitor
381        };
382
383        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
384            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
385            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
386            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
387            let mut min_max_agg = MinMaxColAggregator::new(schema);
388
389            for row_group in metadata.row_groups() {
390                for column_chunk_metadata in row_group.columns() {
391                    let parquet_path = column_chunk_metadata.column_descr().path().string();
392
393                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
394                        continue;
395                    };
396
397                    *per_col_size.entry(field_id).or_insert(0) +=
398                        column_chunk_metadata.compressed_size() as u64;
399                    *per_col_val_num.entry(field_id).or_insert(0) +=
400                        column_chunk_metadata.num_values() as u64;
401
402                    if let Some(statistics) = column_chunk_metadata.statistics() {
403                        if let Some(null_count) = statistics.null_count_opt() {
404                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
405                        }
406
407                        min_max_agg.update(field_id, statistics.clone())?;
408                    }
409                }
410            }
411            (
412                per_col_size,
413                per_col_val_num,
414                per_col_null_val_num,
415                min_max_agg.produce(),
416            )
417        };
418
419        let mut builder = DataFileBuilder::default();
420        builder
421            .content(DataContentType::Data)
422            .file_path(file_path)
423            .file_format(DataFileFormat::Parquet)
424            .partition(Struct::empty())
425            .record_count(metadata.file_metadata().num_rows() as u64)
426            .file_size_in_bytes(written_size as u64)
427            .column_sizes(column_sizes)
428            .value_counts(value_counts)
429            .null_value_counts(null_value_counts)
430            .nan_value_counts(nan_value_counts)
431            // # NOTE:
432            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
433            .lower_bounds(lower_bounds)
434            .upper_bounds(upper_bounds)
435            .split_offsets(Some(
436                metadata
437                    .row_groups()
438                    .iter()
439                    .filter_map(|group| group.file_offset())
440                    .collect(),
441            ));
442
443        Ok(builder)
444    }
445
446    #[allow(dead_code)]
447    fn partition_value_from_bounds(
448        table_spec: Arc<PartitionSpec>,
449        lower_bounds: &HashMap<i32, Datum>,
450        upper_bounds: &HashMap<i32, Datum>,
451    ) -> Result<Struct> {
452        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
453
454        for field in table_spec.fields() {
455            if let (Some(lower), Some(upper)) = (
456                lower_bounds.get(&field.source_id),
457                upper_bounds.get(&field.source_id),
458            ) {
459                if !field.transform.preserves_order() {
460                    return Err(Error::new(
461                        ErrorKind::DataInvalid,
462                        format!(
463                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
464                            field.name, field.transform
465                        ),
466                    ));
467                }
468
469                if lower != upper {
470                    return Err(Error::new(
471                        ErrorKind::DataInvalid,
472                        format!(
473                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
474                            field.name, lower, upper
475                        ),
476                    ));
477                }
478
479                let transform_fn = create_transform_function(&field.transform)?;
480                let transform_literal =
481                    Literal::from(transform_fn.transform_literal_result(lower)?);
482
483                partition_literals.push(Some(transform_literal));
484            } else {
485                partition_literals.push(None);
486            }
487        }
488
489        let partition_struct = Struct::from_iter(partition_literals);
490
491        Ok(partition_struct)
492    }
493}
494
495/// Validate that an Arrow schema's structure matches an Iceberg schema.
496/// This checks field names, types, and nullability, but ignores metadata differences.
497fn validate_arrow_schema_matches_iceberg(
498    arrow_schema: &ArrowSchema,
499    iceberg_schema: &crate::spec::Schema,
500) -> Result<()> {
501    let expected: ArrowSchema = iceberg_schema.try_into()?;
502    validate_schemas_match(&expected, arrow_schema, "")
503}
504
505fn validate_schemas_match(expected: &ArrowSchema, actual: &ArrowSchema, path: &str) -> Result<()> {
506    if expected.fields().len() != actual.fields().len() {
507        return Err(Error::new(
508            ErrorKind::DataInvalid,
509            format!(
510                "Schema mismatch at '{}': expected {} fields, got {}",
511                path,
512                expected.fields().len(),
513                actual.fields().len()
514            ),
515        ));
516    }
517    for (e, a) in expected.fields().iter().zip(actual.fields().iter()) {
518        validate_fields_match(e, a, path)?;
519    }
520    Ok(())
521}
522
523fn validate_fields_match(expected: &Field, actual: &Field, parent_path: &str) -> Result<()> {
524    let path = if parent_path.is_empty() {
525        expected.name().to_string()
526    } else {
527        format!("{}.{}", parent_path, expected.name())
528    };
529
530    if expected.name() != actual.name() {
531        return Err(Error::new(
532            ErrorKind::DataInvalid,
533            format!(
534                "Field name mismatch at '{}': expected '{}', got '{}'",
535                parent_path,
536                expected.name(),
537                actual.name()
538            ),
539        ));
540    }
541
542    if expected.is_nullable() != actual.is_nullable() {
543        return Err(Error::new(
544            ErrorKind::DataInvalid,
545            format!(
546                "Nullability mismatch at '{}': expected {}, got {}",
547                path,
548                expected.is_nullable(),
549                actual.is_nullable()
550            ),
551        ));
552    }
553
554    validate_datatypes_match(expected.data_type(), actual.data_type(), &path)
555}
556
557fn validate_datatypes_match(expected: &DataType, actual: &DataType, path: &str) -> Result<()> {
558    // Compare types structurally, ignoring metadata
559    match (expected, actual) {
560        (DataType::Struct(e_fields), DataType::Struct(a_fields)) => {
561            if e_fields.len() != a_fields.len() {
562                return Err(Error::new(
563                    ErrorKind::DataInvalid,
564                    format!(
565                        "Struct field count mismatch at '{}': expected {}, got {}",
566                        path,
567                        e_fields.len(),
568                        a_fields.len()
569                    ),
570                ));
571            }
572            for (e, a) in e_fields.iter().zip(a_fields.iter()) {
573                validate_fields_match(e, a, path)?;
574            }
575            Ok(())
576        }
577        (DataType::List(e_inner), DataType::List(a_inner))
578        | (DataType::LargeList(e_inner), DataType::LargeList(a_inner)) => {
579            validate_fields_match(e_inner, a_inner, path)
580        }
581        (DataType::Map(e_entries, _), DataType::Map(a_entries, _)) => {
582            validate_fields_match(e_entries, a_entries, path)
583        }
584        _ => {
585            // For primitive types, use direct comparison (metadata not relevant)
586            if std::mem::discriminant(expected) != std::mem::discriminant(actual) {
587                return Err(Error::new(
588                    ErrorKind::DataInvalid,
589                    format!(
590                        "Type mismatch at '{}': expected {:?}, got {:?}",
591                        path, expected, actual
592                    ),
593                ));
594            }
595            Ok(())
596        }
597    }
598}
599
600impl FileWriter for ParquetWriter {
601    async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
602        // Skip empty batch
603        if batch.num_rows() == 0 {
604            return Ok(());
605        }
606
607        self.current_row_num += batch.num_rows();
608
609        self.nan_value_count_visitor
610            .compute(self.schema.clone(), batch.clone())?;
611
612        let writer = if let Some(writer) = &mut self.inner_writer {
613            writer
614        } else {
615            // Use the provided Arrow schema if available, otherwise derive from Iceberg schema.
616            // Using a provided Arrow schema allows preserving extension metadata in the output.
617            let arrow_schema: Arc<ArrowSchema> = match &self.arrow_schema {
618                Some(schema) => Arc::clone(schema),
619                None => Arc::new(self.schema.as_ref().try_into()?),
620            };
621            let inner_writer = self.output_file.writer().await?;
622            let async_writer = AsyncFileWriter::new(inner_writer);
623            let writer = AsyncArrowWriter::try_new(
624                async_writer,
625                arrow_schema,
626                Some(self.writer_properties.clone()),
627            )
628            .map_err(|err| {
629                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
630                    .with_source(err)
631            })?;
632            self.inner_writer = Some(writer);
633            self.inner_writer.as_mut().unwrap()
634        };
635
636        writer.write(batch).await.map_err(|err| {
637            Error::new(
638                ErrorKind::Unexpected,
639                "Failed to write using parquet writer.",
640            )
641            .with_source(err)
642        })?;
643
644        Ok(())
645    }
646
647    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
648        let mut writer = match self.inner_writer.take() {
649            Some(writer) => writer,
650            None => return Ok(vec![]),
651        };
652
653        let metadata = writer.finish().await.map_err(|err| {
654            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
655        })?;
656
657        let written_size = writer.bytes_written();
658
659        if self.current_row_num == 0 {
660            self.output_file.delete().await.map_err(|err| {
661                Error::new(
662                    ErrorKind::Unexpected,
663                    "Failed to delete empty parquet file.",
664                )
665                .with_source(err)
666            })?;
667            Ok(vec![])
668        } else {
669            let parquet_metadata = Arc::new(metadata);
670
671            Ok(vec![Self::parquet_to_data_file_builder(
672                self.schema,
673                parquet_metadata,
674                written_size,
675                self.output_file.location().to_string(),
676                self.nan_value_count_visitor.nan_value_counts,
677            )?])
678        }
679    }
680}
681
682impl CurrentFileStatus for ParquetWriter {
683    fn current_file_path(&self) -> String {
684        self.output_file.location().to_string()
685    }
686
687    fn current_row_num(&self) -> usize {
688        self.current_row_num
689    }
690
691    fn current_written_size(&self) -> usize {
692        if let Some(inner) = self.inner_writer.as_ref() {
693            // inner/AsyncArrowWriter contains sync and async writers
694            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
695            inner.bytes_written() + inner.in_progress_size()
696        } else {
697            // inner writer is not initialized yet
698            0
699        }
700    }
701
702    fn current_schema(&self) -> SchemaRef {
703        self.schema.clone()
704    }
705}
706
707/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
708///
709/// # NOTES
710///
711/// We keep this wrapper been used inside only.
712struct AsyncFileWriter(Box<dyn FileWrite>);
713
714impl AsyncFileWriter {
715    /// Create a new `AsyncFileWriter` with the given writer.
716    pub fn new(writer: Box<dyn FileWrite>) -> Self {
717        Self(writer)
718    }
719}
720
721impl ArrowAsyncFileWriter for AsyncFileWriter {
722    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
723        Box::pin(async {
724            self.0
725                .write(bs)
726                .await
727                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
728        })
729    }
730
731    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
732        Box::pin(async {
733            self.0
734                .close()
735                .await
736                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
737        })
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use std::collections::HashMap;
744    use std::sync::Arc;
745
746    use anyhow::Result;
747    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
748    use arrow_array::types::{Float32Type, Int64Type};
749    use arrow_array::{
750        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
751        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
752    };
753    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
754    use arrow_select::concat::concat_batches;
755    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
756    use parquet::file::statistics::ValueStatistics;
757    use tempfile::TempDir;
758    use uuid::Uuid;
759
760    use super::*;
761    use crate::arrow::schema_to_arrow_schema;
762    use crate::io::FileIO;
763    use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale};
764    use crate::spec::{PrimitiveLiteral, Struct, *};
765    use crate::writer::file_writer::location_generator::{
766        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
767    };
768    use crate::writer::tests::check_parquet_data_file;
769
770    fn schema_for_all_type() -> Schema {
771        Schema::builder()
772            .with_schema_id(1)
773            .with_fields(vec![
774                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
775                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
776                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
777                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
778                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
779                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
780                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
781                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
782                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
783                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
784                    .into(),
785                NestedField::optional(
786                    10,
787                    "timestamptz",
788                    Type::Primitive(PrimitiveType::Timestamptz),
789                )
790                .into(),
791                NestedField::optional(
792                    11,
793                    "timestamp_ns",
794                    Type::Primitive(PrimitiveType::TimestampNs),
795                )
796                .into(),
797                NestedField::optional(
798                    12,
799                    "timestamptz_ns",
800                    Type::Primitive(PrimitiveType::TimestamptzNs),
801                )
802                .into(),
803                NestedField::optional(
804                    13,
805                    "decimal",
806                    Type::Primitive(PrimitiveType::Decimal {
807                        precision: 10,
808                        scale: 5,
809                    }),
810                )
811                .into(),
812                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
813                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
814                    .into(),
815                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
816                // so we need to add a new field for it.
817                NestedField::optional(
818                    16,
819                    "decimal_38",
820                    Type::Primitive(PrimitiveType::Decimal {
821                        precision: 38,
822                        scale: 5,
823                    }),
824                )
825                .into(),
826            ])
827            .build()
828            .unwrap()
829    }
830
831    fn nested_schema_for_test() -> Schema {
832        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
833        Schema::builder()
834            .with_schema_id(1)
835            .with_fields(vec![
836                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
837                NestedField::required(
838                    1,
839                    "col1",
840                    Type::Struct(StructType::new(vec![
841                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
842                            .into(),
843                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
844                            .into(),
845                    ])),
846                )
847                .into(),
848                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
849                NestedField::required(
850                    3,
851                    "col3",
852                    Type::List(ListType::new(
853                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
854                            .into(),
855                    )),
856                )
857                .into(),
858                NestedField::required(
859                    4,
860                    "col4",
861                    Type::Struct(StructType::new(vec![
862                        NestedField::required(
863                            8,
864                            "col_4_8",
865                            Type::Struct(StructType::new(vec![
866                                NestedField::required(
867                                    9,
868                                    "col_4_8_9",
869                                    Type::Primitive(PrimitiveType::Long),
870                                )
871                                .into(),
872                            ])),
873                        )
874                        .into(),
875                    ])),
876                )
877                .into(),
878                NestedField::required(
879                    10,
880                    "col5",
881                    Type::Map(MapType::new(
882                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
883                            .into(),
884                        NestedField::required(
885                            12,
886                            "value",
887                            Type::List(ListType::new(
888                                NestedField::required(
889                                    13,
890                                    "item",
891                                    Type::Primitive(PrimitiveType::Long),
892                                )
893                                .into(),
894                            )),
895                        )
896                        .into(),
897                    )),
898                )
899                .into(),
900            ])
901            .build()
902            .unwrap()
903    }
904
905    #[tokio::test]
906    async fn test_index_by_parquet_path() {
907        let expect = HashMap::from([
908            ("col0".to_string(), 0),
909            ("col1.col_1_5".to_string(), 5),
910            ("col1.col_1_6".to_string(), 6),
911            ("col2".to_string(), 2),
912            ("col3.list.element".to_string(), 7),
913            ("col4.col_4_8.col_4_8_9".to_string(), 9),
914            ("col5.key_value.key".to_string(), 11),
915            ("col5.key_value.value.list.item".to_string(), 13),
916        ]);
917        let mut visitor = IndexByParquetPathName::new();
918        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
919        assert_eq!(visitor.name_to_id, expect);
920    }
921
922    #[tokio::test]
923    async fn test_parquet_writer() -> Result<()> {
924        let temp_dir = TempDir::new().unwrap();
925        let file_io = FileIO::new_with_fs();
926        let location_gen = DefaultLocationGenerator::with_data_location(
927            temp_dir.path().to_str().unwrap().to_string(),
928        );
929        let file_name_gen =
930            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
931
932        // prepare data
933        let schema = {
934            let fields =
935                vec![
936                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
937                        PARQUET_FIELD_ID_META_KEY.to_string(),
938                        "0".to_string(),
939                    )])),
940                ];
941            Arc::new(arrow_schema::Schema::new(fields))
942        };
943        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
944        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
945        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
946        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
947
948        let output_file = file_io.new_output(
949            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
950        )?;
951
952        // write data
953        let mut pw = ParquetWriterBuilder::new(
954            WriterProperties::builder()
955                .set_max_row_group_size(128)
956                .build(),
957            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
958        )
959        .build(output_file)
960        .await?;
961        pw.write(&to_write).await?;
962        pw.write(&to_write_null).await?;
963        let res = pw.close().await?;
964        assert_eq!(res.len(), 1);
965        let data_file = res
966            .into_iter()
967            .next()
968            .unwrap()
969            // Put dummy field for build successfully.
970            .content(DataContentType::Data)
971            .partition(Struct::empty())
972            .partition_spec_id(0)
973            .build()
974            .unwrap();
975
976        // check data file
977        assert_eq!(data_file.record_count(), 2048);
978        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
979        assert_eq!(
980            *data_file.lower_bounds(),
981            HashMap::from([(0, Datum::long(0))])
982        );
983        assert_eq!(
984            *data_file.upper_bounds(),
985            HashMap::from([(0, Datum::long(1023))])
986        );
987        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
988
989        // check the written file
990        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
991        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
992
993        Ok(())
994    }
995
996    #[tokio::test]
997    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
998        let temp_dir = TempDir::new().unwrap();
999        let file_io = FileIO::new_with_fs();
1000        let location_gen = DefaultLocationGenerator::with_data_location(
1001            temp_dir.path().to_str().unwrap().to_string(),
1002        );
1003        let file_name_gen =
1004            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1005
1006        // prepare data
1007        let schema = nested_schema_for_test();
1008        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1009        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1010        let col1 = Arc::new(StructArray::new(
1011            {
1012                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1013                    fields.clone()
1014                } else {
1015                    unreachable!()
1016                }
1017            },
1018            vec![
1019                Arc::new(Int64Array::from_iter_values(0..1024)),
1020                Arc::new(Int64Array::from_iter_values(0..1024)),
1021            ],
1022            None,
1023        ));
1024        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
1025            (0..1024).map(|n| n.to_string()),
1026        )) as ArrayRef;
1027        let col3 = Arc::new({
1028            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
1029                (0..1024).map(|n| Some(vec![Some(n)])),
1030            )
1031            .into_parts();
1032            arrow_array::ListArray::new(
1033                {
1034                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
1035                        field.clone()
1036                    } else {
1037                        unreachable!()
1038                    }
1039                },
1040                list_parts.1,
1041                list_parts.2,
1042                list_parts.3,
1043            )
1044        }) as ArrayRef;
1045        let col4 = Arc::new(StructArray::new(
1046            {
1047                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1048                    fields.clone()
1049                } else {
1050                    unreachable!()
1051                }
1052            },
1053            vec![Arc::new(StructArray::new(
1054                {
1055                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1056                        if let DataType::Struct(fields) = fields[0].data_type() {
1057                            fields.clone()
1058                        } else {
1059                            unreachable!()
1060                        }
1061                    } else {
1062                        unreachable!()
1063                    }
1064                },
1065                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
1066                None,
1067            ))],
1068            None,
1069        ));
1070        let col5 = Arc::new({
1071            let mut map_array_builder = MapBuilder::new(
1072                None,
1073                arrow_array::builder::StringBuilder::new(),
1074                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
1075                    Int64Type,
1076                >::new()),
1077            );
1078            for i in 0..1024 {
1079                map_array_builder.keys().append_value(i.to_string());
1080                map_array_builder
1081                    .values()
1082                    .append_value(vec![Some(i as i64); i + 1]);
1083                map_array_builder.append(true)?;
1084            }
1085            let (_, offset_buffer, struct_array, null_buffer, ordered) =
1086                map_array_builder.finish().into_parts();
1087            let struct_array = {
1088                let (_, mut arrays, nulls) = struct_array.into_parts();
1089                let list_array = {
1090                    let list_array = arrays[1]
1091                        .as_any()
1092                        .downcast_ref::<ListArray>()
1093                        .unwrap()
1094                        .clone();
1095                    let (_, offsets, array, nulls) = list_array.into_parts();
1096                    let list_field = {
1097                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1098                            if let DataType::Struct(fields) = map_field.data_type() {
1099                                if let DataType::List(list_field) = fields[1].data_type() {
1100                                    list_field.clone()
1101                                } else {
1102                                    unreachable!()
1103                                }
1104                            } else {
1105                                unreachable!()
1106                            }
1107                        } else {
1108                            unreachable!()
1109                        }
1110                    };
1111                    ListArray::new(list_field, offsets, array, nulls)
1112                };
1113                arrays[1] = Arc::new(list_array) as ArrayRef;
1114                StructArray::new(
1115                    {
1116                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1117                            if let DataType::Struct(fields) = map_field.data_type() {
1118                                fields.clone()
1119                            } else {
1120                                unreachable!()
1121                            }
1122                        } else {
1123                            unreachable!()
1124                        }
1125                    },
1126                    arrays,
1127                    nulls,
1128                )
1129            };
1130            arrow_array::MapArray::new(
1131                {
1132                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1133                        map_field.clone()
1134                    } else {
1135                        unreachable!()
1136                    }
1137                },
1138                offset_buffer,
1139                struct_array,
1140                null_buffer,
1141                ordered,
1142            )
1143        }) as ArrayRef;
1144        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1145            col0, col1, col2, col3, col4, col5,
1146        ])
1147        .unwrap();
1148        let output_file = file_io.new_output(
1149            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1150        )?;
1151
1152        // write data
1153        let mut pw =
1154            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1155                .build(output_file)
1156                .await?;
1157        pw.write(&to_write).await?;
1158        let res = pw.close().await?;
1159        assert_eq!(res.len(), 1);
1160        let data_file = res
1161            .into_iter()
1162            .next()
1163            .unwrap()
1164            // Put dummy field for build successfully.
1165            .content(crate::spec::DataContentType::Data)
1166            .partition(Struct::empty())
1167            .partition_spec_id(0)
1168            .build()
1169            .unwrap();
1170
1171        // check data file
1172        assert_eq!(data_file.record_count(), 1024);
1173        assert_eq!(
1174            *data_file.value_counts(),
1175            HashMap::from([
1176                (0, 1024),
1177                (5, 1024),
1178                (6, 1024),
1179                (2, 1024),
1180                (7, 1024),
1181                (9, 1024),
1182                (11, 1024),
1183                (13, (1..1025).sum()),
1184            ])
1185        );
1186        assert_eq!(
1187            *data_file.lower_bounds(),
1188            HashMap::from([
1189                (0, Datum::long(0)),
1190                (5, Datum::long(0)),
1191                (6, Datum::long(0)),
1192                (2, Datum::string("0")),
1193                (7, Datum::long(0)),
1194                (9, Datum::long(0)),
1195                (11, Datum::string("0")),
1196                (13, Datum::long(0))
1197            ])
1198        );
1199        assert_eq!(
1200            *data_file.upper_bounds(),
1201            HashMap::from([
1202                (0, Datum::long(1023)),
1203                (5, Datum::long(1023)),
1204                (6, Datum::long(1023)),
1205                (2, Datum::string("999")),
1206                (7, Datum::long(1023)),
1207                (9, Datum::long(1023)),
1208                (11, Datum::string("999")),
1209                (13, Datum::long(1023))
1210            ])
1211        );
1212
1213        // check the written file
1214        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1215
1216        Ok(())
1217    }
1218
1219    #[tokio::test]
1220    async fn test_all_type_for_write() -> Result<()> {
1221        let temp_dir = TempDir::new().unwrap();
1222        let file_io = FileIO::new_with_fs();
1223        let location_gen = DefaultLocationGenerator::with_data_location(
1224            temp_dir.path().to_str().unwrap().to_string(),
1225        );
1226        let file_name_gen =
1227            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1228
1229        // prepare data
1230        // generate iceberg schema for all type
1231        let schema = schema_for_all_type();
1232        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1233        let col0 = Arc::new(BooleanArray::from(vec![
1234            Some(true),
1235            Some(false),
1236            None,
1237            Some(true),
1238        ])) as ArrayRef;
1239        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1240        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1241        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1242            Some(0.5),
1243            Some(2.0),
1244            None,
1245            Some(3.5),
1246        ])) as ArrayRef;
1247        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1248            Some(0.5),
1249            Some(2.0),
1250            None,
1251            Some(3.5),
1252        ])) as ArrayRef;
1253        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1254            Some("a"),
1255            Some("b"),
1256            None,
1257            Some("d"),
1258        ])) as ArrayRef;
1259        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1260            Some(b"one"),
1261            None,
1262            Some(b""),
1263            Some(b"zzzz"),
1264        ])) as ArrayRef;
1265        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1266            Some(0),
1267            Some(1),
1268            None,
1269            Some(3),
1270        ])) as ArrayRef;
1271        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1272            Some(0),
1273            Some(1),
1274            None,
1275            Some(3),
1276        ])) as ArrayRef;
1277        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1278            Some(0),
1279            Some(1),
1280            None,
1281            Some(3),
1282        ])) as ArrayRef;
1283        let col10 = Arc::new(
1284            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1285                .with_timezone_utc(),
1286        ) as ArrayRef;
1287        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1288            Some(0),
1289            Some(1),
1290            None,
1291            Some(3),
1292        ])) as ArrayRef;
1293        let col12 = Arc::new(
1294            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1295                .with_timezone_utc(),
1296        ) as ArrayRef;
1297        let col13 = Arc::new(
1298            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1299                .with_precision_and_scale(10, 5)
1300                .unwrap(),
1301        ) as ArrayRef;
1302        let col14 = Arc::new(
1303            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1304                vec![
1305                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1306                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1307                    None,
1308                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1309                ]
1310                .into_iter(),
1311                16,
1312            )
1313            .unwrap(),
1314        ) as ArrayRef;
1315        let col15 = Arc::new(
1316            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1317                vec![
1318                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1319                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1320                    None,
1321                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1322                ]
1323                .into_iter(),
1324                10,
1325            )
1326            .unwrap(),
1327        ) as ArrayRef;
1328        let col16 = Arc::new(
1329            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1330                .with_precision_and_scale(38, 5)
1331                .unwrap(),
1332        ) as ArrayRef;
1333        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1334            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1335            col14, col15, col16,
1336        ])
1337        .unwrap();
1338        let output_file = file_io.new_output(
1339            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1340        )?;
1341
1342        // write data
1343        let mut pw =
1344            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1345                .build(output_file)
1346                .await?;
1347        pw.write(&to_write).await?;
1348        let res = pw.close().await?;
1349        assert_eq!(res.len(), 1);
1350        let data_file = res
1351            .into_iter()
1352            .next()
1353            .unwrap()
1354            // Put dummy field for build successfully.
1355            .content(crate::spec::DataContentType::Data)
1356            .partition(Struct::empty())
1357            .partition_spec_id(0)
1358            .build()
1359            .unwrap();
1360
1361        // check data file
1362        assert_eq!(data_file.record_count(), 4);
1363        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1364        assert!(
1365            data_file
1366                .null_value_counts()
1367                .iter()
1368                .all(|(_, &v)| { v == 1 })
1369        );
1370        assert_eq!(
1371            *data_file.lower_bounds(),
1372            HashMap::from([
1373                (0, Datum::bool(false)),
1374                (1, Datum::int(1)),
1375                (2, Datum::long(1)),
1376                (3, Datum::float(0.5)),
1377                (4, Datum::double(0.5)),
1378                (5, Datum::string("a")),
1379                (6, Datum::binary(vec![])),
1380                (7, Datum::date(0)),
1381                (8, Datum::time_micros(0).unwrap()),
1382                (9, Datum::timestamp_micros(0)),
1383                (10, Datum::timestamptz_micros(0)),
1384                (11, Datum::timestamp_nanos(0)),
1385                (12, Datum::timestamptz_nanos(0)),
1386                (
1387                    13,
1388                    Datum::new(
1389                        PrimitiveType::Decimal {
1390                            precision: 10,
1391                            scale: 5
1392                        },
1393                        PrimitiveLiteral::Int128(1)
1394                    )
1395                ),
1396                (14, Datum::uuid(Uuid::from_u128(0))),
1397                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1398                (
1399                    16,
1400                    Datum::new(
1401                        PrimitiveType::Decimal {
1402                            precision: 38,
1403                            scale: 5
1404                        },
1405                        PrimitiveLiteral::Int128(1)
1406                    )
1407                ),
1408            ])
1409        );
1410        assert_eq!(
1411            *data_file.upper_bounds(),
1412            HashMap::from([
1413                (0, Datum::bool(true)),
1414                (1, Datum::int(4)),
1415                (2, Datum::long(4)),
1416                (3, Datum::float(3.5)),
1417                (4, Datum::double(3.5)),
1418                (5, Datum::string("d")),
1419                (6, Datum::binary(vec![122, 122, 122, 122])),
1420                (7, Datum::date(3)),
1421                (8, Datum::time_micros(3).unwrap()),
1422                (9, Datum::timestamp_micros(3)),
1423                (10, Datum::timestamptz_micros(3)),
1424                (11, Datum::timestamp_nanos(3)),
1425                (12, Datum::timestamptz_nanos(3)),
1426                (
1427                    13,
1428                    Datum::new(
1429                        PrimitiveType::Decimal {
1430                            precision: 10,
1431                            scale: 5
1432                        },
1433                        PrimitiveLiteral::Int128(100)
1434                    )
1435                ),
1436                (14, Datum::uuid(Uuid::from_u128(3))),
1437                (
1438                    15,
1439                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1440                ),
1441                (
1442                    16,
1443                    Datum::new(
1444                        PrimitiveType::Decimal {
1445                            precision: 38,
1446                            scale: 5
1447                        },
1448                        PrimitiveLiteral::Int128(100)
1449                    )
1450                ),
1451            ])
1452        );
1453
1454        // check the written file
1455        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1456
1457        Ok(())
1458    }
1459
1460    #[tokio::test]
1461    async fn test_decimal_bound() -> Result<()> {
1462        let temp_dir = TempDir::new().unwrap();
1463        let file_io = FileIO::new_with_fs();
1464        let location_gen = DefaultLocationGenerator::with_data_location(
1465            temp_dir.path().to_str().unwrap().to_string(),
1466        );
1467        let file_name_gen =
1468            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1469
1470        // test 1.1 and 2.2
1471        let schema = Arc::new(
1472            Schema::builder()
1473                .with_fields(vec![
1474                    NestedField::optional(
1475                        0,
1476                        "decimal",
1477                        Type::Primitive(PrimitiveType::Decimal {
1478                            precision: 28,
1479                            scale: 10,
1480                        }),
1481                    )
1482                    .into(),
1483                ])
1484                .build()
1485                .unwrap(),
1486        );
1487        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1488        let output_file = file_io.new_output(
1489            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1490        )?;
1491        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1492            .build(output_file)
1493            .await?;
1494        let col0 = Arc::new(
1495            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1496                .with_data_type(DataType::Decimal128(28, 10)),
1497        ) as ArrayRef;
1498        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1499        pw.write(&to_write).await?;
1500        let res = pw.close().await?;
1501        assert_eq!(res.len(), 1);
1502        let data_file = res
1503            .into_iter()
1504            .next()
1505            .unwrap()
1506            .content(crate::spec::DataContentType::Data)
1507            .partition(Struct::empty())
1508            .partition_spec_id(0)
1509            .build()
1510            .unwrap();
1511        assert_eq!(
1512            data_file.upper_bounds().get(&0),
1513            Some(Datum::decimal_with_precision(decimal_new(22000000000_i64, 10), 28).unwrap())
1514                .as_ref()
1515        );
1516        assert_eq!(
1517            data_file.lower_bounds().get(&0),
1518            Some(Datum::decimal_with_precision(decimal_new(11000000000_i64, 10), 28).unwrap())
1519                .as_ref()
1520        );
1521
1522        // test -1.1 and -2.2
1523        let schema = Arc::new(
1524            Schema::builder()
1525                .with_fields(vec![
1526                    NestedField::optional(
1527                        0,
1528                        "decimal",
1529                        Type::Primitive(PrimitiveType::Decimal {
1530                            precision: 28,
1531                            scale: 10,
1532                        }),
1533                    )
1534                    .into(),
1535                ])
1536                .build()
1537                .unwrap(),
1538        );
1539        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1540        let output_file = file_io.new_output(
1541            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1542        )?;
1543        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1544            .build(output_file)
1545            .await?;
1546        let col0 = Arc::new(
1547            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1548                .with_data_type(DataType::Decimal128(28, 10)),
1549        ) as ArrayRef;
1550        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1551        pw.write(&to_write).await?;
1552        let res = pw.close().await?;
1553        assert_eq!(res.len(), 1);
1554        let data_file = res
1555            .into_iter()
1556            .next()
1557            .unwrap()
1558            .content(crate::spec::DataContentType::Data)
1559            .partition(Struct::empty())
1560            .partition_spec_id(0)
1561            .build()
1562            .unwrap();
1563        assert_eq!(
1564            data_file.upper_bounds().get(&0),
1565            Some(Datum::decimal_with_precision(decimal_new(-11000000000_i64, 10), 28).unwrap())
1566                .as_ref()
1567        );
1568        assert_eq!(
1569            data_file.lower_bounds().get(&0),
1570            Some(Datum::decimal_with_precision(decimal_new(-22000000000_i64, 10), 28).unwrap())
1571                .as_ref()
1572        );
1573
1574        // test 38-digit precision decimal values (Iceberg spec max)
1575        // Note: fastnum D128::MAX/MIN have impractical exponents, so we use meaningful values
1576        use crate::spec::decimal_utils::decimal_from_str_exact;
1577        let decimal_max = decimal_from_str_exact("99999999999999999999999999999999999999").unwrap();
1578        let decimal_min =
1579            decimal_from_str_exact("-99999999999999999999999999999999999999").unwrap();
1580        assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min));
1581        let schema = Arc::new(
1582            Schema::builder()
1583                .with_fields(vec![
1584                    NestedField::optional(
1585                        0,
1586                        "decimal",
1587                        Type::Primitive(PrimitiveType::Decimal {
1588                            precision: 38,
1589                            scale: decimal_scale(&decimal_max),
1590                        }),
1591                    )
1592                    .into(),
1593                ])
1594                .build()
1595                .unwrap(),
1596        );
1597        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1598        let output_file = file_io.new_output(
1599            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1600        )?;
1601        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1602            .build(output_file)
1603            .await?;
1604        let col0 = Arc::new(
1605            Decimal128Array::from(vec![
1606                Some(decimal_mantissa(&decimal_max)),
1607                Some(decimal_mantissa(&decimal_min)),
1608            ])
1609            .with_data_type(DataType::Decimal128(38, 0)),
1610        ) as ArrayRef;
1611        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1612        pw.write(&to_write).await?;
1613        let res = pw.close().await?;
1614        assert_eq!(res.len(), 1);
1615        let data_file = res
1616            .into_iter()
1617            .next()
1618            .unwrap()
1619            .content(crate::spec::DataContentType::Data)
1620            .partition(Struct::empty())
1621            .partition_spec_id(0)
1622            .build()
1623            .unwrap();
1624        assert_eq!(
1625            data_file.upper_bounds().get(&0),
1626            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1627        );
1628        assert_eq!(
1629            data_file.lower_bounds().get(&0),
1630            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1631        );
1632
1633        // test max and min for scale 38
1634        // # TODO
1635        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1636        // let schema = Arc::new(
1637        //     Schema::builder()
1638        //         .with_fields(vec![NestedField::optional(
1639        //             0,
1640        //             "decimal",
1641        //             Type::Primitive(PrimitiveType::Decimal {
1642        //                 precision: 38,
1643        //                 scale: 0,
1644        //             }),
1645        //         )
1646        //         .into()])
1647        //         .build()
1648        //         .unwrap(),
1649        // );
1650        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1651        // let mut pw = ParquetWriterBuilder::new(
1652        //     WriterProperties::builder().build(),
1653        //     schema,
1654        //     file_io.clone(),
1655        //     loccation_gen,
1656        //     file_name_gen,
1657        // )
1658        // .build()
1659        // .await?;
1660        // let col0 = Arc::new(
1661        //     Decimal128Array::from(vec![
1662        //         Some(99999999999999999999999999999999999999_i128),
1663        //         Some(-99999999999999999999999999999999999999_i128),
1664        //     ])
1665        //     .with_data_type(DataType::Decimal128(38, 0)),
1666        // ) as ArrayRef;
1667        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1668        // pw.write(&to_write).await?;
1669        // let res = pw.close().await?;
1670        // assert_eq!(res.len(), 1);
1671        // let data_file = res
1672        //     .into_iter()
1673        //     .next()
1674        //     .unwrap()
1675        //     .content(crate::spec::DataContentType::Data)
1676        //     .partition(Struct::empty())
1677        //     .build()
1678        //     .unwrap();
1679        // assert_eq!(
1680        //     data_file.upper_bounds().get(&0),
1681        //     Some(Datum::new(
1682        //         PrimitiveType::Decimal {
1683        //             precision: 38,
1684        //             scale: 0
1685        //         },
1686        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1687        //     ))
1688        //     .as_ref()
1689        // );
1690        // assert_eq!(
1691        //     data_file.lower_bounds().get(&0),
1692        //     Some(Datum::new(
1693        //         PrimitiveType::Decimal {
1694        //             precision: 38,
1695        //             scale: 0
1696        //         },
1697        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1698        //     ))
1699        //     .as_ref()
1700        // );
1701
1702        Ok(())
1703    }
1704
1705    #[tokio::test]
1706    async fn test_empty_write() -> Result<()> {
1707        let temp_dir = TempDir::new().unwrap();
1708        let file_io = FileIO::new_with_fs();
1709        let location_gen = DefaultLocationGenerator::with_data_location(
1710            temp_dir.path().to_str().unwrap().to_string(),
1711        );
1712        let file_name_gen =
1713            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1714
1715        // Test that file will create if data to write
1716        let schema = {
1717            let fields = vec![
1718                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1719                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1720                ),
1721            ];
1722            Arc::new(arrow_schema::Schema::new(fields))
1723        };
1724        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1725        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1726        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1727        let output_file = file_io.new_output(&file_path)?;
1728        let mut pw = ParquetWriterBuilder::new(
1729            WriterProperties::builder().build(),
1730            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1731        )
1732        .build(output_file)
1733        .await?;
1734        pw.write(&to_write).await?;
1735        pw.close().await.unwrap();
1736        assert!(file_io.exists(&file_path).await.unwrap());
1737
1738        // Test that file will not create if no data to write
1739        let file_name_gen =
1740            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1741        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1742        let output_file = file_io.new_output(&file_path)?;
1743        let pw = ParquetWriterBuilder::new(
1744            WriterProperties::builder().build(),
1745            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1746        )
1747        .build(output_file)
1748        .await?;
1749        pw.close().await.unwrap();
1750        assert!(!file_io.exists(&file_path).await.unwrap());
1751
1752        Ok(())
1753    }
1754
1755    #[tokio::test]
1756    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1757        let temp_dir = TempDir::new().unwrap();
1758        let file_io = FileIO::new_with_fs();
1759        let location_gen = DefaultLocationGenerator::with_data_location(
1760            temp_dir.path().to_str().unwrap().to_string(),
1761        );
1762        let file_name_gen =
1763            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1764
1765        // prepare data
1766        let arrow_schema = {
1767            let fields = vec![
1768                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1769                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1770                ),
1771                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1772                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1773                ),
1774            ];
1775            Arc::new(arrow_schema::Schema::new(fields))
1776        };
1777
1778        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1779            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1780            None,
1781        )) as ArrayRef;
1782
1783        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1784            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1785            None,
1786        )) as ArrayRef;
1787
1788        let to_write =
1789            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1790        let output_file = file_io.new_output(
1791            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1792        )?;
1793
1794        // write data
1795        let mut pw = ParquetWriterBuilder::new(
1796            WriterProperties::builder().build(),
1797            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1798        )
1799        .build(output_file)
1800        .await?;
1801
1802        pw.write(&to_write).await?;
1803        let res = pw.close().await?;
1804        assert_eq!(res.len(), 1);
1805        let data_file = res
1806            .into_iter()
1807            .next()
1808            .unwrap()
1809            // Put dummy field for build successfully.
1810            .content(crate::spec::DataContentType::Data)
1811            .partition(Struct::empty())
1812            .partition_spec_id(0)
1813            .build()
1814            .unwrap();
1815
1816        // check data file
1817        assert_eq!(data_file.record_count(), 4);
1818        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1819        assert_eq!(
1820            *data_file.lower_bounds(),
1821            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1822        );
1823        assert_eq!(
1824            *data_file.upper_bounds(),
1825            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1826        );
1827        assert_eq!(
1828            *data_file.null_value_counts(),
1829            HashMap::from([(0, 0), (1, 0)])
1830        );
1831        assert_eq!(
1832            *data_file.nan_value_counts(),
1833            HashMap::from([(0, 1), (1, 1)])
1834        );
1835
1836        // check the written file
1837        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1838        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1839
1840        Ok(())
1841    }
1842
1843    #[tokio::test]
1844    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1845        let temp_dir = TempDir::new().unwrap();
1846        let file_io = FileIO::new_with_fs();
1847        let location_gen = DefaultLocationGenerator::with_data_location(
1848            temp_dir.path().to_str().unwrap().to_string(),
1849        );
1850        let file_name_gen =
1851            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1852
1853        let schema_struct_float_fields = Fields::from(vec![
1854            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1855                PARQUET_FIELD_ID_META_KEY.to_string(),
1856                "4".to_string(),
1857            )])),
1858        ]);
1859
1860        let schema_struct_nested_float_fields = Fields::from(vec![
1861            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1862                PARQUET_FIELD_ID_META_KEY.to_string(),
1863                "7".to_string(),
1864            )])),
1865        ]);
1866
1867        let schema_struct_nested_fields = Fields::from(vec![
1868            Field::new(
1869                "col6",
1870                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1871                false,
1872            )
1873            .with_metadata(HashMap::from([(
1874                PARQUET_FIELD_ID_META_KEY.to_string(),
1875                "6".to_string(),
1876            )])),
1877        ]);
1878
1879        // prepare data
1880        let arrow_schema = {
1881            let fields = vec![
1882                Field::new(
1883                    "col3",
1884                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1885                    false,
1886                )
1887                .with_metadata(HashMap::from([(
1888                    PARQUET_FIELD_ID_META_KEY.to_string(),
1889                    "3".to_string(),
1890                )])),
1891                Field::new(
1892                    "col5",
1893                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1894                    false,
1895                )
1896                .with_metadata(HashMap::from([(
1897                    PARQUET_FIELD_ID_META_KEY.to_string(),
1898                    "5".to_string(),
1899                )])),
1900            ];
1901            Arc::new(arrow_schema::Schema::new(fields))
1902        };
1903
1904        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1905            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1906            None,
1907        )) as ArrayRef;
1908
1909        let struct_float_field_col = Arc::new(StructArray::new(
1910            schema_struct_float_fields,
1911            vec![float_32_col.clone()],
1912            None,
1913        )) as ArrayRef;
1914
1915        let struct_nested_float_field_col = Arc::new(StructArray::new(
1916            schema_struct_nested_fields,
1917            vec![Arc::new(StructArray::new(
1918                schema_struct_nested_float_fields,
1919                vec![float_32_col.clone()],
1920                None,
1921            )) as ArrayRef],
1922            None,
1923        )) as ArrayRef;
1924
1925        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1926            struct_float_field_col,
1927            struct_nested_float_field_col,
1928        ])
1929        .unwrap();
1930        let output_file = file_io.new_output(
1931            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1932        )?;
1933
1934        // write data
1935        let mut pw = ParquetWriterBuilder::new(
1936            WriterProperties::builder().build(),
1937            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1938        )
1939        .build(output_file)
1940        .await?;
1941
1942        pw.write(&to_write).await?;
1943        let res = pw.close().await?;
1944        assert_eq!(res.len(), 1);
1945        let data_file = res
1946            .into_iter()
1947            .next()
1948            .unwrap()
1949            // Put dummy field for build successfully.
1950            .content(crate::spec::DataContentType::Data)
1951            .partition(Struct::empty())
1952            .partition_spec_id(0)
1953            .build()
1954            .unwrap();
1955
1956        // check data file
1957        assert_eq!(data_file.record_count(), 4);
1958        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1959        assert_eq!(
1960            *data_file.lower_bounds(),
1961            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1962        );
1963        assert_eq!(
1964            *data_file.upper_bounds(),
1965            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1966        );
1967        assert_eq!(
1968            *data_file.null_value_counts(),
1969            HashMap::from([(4, 0), (7, 0)])
1970        );
1971        assert_eq!(
1972            *data_file.nan_value_counts(),
1973            HashMap::from([(4, 1), (7, 1)])
1974        );
1975
1976        // check the written file
1977        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1978        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1979
1980        Ok(())
1981    }
1982
1983    #[tokio::test]
1984    async fn test_nan_val_cnts_list_type() -> Result<()> {
1985        let temp_dir = TempDir::new().unwrap();
1986        let file_io = FileIO::new_with_fs();
1987        let location_gen = DefaultLocationGenerator::with_data_location(
1988            temp_dir.path().to_str().unwrap().to_string(),
1989        );
1990        let file_name_gen =
1991            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1992
1993        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1994            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1995        );
1996
1997        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1998            .with_metadata(HashMap::from([(
1999                PARQUET_FIELD_ID_META_KEY.to_string(),
2000                "4".to_string(),
2001            )]));
2002
2003        let schema_struct_list_field = Fields::from(vec![
2004            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
2005                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
2006            ),
2007        ]);
2008
2009        let arrow_schema = {
2010            let fields = vec![
2011                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
2012                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
2013                ),
2014                Field::new_struct("col1", schema_struct_list_field.clone(), true)
2015                    .with_metadata(HashMap::from([(
2016                        PARQUET_FIELD_ID_META_KEY.to_string(),
2017                        "2".to_string(),
2018                    )]))
2019                    .clone(),
2020                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
2021                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
2022                // ).clone(),
2023            ];
2024            Arc::new(arrow_schema::Schema::new(fields))
2025        };
2026
2027        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
2028            Some(1.0_f32),
2029            Some(f32::NAN),
2030            Some(2.0),
2031            Some(2.0),
2032        ])])
2033        .into_parts();
2034
2035        let list_float_field_col = Arc::new({
2036            let list_parts = list_parts.clone();
2037            ListArray::new(
2038                {
2039                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
2040                        field.clone()
2041                    } else {
2042                        unreachable!()
2043                    }
2044                },
2045                list_parts.1,
2046                list_parts.2,
2047                list_parts.3,
2048            )
2049        }) as ArrayRef;
2050
2051        let struct_list_fields_schema =
2052            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
2053                fields.clone()
2054            } else {
2055                unreachable!()
2056            };
2057
2058        let struct_list_float_field_col = Arc::new({
2059            ListArray::new(
2060                {
2061                    if let DataType::List(field) = struct_list_fields_schema
2062                        .first()
2063                        .expect("could not find first list field")
2064                        .data_type()
2065                    {
2066                        field.clone()
2067                    } else {
2068                        unreachable!()
2069                    }
2070                },
2071                list_parts.1,
2072                list_parts.2,
2073                list_parts.3,
2074            )
2075        }) as ArrayRef;
2076
2077        let struct_list_float_field_col = Arc::new(StructArray::new(
2078            struct_list_fields_schema,
2079            vec![struct_list_float_field_col.clone()],
2080            None,
2081        )) as ArrayRef;
2082
2083        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2084            list_float_field_col,
2085            struct_list_float_field_col,
2086            // large_list_float_field_col,
2087        ])
2088        .expect("Could not form record batch");
2089        let output_file = file_io.new_output(
2090            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2091        )?;
2092
2093        // write data
2094        let mut pw = ParquetWriterBuilder::new(
2095            WriterProperties::builder().build(),
2096            Arc::new(
2097                to_write
2098                    .schema()
2099                    .as_ref()
2100                    .try_into()
2101                    .expect("Could not convert iceberg schema"),
2102            ),
2103        )
2104        .build(output_file)
2105        .await?;
2106
2107        pw.write(&to_write).await?;
2108        let res = pw.close().await?;
2109        assert_eq!(res.len(), 1);
2110        let data_file = res
2111            .into_iter()
2112            .next()
2113            .unwrap()
2114            .content(crate::spec::DataContentType::Data)
2115            .partition(Struct::empty())
2116            .partition_spec_id(0)
2117            .build()
2118            .unwrap();
2119
2120        // check data file
2121        assert_eq!(data_file.record_count(), 1);
2122        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2123        assert_eq!(
2124            *data_file.lower_bounds(),
2125            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2126        );
2127        assert_eq!(
2128            *data_file.upper_bounds(),
2129            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2130        );
2131        assert_eq!(
2132            *data_file.null_value_counts(),
2133            HashMap::from([(1, 0), (4, 0)])
2134        );
2135        assert_eq!(
2136            *data_file.nan_value_counts(),
2137            HashMap::from([(1, 1), (4, 1)])
2138        );
2139
2140        // check the written file
2141        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2142        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2143
2144        Ok(())
2145    }
2146
2147    macro_rules! construct_map_arr {
2148        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2149            let int_builder = Int32Builder::new();
2150            let float_builder = Float32Builder::with_capacity(4);
2151            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2152            builder.keys().append_value(1);
2153            builder.values().append_value(1.0_f32);
2154            builder.append(true).unwrap();
2155            builder.keys().append_value(2);
2156            builder.values().append_value(f32::NAN);
2157            builder.append(true).unwrap();
2158            builder.keys().append_value(3);
2159            builder.values().append_value(2.0);
2160            builder.append(true).unwrap();
2161            builder.keys().append_value(4);
2162            builder.values().append_value(2.0);
2163            builder.append(true).unwrap();
2164            let array = builder.finish();
2165
2166            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2167            let new_struct_fields_schema =
2168                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2169
2170            let entries = {
2171                let (_, arrays, nulls) = entries.into_parts();
2172                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2173            };
2174
2175            let field = Arc::new(Field::new(
2176                DEFAULT_MAP_FIELD_NAME,
2177                DataType::Struct(new_struct_fields_schema),
2178                false,
2179            ));
2180
2181            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2182        }};
2183    }
2184
2185    #[tokio::test]
2186    async fn test_nan_val_cnts_map_type() -> Result<()> {
2187        let temp_dir = TempDir::new().unwrap();
2188        let file_io = FileIO::new_with_fs();
2189        let location_gen = DefaultLocationGenerator::with_data_location(
2190            temp_dir.path().to_str().unwrap().to_string(),
2191        );
2192        let file_name_gen =
2193            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2194
2195        let map_key_field_schema =
2196            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2197                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2198            ]));
2199
2200        let map_value_field_schema =
2201            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2202                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2203            ));
2204
2205        let struct_map_key_field_schema =
2206            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2207                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2208            ]));
2209
2210        let struct_map_value_field_schema =
2211            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2212                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2213            ));
2214
2215        let schema_struct_map_field = Fields::from(vec![
2216            Field::new_map(
2217                "col3",
2218                DEFAULT_MAP_FIELD_NAME,
2219                struct_map_key_field_schema.clone(),
2220                struct_map_value_field_schema.clone(),
2221                false,
2222                false,
2223            )
2224            .with_metadata(HashMap::from([(
2225                PARQUET_FIELD_ID_META_KEY.to_string(),
2226                "5".to_string(),
2227            )])),
2228        ]);
2229
2230        let arrow_schema = {
2231            let fields = vec![
2232                Field::new_map(
2233                    "col0",
2234                    DEFAULT_MAP_FIELD_NAME,
2235                    map_key_field_schema.clone(),
2236                    map_value_field_schema.clone(),
2237                    false,
2238                    false,
2239                )
2240                .with_metadata(HashMap::from([(
2241                    PARQUET_FIELD_ID_META_KEY.to_string(),
2242                    "0".to_string(),
2243                )])),
2244                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2245                    .with_metadata(HashMap::from([(
2246                        PARQUET_FIELD_ID_META_KEY.to_string(),
2247                        "3".to_string(),
2248                    )]))
2249                    .clone(),
2250            ];
2251            Arc::new(arrow_schema::Schema::new(fields))
2252        };
2253
2254        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2255
2256        let struct_map_arr =
2257            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2258
2259        let struct_list_float_field_col = Arc::new(StructArray::new(
2260            schema_struct_map_field,
2261            vec![struct_map_arr],
2262            None,
2263        )) as ArrayRef;
2264
2265        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2266            map_array,
2267            struct_list_float_field_col,
2268        ])
2269        .expect("Could not form record batch");
2270        let output_file = file_io.new_output(
2271            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2272        )?;
2273
2274        // write data
2275        let mut pw = ParquetWriterBuilder::new(
2276            WriterProperties::builder().build(),
2277            Arc::new(
2278                to_write
2279                    .schema()
2280                    .as_ref()
2281                    .try_into()
2282                    .expect("Could not convert iceberg schema"),
2283            ),
2284        )
2285        .build(output_file)
2286        .await?;
2287
2288        pw.write(&to_write).await?;
2289        let res = pw.close().await?;
2290        assert_eq!(res.len(), 1);
2291        let data_file = res
2292            .into_iter()
2293            .next()
2294            .unwrap()
2295            .content(crate::spec::DataContentType::Data)
2296            .partition(Struct::empty())
2297            .partition_spec_id(0)
2298            .build()
2299            .unwrap();
2300
2301        // check data file
2302        assert_eq!(data_file.record_count(), 4);
2303        assert_eq!(
2304            *data_file.value_counts(),
2305            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2306        );
2307        assert_eq!(
2308            *data_file.lower_bounds(),
2309            HashMap::from([
2310                (1, Datum::int(1)),
2311                (2, Datum::float(1.0)),
2312                (6, Datum::int(1)),
2313                (7, Datum::float(1.0))
2314            ])
2315        );
2316        assert_eq!(
2317            *data_file.upper_bounds(),
2318            HashMap::from([
2319                (1, Datum::int(4)),
2320                (2, Datum::float(2.0)),
2321                (6, Datum::int(4)),
2322                (7, Datum::float(2.0))
2323            ])
2324        );
2325        assert_eq!(
2326            *data_file.null_value_counts(),
2327            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2328        );
2329        assert_eq!(
2330            *data_file.nan_value_counts(),
2331            HashMap::from([(2, 1), (7, 1)])
2332        );
2333
2334        // check the written file
2335        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2336        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2337
2338        Ok(())
2339    }
2340
2341    #[tokio::test]
2342    async fn test_write_empty_parquet_file() {
2343        let temp_dir = TempDir::new().unwrap();
2344        let file_io = FileIO::new_with_fs();
2345        let location_gen = DefaultLocationGenerator::with_data_location(
2346            temp_dir.path().to_str().unwrap().to_string(),
2347        );
2348        let file_name_gen =
2349            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2350        let output_file = file_io
2351            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2352            .unwrap();
2353
2354        // write data
2355        let pw = ParquetWriterBuilder::new(
2356            WriterProperties::builder().build(),
2357            Arc::new(
2358                Schema::builder()
2359                    .with_schema_id(1)
2360                    .with_fields(vec![
2361                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2362                            .with_id(0)
2363                            .into(),
2364                    ])
2365                    .build()
2366                    .expect("Failed to create schema"),
2367            ),
2368        )
2369        .build(output_file)
2370        .await
2371        .unwrap();
2372
2373        let res = pw.close().await.unwrap();
2374        assert_eq!(res.len(), 0);
2375
2376        // Check that file should have been deleted.
2377        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2378    }
2379
2380    #[test]
2381    fn test_min_max_aggregator() {
2382        let schema = Arc::new(
2383            Schema::builder()
2384                .with_schema_id(1)
2385                .with_fields(vec![
2386                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2387                        .with_id(0)
2388                        .into(),
2389                ])
2390                .build()
2391                .expect("Failed to create schema"),
2392        );
2393        let mut min_max_agg = MinMaxColAggregator::new(schema);
2394        let create_statistics =
2395            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2396        min_max_agg
2397            .update(0, create_statistics(None, Some(42)))
2398            .unwrap();
2399        min_max_agg
2400            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2401            .unwrap();
2402        min_max_agg
2403            .update(0, create_statistics(Some(i32::MIN), None))
2404            .unwrap();
2405        min_max_agg
2406            .update(0, create_statistics(None, None))
2407            .unwrap();
2408
2409        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2410
2411        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2412        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2413    }
2414}