1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33use parquet::format::FileMetaData;
34use parquet::thrift::{TCompactOutputProtocol, TSerializable};
35use thrift::protocol::TOutputProtocol;
36
37use super::location_generator::{FileNameGenerator, LocationGenerator};
38use super::{FileWriter, FileWriterBuilder};
39use crate::arrow::{
40 ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
41 get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
42};
43use crate::io::{FileIO, FileWrite, OutputFile};
44use crate::spec::{
45 DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
46 NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor,
47 Struct, StructType, TableMetadata, Type, visit_schema,
48};
49use crate::transform::create_transform_function;
50use crate::writer::{CurrentFileStatus, DataFile};
51use crate::{Error, ErrorKind, Result};
52
53#[derive(Clone, Debug)]
55pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
56 props: WriterProperties,
57 schema: SchemaRef,
58 partition_key: Option<PartitionKey>,
59 match_mode: FieldMatchMode,
60
61 file_io: FileIO,
62 location_generator: T,
63 file_name_generator: F,
64}
65
66impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
67 pub fn new(
70 props: WriterProperties,
71 schema: SchemaRef,
72 partition_key: Option<PartitionKey>,
73 file_io: FileIO,
74 location_generator: T,
75 file_name_generator: F,
76 ) -> Self {
77 Self::new_with_match_mode(
78 props,
79 schema,
80 partition_key,
81 FieldMatchMode::Id,
82 file_io,
83 location_generator,
84 file_name_generator,
85 )
86 }
87
88 pub fn new_with_match_mode(
90 props: WriterProperties,
91 schema: SchemaRef,
92 partition_key: Option<PartitionKey>,
93 match_mode: FieldMatchMode,
94 file_io: FileIO,
95 location_generator: T,
96 file_name_generator: F,
97 ) -> Self {
98 Self {
99 props,
100 schema,
101 partition_key,
102 match_mode,
103 file_io,
104 location_generator,
105 file_name_generator,
106 }
107 }
108}
109
110impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWriterBuilder<T, F> {
111 type R = ParquetWriter;
112
113 async fn build(self) -> Result<Self::R> {
114 let out_file = self
115 .file_io
116 .new_output(self.location_generator.generate_location(
117 self.partition_key.as_ref(),
118 &self.file_name_generator.generate_file_name(),
119 ))?;
120
121 Ok(ParquetWriter {
122 schema: self.schema.clone(),
123 inner_writer: None,
124 writer_properties: self.props,
125 current_row_num: 0,
126 out_file,
127 nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
128 })
129 }
130}
131
132struct IndexByParquetPathName {
134 name_to_id: HashMap<String, i32>,
135
136 field_names: Vec<String>,
137
138 field_id: i32,
139}
140
141impl IndexByParquetPathName {
142 pub fn new() -> Self {
144 Self {
145 name_to_id: HashMap::new(),
146 field_names: Vec::new(),
147 field_id: 0,
148 }
149 }
150
151 pub fn get(&self, name: &str) -> Option<&i32> {
153 self.name_to_id.get(name)
154 }
155}
156
157impl Default for IndexByParquetPathName {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl SchemaVisitor for IndexByParquetPathName {
164 type T = ();
165
166 fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
167 self.field_names.push(field.name.to_string());
168 self.field_id = field.id;
169 Ok(())
170 }
171
172 fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
173 self.field_names.pop();
174 Ok(())
175 }
176
177 fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
178 self.field_names.push(format!("list.{}", field.name));
179 self.field_id = field.id;
180 Ok(())
181 }
182
183 fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
184 self.field_names.pop();
185 Ok(())
186 }
187
188 fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
189 self.field_names
190 .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
191 self.field_id = field.id;
192 Ok(())
193 }
194
195 fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
196 self.field_names.pop();
197 Ok(())
198 }
199
200 fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
201 self.field_names
202 .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
203 self.field_id = field.id;
204 Ok(())
205 }
206
207 fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
208 self.field_names.pop();
209 Ok(())
210 }
211
212 fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
213 Ok(())
214 }
215
216 fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
217 Ok(())
218 }
219
220 fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
221 Ok(())
222 }
223
224 fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
225 Ok(())
226 }
227
228 fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
229 Ok(())
230 }
231
232 fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
233 let full_name = self.field_names.iter().map(String::as_str).join(".");
234 let field_id = self.field_id;
235 if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
236 return Err(Error::new(
237 ErrorKind::DataInvalid,
238 format!(
239 "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
240 ),
241 ));
242 } else {
243 self.name_to_id.insert(full_name, field_id);
244 }
245
246 Ok(())
247 }
248}
249
250pub struct ParquetWriter {
252 schema: SchemaRef,
253 out_file: OutputFile,
254 inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
255 writer_properties: WriterProperties,
256 current_row_num: usize,
257 nan_value_count_visitor: NanValueCountVisitor,
258}
259
260struct MinMaxColAggregator {
262 lower_bounds: HashMap<i32, Datum>,
263 upper_bounds: HashMap<i32, Datum>,
264 schema: SchemaRef,
265}
266
267impl MinMaxColAggregator {
268 fn new(schema: SchemaRef) -> Self {
270 Self {
271 lower_bounds: HashMap::new(),
272 upper_bounds: HashMap::new(),
273 schema,
274 }
275 }
276
277 fn update_state_min(&mut self, field_id: i32, datum: Datum) {
278 self.lower_bounds
279 .entry(field_id)
280 .and_modify(|e| {
281 if *e > datum {
282 *e = datum.clone()
283 }
284 })
285 .or_insert(datum);
286 }
287
288 fn update_state_max(&mut self, field_id: i32, datum: Datum) {
289 self.upper_bounds
290 .entry(field_id)
291 .and_modify(|e| {
292 if *e < datum {
293 *e = datum.clone()
294 }
295 })
296 .or_insert(datum);
297 }
298
299 fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
301 let Some(ty) = self
302 .schema
303 .field_by_id(field_id)
304 .map(|f| f.field_type.as_ref())
305 else {
306 return Ok(());
309 };
310 let Type::Primitive(ty) = ty.clone() else {
311 return Err(Error::new(
312 ErrorKind::Unexpected,
313 format!(
314 "Composed type {} is not supported for min max aggregation.",
315 ty
316 ),
317 ));
318 };
319
320 if value.min_is_exact() {
321 let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
322 return Err(Error::new(
323 ErrorKind::Unexpected,
324 format!("Statistics {} is not match with field type {}.", value, ty),
325 ));
326 };
327
328 self.update_state_min(field_id, min_datum);
329 }
330
331 if value.max_is_exact() {
332 let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
333 return Err(Error::new(
334 ErrorKind::Unexpected,
335 format!("Statistics {} is not match with field type {}.", value, ty),
336 ));
337 };
338
339 self.update_state_max(field_id, max_datum);
340 }
341
342 Ok(())
343 }
344
345 fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
347 (self.lower_bounds, self.upper_bounds)
348 }
349}
350
351impl ParquetWriter {
352 #[allow(dead_code)]
354 pub(crate) async fn parquet_files_to_data_files(
355 file_io: &FileIO,
356 file_paths: Vec<String>,
357 table_metadata: &TableMetadata,
358 ) -> Result<Vec<DataFile>> {
359 let mut data_files: Vec<DataFile> = Vec::new();
361
362 for file_path in file_paths {
363 let input_file = file_io.new_input(&file_path)?;
364 let file_metadata = input_file.metadata().await?;
365 let file_size_in_bytes = file_metadata.size as usize;
366 let reader = input_file.reader().await?;
367
368 let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
369 let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
370 Error::new(
371 ErrorKind::DataInvalid,
372 format!("Error reading Parquet metadata: {}", err),
373 )
374 })?;
375 let mut builder = ParquetWriter::parquet_to_data_file_builder(
376 table_metadata.current_schema().clone(),
377 parquet_metadata,
378 file_size_in_bytes,
379 file_path,
380 HashMap::new(),
382 )?;
383 builder.partition_spec_id(table_metadata.default_partition_spec_id());
384 let data_file = builder.build().unwrap();
385 data_files.push(data_file);
386 }
387
388 Ok(data_files)
389 }
390
391 fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> {
392 let mut buffer = Vec::new();
393 {
394 let mut protocol = TCompactOutputProtocol::new(&mut buffer);
395 file_metadata
396 .write_to_out_protocol(&mut protocol)
397 .map_err(|err| {
398 Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata")
399 .with_source(err)
400 })?;
401
402 protocol.flush().map_err(|err| {
403 Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err)
404 })?;
405 }
406
407 let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
408 Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err)
409 })?;
410
411 Ok(parquet_metadata)
412 }
413
414 pub(crate) fn parquet_to_data_file_builder(
416 schema: SchemaRef,
417 metadata: Arc<ParquetMetaData>,
418 written_size: usize,
419 file_path: String,
420 nan_value_counts: HashMap<i32, u64>,
421 ) -> Result<DataFileBuilder> {
422 let index_by_parquet_path = {
423 let mut visitor = IndexByParquetPathName::new();
424 visit_schema(&schema, &mut visitor)?;
425 visitor
426 };
427
428 let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
429 let mut per_col_size: HashMap<i32, u64> = HashMap::new();
430 let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
431 let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
432 let mut min_max_agg = MinMaxColAggregator::new(schema);
433
434 for row_group in metadata.row_groups() {
435 for column_chunk_metadata in row_group.columns() {
436 let parquet_path = column_chunk_metadata.column_descr().path().string();
437
438 let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
439 continue;
440 };
441
442 *per_col_size.entry(field_id).or_insert(0) +=
443 column_chunk_metadata.compressed_size() as u64;
444 *per_col_val_num.entry(field_id).or_insert(0) +=
445 column_chunk_metadata.num_values() as u64;
446
447 if let Some(statistics) = column_chunk_metadata.statistics() {
448 if let Some(null_count) = statistics.null_count_opt() {
449 *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
450 }
451
452 min_max_agg.update(field_id, statistics.clone())?;
453 }
454 }
455 }
456 (
457 per_col_size,
458 per_col_val_num,
459 per_col_null_val_num,
460 min_max_agg.produce(),
461 )
462 };
463
464 let mut builder = DataFileBuilder::default();
465 builder
466 .content(DataContentType::Data)
467 .file_path(file_path)
468 .file_format(DataFileFormat::Parquet)
469 .partition(Struct::empty())
470 .record_count(metadata.file_metadata().num_rows() as u64)
471 .file_size_in_bytes(written_size as u64)
472 .column_sizes(column_sizes)
473 .value_counts(value_counts)
474 .null_value_counts(null_value_counts)
475 .nan_value_counts(nan_value_counts)
476 .lower_bounds(lower_bounds)
479 .upper_bounds(upper_bounds)
480 .split_offsets(
481 metadata
482 .row_groups()
483 .iter()
484 .filter_map(|group| group.file_offset())
485 .collect(),
486 );
487
488 Ok(builder)
489 }
490
491 #[allow(dead_code)]
492 fn partition_value_from_bounds(
493 table_spec: Arc<PartitionSpec>,
494 lower_bounds: &HashMap<i32, Datum>,
495 upper_bounds: &HashMap<i32, Datum>,
496 ) -> Result<Struct> {
497 let mut partition_literals: Vec<Option<Literal>> = Vec::new();
498
499 for field in table_spec.fields() {
500 if let (Some(lower), Some(upper)) = (
501 lower_bounds.get(&field.source_id),
502 upper_bounds.get(&field.source_id),
503 ) {
504 if !field.transform.preserves_order() {
505 return Err(Error::new(
506 ErrorKind::DataInvalid,
507 format!(
508 "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
509 field.name, field.transform
510 ),
511 ));
512 }
513
514 if lower != upper {
515 return Err(Error::new(
516 ErrorKind::DataInvalid,
517 format!(
518 "multiple partition values for field {}: lower: {:?}, upper: {:?}",
519 field.name, lower, upper
520 ),
521 ));
522 }
523
524 let transform_fn = create_transform_function(&field.transform)?;
525 let transform_literal =
526 Literal::from(transform_fn.transform_literal_result(lower)?);
527
528 partition_literals.push(Some(transform_literal));
529 } else {
530 partition_literals.push(None);
531 }
532 }
533
534 let partition_struct = Struct::from_iter(partition_literals);
535
536 Ok(partition_struct)
537 }
538}
539
540impl FileWriter for ParquetWriter {
541 async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
542 if batch.num_rows() == 0 {
544 return Ok(());
545 }
546
547 self.current_row_num += batch.num_rows();
548
549 let batch_c = batch.clone();
550 self.nan_value_count_visitor
551 .compute(self.schema.clone(), batch_c)?;
552
553 let writer = if let Some(writer) = &mut self.inner_writer {
555 writer
556 } else {
557 let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
558 let inner_writer = self.out_file.writer().await?;
559 let async_writer = AsyncFileWriter::new(inner_writer);
560 let writer = AsyncArrowWriter::try_new(
561 async_writer,
562 arrow_schema.clone(),
563 Some(self.writer_properties.clone()),
564 )
565 .map_err(|err| {
566 Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
567 .with_source(err)
568 })?;
569 self.inner_writer = Some(writer);
570 self.inner_writer.as_mut().unwrap()
571 };
572
573 writer.write(batch).await.map_err(|err| {
574 Error::new(
575 ErrorKind::Unexpected,
576 "Failed to write using parquet writer.",
577 )
578 .with_source(err)
579 })?;
580
581 Ok(())
582 }
583
584 async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
585 let mut writer = match self.inner_writer.take() {
586 Some(writer) => writer,
587 None => return Ok(vec![]),
588 };
589
590 let metadata = writer.finish().await.map_err(|err| {
591 Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
592 })?;
593
594 let written_size = writer.bytes_written();
595
596 if self.current_row_num == 0 {
597 self.out_file.delete().await.map_err(|err| {
598 Error::new(
599 ErrorKind::Unexpected,
600 "Failed to delete empty parquet file.",
601 )
602 .with_source(err)
603 })?;
604 Ok(vec![])
605 } else {
606 let parquet_metadata =
607 Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
608 Error::new(
609 ErrorKind::Unexpected,
610 "Failed to convert metadata from thrift to parquet.",
611 )
612 .with_source(err)
613 })?);
614
615 Ok(vec![Self::parquet_to_data_file_builder(
616 self.schema,
617 parquet_metadata,
618 written_size,
619 self.out_file.location().to_string(),
620 self.nan_value_count_visitor.nan_value_counts,
621 )?])
622 }
623 }
624}
625
626impl CurrentFileStatus for ParquetWriter {
627 fn current_file_path(&self) -> String {
628 self.out_file.location().to_string()
629 }
630
631 fn current_row_num(&self) -> usize {
632 self.current_row_num
633 }
634
635 fn current_written_size(&self) -> usize {
636 if let Some(inner) = self.inner_writer.as_ref() {
637 inner.bytes_written() + inner.in_progress_size()
640 } else {
641 0
643 }
644 }
645
646 fn current_schema(&self) -> SchemaRef {
647 self.schema.clone()
648 }
649}
650
651struct AsyncFileWriter<W: FileWrite>(W);
657
658impl<W: FileWrite> AsyncFileWriter<W> {
659 pub fn new(writer: W) -> Self {
661 Self(writer)
662 }
663}
664
665impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
666 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
667 Box::pin(async {
668 self.0
669 .write(bs)
670 .await
671 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
672 })
673 }
674
675 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
676 Box::pin(async {
677 self.0
678 .close()
679 .await
680 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
681 })
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use std::collections::HashMap;
688 use std::sync::Arc;
689
690 use anyhow::Result;
691 use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
692 use arrow_array::types::{Float32Type, Int64Type};
693 use arrow_array::{
694 Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
695 Int64Array, ListArray, MapArray, RecordBatch, StructArray,
696 };
697 use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
698 use arrow_select::concat::concat_batches;
699 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
700 use parquet::file::statistics::ValueStatistics;
701 use rust_decimal::Decimal;
702 use tempfile::TempDir;
703 use uuid::Uuid;
704
705 use super::*;
706 use crate::arrow::schema_to_arrow_schema;
707 use crate::io::FileIOBuilder;
708 use crate::spec::{PrimitiveLiteral, Struct, *};
709 use crate::writer::file_writer::location_generator::{
710 DefaultFileNameGenerator, DefaultLocationGenerator,
711 };
712 use crate::writer::tests::check_parquet_data_file;
713
714 fn schema_for_all_type() -> Schema {
715 Schema::builder()
716 .with_schema_id(1)
717 .with_fields(vec![
718 NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
719 NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
720 NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
721 NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
722 NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
723 NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
724 NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
725 NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
726 NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
727 NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
728 .into(),
729 NestedField::optional(
730 10,
731 "timestamptz",
732 Type::Primitive(PrimitiveType::Timestamptz),
733 )
734 .into(),
735 NestedField::optional(
736 11,
737 "timestamp_ns",
738 Type::Primitive(PrimitiveType::TimestampNs),
739 )
740 .into(),
741 NestedField::optional(
742 12,
743 "timestamptz_ns",
744 Type::Primitive(PrimitiveType::TimestamptzNs),
745 )
746 .into(),
747 NestedField::optional(
748 13,
749 "decimal",
750 Type::Primitive(PrimitiveType::Decimal {
751 precision: 10,
752 scale: 5,
753 }),
754 )
755 .into(),
756 NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
757 NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
758 .into(),
759 NestedField::optional(
762 16,
763 "decimal_38",
764 Type::Primitive(PrimitiveType::Decimal {
765 precision: 38,
766 scale: 5,
767 }),
768 )
769 .into(),
770 ])
771 .build()
772 .unwrap()
773 }
774
775 fn nested_schema_for_test() -> Schema {
776 Schema::builder()
778 .with_schema_id(1)
779 .with_fields(vec![
780 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
781 NestedField::required(
782 1,
783 "col1",
784 Type::Struct(StructType::new(vec![
785 NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
786 .into(),
787 NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
788 .into(),
789 ])),
790 )
791 .into(),
792 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
793 NestedField::required(
794 3,
795 "col3",
796 Type::List(ListType::new(
797 NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
798 .into(),
799 )),
800 )
801 .into(),
802 NestedField::required(
803 4,
804 "col4",
805 Type::Struct(StructType::new(vec![
806 NestedField::required(
807 8,
808 "col_4_8",
809 Type::Struct(StructType::new(vec![
810 NestedField::required(
811 9,
812 "col_4_8_9",
813 Type::Primitive(PrimitiveType::Long),
814 )
815 .into(),
816 ])),
817 )
818 .into(),
819 ])),
820 )
821 .into(),
822 NestedField::required(
823 10,
824 "col5",
825 Type::Map(MapType::new(
826 NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
827 .into(),
828 NestedField::required(
829 12,
830 "value",
831 Type::List(ListType::new(
832 NestedField::required(
833 13,
834 "item",
835 Type::Primitive(PrimitiveType::Long),
836 )
837 .into(),
838 )),
839 )
840 .into(),
841 )),
842 )
843 .into(),
844 ])
845 .build()
846 .unwrap()
847 }
848
849 #[tokio::test]
850 async fn test_index_by_parquet_path() {
851 let expect = HashMap::from([
852 ("col0".to_string(), 0),
853 ("col1.col_1_5".to_string(), 5),
854 ("col1.col_1_6".to_string(), 6),
855 ("col2".to_string(), 2),
856 ("col3.list.element".to_string(), 7),
857 ("col4.col_4_8.col_4_8_9".to_string(), 9),
858 ("col5.key_value.key".to_string(), 11),
859 ("col5.key_value.value.list.item".to_string(), 13),
860 ]);
861 let mut visitor = IndexByParquetPathName::new();
862 visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
863 assert_eq!(visitor.name_to_id, expect);
864 }
865
866 #[tokio::test]
867 async fn test_parquet_writer() -> Result<()> {
868 let temp_dir = TempDir::new().unwrap();
869 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
870 let location_gen = DefaultLocationGenerator::with_data_location(
871 temp_dir.path().to_str().unwrap().to_string(),
872 );
873 let file_name_gen =
874 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
875
876 let schema = {
878 let fields = vec![
879 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
880 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
881 ),
882 ];
883 Arc::new(arrow_schema::Schema::new(fields))
884 };
885 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
886 let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
887 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
888 let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
889
890 let mut pw = ParquetWriterBuilder::new(
892 WriterProperties::builder()
893 .set_max_row_group_size(128)
894 .build(),
895 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
896 None,
897 file_io.clone(),
898 location_gen,
899 file_name_gen,
900 )
901 .build()
902 .await?;
903 pw.write(&to_write).await?;
904 pw.write(&to_write_null).await?;
905 let res = pw.close().await?;
906 assert_eq!(res.len(), 1);
907 let data_file = res
908 .into_iter()
909 .next()
910 .unwrap()
911 .content(crate::spec::DataContentType::Data)
913 .partition(Struct::empty())
914 .partition_spec_id(0)
915 .build()
916 .unwrap();
917
918 assert_eq!(data_file.record_count(), 2048);
920 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
921 assert_eq!(
922 *data_file.lower_bounds(),
923 HashMap::from([(0, Datum::long(0))])
924 );
925 assert_eq!(
926 *data_file.upper_bounds(),
927 HashMap::from([(0, Datum::long(1023))])
928 );
929 assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
930
931 let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
933 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
934
935 Ok(())
936 }
937
938 #[tokio::test]
939 async fn test_parquet_writer_with_complex_schema() -> Result<()> {
940 let temp_dir = TempDir::new().unwrap();
941 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
942 let location_gen = DefaultLocationGenerator::with_data_location(
943 temp_dir.path().to_str().unwrap().to_string(),
944 );
945 let file_name_gen =
946 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
947
948 let schema = nested_schema_for_test();
950 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
951 let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
952 let col1 = Arc::new(StructArray::new(
953 {
954 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
955 fields.clone()
956 } else {
957 unreachable!()
958 }
959 },
960 vec![
961 Arc::new(Int64Array::from_iter_values(0..1024)),
962 Arc::new(Int64Array::from_iter_values(0..1024)),
963 ],
964 None,
965 ));
966 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
967 (0..1024).map(|n| n.to_string()),
968 )) as ArrayRef;
969 let col3 = Arc::new({
970 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
971 (0..1024).map(|n| Some(vec![Some(n)])),
972 )
973 .into_parts();
974 arrow_array::ListArray::new(
975 {
976 if let DataType::List(field) = arrow_schema.field(3).data_type() {
977 field.clone()
978 } else {
979 unreachable!()
980 }
981 },
982 list_parts.1,
983 list_parts.2,
984 list_parts.3,
985 )
986 }) as ArrayRef;
987 let col4 = Arc::new(StructArray::new(
988 {
989 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
990 fields.clone()
991 } else {
992 unreachable!()
993 }
994 },
995 vec![Arc::new(StructArray::new(
996 {
997 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
998 if let DataType::Struct(fields) = fields[0].data_type() {
999 fields.clone()
1000 } else {
1001 unreachable!()
1002 }
1003 } else {
1004 unreachable!()
1005 }
1006 },
1007 vec![Arc::new(Int64Array::from_iter_values(0..1024))],
1008 None,
1009 ))],
1010 None,
1011 ));
1012 let col5 = Arc::new({
1013 let mut map_array_builder = arrow_array::builder::MapBuilder::new(
1014 None,
1015 arrow_array::builder::StringBuilder::new(),
1016 arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
1017 Int64Type,
1018 >::new()),
1019 );
1020 for i in 0..1024 {
1021 map_array_builder.keys().append_value(i.to_string());
1022 map_array_builder
1023 .values()
1024 .append_value(vec![Some(i as i64); i + 1]);
1025 map_array_builder.append(true)?;
1026 }
1027 let (_, offset_buffer, struct_array, null_buffer, ordered) =
1028 map_array_builder.finish().into_parts();
1029 let struct_array = {
1030 let (_, mut arrays, nulls) = struct_array.into_parts();
1031 let list_array = {
1032 let list_array = arrays[1]
1033 .as_any()
1034 .downcast_ref::<ListArray>()
1035 .unwrap()
1036 .clone();
1037 let (_, offsets, array, nulls) = list_array.into_parts();
1038 let list_field = {
1039 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1040 if let DataType::Struct(fields) = map_field.data_type() {
1041 if let DataType::List(list_field) = fields[1].data_type() {
1042 list_field.clone()
1043 } else {
1044 unreachable!()
1045 }
1046 } else {
1047 unreachable!()
1048 }
1049 } else {
1050 unreachable!()
1051 }
1052 };
1053 ListArray::new(list_field, offsets, array, nulls)
1054 };
1055 arrays[1] = Arc::new(list_array) as ArrayRef;
1056 StructArray::new(
1057 {
1058 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1059 if let DataType::Struct(fields) = map_field.data_type() {
1060 fields.clone()
1061 } else {
1062 unreachable!()
1063 }
1064 } else {
1065 unreachable!()
1066 }
1067 },
1068 arrays,
1069 nulls,
1070 )
1071 };
1072 arrow_array::MapArray::new(
1073 {
1074 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1075 map_field.clone()
1076 } else {
1077 unreachable!()
1078 }
1079 },
1080 offset_buffer,
1081 struct_array,
1082 null_buffer,
1083 ordered,
1084 )
1085 }) as ArrayRef;
1086 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1087 col0, col1, col2, col3, col4, col5,
1088 ])
1089 .unwrap();
1090
1091 let mut pw = ParquetWriterBuilder::new(
1093 WriterProperties::builder().build(),
1094 Arc::new(schema),
1095 None,
1096 file_io.clone(),
1097 location_gen,
1098 file_name_gen,
1099 )
1100 .build()
1101 .await?;
1102 pw.write(&to_write).await?;
1103 let res = pw.close().await?;
1104 assert_eq!(res.len(), 1);
1105 let data_file = res
1106 .into_iter()
1107 .next()
1108 .unwrap()
1109 .content(crate::spec::DataContentType::Data)
1111 .partition(Struct::empty())
1112 .partition_spec_id(0)
1113 .build()
1114 .unwrap();
1115
1116 assert_eq!(data_file.record_count(), 1024);
1118 assert_eq!(
1119 *data_file.value_counts(),
1120 HashMap::from([
1121 (0, 1024),
1122 (5, 1024),
1123 (6, 1024),
1124 (2, 1024),
1125 (7, 1024),
1126 (9, 1024),
1127 (11, 1024),
1128 (13, (1..1025).sum()),
1129 ])
1130 );
1131 assert_eq!(
1132 *data_file.lower_bounds(),
1133 HashMap::from([
1134 (0, Datum::long(0)),
1135 (5, Datum::long(0)),
1136 (6, Datum::long(0)),
1137 (2, Datum::string("0")),
1138 (7, Datum::long(0)),
1139 (9, Datum::long(0)),
1140 (11, Datum::string("0")),
1141 (13, Datum::long(0))
1142 ])
1143 );
1144 assert_eq!(
1145 *data_file.upper_bounds(),
1146 HashMap::from([
1147 (0, Datum::long(1023)),
1148 (5, Datum::long(1023)),
1149 (6, Datum::long(1023)),
1150 (2, Datum::string("999")),
1151 (7, Datum::long(1023)),
1152 (9, Datum::long(1023)),
1153 (11, Datum::string("999")),
1154 (13, Datum::long(1023))
1155 ])
1156 );
1157
1158 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1160
1161 Ok(())
1162 }
1163
1164 #[tokio::test]
1165 async fn test_all_type_for_write() -> Result<()> {
1166 let temp_dir = TempDir::new().unwrap();
1167 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1168 let loccation_gen = DefaultLocationGenerator::with_data_location(
1169 temp_dir.path().to_str().unwrap().to_string(),
1170 );
1171 let file_name_gen =
1172 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1173
1174 let schema = schema_for_all_type();
1177 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1178 let col0 = Arc::new(BooleanArray::from(vec![
1179 Some(true),
1180 Some(false),
1181 None,
1182 Some(true),
1183 ])) as ArrayRef;
1184 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1185 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1186 let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1187 Some(0.5),
1188 Some(2.0),
1189 None,
1190 Some(3.5),
1191 ])) as ArrayRef;
1192 let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1193 Some(0.5),
1194 Some(2.0),
1195 None,
1196 Some(3.5),
1197 ])) as ArrayRef;
1198 let col5 = Arc::new(arrow_array::StringArray::from(vec![
1199 Some("a"),
1200 Some("b"),
1201 None,
1202 Some("d"),
1203 ])) as ArrayRef;
1204 let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1205 Some(b"one"),
1206 None,
1207 Some(b""),
1208 Some(b"zzzz"),
1209 ])) as ArrayRef;
1210 let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1211 Some(0),
1212 Some(1),
1213 None,
1214 Some(3),
1215 ])) as ArrayRef;
1216 let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1217 Some(0),
1218 Some(1),
1219 None,
1220 Some(3),
1221 ])) as ArrayRef;
1222 let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1223 Some(0),
1224 Some(1),
1225 None,
1226 Some(3),
1227 ])) as ArrayRef;
1228 let col10 = Arc::new(
1229 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1230 .with_timezone_utc(),
1231 ) as ArrayRef;
1232 let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1233 Some(0),
1234 Some(1),
1235 None,
1236 Some(3),
1237 ])) as ArrayRef;
1238 let col12 = Arc::new(
1239 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1240 .with_timezone_utc(),
1241 ) as ArrayRef;
1242 let col13 = Arc::new(
1243 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1244 .with_precision_and_scale(10, 5)
1245 .unwrap(),
1246 ) as ArrayRef;
1247 let col14 = Arc::new(
1248 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1249 vec![
1250 Some(Uuid::from_u128(0).as_bytes().to_vec()),
1251 Some(Uuid::from_u128(1).as_bytes().to_vec()),
1252 None,
1253 Some(Uuid::from_u128(3).as_bytes().to_vec()),
1254 ]
1255 .into_iter(),
1256 16,
1257 )
1258 .unwrap(),
1259 ) as ArrayRef;
1260 let col15 = Arc::new(
1261 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1262 vec![
1263 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1264 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1265 None,
1266 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1267 ]
1268 .into_iter(),
1269 10,
1270 )
1271 .unwrap(),
1272 ) as ArrayRef;
1273 let col16 = Arc::new(
1274 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1275 .with_precision_and_scale(38, 5)
1276 .unwrap(),
1277 ) as ArrayRef;
1278 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1279 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1280 col14, col15, col16,
1281 ])
1282 .unwrap();
1283
1284 let mut pw = ParquetWriterBuilder::new(
1286 WriterProperties::builder().build(),
1287 Arc::new(schema),
1288 None,
1289 file_io.clone(),
1290 loccation_gen,
1291 file_name_gen,
1292 )
1293 .build()
1294 .await?;
1295 pw.write(&to_write).await?;
1296 let res = pw.close().await?;
1297 assert_eq!(res.len(), 1);
1298 let data_file = res
1299 .into_iter()
1300 .next()
1301 .unwrap()
1302 .content(crate::spec::DataContentType::Data)
1304 .partition(Struct::empty())
1305 .partition_spec_id(0)
1306 .build()
1307 .unwrap();
1308
1309 assert_eq!(data_file.record_count(), 4);
1311 assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1312 assert!(
1313 data_file
1314 .null_value_counts()
1315 .iter()
1316 .all(|(_, &v)| { v == 1 })
1317 );
1318 assert_eq!(
1319 *data_file.lower_bounds(),
1320 HashMap::from([
1321 (0, Datum::bool(false)),
1322 (1, Datum::int(1)),
1323 (2, Datum::long(1)),
1324 (3, Datum::float(0.5)),
1325 (4, Datum::double(0.5)),
1326 (5, Datum::string("a")),
1327 (6, Datum::binary(vec![])),
1328 (7, Datum::date(0)),
1329 (8, Datum::time_micros(0).unwrap()),
1330 (9, Datum::timestamp_micros(0)),
1331 (10, Datum::timestamptz_micros(0)),
1332 (11, Datum::timestamp_nanos(0)),
1333 (12, Datum::timestamptz_nanos(0)),
1334 (
1335 13,
1336 Datum::new(
1337 PrimitiveType::Decimal {
1338 precision: 10,
1339 scale: 5
1340 },
1341 PrimitiveLiteral::Int128(1)
1342 )
1343 ),
1344 (14, Datum::uuid(Uuid::from_u128(0))),
1345 (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1346 (
1347 16,
1348 Datum::new(
1349 PrimitiveType::Decimal {
1350 precision: 38,
1351 scale: 5
1352 },
1353 PrimitiveLiteral::Int128(1)
1354 )
1355 ),
1356 ])
1357 );
1358 assert_eq!(
1359 *data_file.upper_bounds(),
1360 HashMap::from([
1361 (0, Datum::bool(true)),
1362 (1, Datum::int(4)),
1363 (2, Datum::long(4)),
1364 (3, Datum::float(3.5)),
1365 (4, Datum::double(3.5)),
1366 (5, Datum::string("d")),
1367 (6, Datum::binary(vec![122, 122, 122, 122])),
1368 (7, Datum::date(3)),
1369 (8, Datum::time_micros(3).unwrap()),
1370 (9, Datum::timestamp_micros(3)),
1371 (10, Datum::timestamptz_micros(3)),
1372 (11, Datum::timestamp_nanos(3)),
1373 (12, Datum::timestamptz_nanos(3)),
1374 (
1375 13,
1376 Datum::new(
1377 PrimitiveType::Decimal {
1378 precision: 10,
1379 scale: 5
1380 },
1381 PrimitiveLiteral::Int128(100)
1382 )
1383 ),
1384 (14, Datum::uuid(Uuid::from_u128(3))),
1385 (
1386 15,
1387 Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1388 ),
1389 (
1390 16,
1391 Datum::new(
1392 PrimitiveType::Decimal {
1393 precision: 38,
1394 scale: 5
1395 },
1396 PrimitiveLiteral::Int128(100)
1397 )
1398 ),
1399 ])
1400 );
1401
1402 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1404
1405 Ok(())
1406 }
1407
1408 #[tokio::test]
1409 async fn test_decimal_bound() -> Result<()> {
1410 let temp_dir = TempDir::new().unwrap();
1411 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1412 let loccation_gen = DefaultLocationGenerator::with_data_location(
1413 temp_dir.path().to_str().unwrap().to_string(),
1414 );
1415 let file_name_gen =
1416 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1417
1418 let schema = Arc::new(
1420 Schema::builder()
1421 .with_fields(vec![
1422 NestedField::optional(
1423 0,
1424 "decimal",
1425 Type::Primitive(PrimitiveType::Decimal {
1426 precision: 28,
1427 scale: 10,
1428 }),
1429 )
1430 .into(),
1431 ])
1432 .build()
1433 .unwrap(),
1434 );
1435 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1436 let mut pw = ParquetWriterBuilder::new(
1437 WriterProperties::builder().build(),
1438 schema.clone(),
1439 None,
1440 file_io.clone(),
1441 loccation_gen.clone(),
1442 file_name_gen.clone(),
1443 )
1444 .build()
1445 .await?;
1446 let col0 = Arc::new(
1447 Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1448 .with_data_type(DataType::Decimal128(28, 10)),
1449 ) as ArrayRef;
1450 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1451 pw.write(&to_write).await?;
1452 let res = pw.close().await?;
1453 assert_eq!(res.len(), 1);
1454 let data_file = res
1455 .into_iter()
1456 .next()
1457 .unwrap()
1458 .content(crate::spec::DataContentType::Data)
1459 .partition(Struct::empty())
1460 .partition_spec_id(0)
1461 .build()
1462 .unwrap();
1463 assert_eq!(
1464 data_file.upper_bounds().get(&0),
1465 Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1466 .as_ref()
1467 );
1468 assert_eq!(
1469 data_file.lower_bounds().get(&0),
1470 Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1471 .as_ref()
1472 );
1473
1474 let schema = Arc::new(
1476 Schema::builder()
1477 .with_fields(vec![
1478 NestedField::optional(
1479 0,
1480 "decimal",
1481 Type::Primitive(PrimitiveType::Decimal {
1482 precision: 28,
1483 scale: 10,
1484 }),
1485 )
1486 .into(),
1487 ])
1488 .build()
1489 .unwrap(),
1490 );
1491 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1492 let mut pw = ParquetWriterBuilder::new(
1493 WriterProperties::builder().build(),
1494 schema.clone(),
1495 None,
1496 file_io.clone(),
1497 loccation_gen.clone(),
1498 file_name_gen.clone(),
1499 )
1500 .build()
1501 .await?;
1502 let col0 = Arc::new(
1503 Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1504 .with_data_type(DataType::Decimal128(28, 10)),
1505 ) as ArrayRef;
1506 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1507 pw.write(&to_write).await?;
1508 let res = pw.close().await?;
1509 assert_eq!(res.len(), 1);
1510 let data_file = res
1511 .into_iter()
1512 .next()
1513 .unwrap()
1514 .content(crate::spec::DataContentType::Data)
1515 .partition(Struct::empty())
1516 .partition_spec_id(0)
1517 .build()
1518 .unwrap();
1519 assert_eq!(
1520 data_file.upper_bounds().get(&0),
1521 Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1522 .as_ref()
1523 );
1524 assert_eq!(
1525 data_file.lower_bounds().get(&0),
1526 Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1527 .as_ref()
1528 );
1529
1530 let decimal_max = Decimal::MAX;
1532 let decimal_min = Decimal::MIN;
1533 assert_eq!(decimal_max.scale(), decimal_min.scale());
1534 let schema = Arc::new(
1535 Schema::builder()
1536 .with_fields(vec![
1537 NestedField::optional(
1538 0,
1539 "decimal",
1540 Type::Primitive(PrimitiveType::Decimal {
1541 precision: 38,
1542 scale: decimal_max.scale(),
1543 }),
1544 )
1545 .into(),
1546 ])
1547 .build()
1548 .unwrap(),
1549 );
1550 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1551 let mut pw = ParquetWriterBuilder::new(
1552 WriterProperties::builder().build(),
1553 schema,
1554 None,
1555 file_io.clone(),
1556 loccation_gen,
1557 file_name_gen,
1558 )
1559 .build()
1560 .await?;
1561 let col0 = Arc::new(
1562 Decimal128Array::from(vec![
1563 Some(decimal_max.mantissa()),
1564 Some(decimal_min.mantissa()),
1565 ])
1566 .with_data_type(DataType::Decimal128(38, 0)),
1567 ) as ArrayRef;
1568 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1569 pw.write(&to_write).await?;
1570 let res = pw.close().await?;
1571 assert_eq!(res.len(), 1);
1572 let data_file = res
1573 .into_iter()
1574 .next()
1575 .unwrap()
1576 .content(crate::spec::DataContentType::Data)
1577 .partition(Struct::empty())
1578 .partition_spec_id(0)
1579 .build()
1580 .unwrap();
1581 assert_eq!(
1582 data_file.upper_bounds().get(&0),
1583 Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1584 );
1585 assert_eq!(
1586 data_file.lower_bounds().get(&0),
1587 Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1588 );
1589
1590 Ok(())
1660 }
1661
1662 #[tokio::test]
1663 async fn test_empty_write() -> Result<()> {
1664 let temp_dir = TempDir::new().unwrap();
1665 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1666 let location_gen = DefaultLocationGenerator::with_data_location(
1667 temp_dir.path().to_str().unwrap().to_string(),
1668 );
1669 let file_name_gen =
1670 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1671
1672 let schema = {
1674 let fields = vec![
1675 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1676 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1677 ),
1678 ];
1679 Arc::new(arrow_schema::Schema::new(fields))
1680 };
1681 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1682 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1683 let mut pw = ParquetWriterBuilder::new(
1684 WriterProperties::builder().build(),
1685 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1686 None,
1687 file_io.clone(),
1688 location_gen.clone(),
1689 file_name_gen,
1690 )
1691 .build()
1692 .await?;
1693 pw.write(&to_write).await?;
1694 let file_path = pw.out_file.location().to_string();
1695 pw.close().await.unwrap();
1696 assert!(file_io.exists(file_path).await.unwrap());
1697
1698 let file_name_gen =
1700 DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1701 let pw = ParquetWriterBuilder::new(
1702 WriterProperties::builder().build(),
1703 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1704 None,
1705 file_io.clone(),
1706 location_gen,
1707 file_name_gen,
1708 )
1709 .build()
1710 .await?;
1711 let file_path = pw.out_file.location().to_string();
1712 pw.close().await.unwrap();
1713 assert!(!file_io.exists(file_path).await.unwrap());
1714
1715 Ok(())
1716 }
1717
1718 #[tokio::test]
1719 async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1720 let temp_dir = TempDir::new().unwrap();
1721 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1722 let location_gen = DefaultLocationGenerator::with_data_location(
1723 temp_dir.path().to_str().unwrap().to_string(),
1724 );
1725 let file_name_gen =
1726 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1727
1728 let arrow_schema = {
1730 let fields = vec![
1731 Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1732 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1733 ),
1734 Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1735 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1736 ),
1737 ];
1738 Arc::new(arrow_schema::Schema::new(fields))
1739 };
1740
1741 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1742 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1743 None,
1744 )) as ArrayRef;
1745
1746 let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1747 [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1748 None,
1749 )) as ArrayRef;
1750
1751 let to_write =
1752 RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1753
1754 let mut pw = ParquetWriterBuilder::new(
1756 WriterProperties::builder().build(),
1757 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1758 None,
1759 file_io.clone(),
1760 location_gen,
1761 file_name_gen,
1762 )
1763 .build()
1764 .await?;
1765
1766 pw.write(&to_write).await?;
1767 let res = pw.close().await?;
1768 assert_eq!(res.len(), 1);
1769 let data_file = res
1770 .into_iter()
1771 .next()
1772 .unwrap()
1773 .content(crate::spec::DataContentType::Data)
1775 .partition(Struct::empty())
1776 .partition_spec_id(0)
1777 .build()
1778 .unwrap();
1779
1780 assert_eq!(data_file.record_count(), 4);
1782 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1783 assert_eq!(
1784 *data_file.lower_bounds(),
1785 HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1786 );
1787 assert_eq!(
1788 *data_file.upper_bounds(),
1789 HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1790 );
1791 assert_eq!(
1792 *data_file.null_value_counts(),
1793 HashMap::from([(0, 0), (1, 0)])
1794 );
1795 assert_eq!(
1796 *data_file.nan_value_counts(),
1797 HashMap::from([(0, 1), (1, 1)])
1798 );
1799
1800 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1802 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1803
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn test_nan_val_cnts_struct_type() -> Result<()> {
1809 let temp_dir = TempDir::new().unwrap();
1810 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1811 let location_gen = DefaultLocationGenerator::with_data_location(
1812 temp_dir.path().to_str().unwrap().to_string(),
1813 );
1814 let file_name_gen =
1815 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1816
1817 let schema_struct_float_fields = Fields::from(vec![
1818 Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1819 PARQUET_FIELD_ID_META_KEY.to_string(),
1820 "4".to_string(),
1821 )])),
1822 ]);
1823
1824 let schema_struct_nested_float_fields = Fields::from(vec![
1825 Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1826 PARQUET_FIELD_ID_META_KEY.to_string(),
1827 "7".to_string(),
1828 )])),
1829 ]);
1830
1831 let schema_struct_nested_fields = Fields::from(vec![
1832 Field::new(
1833 "col6",
1834 arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1835 false,
1836 )
1837 .with_metadata(HashMap::from([(
1838 PARQUET_FIELD_ID_META_KEY.to_string(),
1839 "6".to_string(),
1840 )])),
1841 ]);
1842
1843 let arrow_schema = {
1845 let fields = vec![
1846 Field::new(
1847 "col3",
1848 arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1849 false,
1850 )
1851 .with_metadata(HashMap::from([(
1852 PARQUET_FIELD_ID_META_KEY.to_string(),
1853 "3".to_string(),
1854 )])),
1855 Field::new(
1856 "col5",
1857 arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1858 false,
1859 )
1860 .with_metadata(HashMap::from([(
1861 PARQUET_FIELD_ID_META_KEY.to_string(),
1862 "5".to_string(),
1863 )])),
1864 ];
1865 Arc::new(arrow_schema::Schema::new(fields))
1866 };
1867
1868 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1869 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1870 None,
1871 )) as ArrayRef;
1872
1873 let struct_float_field_col = Arc::new(StructArray::new(
1874 schema_struct_float_fields,
1875 vec![float_32_col.clone()],
1876 None,
1877 )) as ArrayRef;
1878
1879 let struct_nested_float_field_col = Arc::new(StructArray::new(
1880 schema_struct_nested_fields,
1881 vec![Arc::new(StructArray::new(
1882 schema_struct_nested_float_fields,
1883 vec![float_32_col.clone()],
1884 None,
1885 )) as ArrayRef],
1886 None,
1887 )) as ArrayRef;
1888
1889 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1890 struct_float_field_col,
1891 struct_nested_float_field_col,
1892 ])
1893 .unwrap();
1894
1895 let mut pw = ParquetWriterBuilder::new(
1897 WriterProperties::builder().build(),
1898 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1899 None,
1900 file_io.clone(),
1901 location_gen,
1902 file_name_gen,
1903 )
1904 .build()
1905 .await?;
1906
1907 pw.write(&to_write).await?;
1908 let res = pw.close().await?;
1909 assert_eq!(res.len(), 1);
1910 let data_file = res
1911 .into_iter()
1912 .next()
1913 .unwrap()
1914 .content(crate::spec::DataContentType::Data)
1916 .partition(Struct::empty())
1917 .partition_spec_id(0)
1918 .build()
1919 .unwrap();
1920
1921 assert_eq!(data_file.record_count(), 4);
1923 assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1924 assert_eq!(
1925 *data_file.lower_bounds(),
1926 HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1927 );
1928 assert_eq!(
1929 *data_file.upper_bounds(),
1930 HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1931 );
1932 assert_eq!(
1933 *data_file.null_value_counts(),
1934 HashMap::from([(4, 0), (7, 0)])
1935 );
1936 assert_eq!(
1937 *data_file.nan_value_counts(),
1938 HashMap::from([(4, 1), (7, 1)])
1939 );
1940
1941 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1943 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1944
1945 Ok(())
1946 }
1947
1948 #[tokio::test]
1949 async fn test_nan_val_cnts_list_type() -> Result<()> {
1950 let temp_dir = TempDir::new().unwrap();
1951 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1952 let location_gen = DefaultLocationGenerator::with_data_location(
1953 temp_dir.path().to_str().unwrap().to_string(),
1954 );
1955 let file_name_gen =
1956 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1957
1958 let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1959 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1960 );
1961
1962 let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1963 .with_metadata(HashMap::from([(
1964 PARQUET_FIELD_ID_META_KEY.to_string(),
1965 "4".to_string(),
1966 )]));
1967
1968 let schema_struct_list_field = Fields::from(vec![
1969 Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1970 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1971 ),
1972 ]);
1973
1974 let arrow_schema = {
1975 let fields = vec![
1976 Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1977 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1978 ),
1979 Field::new_struct("col1", schema_struct_list_field.clone(), true)
1980 .with_metadata(HashMap::from([(
1981 PARQUET_FIELD_ID_META_KEY.to_string(),
1982 "2".to_string(),
1983 )]))
1984 .clone(),
1985 ];
1989 Arc::new(arrow_schema::Schema::new(fields))
1990 };
1991
1992 let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1993 Some(1.0_f32),
1994 Some(f32::NAN),
1995 Some(2.0),
1996 Some(2.0),
1997 ])])
1998 .into_parts();
1999
2000 let list_float_field_col = Arc::new({
2001 let list_parts = list_parts.clone();
2002 ListArray::new(
2003 {
2004 if let DataType::List(field) = arrow_schema.field(0).data_type() {
2005 field.clone()
2006 } else {
2007 unreachable!()
2008 }
2009 },
2010 list_parts.1,
2011 list_parts.2,
2012 list_parts.3,
2013 )
2014 }) as ArrayRef;
2015
2016 let struct_list_fields_schema =
2017 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
2018 fields.clone()
2019 } else {
2020 unreachable!()
2021 };
2022
2023 let struct_list_float_field_col = Arc::new({
2024 ListArray::new(
2025 {
2026 if let DataType::List(field) = struct_list_fields_schema
2027 .first()
2028 .expect("could not find first list field")
2029 .data_type()
2030 {
2031 field.clone()
2032 } else {
2033 unreachable!()
2034 }
2035 },
2036 list_parts.1,
2037 list_parts.2,
2038 list_parts.3,
2039 )
2040 }) as ArrayRef;
2041
2042 let struct_list_float_field_col = Arc::new(StructArray::new(
2043 struct_list_fields_schema,
2044 vec![struct_list_float_field_col.clone()],
2045 None,
2046 )) as ArrayRef;
2047
2048 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2049 list_float_field_col,
2050 struct_list_float_field_col,
2051 ])
2053 .expect("Could not form record batch");
2054
2055 let mut pw = ParquetWriterBuilder::new(
2057 WriterProperties::builder().build(),
2058 Arc::new(
2059 to_write
2060 .schema()
2061 .as_ref()
2062 .try_into()
2063 .expect("Could not convert iceberg schema"),
2064 ),
2065 None,
2066 file_io.clone(),
2067 location_gen,
2068 file_name_gen,
2069 )
2070 .build()
2071 .await?;
2072
2073 pw.write(&to_write).await?;
2074 let res = pw.close().await?;
2075 assert_eq!(res.len(), 1);
2076 let data_file = res
2077 .into_iter()
2078 .next()
2079 .unwrap()
2080 .content(crate::spec::DataContentType::Data)
2081 .partition(Struct::empty())
2082 .partition_spec_id(0)
2083 .build()
2084 .unwrap();
2085
2086 assert_eq!(data_file.record_count(), 1);
2088 assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2089 assert_eq!(
2090 *data_file.lower_bounds(),
2091 HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2092 );
2093 assert_eq!(
2094 *data_file.upper_bounds(),
2095 HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2096 );
2097 assert_eq!(
2098 *data_file.null_value_counts(),
2099 HashMap::from([(1, 0), (4, 0)])
2100 );
2101 assert_eq!(
2102 *data_file.nan_value_counts(),
2103 HashMap::from([(1, 1), (4, 1)])
2104 );
2105
2106 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2108 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2109
2110 Ok(())
2111 }
2112
2113 macro_rules! construct_map_arr {
2114 ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2115 let int_builder = Int32Builder::new();
2116 let float_builder = Float32Builder::with_capacity(4);
2117 let mut builder = MapBuilder::new(None, int_builder, float_builder);
2118 builder.keys().append_value(1);
2119 builder.values().append_value(1.0_f32);
2120 builder.append(true).unwrap();
2121 builder.keys().append_value(2);
2122 builder.values().append_value(f32::NAN);
2123 builder.append(true).unwrap();
2124 builder.keys().append_value(3);
2125 builder.values().append_value(2.0);
2126 builder.append(true).unwrap();
2127 builder.keys().append_value(4);
2128 builder.values().append_value(2.0);
2129 builder.append(true).unwrap();
2130 let array = builder.finish();
2131
2132 let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2133 let new_struct_fields_schema =
2134 Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2135
2136 let entries = {
2137 let (_, arrays, nulls) = entries.into_parts();
2138 StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2139 };
2140
2141 let field = Arc::new(Field::new(
2142 DEFAULT_MAP_FIELD_NAME,
2143 DataType::Struct(new_struct_fields_schema),
2144 false,
2145 ));
2146
2147 Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2148 }};
2149 }
2150
2151 #[tokio::test]
2152 async fn test_nan_val_cnts_map_type() -> Result<()> {
2153 let temp_dir = TempDir::new().unwrap();
2154 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2155 let location_gen = DefaultLocationGenerator::with_data_location(
2156 temp_dir.path().to_str().unwrap().to_string(),
2157 );
2158 let file_name_gen =
2159 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2160
2161 let map_key_field_schema =
2162 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2163 (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2164 ]));
2165
2166 let map_value_field_schema =
2167 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2168 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2169 ));
2170
2171 let struct_map_key_field_schema =
2172 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2173 (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2174 ]));
2175
2176 let struct_map_value_field_schema =
2177 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2178 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2179 ));
2180
2181 let schema_struct_map_field = Fields::from(vec![
2182 Field::new_map(
2183 "col3",
2184 DEFAULT_MAP_FIELD_NAME,
2185 struct_map_key_field_schema.clone(),
2186 struct_map_value_field_schema.clone(),
2187 false,
2188 false,
2189 )
2190 .with_metadata(HashMap::from([(
2191 PARQUET_FIELD_ID_META_KEY.to_string(),
2192 "5".to_string(),
2193 )])),
2194 ]);
2195
2196 let arrow_schema = {
2197 let fields = vec![
2198 Field::new_map(
2199 "col0",
2200 DEFAULT_MAP_FIELD_NAME,
2201 map_key_field_schema.clone(),
2202 map_value_field_schema.clone(),
2203 false,
2204 false,
2205 )
2206 .with_metadata(HashMap::from([(
2207 PARQUET_FIELD_ID_META_KEY.to_string(),
2208 "0".to_string(),
2209 )])),
2210 Field::new_struct("col1", schema_struct_map_field.clone(), true)
2211 .with_metadata(HashMap::from([(
2212 PARQUET_FIELD_ID_META_KEY.to_string(),
2213 "3".to_string(),
2214 )]))
2215 .clone(),
2216 ];
2217 Arc::new(arrow_schema::Schema::new(fields))
2218 };
2219
2220 let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2221
2222 let struct_map_arr =
2223 construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2224
2225 let struct_list_float_field_col = Arc::new(StructArray::new(
2226 schema_struct_map_field,
2227 vec![struct_map_arr],
2228 None,
2229 )) as ArrayRef;
2230
2231 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2232 map_array,
2233 struct_list_float_field_col,
2234 ])
2235 .expect("Could not form record batch");
2236
2237 let mut pw = ParquetWriterBuilder::new(
2239 WriterProperties::builder().build(),
2240 Arc::new(
2241 to_write
2242 .schema()
2243 .as_ref()
2244 .try_into()
2245 .expect("Could not convert iceberg schema"),
2246 ),
2247 None,
2248 file_io.clone(),
2249 location_gen,
2250 file_name_gen,
2251 )
2252 .build()
2253 .await?;
2254
2255 pw.write(&to_write).await?;
2256 let res = pw.close().await?;
2257 assert_eq!(res.len(), 1);
2258 let data_file = res
2259 .into_iter()
2260 .next()
2261 .unwrap()
2262 .content(crate::spec::DataContentType::Data)
2263 .partition(Struct::empty())
2264 .partition_spec_id(0)
2265 .build()
2266 .unwrap();
2267
2268 assert_eq!(data_file.record_count(), 4);
2270 assert_eq!(
2271 *data_file.value_counts(),
2272 HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2273 );
2274 assert_eq!(
2275 *data_file.lower_bounds(),
2276 HashMap::from([
2277 (1, Datum::int(1)),
2278 (2, Datum::float(1.0)),
2279 (6, Datum::int(1)),
2280 (7, Datum::float(1.0))
2281 ])
2282 );
2283 assert_eq!(
2284 *data_file.upper_bounds(),
2285 HashMap::from([
2286 (1, Datum::int(4)),
2287 (2, Datum::float(2.0)),
2288 (6, Datum::int(4)),
2289 (7, Datum::float(2.0))
2290 ])
2291 );
2292 assert_eq!(
2293 *data_file.null_value_counts(),
2294 HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2295 );
2296 assert_eq!(
2297 *data_file.nan_value_counts(),
2298 HashMap::from([(2, 1), (7, 1)])
2299 );
2300
2301 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2303 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2304
2305 Ok(())
2306 }
2307
2308 #[tokio::test]
2309 async fn test_write_empty_parquet_file() {
2310 let temp_dir = TempDir::new().unwrap();
2311 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2312 let location_gen = DefaultLocationGenerator::with_data_location(
2313 temp_dir.path().to_str().unwrap().to_string(),
2314 );
2315 let file_name_gen =
2316 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2317
2318 let pw = ParquetWriterBuilder::new(
2320 WriterProperties::builder().build(),
2321 Arc::new(
2322 Schema::builder()
2323 .with_schema_id(1)
2324 .with_fields(vec![
2325 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2326 .with_id(0)
2327 .into(),
2328 ])
2329 .build()
2330 .expect("Failed to create schema"),
2331 ),
2332 None,
2333 file_io.clone(),
2334 location_gen,
2335 file_name_gen,
2336 )
2337 .build()
2338 .await
2339 .unwrap();
2340
2341 let res = pw.close().await.unwrap();
2342 assert_eq!(res.len(), 0);
2343
2344 assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2346 }
2347
2348 #[test]
2349 fn test_min_max_aggregator() {
2350 let schema = Arc::new(
2351 Schema::builder()
2352 .with_schema_id(1)
2353 .with_fields(vec![
2354 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2355 .with_id(0)
2356 .into(),
2357 ])
2358 .build()
2359 .expect("Failed to create schema"),
2360 );
2361 let mut min_max_agg = MinMaxColAggregator::new(schema);
2362 let create_statistics =
2363 |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2364 min_max_agg
2365 .update(0, create_statistics(None, Some(42)))
2366 .unwrap();
2367 min_max_agg
2368 .update(0, create_statistics(Some(0), Some(i32::MAX)))
2369 .unwrap();
2370 min_max_agg
2371 .update(0, create_statistics(Some(i32::MIN), None))
2372 .unwrap();
2373 min_max_agg
2374 .update(0, create_statistics(None, None))
2375 .unwrap();
2376
2377 let (lower_bounds, upper_bounds) = min_max_agg.produce();
2378
2379 assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2380 assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2381 }
2382}