1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::ParquetMetaData;
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33
34use super::{FileWriter, FileWriterBuilder};
35use crate::arrow::{
36 ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
37 get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
38};
39use crate::io::{FileIO, FileWrite, OutputFile};
40use crate::spec::{
41 DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
42 NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43 StructType, TableMetadata, Type, visit_schema,
44};
45use crate::transform::create_transform_function;
46use crate::writer::{CurrentFileStatus, DataFile};
47use crate::{Error, ErrorKind, Result};
48
49#[derive(Clone, Debug)]
51pub struct ParquetWriterBuilder {
52 props: WriterProperties,
53 schema: SchemaRef,
54 match_mode: FieldMatchMode,
55}
56
57impl ParquetWriterBuilder {
58 pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
61 Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
62 }
63
64 pub fn new_with_match_mode(
66 props: WriterProperties,
67 schema: SchemaRef,
68 match_mode: FieldMatchMode,
69 ) -> Self {
70 Self {
71 props,
72 schema,
73 match_mode,
74 }
75 }
76}
77
78impl FileWriterBuilder for ParquetWriterBuilder {
79 type R = ParquetWriter;
80
81 async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
82 Ok(ParquetWriter {
83 schema: self.schema.clone(),
84 inner_writer: None,
85 writer_properties: self.props.clone(),
86 current_row_num: 0,
87 output_file,
88 nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
89 })
90 }
91}
92
93struct IndexByParquetPathName {
95 name_to_id: HashMap<String, i32>,
96
97 field_names: Vec<String>,
98
99 field_id: i32,
100}
101
102impl IndexByParquetPathName {
103 pub fn new() -> Self {
105 Self {
106 name_to_id: HashMap::new(),
107 field_names: Vec::new(),
108 field_id: 0,
109 }
110 }
111
112 pub fn get(&self, name: &str) -> Option<&i32> {
114 self.name_to_id.get(name)
115 }
116}
117
118impl Default for IndexByParquetPathName {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl SchemaVisitor for IndexByParquetPathName {
125 type T = ();
126
127 fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
128 self.field_names.push(field.name.to_string());
129 self.field_id = field.id;
130 Ok(())
131 }
132
133 fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
134 self.field_names.pop();
135 Ok(())
136 }
137
138 fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
139 self.field_names.push(format!("list.{}", field.name));
140 self.field_id = field.id;
141 Ok(())
142 }
143
144 fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
145 self.field_names.pop();
146 Ok(())
147 }
148
149 fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
150 self.field_names
151 .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
152 self.field_id = field.id;
153 Ok(())
154 }
155
156 fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
157 self.field_names.pop();
158 Ok(())
159 }
160
161 fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
162 self.field_names
163 .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
164 self.field_id = field.id;
165 Ok(())
166 }
167
168 fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
169 self.field_names.pop();
170 Ok(())
171 }
172
173 fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
174 Ok(())
175 }
176
177 fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
178 Ok(())
179 }
180
181 fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
182 Ok(())
183 }
184
185 fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
186 Ok(())
187 }
188
189 fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
190 Ok(())
191 }
192
193 fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
194 let full_name = self.field_names.iter().map(String::as_str).join(".");
195 let field_id = self.field_id;
196 if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
197 return Err(Error::new(
198 ErrorKind::DataInvalid,
199 format!(
200 "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
201 ),
202 ));
203 } else {
204 self.name_to_id.insert(full_name, field_id);
205 }
206
207 Ok(())
208 }
209}
210
211pub struct ParquetWriter {
213 schema: SchemaRef,
214 output_file: OutputFile,
215 inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
216 writer_properties: WriterProperties,
217 current_row_num: usize,
218 nan_value_count_visitor: NanValueCountVisitor,
219}
220
221struct MinMaxColAggregator {
223 lower_bounds: HashMap<i32, Datum>,
224 upper_bounds: HashMap<i32, Datum>,
225 schema: SchemaRef,
226}
227
228impl MinMaxColAggregator {
229 fn new(schema: SchemaRef) -> Self {
231 Self {
232 lower_bounds: HashMap::new(),
233 upper_bounds: HashMap::new(),
234 schema,
235 }
236 }
237
238 fn update_state_min(&mut self, field_id: i32, datum: Datum) {
239 self.lower_bounds
240 .entry(field_id)
241 .and_modify(|e| {
242 if *e > datum {
243 *e = datum.clone()
244 }
245 })
246 .or_insert(datum);
247 }
248
249 fn update_state_max(&mut self, field_id: i32, datum: Datum) {
250 self.upper_bounds
251 .entry(field_id)
252 .and_modify(|e| {
253 if *e < datum {
254 *e = datum.clone()
255 }
256 })
257 .or_insert(datum);
258 }
259
260 fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
262 let Some(ty) = self
263 .schema
264 .field_by_id(field_id)
265 .map(|f| f.field_type.as_ref())
266 else {
267 return Ok(());
270 };
271 let Type::Primitive(ty) = ty.clone() else {
272 return Err(Error::new(
273 ErrorKind::Unexpected,
274 format!("Composed type {ty} is not supported for min max aggregation."),
275 ));
276 };
277
278 if value.min_is_exact() {
279 let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
280 return Err(Error::new(
281 ErrorKind::Unexpected,
282 format!("Statistics {value} is not match with field type {ty}."),
283 ));
284 };
285
286 self.update_state_min(field_id, min_datum);
287 }
288
289 if value.max_is_exact() {
290 let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
291 return Err(Error::new(
292 ErrorKind::Unexpected,
293 format!("Statistics {value} is not match with field type {ty}."),
294 ));
295 };
296
297 self.update_state_max(field_id, max_datum);
298 }
299
300 Ok(())
301 }
302
303 fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
305 (self.lower_bounds, self.upper_bounds)
306 }
307}
308
309impl ParquetWriter {
310 #[allow(dead_code)]
312 pub(crate) async fn parquet_files_to_data_files(
313 file_io: &FileIO,
314 file_paths: Vec<String>,
315 table_metadata: &TableMetadata,
316 ) -> Result<Vec<DataFile>> {
317 let mut data_files: Vec<DataFile> = Vec::new();
319
320 for file_path in file_paths {
321 let input_file = file_io.new_input(&file_path)?;
322 let file_metadata = input_file.metadata().await?;
323 let file_size_in_bytes = file_metadata.size as usize;
324 let reader = input_file.reader().await?;
325
326 let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
327 let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
328 Error::new(
329 ErrorKind::DataInvalid,
330 format!("Error reading Parquet metadata: {err}"),
331 )
332 })?;
333 let mut builder = ParquetWriter::parquet_to_data_file_builder(
334 table_metadata.current_schema().clone(),
335 parquet_metadata,
336 file_size_in_bytes,
337 file_path,
338 HashMap::new(),
340 )?;
341 builder.partition_spec_id(table_metadata.default_partition_spec_id());
342 let data_file = builder.build().unwrap();
343 data_files.push(data_file);
344 }
345
346 Ok(data_files)
347 }
348
349 pub(crate) fn parquet_to_data_file_builder(
351 schema: SchemaRef,
352 metadata: Arc<ParquetMetaData>,
353 written_size: usize,
354 file_path: String,
355 nan_value_counts: HashMap<i32, u64>,
356 ) -> Result<DataFileBuilder> {
357 let index_by_parquet_path = {
358 let mut visitor = IndexByParquetPathName::new();
359 visit_schema(&schema, &mut visitor)?;
360 visitor
361 };
362
363 let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
364 let mut per_col_size: HashMap<i32, u64> = HashMap::new();
365 let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
366 let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
367 let mut min_max_agg = MinMaxColAggregator::new(schema);
368
369 for row_group in metadata.row_groups() {
370 for column_chunk_metadata in row_group.columns() {
371 let parquet_path = column_chunk_metadata.column_descr().path().string();
372
373 let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
374 continue;
375 };
376
377 *per_col_size.entry(field_id).or_insert(0) +=
378 column_chunk_metadata.compressed_size() as u64;
379 *per_col_val_num.entry(field_id).or_insert(0) +=
380 column_chunk_metadata.num_values() as u64;
381
382 if let Some(statistics) = column_chunk_metadata.statistics() {
383 if let Some(null_count) = statistics.null_count_opt() {
384 *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
385 }
386
387 min_max_agg.update(field_id, statistics.clone())?;
388 }
389 }
390 }
391 (
392 per_col_size,
393 per_col_val_num,
394 per_col_null_val_num,
395 min_max_agg.produce(),
396 )
397 };
398
399 let mut builder = DataFileBuilder::default();
400 builder
401 .content(DataContentType::Data)
402 .file_path(file_path)
403 .file_format(DataFileFormat::Parquet)
404 .partition(Struct::empty())
405 .record_count(metadata.file_metadata().num_rows() as u64)
406 .file_size_in_bytes(written_size as u64)
407 .column_sizes(column_sizes)
408 .value_counts(value_counts)
409 .null_value_counts(null_value_counts)
410 .nan_value_counts(nan_value_counts)
411 .lower_bounds(lower_bounds)
414 .upper_bounds(upper_bounds)
415 .split_offsets(Some(
416 metadata
417 .row_groups()
418 .iter()
419 .filter_map(|group| group.file_offset())
420 .collect(),
421 ));
422
423 Ok(builder)
424 }
425
426 #[allow(dead_code)]
427 fn partition_value_from_bounds(
428 table_spec: Arc<PartitionSpec>,
429 lower_bounds: &HashMap<i32, Datum>,
430 upper_bounds: &HashMap<i32, Datum>,
431 ) -> Result<Struct> {
432 let mut partition_literals: Vec<Option<Literal>> = Vec::new();
433
434 for field in table_spec.fields() {
435 if let (Some(lower), Some(upper)) = (
436 lower_bounds.get(&field.source_id),
437 upper_bounds.get(&field.source_id),
438 ) {
439 if !field.transform.preserves_order() {
440 return Err(Error::new(
441 ErrorKind::DataInvalid,
442 format!(
443 "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
444 field.name, field.transform
445 ),
446 ));
447 }
448
449 if lower != upper {
450 return Err(Error::new(
451 ErrorKind::DataInvalid,
452 format!(
453 "multiple partition values for field {}: lower: {:?}, upper: {:?}",
454 field.name, lower, upper
455 ),
456 ));
457 }
458
459 let transform_fn = create_transform_function(&field.transform)?;
460 let transform_literal =
461 Literal::from(transform_fn.transform_literal_result(lower)?);
462
463 partition_literals.push(Some(transform_literal));
464 } else {
465 partition_literals.push(None);
466 }
467 }
468
469 let partition_struct = Struct::from_iter(partition_literals);
470
471 Ok(partition_struct)
472 }
473}
474
475impl FileWriter for ParquetWriter {
476 async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
477 if batch.num_rows() == 0 {
479 return Ok(());
480 }
481
482 self.current_row_num += batch.num_rows();
483
484 let batch_c = batch.clone();
485 self.nan_value_count_visitor
486 .compute(self.schema.clone(), batch_c)?;
487
488 let writer = if let Some(writer) = &mut self.inner_writer {
490 writer
491 } else {
492 let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
493 let inner_writer = self.output_file.writer().await?;
494 let async_writer = AsyncFileWriter::new(inner_writer);
495 let writer = AsyncArrowWriter::try_new(
496 async_writer,
497 arrow_schema.clone(),
498 Some(self.writer_properties.clone()),
499 )
500 .map_err(|err| {
501 Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
502 .with_source(err)
503 })?;
504 self.inner_writer = Some(writer);
505 self.inner_writer.as_mut().unwrap()
506 };
507
508 writer.write(batch).await.map_err(|err| {
509 Error::new(
510 ErrorKind::Unexpected,
511 "Failed to write using parquet writer.",
512 )
513 .with_source(err)
514 })?;
515
516 Ok(())
517 }
518
519 async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
520 let mut writer = match self.inner_writer.take() {
521 Some(writer) => writer,
522 None => return Ok(vec![]),
523 };
524
525 let metadata = writer.finish().await.map_err(|err| {
526 Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
527 })?;
528
529 let written_size = writer.bytes_written();
530
531 if self.current_row_num == 0 {
532 self.output_file.delete().await.map_err(|err| {
533 Error::new(
534 ErrorKind::Unexpected,
535 "Failed to delete empty parquet file.",
536 )
537 .with_source(err)
538 })?;
539 Ok(vec![])
540 } else {
541 let parquet_metadata = Arc::new(metadata);
542
543 Ok(vec![Self::parquet_to_data_file_builder(
544 self.schema,
545 parquet_metadata,
546 written_size,
547 self.output_file.location().to_string(),
548 self.nan_value_count_visitor.nan_value_counts,
549 )?])
550 }
551 }
552}
553
554impl CurrentFileStatus for ParquetWriter {
555 fn current_file_path(&self) -> String {
556 self.output_file.location().to_string()
557 }
558
559 fn current_row_num(&self) -> usize {
560 self.current_row_num
561 }
562
563 fn current_written_size(&self) -> usize {
564 if let Some(inner) = self.inner_writer.as_ref() {
565 inner.bytes_written() + inner.in_progress_size()
568 } else {
569 0
571 }
572 }
573
574 fn current_schema(&self) -> SchemaRef {
575 self.schema.clone()
576 }
577}
578
579struct AsyncFileWriter<W: FileWrite>(W);
585
586impl<W: FileWrite> AsyncFileWriter<W> {
587 pub fn new(writer: W) -> Self {
589 Self(writer)
590 }
591}
592
593impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
594 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
595 Box::pin(async {
596 self.0
597 .write(bs)
598 .await
599 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
600 })
601 }
602
603 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
604 Box::pin(async {
605 self.0
606 .close()
607 .await
608 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
609 })
610 }
611}
612
613#[cfg(test)]
614mod tests {
615 use std::collections::HashMap;
616 use std::sync::Arc;
617
618 use anyhow::Result;
619 use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
620 use arrow_array::types::{Float32Type, Int64Type};
621 use arrow_array::{
622 Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
623 Int64Array, ListArray, MapArray, RecordBatch, StructArray,
624 };
625 use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
626 use arrow_select::concat::concat_batches;
627 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
628 use parquet::file::statistics::ValueStatistics;
629 use rust_decimal::Decimal;
630 use tempfile::TempDir;
631 use uuid::Uuid;
632
633 use super::*;
634 use crate::arrow::schema_to_arrow_schema;
635 use crate::io::FileIOBuilder;
636 use crate::spec::{PrimitiveLiteral, Struct, *};
637 use crate::writer::file_writer::location_generator::{
638 DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
639 };
640 use crate::writer::tests::check_parquet_data_file;
641
642 fn schema_for_all_type() -> Schema {
643 Schema::builder()
644 .with_schema_id(1)
645 .with_fields(vec![
646 NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
647 NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
648 NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
649 NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
650 NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
651 NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
652 NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
653 NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
654 NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
655 NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
656 .into(),
657 NestedField::optional(
658 10,
659 "timestamptz",
660 Type::Primitive(PrimitiveType::Timestamptz),
661 )
662 .into(),
663 NestedField::optional(
664 11,
665 "timestamp_ns",
666 Type::Primitive(PrimitiveType::TimestampNs),
667 )
668 .into(),
669 NestedField::optional(
670 12,
671 "timestamptz_ns",
672 Type::Primitive(PrimitiveType::TimestamptzNs),
673 )
674 .into(),
675 NestedField::optional(
676 13,
677 "decimal",
678 Type::Primitive(PrimitiveType::Decimal {
679 precision: 10,
680 scale: 5,
681 }),
682 )
683 .into(),
684 NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
685 NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
686 .into(),
687 NestedField::optional(
690 16,
691 "decimal_38",
692 Type::Primitive(PrimitiveType::Decimal {
693 precision: 38,
694 scale: 5,
695 }),
696 )
697 .into(),
698 ])
699 .build()
700 .unwrap()
701 }
702
703 fn nested_schema_for_test() -> Schema {
704 Schema::builder()
706 .with_schema_id(1)
707 .with_fields(vec![
708 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
709 NestedField::required(
710 1,
711 "col1",
712 Type::Struct(StructType::new(vec![
713 NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
714 .into(),
715 NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
716 .into(),
717 ])),
718 )
719 .into(),
720 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
721 NestedField::required(
722 3,
723 "col3",
724 Type::List(ListType::new(
725 NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
726 .into(),
727 )),
728 )
729 .into(),
730 NestedField::required(
731 4,
732 "col4",
733 Type::Struct(StructType::new(vec![
734 NestedField::required(
735 8,
736 "col_4_8",
737 Type::Struct(StructType::new(vec![
738 NestedField::required(
739 9,
740 "col_4_8_9",
741 Type::Primitive(PrimitiveType::Long),
742 )
743 .into(),
744 ])),
745 )
746 .into(),
747 ])),
748 )
749 .into(),
750 NestedField::required(
751 10,
752 "col5",
753 Type::Map(MapType::new(
754 NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
755 .into(),
756 NestedField::required(
757 12,
758 "value",
759 Type::List(ListType::new(
760 NestedField::required(
761 13,
762 "item",
763 Type::Primitive(PrimitiveType::Long),
764 )
765 .into(),
766 )),
767 )
768 .into(),
769 )),
770 )
771 .into(),
772 ])
773 .build()
774 .unwrap()
775 }
776
777 #[tokio::test]
778 async fn test_index_by_parquet_path() {
779 let expect = HashMap::from([
780 ("col0".to_string(), 0),
781 ("col1.col_1_5".to_string(), 5),
782 ("col1.col_1_6".to_string(), 6),
783 ("col2".to_string(), 2),
784 ("col3.list.element".to_string(), 7),
785 ("col4.col_4_8.col_4_8_9".to_string(), 9),
786 ("col5.key_value.key".to_string(), 11),
787 ("col5.key_value.value.list.item".to_string(), 13),
788 ]);
789 let mut visitor = IndexByParquetPathName::new();
790 visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
791 assert_eq!(visitor.name_to_id, expect);
792 }
793
794 #[tokio::test]
795 async fn test_parquet_writer() -> Result<()> {
796 let temp_dir = TempDir::new().unwrap();
797 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
798 let location_gen = DefaultLocationGenerator::with_data_location(
799 temp_dir.path().to_str().unwrap().to_string(),
800 );
801 let file_name_gen =
802 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
803
804 let schema = {
806 let fields =
807 vec![
808 Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
809 PARQUET_FIELD_ID_META_KEY.to_string(),
810 "0".to_string(),
811 )])),
812 ];
813 Arc::new(arrow_schema::Schema::new(fields))
814 };
815 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
816 let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
817 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
818 let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
819
820 let output_file = file_io.new_output(
821 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
822 )?;
823
824 let mut pw = ParquetWriterBuilder::new(
826 WriterProperties::builder()
827 .set_max_row_group_size(128)
828 .build(),
829 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
830 )
831 .build(output_file)
832 .await?;
833 pw.write(&to_write).await?;
834 pw.write(&to_write_null).await?;
835 let res = pw.close().await?;
836 assert_eq!(res.len(), 1);
837 let data_file = res
838 .into_iter()
839 .next()
840 .unwrap()
841 .content(DataContentType::Data)
843 .partition(Struct::empty())
844 .partition_spec_id(0)
845 .build()
846 .unwrap();
847
848 assert_eq!(data_file.record_count(), 2048);
850 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
851 assert_eq!(
852 *data_file.lower_bounds(),
853 HashMap::from([(0, Datum::long(0))])
854 );
855 assert_eq!(
856 *data_file.upper_bounds(),
857 HashMap::from([(0, Datum::long(1023))])
858 );
859 assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
860
861 let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
863 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
864
865 Ok(())
866 }
867
868 #[tokio::test]
869 async fn test_parquet_writer_with_complex_schema() -> Result<()> {
870 let temp_dir = TempDir::new().unwrap();
871 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
872 let location_gen = DefaultLocationGenerator::with_data_location(
873 temp_dir.path().to_str().unwrap().to_string(),
874 );
875 let file_name_gen =
876 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
877
878 let schema = nested_schema_for_test();
880 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
881 let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
882 let col1 = Arc::new(StructArray::new(
883 {
884 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
885 fields.clone()
886 } else {
887 unreachable!()
888 }
889 },
890 vec![
891 Arc::new(Int64Array::from_iter_values(0..1024)),
892 Arc::new(Int64Array::from_iter_values(0..1024)),
893 ],
894 None,
895 ));
896 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
897 (0..1024).map(|n| n.to_string()),
898 )) as ArrayRef;
899 let col3 = Arc::new({
900 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
901 (0..1024).map(|n| Some(vec![Some(n)])),
902 )
903 .into_parts();
904 arrow_array::ListArray::new(
905 {
906 if let DataType::List(field) = arrow_schema.field(3).data_type() {
907 field.clone()
908 } else {
909 unreachable!()
910 }
911 },
912 list_parts.1,
913 list_parts.2,
914 list_parts.3,
915 )
916 }) as ArrayRef;
917 let col4 = Arc::new(StructArray::new(
918 {
919 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
920 fields.clone()
921 } else {
922 unreachable!()
923 }
924 },
925 vec![Arc::new(StructArray::new(
926 {
927 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
928 if let DataType::Struct(fields) = fields[0].data_type() {
929 fields.clone()
930 } else {
931 unreachable!()
932 }
933 } else {
934 unreachable!()
935 }
936 },
937 vec![Arc::new(Int64Array::from_iter_values(0..1024))],
938 None,
939 ))],
940 None,
941 ));
942 let col5 = Arc::new({
943 let mut map_array_builder = MapBuilder::new(
944 None,
945 arrow_array::builder::StringBuilder::new(),
946 arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
947 Int64Type,
948 >::new()),
949 );
950 for i in 0..1024 {
951 map_array_builder.keys().append_value(i.to_string());
952 map_array_builder
953 .values()
954 .append_value(vec![Some(i as i64); i + 1]);
955 map_array_builder.append(true)?;
956 }
957 let (_, offset_buffer, struct_array, null_buffer, ordered) =
958 map_array_builder.finish().into_parts();
959 let struct_array = {
960 let (_, mut arrays, nulls) = struct_array.into_parts();
961 let list_array = {
962 let list_array = arrays[1]
963 .as_any()
964 .downcast_ref::<ListArray>()
965 .unwrap()
966 .clone();
967 let (_, offsets, array, nulls) = list_array.into_parts();
968 let list_field = {
969 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
970 if let DataType::Struct(fields) = map_field.data_type() {
971 if let DataType::List(list_field) = fields[1].data_type() {
972 list_field.clone()
973 } else {
974 unreachable!()
975 }
976 } else {
977 unreachable!()
978 }
979 } else {
980 unreachable!()
981 }
982 };
983 ListArray::new(list_field, offsets, array, nulls)
984 };
985 arrays[1] = Arc::new(list_array) as ArrayRef;
986 StructArray::new(
987 {
988 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
989 if let DataType::Struct(fields) = map_field.data_type() {
990 fields.clone()
991 } else {
992 unreachable!()
993 }
994 } else {
995 unreachable!()
996 }
997 },
998 arrays,
999 nulls,
1000 )
1001 };
1002 arrow_array::MapArray::new(
1003 {
1004 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1005 map_field.clone()
1006 } else {
1007 unreachable!()
1008 }
1009 },
1010 offset_buffer,
1011 struct_array,
1012 null_buffer,
1013 ordered,
1014 )
1015 }) as ArrayRef;
1016 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1017 col0, col1, col2, col3, col4, col5,
1018 ])
1019 .unwrap();
1020 let output_file = file_io.new_output(
1021 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1022 )?;
1023
1024 let mut pw =
1026 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1027 .build(output_file)
1028 .await?;
1029 pw.write(&to_write).await?;
1030 let res = pw.close().await?;
1031 assert_eq!(res.len(), 1);
1032 let data_file = res
1033 .into_iter()
1034 .next()
1035 .unwrap()
1036 .content(crate::spec::DataContentType::Data)
1038 .partition(Struct::empty())
1039 .partition_spec_id(0)
1040 .build()
1041 .unwrap();
1042
1043 assert_eq!(data_file.record_count(), 1024);
1045 assert_eq!(
1046 *data_file.value_counts(),
1047 HashMap::from([
1048 (0, 1024),
1049 (5, 1024),
1050 (6, 1024),
1051 (2, 1024),
1052 (7, 1024),
1053 (9, 1024),
1054 (11, 1024),
1055 (13, (1..1025).sum()),
1056 ])
1057 );
1058 assert_eq!(
1059 *data_file.lower_bounds(),
1060 HashMap::from([
1061 (0, Datum::long(0)),
1062 (5, Datum::long(0)),
1063 (6, Datum::long(0)),
1064 (2, Datum::string("0")),
1065 (7, Datum::long(0)),
1066 (9, Datum::long(0)),
1067 (11, Datum::string("0")),
1068 (13, Datum::long(0))
1069 ])
1070 );
1071 assert_eq!(
1072 *data_file.upper_bounds(),
1073 HashMap::from([
1074 (0, Datum::long(1023)),
1075 (5, Datum::long(1023)),
1076 (6, Datum::long(1023)),
1077 (2, Datum::string("999")),
1078 (7, Datum::long(1023)),
1079 (9, Datum::long(1023)),
1080 (11, Datum::string("999")),
1081 (13, Datum::long(1023))
1082 ])
1083 );
1084
1085 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1087
1088 Ok(())
1089 }
1090
1091 #[tokio::test]
1092 async fn test_all_type_for_write() -> Result<()> {
1093 let temp_dir = TempDir::new().unwrap();
1094 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1095 let location_gen = DefaultLocationGenerator::with_data_location(
1096 temp_dir.path().to_str().unwrap().to_string(),
1097 );
1098 let file_name_gen =
1099 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1100
1101 let schema = schema_for_all_type();
1104 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1105 let col0 = Arc::new(BooleanArray::from(vec![
1106 Some(true),
1107 Some(false),
1108 None,
1109 Some(true),
1110 ])) as ArrayRef;
1111 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1112 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1113 let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1114 Some(0.5),
1115 Some(2.0),
1116 None,
1117 Some(3.5),
1118 ])) as ArrayRef;
1119 let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1120 Some(0.5),
1121 Some(2.0),
1122 None,
1123 Some(3.5),
1124 ])) as ArrayRef;
1125 let col5 = Arc::new(arrow_array::StringArray::from(vec![
1126 Some("a"),
1127 Some("b"),
1128 None,
1129 Some("d"),
1130 ])) as ArrayRef;
1131 let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1132 Some(b"one"),
1133 None,
1134 Some(b""),
1135 Some(b"zzzz"),
1136 ])) as ArrayRef;
1137 let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1138 Some(0),
1139 Some(1),
1140 None,
1141 Some(3),
1142 ])) as ArrayRef;
1143 let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1144 Some(0),
1145 Some(1),
1146 None,
1147 Some(3),
1148 ])) as ArrayRef;
1149 let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1150 Some(0),
1151 Some(1),
1152 None,
1153 Some(3),
1154 ])) as ArrayRef;
1155 let col10 = Arc::new(
1156 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1157 .with_timezone_utc(),
1158 ) as ArrayRef;
1159 let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1160 Some(0),
1161 Some(1),
1162 None,
1163 Some(3),
1164 ])) as ArrayRef;
1165 let col12 = Arc::new(
1166 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1167 .with_timezone_utc(),
1168 ) as ArrayRef;
1169 let col13 = Arc::new(
1170 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1171 .with_precision_and_scale(10, 5)
1172 .unwrap(),
1173 ) as ArrayRef;
1174 let col14 = Arc::new(
1175 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1176 vec![
1177 Some(Uuid::from_u128(0).as_bytes().to_vec()),
1178 Some(Uuid::from_u128(1).as_bytes().to_vec()),
1179 None,
1180 Some(Uuid::from_u128(3).as_bytes().to_vec()),
1181 ]
1182 .into_iter(),
1183 16,
1184 )
1185 .unwrap(),
1186 ) as ArrayRef;
1187 let col15 = Arc::new(
1188 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1189 vec![
1190 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1191 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1192 None,
1193 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1194 ]
1195 .into_iter(),
1196 10,
1197 )
1198 .unwrap(),
1199 ) as ArrayRef;
1200 let col16 = Arc::new(
1201 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1202 .with_precision_and_scale(38, 5)
1203 .unwrap(),
1204 ) as ArrayRef;
1205 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1206 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1207 col14, col15, col16,
1208 ])
1209 .unwrap();
1210 let output_file = file_io.new_output(
1211 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1212 )?;
1213
1214 let mut pw =
1216 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1217 .build(output_file)
1218 .await?;
1219 pw.write(&to_write).await?;
1220 let res = pw.close().await?;
1221 assert_eq!(res.len(), 1);
1222 let data_file = res
1223 .into_iter()
1224 .next()
1225 .unwrap()
1226 .content(crate::spec::DataContentType::Data)
1228 .partition(Struct::empty())
1229 .partition_spec_id(0)
1230 .build()
1231 .unwrap();
1232
1233 assert_eq!(data_file.record_count(), 4);
1235 assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1236 assert!(
1237 data_file
1238 .null_value_counts()
1239 .iter()
1240 .all(|(_, &v)| { v == 1 })
1241 );
1242 assert_eq!(
1243 *data_file.lower_bounds(),
1244 HashMap::from([
1245 (0, Datum::bool(false)),
1246 (1, Datum::int(1)),
1247 (2, Datum::long(1)),
1248 (3, Datum::float(0.5)),
1249 (4, Datum::double(0.5)),
1250 (5, Datum::string("a")),
1251 (6, Datum::binary(vec![])),
1252 (7, Datum::date(0)),
1253 (8, Datum::time_micros(0).unwrap()),
1254 (9, Datum::timestamp_micros(0)),
1255 (10, Datum::timestamptz_micros(0)),
1256 (11, Datum::timestamp_nanos(0)),
1257 (12, Datum::timestamptz_nanos(0)),
1258 (
1259 13,
1260 Datum::new(
1261 PrimitiveType::Decimal {
1262 precision: 10,
1263 scale: 5
1264 },
1265 PrimitiveLiteral::Int128(1)
1266 )
1267 ),
1268 (14, Datum::uuid(Uuid::from_u128(0))),
1269 (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1270 (
1271 16,
1272 Datum::new(
1273 PrimitiveType::Decimal {
1274 precision: 38,
1275 scale: 5
1276 },
1277 PrimitiveLiteral::Int128(1)
1278 )
1279 ),
1280 ])
1281 );
1282 assert_eq!(
1283 *data_file.upper_bounds(),
1284 HashMap::from([
1285 (0, Datum::bool(true)),
1286 (1, Datum::int(4)),
1287 (2, Datum::long(4)),
1288 (3, Datum::float(3.5)),
1289 (4, Datum::double(3.5)),
1290 (5, Datum::string("d")),
1291 (6, Datum::binary(vec![122, 122, 122, 122])),
1292 (7, Datum::date(3)),
1293 (8, Datum::time_micros(3).unwrap()),
1294 (9, Datum::timestamp_micros(3)),
1295 (10, Datum::timestamptz_micros(3)),
1296 (11, Datum::timestamp_nanos(3)),
1297 (12, Datum::timestamptz_nanos(3)),
1298 (
1299 13,
1300 Datum::new(
1301 PrimitiveType::Decimal {
1302 precision: 10,
1303 scale: 5
1304 },
1305 PrimitiveLiteral::Int128(100)
1306 )
1307 ),
1308 (14, Datum::uuid(Uuid::from_u128(3))),
1309 (
1310 15,
1311 Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1312 ),
1313 (
1314 16,
1315 Datum::new(
1316 PrimitiveType::Decimal {
1317 precision: 38,
1318 scale: 5
1319 },
1320 PrimitiveLiteral::Int128(100)
1321 )
1322 ),
1323 ])
1324 );
1325
1326 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1328
1329 Ok(())
1330 }
1331
1332 #[tokio::test]
1333 async fn test_decimal_bound() -> Result<()> {
1334 let temp_dir = TempDir::new().unwrap();
1335 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1336 let location_gen = DefaultLocationGenerator::with_data_location(
1337 temp_dir.path().to_str().unwrap().to_string(),
1338 );
1339 let file_name_gen =
1340 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1341
1342 let schema = Arc::new(
1344 Schema::builder()
1345 .with_fields(vec![
1346 NestedField::optional(
1347 0,
1348 "decimal",
1349 Type::Primitive(PrimitiveType::Decimal {
1350 precision: 28,
1351 scale: 10,
1352 }),
1353 )
1354 .into(),
1355 ])
1356 .build()
1357 .unwrap(),
1358 );
1359 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1360 let output_file = file_io.new_output(
1361 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1362 )?;
1363 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1364 .build(output_file)
1365 .await?;
1366 let col0 = Arc::new(
1367 Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1368 .with_data_type(DataType::Decimal128(28, 10)),
1369 ) as ArrayRef;
1370 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1371 pw.write(&to_write).await?;
1372 let res = pw.close().await?;
1373 assert_eq!(res.len(), 1);
1374 let data_file = res
1375 .into_iter()
1376 .next()
1377 .unwrap()
1378 .content(crate::spec::DataContentType::Data)
1379 .partition(Struct::empty())
1380 .partition_spec_id(0)
1381 .build()
1382 .unwrap();
1383 assert_eq!(
1384 data_file.upper_bounds().get(&0),
1385 Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1386 .as_ref()
1387 );
1388 assert_eq!(
1389 data_file.lower_bounds().get(&0),
1390 Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1391 .as_ref()
1392 );
1393
1394 let schema = Arc::new(
1396 Schema::builder()
1397 .with_fields(vec![
1398 NestedField::optional(
1399 0,
1400 "decimal",
1401 Type::Primitive(PrimitiveType::Decimal {
1402 precision: 28,
1403 scale: 10,
1404 }),
1405 )
1406 .into(),
1407 ])
1408 .build()
1409 .unwrap(),
1410 );
1411 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1412 let output_file = file_io.new_output(
1413 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1414 )?;
1415 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1416 .build(output_file)
1417 .await?;
1418 let col0 = Arc::new(
1419 Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1420 .with_data_type(DataType::Decimal128(28, 10)),
1421 ) as ArrayRef;
1422 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1423 pw.write(&to_write).await?;
1424 let res = pw.close().await?;
1425 assert_eq!(res.len(), 1);
1426 let data_file = res
1427 .into_iter()
1428 .next()
1429 .unwrap()
1430 .content(crate::spec::DataContentType::Data)
1431 .partition(Struct::empty())
1432 .partition_spec_id(0)
1433 .build()
1434 .unwrap();
1435 assert_eq!(
1436 data_file.upper_bounds().get(&0),
1437 Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1438 .as_ref()
1439 );
1440 assert_eq!(
1441 data_file.lower_bounds().get(&0),
1442 Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1443 .as_ref()
1444 );
1445
1446 let decimal_max = Decimal::MAX;
1448 let decimal_min = Decimal::MIN;
1449 assert_eq!(decimal_max.scale(), decimal_min.scale());
1450 let schema = Arc::new(
1451 Schema::builder()
1452 .with_fields(vec![
1453 NestedField::optional(
1454 0,
1455 "decimal",
1456 Type::Primitive(PrimitiveType::Decimal {
1457 precision: 38,
1458 scale: decimal_max.scale(),
1459 }),
1460 )
1461 .into(),
1462 ])
1463 .build()
1464 .unwrap(),
1465 );
1466 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1467 let output_file = file_io.new_output(
1468 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1469 )?;
1470 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1471 .build(output_file)
1472 .await?;
1473 let col0 = Arc::new(
1474 Decimal128Array::from(vec![
1475 Some(decimal_max.mantissa()),
1476 Some(decimal_min.mantissa()),
1477 ])
1478 .with_data_type(DataType::Decimal128(38, 0)),
1479 ) as ArrayRef;
1480 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1481 pw.write(&to_write).await?;
1482 let res = pw.close().await?;
1483 assert_eq!(res.len(), 1);
1484 let data_file = res
1485 .into_iter()
1486 .next()
1487 .unwrap()
1488 .content(crate::spec::DataContentType::Data)
1489 .partition(Struct::empty())
1490 .partition_spec_id(0)
1491 .build()
1492 .unwrap();
1493 assert_eq!(
1494 data_file.upper_bounds().get(&0),
1495 Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1496 );
1497 assert_eq!(
1498 data_file.lower_bounds().get(&0),
1499 Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1500 );
1501
1502 Ok(())
1572 }
1573
1574 #[tokio::test]
1575 async fn test_empty_write() -> Result<()> {
1576 let temp_dir = TempDir::new().unwrap();
1577 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1578 let location_gen = DefaultLocationGenerator::with_data_location(
1579 temp_dir.path().to_str().unwrap().to_string(),
1580 );
1581 let file_name_gen =
1582 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1583
1584 let schema = {
1586 let fields = vec![
1587 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1588 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1589 ),
1590 ];
1591 Arc::new(arrow_schema::Schema::new(fields))
1592 };
1593 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1594 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1595 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1596 let output_file = file_io.new_output(&file_path)?;
1597 let mut pw = ParquetWriterBuilder::new(
1598 WriterProperties::builder().build(),
1599 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1600 )
1601 .build(output_file)
1602 .await?;
1603 pw.write(&to_write).await?;
1604 pw.close().await.unwrap();
1605 assert!(file_io.exists(&file_path).await.unwrap());
1606
1607 let file_name_gen =
1609 DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1610 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1611 let output_file = file_io.new_output(&file_path)?;
1612 let pw = ParquetWriterBuilder::new(
1613 WriterProperties::builder().build(),
1614 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1615 )
1616 .build(output_file)
1617 .await?;
1618 pw.close().await.unwrap();
1619 assert!(!file_io.exists(&file_path).await.unwrap());
1620
1621 Ok(())
1622 }
1623
1624 #[tokio::test]
1625 async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1626 let temp_dir = TempDir::new().unwrap();
1627 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1628 let location_gen = DefaultLocationGenerator::with_data_location(
1629 temp_dir.path().to_str().unwrap().to_string(),
1630 );
1631 let file_name_gen =
1632 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1633
1634 let arrow_schema = {
1636 let fields = vec![
1637 Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1638 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1639 ),
1640 Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1641 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1642 ),
1643 ];
1644 Arc::new(arrow_schema::Schema::new(fields))
1645 };
1646
1647 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1648 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1649 None,
1650 )) as ArrayRef;
1651
1652 let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1653 [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1654 None,
1655 )) as ArrayRef;
1656
1657 let to_write =
1658 RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1659 let output_file = file_io.new_output(
1660 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1661 )?;
1662
1663 let mut pw = ParquetWriterBuilder::new(
1665 WriterProperties::builder().build(),
1666 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1667 )
1668 .build(output_file)
1669 .await?;
1670
1671 pw.write(&to_write).await?;
1672 let res = pw.close().await?;
1673 assert_eq!(res.len(), 1);
1674 let data_file = res
1675 .into_iter()
1676 .next()
1677 .unwrap()
1678 .content(crate::spec::DataContentType::Data)
1680 .partition(Struct::empty())
1681 .partition_spec_id(0)
1682 .build()
1683 .unwrap();
1684
1685 assert_eq!(data_file.record_count(), 4);
1687 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1688 assert_eq!(
1689 *data_file.lower_bounds(),
1690 HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1691 );
1692 assert_eq!(
1693 *data_file.upper_bounds(),
1694 HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1695 );
1696 assert_eq!(
1697 *data_file.null_value_counts(),
1698 HashMap::from([(0, 0), (1, 0)])
1699 );
1700 assert_eq!(
1701 *data_file.nan_value_counts(),
1702 HashMap::from([(0, 1), (1, 1)])
1703 );
1704
1705 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1707 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1708
1709 Ok(())
1710 }
1711
1712 #[tokio::test]
1713 async fn test_nan_val_cnts_struct_type() -> Result<()> {
1714 let temp_dir = TempDir::new().unwrap();
1715 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1716 let location_gen = DefaultLocationGenerator::with_data_location(
1717 temp_dir.path().to_str().unwrap().to_string(),
1718 );
1719 let file_name_gen =
1720 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1721
1722 let schema_struct_float_fields = Fields::from(vec![
1723 Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1724 PARQUET_FIELD_ID_META_KEY.to_string(),
1725 "4".to_string(),
1726 )])),
1727 ]);
1728
1729 let schema_struct_nested_float_fields = Fields::from(vec![
1730 Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1731 PARQUET_FIELD_ID_META_KEY.to_string(),
1732 "7".to_string(),
1733 )])),
1734 ]);
1735
1736 let schema_struct_nested_fields = Fields::from(vec![
1737 Field::new(
1738 "col6",
1739 arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1740 false,
1741 )
1742 .with_metadata(HashMap::from([(
1743 PARQUET_FIELD_ID_META_KEY.to_string(),
1744 "6".to_string(),
1745 )])),
1746 ]);
1747
1748 let arrow_schema = {
1750 let fields = vec![
1751 Field::new(
1752 "col3",
1753 arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1754 false,
1755 )
1756 .with_metadata(HashMap::from([(
1757 PARQUET_FIELD_ID_META_KEY.to_string(),
1758 "3".to_string(),
1759 )])),
1760 Field::new(
1761 "col5",
1762 arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1763 false,
1764 )
1765 .with_metadata(HashMap::from([(
1766 PARQUET_FIELD_ID_META_KEY.to_string(),
1767 "5".to_string(),
1768 )])),
1769 ];
1770 Arc::new(arrow_schema::Schema::new(fields))
1771 };
1772
1773 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1774 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1775 None,
1776 )) as ArrayRef;
1777
1778 let struct_float_field_col = Arc::new(StructArray::new(
1779 schema_struct_float_fields,
1780 vec![float_32_col.clone()],
1781 None,
1782 )) as ArrayRef;
1783
1784 let struct_nested_float_field_col = Arc::new(StructArray::new(
1785 schema_struct_nested_fields,
1786 vec![Arc::new(StructArray::new(
1787 schema_struct_nested_float_fields,
1788 vec![float_32_col.clone()],
1789 None,
1790 )) as ArrayRef],
1791 None,
1792 )) as ArrayRef;
1793
1794 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1795 struct_float_field_col,
1796 struct_nested_float_field_col,
1797 ])
1798 .unwrap();
1799 let output_file = file_io.new_output(
1800 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1801 )?;
1802
1803 let mut pw = ParquetWriterBuilder::new(
1805 WriterProperties::builder().build(),
1806 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1807 )
1808 .build(output_file)
1809 .await?;
1810
1811 pw.write(&to_write).await?;
1812 let res = pw.close().await?;
1813 assert_eq!(res.len(), 1);
1814 let data_file = res
1815 .into_iter()
1816 .next()
1817 .unwrap()
1818 .content(crate::spec::DataContentType::Data)
1820 .partition(Struct::empty())
1821 .partition_spec_id(0)
1822 .build()
1823 .unwrap();
1824
1825 assert_eq!(data_file.record_count(), 4);
1827 assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1828 assert_eq!(
1829 *data_file.lower_bounds(),
1830 HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1831 );
1832 assert_eq!(
1833 *data_file.upper_bounds(),
1834 HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1835 );
1836 assert_eq!(
1837 *data_file.null_value_counts(),
1838 HashMap::from([(4, 0), (7, 0)])
1839 );
1840 assert_eq!(
1841 *data_file.nan_value_counts(),
1842 HashMap::from([(4, 1), (7, 1)])
1843 );
1844
1845 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1847 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1848
1849 Ok(())
1850 }
1851
1852 #[tokio::test]
1853 async fn test_nan_val_cnts_list_type() -> Result<()> {
1854 let temp_dir = TempDir::new().unwrap();
1855 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1856 let location_gen = DefaultLocationGenerator::with_data_location(
1857 temp_dir.path().to_str().unwrap().to_string(),
1858 );
1859 let file_name_gen =
1860 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1861
1862 let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1863 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1864 );
1865
1866 let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1867 .with_metadata(HashMap::from([(
1868 PARQUET_FIELD_ID_META_KEY.to_string(),
1869 "4".to_string(),
1870 )]));
1871
1872 let schema_struct_list_field = Fields::from(vec![
1873 Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1874 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1875 ),
1876 ]);
1877
1878 let arrow_schema = {
1879 let fields = vec![
1880 Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1881 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1882 ),
1883 Field::new_struct("col1", schema_struct_list_field.clone(), true)
1884 .with_metadata(HashMap::from([(
1885 PARQUET_FIELD_ID_META_KEY.to_string(),
1886 "2".to_string(),
1887 )]))
1888 .clone(),
1889 ];
1893 Arc::new(arrow_schema::Schema::new(fields))
1894 };
1895
1896 let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1897 Some(1.0_f32),
1898 Some(f32::NAN),
1899 Some(2.0),
1900 Some(2.0),
1901 ])])
1902 .into_parts();
1903
1904 let list_float_field_col = Arc::new({
1905 let list_parts = list_parts.clone();
1906 ListArray::new(
1907 {
1908 if let DataType::List(field) = arrow_schema.field(0).data_type() {
1909 field.clone()
1910 } else {
1911 unreachable!()
1912 }
1913 },
1914 list_parts.1,
1915 list_parts.2,
1916 list_parts.3,
1917 )
1918 }) as ArrayRef;
1919
1920 let struct_list_fields_schema =
1921 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1922 fields.clone()
1923 } else {
1924 unreachable!()
1925 };
1926
1927 let struct_list_float_field_col = Arc::new({
1928 ListArray::new(
1929 {
1930 if let DataType::List(field) = struct_list_fields_schema
1931 .first()
1932 .expect("could not find first list field")
1933 .data_type()
1934 {
1935 field.clone()
1936 } else {
1937 unreachable!()
1938 }
1939 },
1940 list_parts.1,
1941 list_parts.2,
1942 list_parts.3,
1943 )
1944 }) as ArrayRef;
1945
1946 let struct_list_float_field_col = Arc::new(StructArray::new(
1947 struct_list_fields_schema,
1948 vec![struct_list_float_field_col.clone()],
1949 None,
1950 )) as ArrayRef;
1951
1952 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1953 list_float_field_col,
1954 struct_list_float_field_col,
1955 ])
1957 .expect("Could not form record batch");
1958 let output_file = file_io.new_output(
1959 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1960 )?;
1961
1962 let mut pw = ParquetWriterBuilder::new(
1964 WriterProperties::builder().build(),
1965 Arc::new(
1966 to_write
1967 .schema()
1968 .as_ref()
1969 .try_into()
1970 .expect("Could not convert iceberg schema"),
1971 ),
1972 )
1973 .build(output_file)
1974 .await?;
1975
1976 pw.write(&to_write).await?;
1977 let res = pw.close().await?;
1978 assert_eq!(res.len(), 1);
1979 let data_file = res
1980 .into_iter()
1981 .next()
1982 .unwrap()
1983 .content(crate::spec::DataContentType::Data)
1984 .partition(Struct::empty())
1985 .partition_spec_id(0)
1986 .build()
1987 .unwrap();
1988
1989 assert_eq!(data_file.record_count(), 1);
1991 assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
1992 assert_eq!(
1993 *data_file.lower_bounds(),
1994 HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
1995 );
1996 assert_eq!(
1997 *data_file.upper_bounds(),
1998 HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
1999 );
2000 assert_eq!(
2001 *data_file.null_value_counts(),
2002 HashMap::from([(1, 0), (4, 0)])
2003 );
2004 assert_eq!(
2005 *data_file.nan_value_counts(),
2006 HashMap::from([(1, 1), (4, 1)])
2007 );
2008
2009 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2011 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2012
2013 Ok(())
2014 }
2015
2016 macro_rules! construct_map_arr {
2017 ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2018 let int_builder = Int32Builder::new();
2019 let float_builder = Float32Builder::with_capacity(4);
2020 let mut builder = MapBuilder::new(None, int_builder, float_builder);
2021 builder.keys().append_value(1);
2022 builder.values().append_value(1.0_f32);
2023 builder.append(true).unwrap();
2024 builder.keys().append_value(2);
2025 builder.values().append_value(f32::NAN);
2026 builder.append(true).unwrap();
2027 builder.keys().append_value(3);
2028 builder.values().append_value(2.0);
2029 builder.append(true).unwrap();
2030 builder.keys().append_value(4);
2031 builder.values().append_value(2.0);
2032 builder.append(true).unwrap();
2033 let array = builder.finish();
2034
2035 let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2036 let new_struct_fields_schema =
2037 Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2038
2039 let entries = {
2040 let (_, arrays, nulls) = entries.into_parts();
2041 StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2042 };
2043
2044 let field = Arc::new(Field::new(
2045 DEFAULT_MAP_FIELD_NAME,
2046 DataType::Struct(new_struct_fields_schema),
2047 false,
2048 ));
2049
2050 Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2051 }};
2052 }
2053
2054 #[tokio::test]
2055 async fn test_nan_val_cnts_map_type() -> Result<()> {
2056 let temp_dir = TempDir::new().unwrap();
2057 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2058 let location_gen = DefaultLocationGenerator::with_data_location(
2059 temp_dir.path().to_str().unwrap().to_string(),
2060 );
2061 let file_name_gen =
2062 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2063
2064 let map_key_field_schema =
2065 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2066 (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2067 ]));
2068
2069 let map_value_field_schema =
2070 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2071 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2072 ));
2073
2074 let struct_map_key_field_schema =
2075 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2076 (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2077 ]));
2078
2079 let struct_map_value_field_schema =
2080 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2081 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2082 ));
2083
2084 let schema_struct_map_field = Fields::from(vec![
2085 Field::new_map(
2086 "col3",
2087 DEFAULT_MAP_FIELD_NAME,
2088 struct_map_key_field_schema.clone(),
2089 struct_map_value_field_schema.clone(),
2090 false,
2091 false,
2092 )
2093 .with_metadata(HashMap::from([(
2094 PARQUET_FIELD_ID_META_KEY.to_string(),
2095 "5".to_string(),
2096 )])),
2097 ]);
2098
2099 let arrow_schema = {
2100 let fields = vec![
2101 Field::new_map(
2102 "col0",
2103 DEFAULT_MAP_FIELD_NAME,
2104 map_key_field_schema.clone(),
2105 map_value_field_schema.clone(),
2106 false,
2107 false,
2108 )
2109 .with_metadata(HashMap::from([(
2110 PARQUET_FIELD_ID_META_KEY.to_string(),
2111 "0".to_string(),
2112 )])),
2113 Field::new_struct("col1", schema_struct_map_field.clone(), true)
2114 .with_metadata(HashMap::from([(
2115 PARQUET_FIELD_ID_META_KEY.to_string(),
2116 "3".to_string(),
2117 )]))
2118 .clone(),
2119 ];
2120 Arc::new(arrow_schema::Schema::new(fields))
2121 };
2122
2123 let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2124
2125 let struct_map_arr =
2126 construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2127
2128 let struct_list_float_field_col = Arc::new(StructArray::new(
2129 schema_struct_map_field,
2130 vec![struct_map_arr],
2131 None,
2132 )) as ArrayRef;
2133
2134 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2135 map_array,
2136 struct_list_float_field_col,
2137 ])
2138 .expect("Could not form record batch");
2139 let output_file = file_io.new_output(
2140 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2141 )?;
2142
2143 let mut pw = ParquetWriterBuilder::new(
2145 WriterProperties::builder().build(),
2146 Arc::new(
2147 to_write
2148 .schema()
2149 .as_ref()
2150 .try_into()
2151 .expect("Could not convert iceberg schema"),
2152 ),
2153 )
2154 .build(output_file)
2155 .await?;
2156
2157 pw.write(&to_write).await?;
2158 let res = pw.close().await?;
2159 assert_eq!(res.len(), 1);
2160 let data_file = res
2161 .into_iter()
2162 .next()
2163 .unwrap()
2164 .content(crate::spec::DataContentType::Data)
2165 .partition(Struct::empty())
2166 .partition_spec_id(0)
2167 .build()
2168 .unwrap();
2169
2170 assert_eq!(data_file.record_count(), 4);
2172 assert_eq!(
2173 *data_file.value_counts(),
2174 HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2175 );
2176 assert_eq!(
2177 *data_file.lower_bounds(),
2178 HashMap::from([
2179 (1, Datum::int(1)),
2180 (2, Datum::float(1.0)),
2181 (6, Datum::int(1)),
2182 (7, Datum::float(1.0))
2183 ])
2184 );
2185 assert_eq!(
2186 *data_file.upper_bounds(),
2187 HashMap::from([
2188 (1, Datum::int(4)),
2189 (2, Datum::float(2.0)),
2190 (6, Datum::int(4)),
2191 (7, Datum::float(2.0))
2192 ])
2193 );
2194 assert_eq!(
2195 *data_file.null_value_counts(),
2196 HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2197 );
2198 assert_eq!(
2199 *data_file.nan_value_counts(),
2200 HashMap::from([(2, 1), (7, 1)])
2201 );
2202
2203 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2205 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2206
2207 Ok(())
2208 }
2209
2210 #[tokio::test]
2211 async fn test_write_empty_parquet_file() {
2212 let temp_dir = TempDir::new().unwrap();
2213 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2214 let location_gen = DefaultLocationGenerator::with_data_location(
2215 temp_dir.path().to_str().unwrap().to_string(),
2216 );
2217 let file_name_gen =
2218 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2219 let output_file = file_io
2220 .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2221 .unwrap();
2222
2223 let pw = ParquetWriterBuilder::new(
2225 WriterProperties::builder().build(),
2226 Arc::new(
2227 Schema::builder()
2228 .with_schema_id(1)
2229 .with_fields(vec![
2230 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2231 .with_id(0)
2232 .into(),
2233 ])
2234 .build()
2235 .expect("Failed to create schema"),
2236 ),
2237 )
2238 .build(output_file)
2239 .await
2240 .unwrap();
2241
2242 let res = pw.close().await.unwrap();
2243 assert_eq!(res.len(), 0);
2244
2245 assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2247 }
2248
2249 #[test]
2250 fn test_min_max_aggregator() {
2251 let schema = Arc::new(
2252 Schema::builder()
2253 .with_schema_id(1)
2254 .with_fields(vec![
2255 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2256 .with_id(0)
2257 .into(),
2258 ])
2259 .build()
2260 .expect("Failed to create schema"),
2261 );
2262 let mut min_max_agg = MinMaxColAggregator::new(schema);
2263 let create_statistics =
2264 |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2265 min_max_agg
2266 .update(0, create_statistics(None, Some(42)))
2267 .unwrap();
2268 min_max_agg
2269 .update(0, create_statistics(Some(0), Some(i32::MAX)))
2270 .unwrap();
2271 min_max_agg
2272 .update(0, create_statistics(Some(i32::MIN), None))
2273 .unwrap();
2274 min_max_agg
2275 .update(0, create_statistics(None, None))
2276 .unwrap();
2277
2278 let (lower_bounds, upper_bounds) = min_max_agg.produce();
2279
2280 assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2281 assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2282 }
2283}