1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::RecordBatch;
24use arrow_schema::{DataType, Field, Schema as ArrowSchema};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use itertools::Itertools;
28use parquet::arrow::AsyncArrowWriter;
29use parquet::arrow::async_reader::AsyncFileReader;
30use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
31use parquet::file::metadata::ParquetMetaData;
32use parquet::file::properties::WriterProperties;
33use parquet::file::statistics::Statistics;
34
35use super::{FileWriter, FileWriterBuilder};
36use crate::arrow::{
37 ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
38 get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
39};
40use crate::io::{FileIO, FileWrite, OutputFile};
41use crate::spec::{
42 DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
43 NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
44 StructType, TableMetadata, Type, visit_schema,
45};
46use crate::transform::create_transform_function;
47use crate::writer::{CurrentFileStatus, DataFile};
48use crate::{Error, ErrorKind, Result};
49
50#[derive(Clone, Debug)]
52pub struct ParquetWriterBuilder {
53 props: WriterProperties,
54 schema: SchemaRef,
55 match_mode: FieldMatchMode,
56 arrow_schema: Option<Arc<ArrowSchema>>,
60}
61
62impl ParquetWriterBuilder {
63 pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
66 Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
67 }
68
69 pub fn new_with_match_mode(
71 props: WriterProperties,
72 schema: SchemaRef,
73 match_mode: FieldMatchMode,
74 ) -> Self {
75 Self {
76 props,
77 schema,
78 match_mode,
79 arrow_schema: None,
80 }
81 }
82
83 pub fn with_arrow_schema(mut self, arrow_schema: Arc<ArrowSchema>) -> Result<Self> {
88 validate_arrow_schema_matches_iceberg(&arrow_schema, &self.schema)?;
89 self.arrow_schema = Some(arrow_schema);
90 Ok(self)
91 }
92}
93
94impl FileWriterBuilder for ParquetWriterBuilder {
95 type R = ParquetWriter;
96
97 async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
98 Ok(ParquetWriter {
99 schema: self.schema.clone(),
100 arrow_schema: self.arrow_schema.clone(),
101 inner_writer: None,
102 writer_properties: self.props.clone(),
103 current_row_num: 0,
104 output_file,
105 nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
106 })
107 }
108}
109
110struct IndexByParquetPathName {
112 name_to_id: HashMap<String, i32>,
113
114 field_names: Vec<String>,
115
116 field_id: i32,
117}
118
119impl IndexByParquetPathName {
120 pub fn new() -> Self {
122 Self {
123 name_to_id: HashMap::new(),
124 field_names: Vec::new(),
125 field_id: 0,
126 }
127 }
128
129 pub fn get(&self, name: &str) -> Option<&i32> {
131 self.name_to_id.get(name)
132 }
133}
134
135impl Default for IndexByParquetPathName {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl SchemaVisitor for IndexByParquetPathName {
142 type T = ();
143
144 fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
145 self.field_names.push(field.name.to_string());
146 self.field_id = field.id;
147 Ok(())
148 }
149
150 fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
151 self.field_names.pop();
152 Ok(())
153 }
154
155 fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
156 self.field_names.push(format!("list.{}", field.name));
157 self.field_id = field.id;
158 Ok(())
159 }
160
161 fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
162 self.field_names.pop();
163 Ok(())
164 }
165
166 fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
167 self.field_names
168 .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
169 self.field_id = field.id;
170 Ok(())
171 }
172
173 fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
174 self.field_names.pop();
175 Ok(())
176 }
177
178 fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
179 self.field_names
180 .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
181 self.field_id = field.id;
182 Ok(())
183 }
184
185 fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
186 self.field_names.pop();
187 Ok(())
188 }
189
190 fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
191 Ok(())
192 }
193
194 fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
195 Ok(())
196 }
197
198 fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
199 Ok(())
200 }
201
202 fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
203 Ok(())
204 }
205
206 fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
207 Ok(())
208 }
209
210 fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
211 let full_name = self.field_names.iter().map(String::as_str).join(".");
212 let field_id = self.field_id;
213 if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
214 return Err(Error::new(
215 ErrorKind::DataInvalid,
216 format!(
217 "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
218 ),
219 ));
220 } else {
221 self.name_to_id.insert(full_name, field_id);
222 }
223
224 Ok(())
225 }
226}
227
228pub struct ParquetWriter {
230 schema: SchemaRef,
231 arrow_schema: Option<Arc<ArrowSchema>>,
234 output_file: OutputFile,
235 inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
236 writer_properties: WriterProperties,
237 current_row_num: usize,
238 nan_value_count_visitor: NanValueCountVisitor,
239}
240
241struct MinMaxColAggregator {
243 lower_bounds: HashMap<i32, Datum>,
244 upper_bounds: HashMap<i32, Datum>,
245 schema: SchemaRef,
246}
247
248impl MinMaxColAggregator {
249 fn new(schema: SchemaRef) -> Self {
251 Self {
252 lower_bounds: HashMap::new(),
253 upper_bounds: HashMap::new(),
254 schema,
255 }
256 }
257
258 fn update_state_min(&mut self, field_id: i32, datum: Datum) {
259 self.lower_bounds
260 .entry(field_id)
261 .and_modify(|e| {
262 if *e > datum {
263 *e = datum.clone()
264 }
265 })
266 .or_insert(datum);
267 }
268
269 fn update_state_max(&mut self, field_id: i32, datum: Datum) {
270 self.upper_bounds
271 .entry(field_id)
272 .and_modify(|e| {
273 if *e < datum {
274 *e = datum.clone()
275 }
276 })
277 .or_insert(datum);
278 }
279
280 fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
282 let Some(ty) = self
283 .schema
284 .field_by_id(field_id)
285 .map(|f| f.field_type.as_ref())
286 else {
287 return Ok(());
290 };
291 let Type::Primitive(ty) = ty.clone() else {
292 return Err(Error::new(
293 ErrorKind::Unexpected,
294 format!("Composed type {ty} is not supported for min max aggregation."),
295 ));
296 };
297
298 if value.min_is_exact() {
299 let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
300 return Err(Error::new(
301 ErrorKind::Unexpected,
302 format!("Statistics {value} is not match with field type {ty}."),
303 ));
304 };
305
306 self.update_state_min(field_id, min_datum);
307 }
308
309 if value.max_is_exact() {
310 let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
311 return Err(Error::new(
312 ErrorKind::Unexpected,
313 format!("Statistics {value} is not match with field type {ty}."),
314 ));
315 };
316
317 self.update_state_max(field_id, max_datum);
318 }
319
320 Ok(())
321 }
322
323 fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
325 (self.lower_bounds, self.upper_bounds)
326 }
327}
328
329impl ParquetWriter {
330 #[allow(dead_code)]
332 pub(crate) async fn parquet_files_to_data_files(
333 file_io: &FileIO,
334 file_paths: Vec<String>,
335 table_metadata: &TableMetadata,
336 ) -> Result<Vec<DataFile>> {
337 let mut data_files: Vec<DataFile> = Vec::new();
339
340 for file_path in file_paths {
341 let input_file = file_io.new_input(&file_path)?;
342 let file_metadata = input_file.metadata().await?;
343 let file_size_in_bytes = file_metadata.size as usize;
344 let reader = input_file.reader().await?;
345
346 let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
347 let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
348 Error::new(
349 ErrorKind::DataInvalid,
350 format!("Error reading Parquet metadata: {err}"),
351 )
352 })?;
353 let mut builder = ParquetWriter::parquet_to_data_file_builder(
354 table_metadata.current_schema().clone(),
355 parquet_metadata,
356 file_size_in_bytes,
357 file_path,
358 HashMap::new(),
360 )?;
361 builder.partition_spec_id(table_metadata.default_partition_spec_id());
362 let data_file = builder.build().unwrap();
363 data_files.push(data_file);
364 }
365
366 Ok(data_files)
367 }
368
369 pub(crate) fn parquet_to_data_file_builder(
371 schema: SchemaRef,
372 metadata: Arc<ParquetMetaData>,
373 written_size: usize,
374 file_path: String,
375 nan_value_counts: HashMap<i32, u64>,
376 ) -> Result<DataFileBuilder> {
377 let index_by_parquet_path = {
378 let mut visitor = IndexByParquetPathName::new();
379 visit_schema(&schema, &mut visitor)?;
380 visitor
381 };
382
383 let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
384 let mut per_col_size: HashMap<i32, u64> = HashMap::new();
385 let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
386 let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
387 let mut min_max_agg = MinMaxColAggregator::new(schema);
388
389 for row_group in metadata.row_groups() {
390 for column_chunk_metadata in row_group.columns() {
391 let parquet_path = column_chunk_metadata.column_descr().path().string();
392
393 let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
394 continue;
395 };
396
397 *per_col_size.entry(field_id).or_insert(0) +=
398 column_chunk_metadata.compressed_size() as u64;
399 *per_col_val_num.entry(field_id).or_insert(0) +=
400 column_chunk_metadata.num_values() as u64;
401
402 if let Some(statistics) = column_chunk_metadata.statistics() {
403 if let Some(null_count) = statistics.null_count_opt() {
404 *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
405 }
406
407 min_max_agg.update(field_id, statistics.clone())?;
408 }
409 }
410 }
411 (
412 per_col_size,
413 per_col_val_num,
414 per_col_null_val_num,
415 min_max_agg.produce(),
416 )
417 };
418
419 let mut builder = DataFileBuilder::default();
420 builder
421 .content(DataContentType::Data)
422 .file_path(file_path)
423 .file_format(DataFileFormat::Parquet)
424 .partition(Struct::empty())
425 .record_count(metadata.file_metadata().num_rows() as u64)
426 .file_size_in_bytes(written_size as u64)
427 .column_sizes(column_sizes)
428 .value_counts(value_counts)
429 .null_value_counts(null_value_counts)
430 .nan_value_counts(nan_value_counts)
431 .lower_bounds(lower_bounds)
434 .upper_bounds(upper_bounds)
435 .split_offsets(Some(
436 metadata
437 .row_groups()
438 .iter()
439 .filter_map(|group| group.file_offset())
440 .collect(),
441 ));
442
443 Ok(builder)
444 }
445
446 #[allow(dead_code)]
447 fn partition_value_from_bounds(
448 table_spec: Arc<PartitionSpec>,
449 lower_bounds: &HashMap<i32, Datum>,
450 upper_bounds: &HashMap<i32, Datum>,
451 ) -> Result<Struct> {
452 let mut partition_literals: Vec<Option<Literal>> = Vec::new();
453
454 for field in table_spec.fields() {
455 if let (Some(lower), Some(upper)) = (
456 lower_bounds.get(&field.source_id),
457 upper_bounds.get(&field.source_id),
458 ) {
459 if !field.transform.preserves_order() {
460 return Err(Error::new(
461 ErrorKind::DataInvalid,
462 format!(
463 "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
464 field.name, field.transform
465 ),
466 ));
467 }
468
469 if lower != upper {
470 return Err(Error::new(
471 ErrorKind::DataInvalid,
472 format!(
473 "multiple partition values for field {}: lower: {:?}, upper: {:?}",
474 field.name, lower, upper
475 ),
476 ));
477 }
478
479 let transform_fn = create_transform_function(&field.transform)?;
480 let transform_literal =
481 Literal::from(transform_fn.transform_literal_result(lower)?);
482
483 partition_literals.push(Some(transform_literal));
484 } else {
485 partition_literals.push(None);
486 }
487 }
488
489 let partition_struct = Struct::from_iter(partition_literals);
490
491 Ok(partition_struct)
492 }
493}
494
495fn validate_arrow_schema_matches_iceberg(
498 arrow_schema: &ArrowSchema,
499 iceberg_schema: &crate::spec::Schema,
500) -> Result<()> {
501 let expected: ArrowSchema = iceberg_schema.try_into()?;
502 validate_schemas_match(&expected, arrow_schema, "")
503}
504
505fn validate_schemas_match(expected: &ArrowSchema, actual: &ArrowSchema, path: &str) -> Result<()> {
506 if expected.fields().len() != actual.fields().len() {
507 return Err(Error::new(
508 ErrorKind::DataInvalid,
509 format!(
510 "Schema mismatch at '{}': expected {} fields, got {}",
511 path,
512 expected.fields().len(),
513 actual.fields().len()
514 ),
515 ));
516 }
517 for (e, a) in expected.fields().iter().zip(actual.fields().iter()) {
518 validate_fields_match(e, a, path)?;
519 }
520 Ok(())
521}
522
523fn validate_fields_match(expected: &Field, actual: &Field, parent_path: &str) -> Result<()> {
524 let path = if parent_path.is_empty() {
525 expected.name().to_string()
526 } else {
527 format!("{}.{}", parent_path, expected.name())
528 };
529
530 if expected.name() != actual.name() {
531 return Err(Error::new(
532 ErrorKind::DataInvalid,
533 format!(
534 "Field name mismatch at '{}': expected '{}', got '{}'",
535 parent_path,
536 expected.name(),
537 actual.name()
538 ),
539 ));
540 }
541
542 if expected.is_nullable() != actual.is_nullable() {
543 return Err(Error::new(
544 ErrorKind::DataInvalid,
545 format!(
546 "Nullability mismatch at '{}': expected {}, got {}",
547 path,
548 expected.is_nullable(),
549 actual.is_nullable()
550 ),
551 ));
552 }
553
554 validate_datatypes_match(expected.data_type(), actual.data_type(), &path)
555}
556
557fn validate_datatypes_match(expected: &DataType, actual: &DataType, path: &str) -> Result<()> {
558 match (expected, actual) {
560 (DataType::Struct(e_fields), DataType::Struct(a_fields)) => {
561 if e_fields.len() != a_fields.len() {
562 return Err(Error::new(
563 ErrorKind::DataInvalid,
564 format!(
565 "Struct field count mismatch at '{}': expected {}, got {}",
566 path,
567 e_fields.len(),
568 a_fields.len()
569 ),
570 ));
571 }
572 for (e, a) in e_fields.iter().zip(a_fields.iter()) {
573 validate_fields_match(e, a, path)?;
574 }
575 Ok(())
576 }
577 (DataType::List(e_inner), DataType::List(a_inner))
578 | (DataType::LargeList(e_inner), DataType::LargeList(a_inner)) => {
579 validate_fields_match(e_inner, a_inner, path)
580 }
581 (DataType::Map(e_entries, _), DataType::Map(a_entries, _)) => {
582 validate_fields_match(e_entries, a_entries, path)
583 }
584 _ => {
585 if std::mem::discriminant(expected) != std::mem::discriminant(actual) {
587 return Err(Error::new(
588 ErrorKind::DataInvalid,
589 format!(
590 "Type mismatch at '{}': expected {:?}, got {:?}",
591 path, expected, actual
592 ),
593 ));
594 }
595 Ok(())
596 }
597 }
598}
599
600impl FileWriter for ParquetWriter {
601 async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
602 if batch.num_rows() == 0 {
604 return Ok(());
605 }
606
607 self.current_row_num += batch.num_rows();
608
609 self.nan_value_count_visitor
610 .compute(self.schema.clone(), batch.clone())?;
611
612 let writer = if let Some(writer) = &mut self.inner_writer {
613 writer
614 } else {
615 let arrow_schema: Arc<ArrowSchema> = match &self.arrow_schema {
618 Some(schema) => Arc::clone(schema),
619 None => Arc::new(self.schema.as_ref().try_into()?),
620 };
621 let inner_writer = self.output_file.writer().await?;
622 let async_writer = AsyncFileWriter::new(inner_writer);
623 let writer = AsyncArrowWriter::try_new(
624 async_writer,
625 arrow_schema,
626 Some(self.writer_properties.clone()),
627 )
628 .map_err(|err| {
629 Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
630 .with_source(err)
631 })?;
632 self.inner_writer = Some(writer);
633 self.inner_writer.as_mut().unwrap()
634 };
635
636 writer.write(batch).await.map_err(|err| {
637 Error::new(
638 ErrorKind::Unexpected,
639 "Failed to write using parquet writer.",
640 )
641 .with_source(err)
642 })?;
643
644 Ok(())
645 }
646
647 async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
648 let mut writer = match self.inner_writer.take() {
649 Some(writer) => writer,
650 None => return Ok(vec![]),
651 };
652
653 let metadata = writer.finish().await.map_err(|err| {
654 Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
655 })?;
656
657 let written_size = writer.bytes_written();
658
659 if self.current_row_num == 0 {
660 self.output_file.delete().await.map_err(|err| {
661 Error::new(
662 ErrorKind::Unexpected,
663 "Failed to delete empty parquet file.",
664 )
665 .with_source(err)
666 })?;
667 Ok(vec![])
668 } else {
669 let parquet_metadata = Arc::new(metadata);
670
671 Ok(vec![Self::parquet_to_data_file_builder(
672 self.schema,
673 parquet_metadata,
674 written_size,
675 self.output_file.location().to_string(),
676 self.nan_value_count_visitor.nan_value_counts,
677 )?])
678 }
679 }
680}
681
682impl CurrentFileStatus for ParquetWriter {
683 fn current_file_path(&self) -> String {
684 self.output_file.location().to_string()
685 }
686
687 fn current_row_num(&self) -> usize {
688 self.current_row_num
689 }
690
691 fn current_written_size(&self) -> usize {
692 if let Some(inner) = self.inner_writer.as_ref() {
693 inner.bytes_written() + inner.in_progress_size()
696 } else {
697 0
699 }
700 }
701
702 fn current_schema(&self) -> SchemaRef {
703 self.schema.clone()
704 }
705}
706
707struct AsyncFileWriter<W: FileWrite>(W);
713
714impl<W: FileWrite> AsyncFileWriter<W> {
715 pub fn new(writer: W) -> Self {
717 Self(writer)
718 }
719}
720
721impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
722 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
723 Box::pin(async {
724 self.0
725 .write(bs)
726 .await
727 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
728 })
729 }
730
731 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
732 Box::pin(async {
733 self.0
734 .close()
735 .await
736 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
737 })
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use std::collections::HashMap;
744 use std::sync::Arc;
745
746 use anyhow::Result;
747 use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
748 use arrow_array::types::{Float32Type, Int64Type};
749 use arrow_array::{
750 Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
751 Int64Array, ListArray, MapArray, RecordBatch, StructArray,
752 };
753 use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
754 use arrow_select::concat::concat_batches;
755 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
756 use parquet::file::statistics::ValueStatistics;
757 use rust_decimal::Decimal;
758 use tempfile::TempDir;
759 use uuid::Uuid;
760
761 use super::*;
762 use crate::arrow::schema_to_arrow_schema;
763 use crate::io::FileIOBuilder;
764 use crate::spec::{PrimitiveLiteral, Struct, *};
765 use crate::writer::file_writer::location_generator::{
766 DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
767 };
768 use crate::writer::tests::check_parquet_data_file;
769
770 fn schema_for_all_type() -> Schema {
771 Schema::builder()
772 .with_schema_id(1)
773 .with_fields(vec![
774 NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
775 NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
776 NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
777 NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
778 NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
779 NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
780 NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
781 NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
782 NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
783 NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
784 .into(),
785 NestedField::optional(
786 10,
787 "timestamptz",
788 Type::Primitive(PrimitiveType::Timestamptz),
789 )
790 .into(),
791 NestedField::optional(
792 11,
793 "timestamp_ns",
794 Type::Primitive(PrimitiveType::TimestampNs),
795 )
796 .into(),
797 NestedField::optional(
798 12,
799 "timestamptz_ns",
800 Type::Primitive(PrimitiveType::TimestamptzNs),
801 )
802 .into(),
803 NestedField::optional(
804 13,
805 "decimal",
806 Type::Primitive(PrimitiveType::Decimal {
807 precision: 10,
808 scale: 5,
809 }),
810 )
811 .into(),
812 NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
813 NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
814 .into(),
815 NestedField::optional(
818 16,
819 "decimal_38",
820 Type::Primitive(PrimitiveType::Decimal {
821 precision: 38,
822 scale: 5,
823 }),
824 )
825 .into(),
826 ])
827 .build()
828 .unwrap()
829 }
830
831 fn nested_schema_for_test() -> Schema {
832 Schema::builder()
834 .with_schema_id(1)
835 .with_fields(vec![
836 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
837 NestedField::required(
838 1,
839 "col1",
840 Type::Struct(StructType::new(vec![
841 NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
842 .into(),
843 NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
844 .into(),
845 ])),
846 )
847 .into(),
848 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
849 NestedField::required(
850 3,
851 "col3",
852 Type::List(ListType::new(
853 NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
854 .into(),
855 )),
856 )
857 .into(),
858 NestedField::required(
859 4,
860 "col4",
861 Type::Struct(StructType::new(vec![
862 NestedField::required(
863 8,
864 "col_4_8",
865 Type::Struct(StructType::new(vec![
866 NestedField::required(
867 9,
868 "col_4_8_9",
869 Type::Primitive(PrimitiveType::Long),
870 )
871 .into(),
872 ])),
873 )
874 .into(),
875 ])),
876 )
877 .into(),
878 NestedField::required(
879 10,
880 "col5",
881 Type::Map(MapType::new(
882 NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
883 .into(),
884 NestedField::required(
885 12,
886 "value",
887 Type::List(ListType::new(
888 NestedField::required(
889 13,
890 "item",
891 Type::Primitive(PrimitiveType::Long),
892 )
893 .into(),
894 )),
895 )
896 .into(),
897 )),
898 )
899 .into(),
900 ])
901 .build()
902 .unwrap()
903 }
904
905 #[tokio::test]
906 async fn test_index_by_parquet_path() {
907 let expect = HashMap::from([
908 ("col0".to_string(), 0),
909 ("col1.col_1_5".to_string(), 5),
910 ("col1.col_1_6".to_string(), 6),
911 ("col2".to_string(), 2),
912 ("col3.list.element".to_string(), 7),
913 ("col4.col_4_8.col_4_8_9".to_string(), 9),
914 ("col5.key_value.key".to_string(), 11),
915 ("col5.key_value.value.list.item".to_string(), 13),
916 ]);
917 let mut visitor = IndexByParquetPathName::new();
918 visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
919 assert_eq!(visitor.name_to_id, expect);
920 }
921
922 #[tokio::test]
923 async fn test_parquet_writer() -> Result<()> {
924 let temp_dir = TempDir::new().unwrap();
925 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
926 let location_gen = DefaultLocationGenerator::with_data_location(
927 temp_dir.path().to_str().unwrap().to_string(),
928 );
929 let file_name_gen =
930 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
931
932 let schema = {
934 let fields =
935 vec![
936 Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
937 PARQUET_FIELD_ID_META_KEY.to_string(),
938 "0".to_string(),
939 )])),
940 ];
941 Arc::new(arrow_schema::Schema::new(fields))
942 };
943 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
944 let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
945 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
946 let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
947
948 let output_file = file_io.new_output(
949 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
950 )?;
951
952 let mut pw = ParquetWriterBuilder::new(
954 WriterProperties::builder()
955 .set_max_row_group_size(128)
956 .build(),
957 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
958 )
959 .build(output_file)
960 .await?;
961 pw.write(&to_write).await?;
962 pw.write(&to_write_null).await?;
963 let res = pw.close().await?;
964 assert_eq!(res.len(), 1);
965 let data_file = res
966 .into_iter()
967 .next()
968 .unwrap()
969 .content(DataContentType::Data)
971 .partition(Struct::empty())
972 .partition_spec_id(0)
973 .build()
974 .unwrap();
975
976 assert_eq!(data_file.record_count(), 2048);
978 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
979 assert_eq!(
980 *data_file.lower_bounds(),
981 HashMap::from([(0, Datum::long(0))])
982 );
983 assert_eq!(
984 *data_file.upper_bounds(),
985 HashMap::from([(0, Datum::long(1023))])
986 );
987 assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
988
989 let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
991 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
992
993 Ok(())
994 }
995
996 #[tokio::test]
997 async fn test_parquet_writer_with_complex_schema() -> Result<()> {
998 let temp_dir = TempDir::new().unwrap();
999 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1000 let location_gen = DefaultLocationGenerator::with_data_location(
1001 temp_dir.path().to_str().unwrap().to_string(),
1002 );
1003 let file_name_gen =
1004 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1005
1006 let schema = nested_schema_for_test();
1008 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1009 let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1010 let col1 = Arc::new(StructArray::new(
1011 {
1012 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1013 fields.clone()
1014 } else {
1015 unreachable!()
1016 }
1017 },
1018 vec![
1019 Arc::new(Int64Array::from_iter_values(0..1024)),
1020 Arc::new(Int64Array::from_iter_values(0..1024)),
1021 ],
1022 None,
1023 ));
1024 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
1025 (0..1024).map(|n| n.to_string()),
1026 )) as ArrayRef;
1027 let col3 = Arc::new({
1028 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
1029 (0..1024).map(|n| Some(vec![Some(n)])),
1030 )
1031 .into_parts();
1032 arrow_array::ListArray::new(
1033 {
1034 if let DataType::List(field) = arrow_schema.field(3).data_type() {
1035 field.clone()
1036 } else {
1037 unreachable!()
1038 }
1039 },
1040 list_parts.1,
1041 list_parts.2,
1042 list_parts.3,
1043 )
1044 }) as ArrayRef;
1045 let col4 = Arc::new(StructArray::new(
1046 {
1047 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1048 fields.clone()
1049 } else {
1050 unreachable!()
1051 }
1052 },
1053 vec![Arc::new(StructArray::new(
1054 {
1055 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
1056 if let DataType::Struct(fields) = fields[0].data_type() {
1057 fields.clone()
1058 } else {
1059 unreachable!()
1060 }
1061 } else {
1062 unreachable!()
1063 }
1064 },
1065 vec![Arc::new(Int64Array::from_iter_values(0..1024))],
1066 None,
1067 ))],
1068 None,
1069 ));
1070 let col5 = Arc::new({
1071 let mut map_array_builder = MapBuilder::new(
1072 None,
1073 arrow_array::builder::StringBuilder::new(),
1074 arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
1075 Int64Type,
1076 >::new()),
1077 );
1078 for i in 0..1024 {
1079 map_array_builder.keys().append_value(i.to_string());
1080 map_array_builder
1081 .values()
1082 .append_value(vec![Some(i as i64); i + 1]);
1083 map_array_builder.append(true)?;
1084 }
1085 let (_, offset_buffer, struct_array, null_buffer, ordered) =
1086 map_array_builder.finish().into_parts();
1087 let struct_array = {
1088 let (_, mut arrays, nulls) = struct_array.into_parts();
1089 let list_array = {
1090 let list_array = arrays[1]
1091 .as_any()
1092 .downcast_ref::<ListArray>()
1093 .unwrap()
1094 .clone();
1095 let (_, offsets, array, nulls) = list_array.into_parts();
1096 let list_field = {
1097 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1098 if let DataType::Struct(fields) = map_field.data_type() {
1099 if let DataType::List(list_field) = fields[1].data_type() {
1100 list_field.clone()
1101 } else {
1102 unreachable!()
1103 }
1104 } else {
1105 unreachable!()
1106 }
1107 } else {
1108 unreachable!()
1109 }
1110 };
1111 ListArray::new(list_field, offsets, array, nulls)
1112 };
1113 arrays[1] = Arc::new(list_array) as ArrayRef;
1114 StructArray::new(
1115 {
1116 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1117 if let DataType::Struct(fields) = map_field.data_type() {
1118 fields.clone()
1119 } else {
1120 unreachable!()
1121 }
1122 } else {
1123 unreachable!()
1124 }
1125 },
1126 arrays,
1127 nulls,
1128 )
1129 };
1130 arrow_array::MapArray::new(
1131 {
1132 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1133 map_field.clone()
1134 } else {
1135 unreachable!()
1136 }
1137 },
1138 offset_buffer,
1139 struct_array,
1140 null_buffer,
1141 ordered,
1142 )
1143 }) as ArrayRef;
1144 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1145 col0, col1, col2, col3, col4, col5,
1146 ])
1147 .unwrap();
1148 let output_file = file_io.new_output(
1149 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1150 )?;
1151
1152 let mut pw =
1154 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1155 .build(output_file)
1156 .await?;
1157 pw.write(&to_write).await?;
1158 let res = pw.close().await?;
1159 assert_eq!(res.len(), 1);
1160 let data_file = res
1161 .into_iter()
1162 .next()
1163 .unwrap()
1164 .content(crate::spec::DataContentType::Data)
1166 .partition(Struct::empty())
1167 .partition_spec_id(0)
1168 .build()
1169 .unwrap();
1170
1171 assert_eq!(data_file.record_count(), 1024);
1173 assert_eq!(
1174 *data_file.value_counts(),
1175 HashMap::from([
1176 (0, 1024),
1177 (5, 1024),
1178 (6, 1024),
1179 (2, 1024),
1180 (7, 1024),
1181 (9, 1024),
1182 (11, 1024),
1183 (13, (1..1025).sum()),
1184 ])
1185 );
1186 assert_eq!(
1187 *data_file.lower_bounds(),
1188 HashMap::from([
1189 (0, Datum::long(0)),
1190 (5, Datum::long(0)),
1191 (6, Datum::long(0)),
1192 (2, Datum::string("0")),
1193 (7, Datum::long(0)),
1194 (9, Datum::long(0)),
1195 (11, Datum::string("0")),
1196 (13, Datum::long(0))
1197 ])
1198 );
1199 assert_eq!(
1200 *data_file.upper_bounds(),
1201 HashMap::from([
1202 (0, Datum::long(1023)),
1203 (5, Datum::long(1023)),
1204 (6, Datum::long(1023)),
1205 (2, Datum::string("999")),
1206 (7, Datum::long(1023)),
1207 (9, Datum::long(1023)),
1208 (11, Datum::string("999")),
1209 (13, Datum::long(1023))
1210 ])
1211 );
1212
1213 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1215
1216 Ok(())
1217 }
1218
1219 #[tokio::test]
1220 async fn test_all_type_for_write() -> Result<()> {
1221 let temp_dir = TempDir::new().unwrap();
1222 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1223 let location_gen = DefaultLocationGenerator::with_data_location(
1224 temp_dir.path().to_str().unwrap().to_string(),
1225 );
1226 let file_name_gen =
1227 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1228
1229 let schema = schema_for_all_type();
1232 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1233 let col0 = Arc::new(BooleanArray::from(vec![
1234 Some(true),
1235 Some(false),
1236 None,
1237 Some(true),
1238 ])) as ArrayRef;
1239 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1240 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1241 let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1242 Some(0.5),
1243 Some(2.0),
1244 None,
1245 Some(3.5),
1246 ])) as ArrayRef;
1247 let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1248 Some(0.5),
1249 Some(2.0),
1250 None,
1251 Some(3.5),
1252 ])) as ArrayRef;
1253 let col5 = Arc::new(arrow_array::StringArray::from(vec![
1254 Some("a"),
1255 Some("b"),
1256 None,
1257 Some("d"),
1258 ])) as ArrayRef;
1259 let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1260 Some(b"one"),
1261 None,
1262 Some(b""),
1263 Some(b"zzzz"),
1264 ])) as ArrayRef;
1265 let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1266 Some(0),
1267 Some(1),
1268 None,
1269 Some(3),
1270 ])) as ArrayRef;
1271 let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1272 Some(0),
1273 Some(1),
1274 None,
1275 Some(3),
1276 ])) as ArrayRef;
1277 let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1278 Some(0),
1279 Some(1),
1280 None,
1281 Some(3),
1282 ])) as ArrayRef;
1283 let col10 = Arc::new(
1284 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1285 .with_timezone_utc(),
1286 ) as ArrayRef;
1287 let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1288 Some(0),
1289 Some(1),
1290 None,
1291 Some(3),
1292 ])) as ArrayRef;
1293 let col12 = Arc::new(
1294 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1295 .with_timezone_utc(),
1296 ) as ArrayRef;
1297 let col13 = Arc::new(
1298 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1299 .with_precision_and_scale(10, 5)
1300 .unwrap(),
1301 ) as ArrayRef;
1302 let col14 = Arc::new(
1303 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1304 vec![
1305 Some(Uuid::from_u128(0).as_bytes().to_vec()),
1306 Some(Uuid::from_u128(1).as_bytes().to_vec()),
1307 None,
1308 Some(Uuid::from_u128(3).as_bytes().to_vec()),
1309 ]
1310 .into_iter(),
1311 16,
1312 )
1313 .unwrap(),
1314 ) as ArrayRef;
1315 let col15 = Arc::new(
1316 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1317 vec![
1318 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1319 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1320 None,
1321 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1322 ]
1323 .into_iter(),
1324 10,
1325 )
1326 .unwrap(),
1327 ) as ArrayRef;
1328 let col16 = Arc::new(
1329 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1330 .with_precision_and_scale(38, 5)
1331 .unwrap(),
1332 ) as ArrayRef;
1333 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1334 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1335 col14, col15, col16,
1336 ])
1337 .unwrap();
1338 let output_file = file_io.new_output(
1339 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1340 )?;
1341
1342 let mut pw =
1344 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1345 .build(output_file)
1346 .await?;
1347 pw.write(&to_write).await?;
1348 let res = pw.close().await?;
1349 assert_eq!(res.len(), 1);
1350 let data_file = res
1351 .into_iter()
1352 .next()
1353 .unwrap()
1354 .content(crate::spec::DataContentType::Data)
1356 .partition(Struct::empty())
1357 .partition_spec_id(0)
1358 .build()
1359 .unwrap();
1360
1361 assert_eq!(data_file.record_count(), 4);
1363 assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1364 assert!(
1365 data_file
1366 .null_value_counts()
1367 .iter()
1368 .all(|(_, &v)| { v == 1 })
1369 );
1370 assert_eq!(
1371 *data_file.lower_bounds(),
1372 HashMap::from([
1373 (0, Datum::bool(false)),
1374 (1, Datum::int(1)),
1375 (2, Datum::long(1)),
1376 (3, Datum::float(0.5)),
1377 (4, Datum::double(0.5)),
1378 (5, Datum::string("a")),
1379 (6, Datum::binary(vec![])),
1380 (7, Datum::date(0)),
1381 (8, Datum::time_micros(0).unwrap()),
1382 (9, Datum::timestamp_micros(0)),
1383 (10, Datum::timestamptz_micros(0)),
1384 (11, Datum::timestamp_nanos(0)),
1385 (12, Datum::timestamptz_nanos(0)),
1386 (
1387 13,
1388 Datum::new(
1389 PrimitiveType::Decimal {
1390 precision: 10,
1391 scale: 5
1392 },
1393 PrimitiveLiteral::Int128(1)
1394 )
1395 ),
1396 (14, Datum::uuid(Uuid::from_u128(0))),
1397 (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1398 (
1399 16,
1400 Datum::new(
1401 PrimitiveType::Decimal {
1402 precision: 38,
1403 scale: 5
1404 },
1405 PrimitiveLiteral::Int128(1)
1406 )
1407 ),
1408 ])
1409 );
1410 assert_eq!(
1411 *data_file.upper_bounds(),
1412 HashMap::from([
1413 (0, Datum::bool(true)),
1414 (1, Datum::int(4)),
1415 (2, Datum::long(4)),
1416 (3, Datum::float(3.5)),
1417 (4, Datum::double(3.5)),
1418 (5, Datum::string("d")),
1419 (6, Datum::binary(vec![122, 122, 122, 122])),
1420 (7, Datum::date(3)),
1421 (8, Datum::time_micros(3).unwrap()),
1422 (9, Datum::timestamp_micros(3)),
1423 (10, Datum::timestamptz_micros(3)),
1424 (11, Datum::timestamp_nanos(3)),
1425 (12, Datum::timestamptz_nanos(3)),
1426 (
1427 13,
1428 Datum::new(
1429 PrimitiveType::Decimal {
1430 precision: 10,
1431 scale: 5
1432 },
1433 PrimitiveLiteral::Int128(100)
1434 )
1435 ),
1436 (14, Datum::uuid(Uuid::from_u128(3))),
1437 (
1438 15,
1439 Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1440 ),
1441 (
1442 16,
1443 Datum::new(
1444 PrimitiveType::Decimal {
1445 precision: 38,
1446 scale: 5
1447 },
1448 PrimitiveLiteral::Int128(100)
1449 )
1450 ),
1451 ])
1452 );
1453
1454 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1456
1457 Ok(())
1458 }
1459
1460 #[tokio::test]
1461 async fn test_decimal_bound() -> Result<()> {
1462 let temp_dir = TempDir::new().unwrap();
1463 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1464 let location_gen = DefaultLocationGenerator::with_data_location(
1465 temp_dir.path().to_str().unwrap().to_string(),
1466 );
1467 let file_name_gen =
1468 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1469
1470 let schema = Arc::new(
1472 Schema::builder()
1473 .with_fields(vec![
1474 NestedField::optional(
1475 0,
1476 "decimal",
1477 Type::Primitive(PrimitiveType::Decimal {
1478 precision: 28,
1479 scale: 10,
1480 }),
1481 )
1482 .into(),
1483 ])
1484 .build()
1485 .unwrap(),
1486 );
1487 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1488 let output_file = file_io.new_output(
1489 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1490 )?;
1491 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1492 .build(output_file)
1493 .await?;
1494 let col0 = Arc::new(
1495 Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1496 .with_data_type(DataType::Decimal128(28, 10)),
1497 ) as ArrayRef;
1498 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1499 pw.write(&to_write).await?;
1500 let res = pw.close().await?;
1501 assert_eq!(res.len(), 1);
1502 let data_file = res
1503 .into_iter()
1504 .next()
1505 .unwrap()
1506 .content(crate::spec::DataContentType::Data)
1507 .partition(Struct::empty())
1508 .partition_spec_id(0)
1509 .build()
1510 .unwrap();
1511 assert_eq!(
1512 data_file.upper_bounds().get(&0),
1513 Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1514 .as_ref()
1515 );
1516 assert_eq!(
1517 data_file.lower_bounds().get(&0),
1518 Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1519 .as_ref()
1520 );
1521
1522 let schema = Arc::new(
1524 Schema::builder()
1525 .with_fields(vec![
1526 NestedField::optional(
1527 0,
1528 "decimal",
1529 Type::Primitive(PrimitiveType::Decimal {
1530 precision: 28,
1531 scale: 10,
1532 }),
1533 )
1534 .into(),
1535 ])
1536 .build()
1537 .unwrap(),
1538 );
1539 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1540 let output_file = file_io.new_output(
1541 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1542 )?;
1543 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1544 .build(output_file)
1545 .await?;
1546 let col0 = Arc::new(
1547 Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1548 .with_data_type(DataType::Decimal128(28, 10)),
1549 ) as ArrayRef;
1550 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1551 pw.write(&to_write).await?;
1552 let res = pw.close().await?;
1553 assert_eq!(res.len(), 1);
1554 let data_file = res
1555 .into_iter()
1556 .next()
1557 .unwrap()
1558 .content(crate::spec::DataContentType::Data)
1559 .partition(Struct::empty())
1560 .partition_spec_id(0)
1561 .build()
1562 .unwrap();
1563 assert_eq!(
1564 data_file.upper_bounds().get(&0),
1565 Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1566 .as_ref()
1567 );
1568 assert_eq!(
1569 data_file.lower_bounds().get(&0),
1570 Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1571 .as_ref()
1572 );
1573
1574 let decimal_max = Decimal::MAX;
1576 let decimal_min = Decimal::MIN;
1577 assert_eq!(decimal_max.scale(), decimal_min.scale());
1578 let schema = Arc::new(
1579 Schema::builder()
1580 .with_fields(vec![
1581 NestedField::optional(
1582 0,
1583 "decimal",
1584 Type::Primitive(PrimitiveType::Decimal {
1585 precision: 38,
1586 scale: decimal_max.scale(),
1587 }),
1588 )
1589 .into(),
1590 ])
1591 .build()
1592 .unwrap(),
1593 );
1594 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1595 let output_file = file_io.new_output(
1596 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1597 )?;
1598 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1599 .build(output_file)
1600 .await?;
1601 let col0 = Arc::new(
1602 Decimal128Array::from(vec![
1603 Some(decimal_max.mantissa()),
1604 Some(decimal_min.mantissa()),
1605 ])
1606 .with_data_type(DataType::Decimal128(38, 0)),
1607 ) as ArrayRef;
1608 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1609 pw.write(&to_write).await?;
1610 let res = pw.close().await?;
1611 assert_eq!(res.len(), 1);
1612 let data_file = res
1613 .into_iter()
1614 .next()
1615 .unwrap()
1616 .content(crate::spec::DataContentType::Data)
1617 .partition(Struct::empty())
1618 .partition_spec_id(0)
1619 .build()
1620 .unwrap();
1621 assert_eq!(
1622 data_file.upper_bounds().get(&0),
1623 Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1624 );
1625 assert_eq!(
1626 data_file.lower_bounds().get(&0),
1627 Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1628 );
1629
1630 Ok(())
1700 }
1701
1702 #[tokio::test]
1703 async fn test_empty_write() -> Result<()> {
1704 let temp_dir = TempDir::new().unwrap();
1705 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1706 let location_gen = DefaultLocationGenerator::with_data_location(
1707 temp_dir.path().to_str().unwrap().to_string(),
1708 );
1709 let file_name_gen =
1710 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1711
1712 let schema = {
1714 let fields = vec![
1715 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1716 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1717 ),
1718 ];
1719 Arc::new(arrow_schema::Schema::new(fields))
1720 };
1721 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1722 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1723 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1724 let output_file = file_io.new_output(&file_path)?;
1725 let mut pw = ParquetWriterBuilder::new(
1726 WriterProperties::builder().build(),
1727 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1728 )
1729 .build(output_file)
1730 .await?;
1731 pw.write(&to_write).await?;
1732 pw.close().await.unwrap();
1733 assert!(file_io.exists(&file_path).await.unwrap());
1734
1735 let file_name_gen =
1737 DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1738 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1739 let output_file = file_io.new_output(&file_path)?;
1740 let pw = ParquetWriterBuilder::new(
1741 WriterProperties::builder().build(),
1742 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1743 )
1744 .build(output_file)
1745 .await?;
1746 pw.close().await.unwrap();
1747 assert!(!file_io.exists(&file_path).await.unwrap());
1748
1749 Ok(())
1750 }
1751
1752 #[tokio::test]
1753 async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1754 let temp_dir = TempDir::new().unwrap();
1755 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1756 let location_gen = DefaultLocationGenerator::with_data_location(
1757 temp_dir.path().to_str().unwrap().to_string(),
1758 );
1759 let file_name_gen =
1760 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1761
1762 let arrow_schema = {
1764 let fields = vec![
1765 Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1766 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1767 ),
1768 Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1769 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1770 ),
1771 ];
1772 Arc::new(arrow_schema::Schema::new(fields))
1773 };
1774
1775 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1776 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1777 None,
1778 )) as ArrayRef;
1779
1780 let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1781 [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1782 None,
1783 )) as ArrayRef;
1784
1785 let to_write =
1786 RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1787 let output_file = file_io.new_output(
1788 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1789 )?;
1790
1791 let mut pw = ParquetWriterBuilder::new(
1793 WriterProperties::builder().build(),
1794 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1795 )
1796 .build(output_file)
1797 .await?;
1798
1799 pw.write(&to_write).await?;
1800 let res = pw.close().await?;
1801 assert_eq!(res.len(), 1);
1802 let data_file = res
1803 .into_iter()
1804 .next()
1805 .unwrap()
1806 .content(crate::spec::DataContentType::Data)
1808 .partition(Struct::empty())
1809 .partition_spec_id(0)
1810 .build()
1811 .unwrap();
1812
1813 assert_eq!(data_file.record_count(), 4);
1815 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1816 assert_eq!(
1817 *data_file.lower_bounds(),
1818 HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1819 );
1820 assert_eq!(
1821 *data_file.upper_bounds(),
1822 HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1823 );
1824 assert_eq!(
1825 *data_file.null_value_counts(),
1826 HashMap::from([(0, 0), (1, 0)])
1827 );
1828 assert_eq!(
1829 *data_file.nan_value_counts(),
1830 HashMap::from([(0, 1), (1, 1)])
1831 );
1832
1833 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1835 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1836
1837 Ok(())
1838 }
1839
1840 #[tokio::test]
1841 async fn test_nan_val_cnts_struct_type() -> Result<()> {
1842 let temp_dir = TempDir::new().unwrap();
1843 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1844 let location_gen = DefaultLocationGenerator::with_data_location(
1845 temp_dir.path().to_str().unwrap().to_string(),
1846 );
1847 let file_name_gen =
1848 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1849
1850 let schema_struct_float_fields = Fields::from(vec![
1851 Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1852 PARQUET_FIELD_ID_META_KEY.to_string(),
1853 "4".to_string(),
1854 )])),
1855 ]);
1856
1857 let schema_struct_nested_float_fields = Fields::from(vec![
1858 Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1859 PARQUET_FIELD_ID_META_KEY.to_string(),
1860 "7".to_string(),
1861 )])),
1862 ]);
1863
1864 let schema_struct_nested_fields = Fields::from(vec![
1865 Field::new(
1866 "col6",
1867 arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1868 false,
1869 )
1870 .with_metadata(HashMap::from([(
1871 PARQUET_FIELD_ID_META_KEY.to_string(),
1872 "6".to_string(),
1873 )])),
1874 ]);
1875
1876 let arrow_schema = {
1878 let fields = vec![
1879 Field::new(
1880 "col3",
1881 arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1882 false,
1883 )
1884 .with_metadata(HashMap::from([(
1885 PARQUET_FIELD_ID_META_KEY.to_string(),
1886 "3".to_string(),
1887 )])),
1888 Field::new(
1889 "col5",
1890 arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1891 false,
1892 )
1893 .with_metadata(HashMap::from([(
1894 PARQUET_FIELD_ID_META_KEY.to_string(),
1895 "5".to_string(),
1896 )])),
1897 ];
1898 Arc::new(arrow_schema::Schema::new(fields))
1899 };
1900
1901 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1902 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1903 None,
1904 )) as ArrayRef;
1905
1906 let struct_float_field_col = Arc::new(StructArray::new(
1907 schema_struct_float_fields,
1908 vec![float_32_col.clone()],
1909 None,
1910 )) as ArrayRef;
1911
1912 let struct_nested_float_field_col = Arc::new(StructArray::new(
1913 schema_struct_nested_fields,
1914 vec![Arc::new(StructArray::new(
1915 schema_struct_nested_float_fields,
1916 vec![float_32_col.clone()],
1917 None,
1918 )) as ArrayRef],
1919 None,
1920 )) as ArrayRef;
1921
1922 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1923 struct_float_field_col,
1924 struct_nested_float_field_col,
1925 ])
1926 .unwrap();
1927 let output_file = file_io.new_output(
1928 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1929 )?;
1930
1931 let mut pw = ParquetWriterBuilder::new(
1933 WriterProperties::builder().build(),
1934 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1935 )
1936 .build(output_file)
1937 .await?;
1938
1939 pw.write(&to_write).await?;
1940 let res = pw.close().await?;
1941 assert_eq!(res.len(), 1);
1942 let data_file = res
1943 .into_iter()
1944 .next()
1945 .unwrap()
1946 .content(crate::spec::DataContentType::Data)
1948 .partition(Struct::empty())
1949 .partition_spec_id(0)
1950 .build()
1951 .unwrap();
1952
1953 assert_eq!(data_file.record_count(), 4);
1955 assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1956 assert_eq!(
1957 *data_file.lower_bounds(),
1958 HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1959 );
1960 assert_eq!(
1961 *data_file.upper_bounds(),
1962 HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1963 );
1964 assert_eq!(
1965 *data_file.null_value_counts(),
1966 HashMap::from([(4, 0), (7, 0)])
1967 );
1968 assert_eq!(
1969 *data_file.nan_value_counts(),
1970 HashMap::from([(4, 1), (7, 1)])
1971 );
1972
1973 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1975 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1976
1977 Ok(())
1978 }
1979
1980 #[tokio::test]
1981 async fn test_nan_val_cnts_list_type() -> Result<()> {
1982 let temp_dir = TempDir::new().unwrap();
1983 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1984 let location_gen = DefaultLocationGenerator::with_data_location(
1985 temp_dir.path().to_str().unwrap().to_string(),
1986 );
1987 let file_name_gen =
1988 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1989
1990 let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1991 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1992 );
1993
1994 let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1995 .with_metadata(HashMap::from([(
1996 PARQUET_FIELD_ID_META_KEY.to_string(),
1997 "4".to_string(),
1998 )]));
1999
2000 let schema_struct_list_field = Fields::from(vec![
2001 Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
2002 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
2003 ),
2004 ]);
2005
2006 let arrow_schema = {
2007 let fields = vec![
2008 Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
2009 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
2010 ),
2011 Field::new_struct("col1", schema_struct_list_field.clone(), true)
2012 .with_metadata(HashMap::from([(
2013 PARQUET_FIELD_ID_META_KEY.to_string(),
2014 "2".to_string(),
2015 )]))
2016 .clone(),
2017 ];
2021 Arc::new(arrow_schema::Schema::new(fields))
2022 };
2023
2024 let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
2025 Some(1.0_f32),
2026 Some(f32::NAN),
2027 Some(2.0),
2028 Some(2.0),
2029 ])])
2030 .into_parts();
2031
2032 let list_float_field_col = Arc::new({
2033 let list_parts = list_parts.clone();
2034 ListArray::new(
2035 {
2036 if let DataType::List(field) = arrow_schema.field(0).data_type() {
2037 field.clone()
2038 } else {
2039 unreachable!()
2040 }
2041 },
2042 list_parts.1,
2043 list_parts.2,
2044 list_parts.3,
2045 )
2046 }) as ArrayRef;
2047
2048 let struct_list_fields_schema =
2049 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
2050 fields.clone()
2051 } else {
2052 unreachable!()
2053 };
2054
2055 let struct_list_float_field_col = Arc::new({
2056 ListArray::new(
2057 {
2058 if let DataType::List(field) = struct_list_fields_schema
2059 .first()
2060 .expect("could not find first list field")
2061 .data_type()
2062 {
2063 field.clone()
2064 } else {
2065 unreachable!()
2066 }
2067 },
2068 list_parts.1,
2069 list_parts.2,
2070 list_parts.3,
2071 )
2072 }) as ArrayRef;
2073
2074 let struct_list_float_field_col = Arc::new(StructArray::new(
2075 struct_list_fields_schema,
2076 vec![struct_list_float_field_col.clone()],
2077 None,
2078 )) as ArrayRef;
2079
2080 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2081 list_float_field_col,
2082 struct_list_float_field_col,
2083 ])
2085 .expect("Could not form record batch");
2086 let output_file = file_io.new_output(
2087 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2088 )?;
2089
2090 let mut pw = ParquetWriterBuilder::new(
2092 WriterProperties::builder().build(),
2093 Arc::new(
2094 to_write
2095 .schema()
2096 .as_ref()
2097 .try_into()
2098 .expect("Could not convert iceberg schema"),
2099 ),
2100 )
2101 .build(output_file)
2102 .await?;
2103
2104 pw.write(&to_write).await?;
2105 let res = pw.close().await?;
2106 assert_eq!(res.len(), 1);
2107 let data_file = res
2108 .into_iter()
2109 .next()
2110 .unwrap()
2111 .content(crate::spec::DataContentType::Data)
2112 .partition(Struct::empty())
2113 .partition_spec_id(0)
2114 .build()
2115 .unwrap();
2116
2117 assert_eq!(data_file.record_count(), 1);
2119 assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2120 assert_eq!(
2121 *data_file.lower_bounds(),
2122 HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2123 );
2124 assert_eq!(
2125 *data_file.upper_bounds(),
2126 HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2127 );
2128 assert_eq!(
2129 *data_file.null_value_counts(),
2130 HashMap::from([(1, 0), (4, 0)])
2131 );
2132 assert_eq!(
2133 *data_file.nan_value_counts(),
2134 HashMap::from([(1, 1), (4, 1)])
2135 );
2136
2137 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2139 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2140
2141 Ok(())
2142 }
2143
2144 macro_rules! construct_map_arr {
2145 ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2146 let int_builder = Int32Builder::new();
2147 let float_builder = Float32Builder::with_capacity(4);
2148 let mut builder = MapBuilder::new(None, int_builder, float_builder);
2149 builder.keys().append_value(1);
2150 builder.values().append_value(1.0_f32);
2151 builder.append(true).unwrap();
2152 builder.keys().append_value(2);
2153 builder.values().append_value(f32::NAN);
2154 builder.append(true).unwrap();
2155 builder.keys().append_value(3);
2156 builder.values().append_value(2.0);
2157 builder.append(true).unwrap();
2158 builder.keys().append_value(4);
2159 builder.values().append_value(2.0);
2160 builder.append(true).unwrap();
2161 let array = builder.finish();
2162
2163 let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2164 let new_struct_fields_schema =
2165 Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2166
2167 let entries = {
2168 let (_, arrays, nulls) = entries.into_parts();
2169 StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2170 };
2171
2172 let field = Arc::new(Field::new(
2173 DEFAULT_MAP_FIELD_NAME,
2174 DataType::Struct(new_struct_fields_schema),
2175 false,
2176 ));
2177
2178 Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2179 }};
2180 }
2181
2182 #[tokio::test]
2183 async fn test_nan_val_cnts_map_type() -> Result<()> {
2184 let temp_dir = TempDir::new().unwrap();
2185 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2186 let location_gen = DefaultLocationGenerator::with_data_location(
2187 temp_dir.path().to_str().unwrap().to_string(),
2188 );
2189 let file_name_gen =
2190 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2191
2192 let map_key_field_schema =
2193 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2194 (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2195 ]));
2196
2197 let map_value_field_schema =
2198 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2199 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2200 ));
2201
2202 let struct_map_key_field_schema =
2203 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2204 (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2205 ]));
2206
2207 let struct_map_value_field_schema =
2208 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2209 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2210 ));
2211
2212 let schema_struct_map_field = Fields::from(vec![
2213 Field::new_map(
2214 "col3",
2215 DEFAULT_MAP_FIELD_NAME,
2216 struct_map_key_field_schema.clone(),
2217 struct_map_value_field_schema.clone(),
2218 false,
2219 false,
2220 )
2221 .with_metadata(HashMap::from([(
2222 PARQUET_FIELD_ID_META_KEY.to_string(),
2223 "5".to_string(),
2224 )])),
2225 ]);
2226
2227 let arrow_schema = {
2228 let fields = vec![
2229 Field::new_map(
2230 "col0",
2231 DEFAULT_MAP_FIELD_NAME,
2232 map_key_field_schema.clone(),
2233 map_value_field_schema.clone(),
2234 false,
2235 false,
2236 )
2237 .with_metadata(HashMap::from([(
2238 PARQUET_FIELD_ID_META_KEY.to_string(),
2239 "0".to_string(),
2240 )])),
2241 Field::new_struct("col1", schema_struct_map_field.clone(), true)
2242 .with_metadata(HashMap::from([(
2243 PARQUET_FIELD_ID_META_KEY.to_string(),
2244 "3".to_string(),
2245 )]))
2246 .clone(),
2247 ];
2248 Arc::new(arrow_schema::Schema::new(fields))
2249 };
2250
2251 let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2252
2253 let struct_map_arr =
2254 construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2255
2256 let struct_list_float_field_col = Arc::new(StructArray::new(
2257 schema_struct_map_field,
2258 vec![struct_map_arr],
2259 None,
2260 )) as ArrayRef;
2261
2262 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2263 map_array,
2264 struct_list_float_field_col,
2265 ])
2266 .expect("Could not form record batch");
2267 let output_file = file_io.new_output(
2268 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2269 )?;
2270
2271 let mut pw = ParquetWriterBuilder::new(
2273 WriterProperties::builder().build(),
2274 Arc::new(
2275 to_write
2276 .schema()
2277 .as_ref()
2278 .try_into()
2279 .expect("Could not convert iceberg schema"),
2280 ),
2281 )
2282 .build(output_file)
2283 .await?;
2284
2285 pw.write(&to_write).await?;
2286 let res = pw.close().await?;
2287 assert_eq!(res.len(), 1);
2288 let data_file = res
2289 .into_iter()
2290 .next()
2291 .unwrap()
2292 .content(crate::spec::DataContentType::Data)
2293 .partition(Struct::empty())
2294 .partition_spec_id(0)
2295 .build()
2296 .unwrap();
2297
2298 assert_eq!(data_file.record_count(), 4);
2300 assert_eq!(
2301 *data_file.value_counts(),
2302 HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2303 );
2304 assert_eq!(
2305 *data_file.lower_bounds(),
2306 HashMap::from([
2307 (1, Datum::int(1)),
2308 (2, Datum::float(1.0)),
2309 (6, Datum::int(1)),
2310 (7, Datum::float(1.0))
2311 ])
2312 );
2313 assert_eq!(
2314 *data_file.upper_bounds(),
2315 HashMap::from([
2316 (1, Datum::int(4)),
2317 (2, Datum::float(2.0)),
2318 (6, Datum::int(4)),
2319 (7, Datum::float(2.0))
2320 ])
2321 );
2322 assert_eq!(
2323 *data_file.null_value_counts(),
2324 HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2325 );
2326 assert_eq!(
2327 *data_file.nan_value_counts(),
2328 HashMap::from([(2, 1), (7, 1)])
2329 );
2330
2331 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2333 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2334
2335 Ok(())
2336 }
2337
2338 #[tokio::test]
2339 async fn test_write_empty_parquet_file() {
2340 let temp_dir = TempDir::new().unwrap();
2341 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2342 let location_gen = DefaultLocationGenerator::with_data_location(
2343 temp_dir.path().to_str().unwrap().to_string(),
2344 );
2345 let file_name_gen =
2346 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2347 let output_file = file_io
2348 .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2349 .unwrap();
2350
2351 let pw = ParquetWriterBuilder::new(
2353 WriterProperties::builder().build(),
2354 Arc::new(
2355 Schema::builder()
2356 .with_schema_id(1)
2357 .with_fields(vec![
2358 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2359 .with_id(0)
2360 .into(),
2361 ])
2362 .build()
2363 .expect("Failed to create schema"),
2364 ),
2365 )
2366 .build(output_file)
2367 .await
2368 .unwrap();
2369
2370 let res = pw.close().await.unwrap();
2371 assert_eq!(res.len(), 0);
2372
2373 assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2375 }
2376
2377 #[test]
2378 fn test_min_max_aggregator() {
2379 let schema = Arc::new(
2380 Schema::builder()
2381 .with_schema_id(1)
2382 .with_fields(vec![
2383 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2384 .with_id(0)
2385 .into(),
2386 ])
2387 .build()
2388 .expect("Failed to create schema"),
2389 );
2390 let mut min_max_agg = MinMaxColAggregator::new(schema);
2391 let create_statistics =
2392 |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2393 min_max_agg
2394 .update(0, create_statistics(None, Some(42)))
2395 .unwrap();
2396 min_max_agg
2397 .update(0, create_statistics(Some(0), Some(i32::MAX)))
2398 .unwrap();
2399 min_max_agg
2400 .update(0, create_statistics(Some(i32::MIN), None))
2401 .unwrap();
2402 min_max_agg
2403 .update(0, create_statistics(None, None))
2404 .unwrap();
2405
2406 let (lower_bounds, upper_bounds) = min_max_agg.produce();
2407
2408 assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2409 assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2410 }
2411}