1use std::collections::VecDeque;
21use std::sync::Arc;
22
23use arrow_array::cast::AsArray;
24use arrow_array::Array;
25use arrow_array::{RecordBatch, RecordBatchReader};
26use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
27use arrow_select::filter::prep_null_mask_filter;
28pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
29pub use selection::{RowSelection, RowSelector};
30
31pub use crate::arrow::array_reader::RowGroups;
32use crate::arrow::array_reader::{build_array_reader, ArrayReader};
33use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
34use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
35use crate::column::page::{PageIterator, PageReader};
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41mod filter;
42mod selection;
43pub mod statistics;
44
45pub struct ArrowReaderBuilder<T> {
54 pub(crate) input: T,
55
56 pub(crate) metadata: Arc<ParquetMetaData>,
57
58 pub(crate) schema: SchemaRef,
59
60 pub(crate) fields: Option<Arc<ParquetField>>,
61
62 pub(crate) batch_size: usize,
63
64 pub(crate) row_groups: Option<Vec<usize>>,
65
66 pub(crate) projection: ProjectionMask,
67
68 pub(crate) filter: Option<RowFilter>,
69
70 pub(crate) selection: Option<RowSelection>,
71
72 pub(crate) limit: Option<usize>,
73
74 pub(crate) offset: Option<usize>,
75}
76
77impl<T> ArrowReaderBuilder<T> {
78 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
79 Self {
80 input,
81 metadata: metadata.metadata,
82 schema: metadata.schema,
83 fields: metadata.fields,
84 batch_size: 1024,
85 row_groups: None,
86 projection: ProjectionMask::all(),
87 filter: None,
88 selection: None,
89 limit: None,
90 offset: None,
91 }
92 }
93
94 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
96 &self.metadata
97 }
98
99 pub fn parquet_schema(&self) -> &SchemaDescriptor {
101 self.metadata.file_metadata().schema_descr()
102 }
103
104 pub fn schema(&self) -> &SchemaRef {
106 &self.schema
107 }
108
109 pub fn with_batch_size(self, batch_size: usize) -> Self {
112 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
114 Self { batch_size, ..self }
115 }
116
117 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
121 Self {
122 row_groups: Some(row_groups),
123 ..self
124 }
125 }
126
127 pub fn with_projection(self, mask: ProjectionMask) -> Self {
129 Self {
130 projection: mask,
131 ..self
132 }
133 }
134
135 pub fn with_row_selection(self, selection: RowSelection) -> Self {
195 Self {
196 selection: Some(selection),
197 ..self
198 }
199 }
200
201 pub fn with_row_filter(self, filter: RowFilter) -> Self {
208 Self {
209 filter: Some(filter),
210 ..self
211 }
212 }
213
214 pub fn with_limit(self, limit: usize) -> Self {
222 Self {
223 limit: Some(limit),
224 ..self
225 }
226 }
227
228 pub fn with_offset(self, offset: usize) -> Self {
236 Self {
237 offset: Some(offset),
238 ..self
239 }
240 }
241}
242
243#[derive(Debug, Clone, Default)]
248pub struct ArrowReaderOptions {
249 skip_arrow_metadata: bool,
251 supplied_schema: Option<SchemaRef>,
253 pub(crate) page_index: bool,
255}
256
257impl ArrowReaderOptions {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
270 Self {
271 skip_arrow_metadata,
272 ..self
273 }
274 }
275
276 pub fn with_schema(self, schema: SchemaRef) -> Self {
323 Self {
324 supplied_schema: Some(schema),
325 skip_arrow_metadata: true,
326 ..self
327 }
328 }
329
330 pub fn with_page_index(self, page_index: bool) -> Self {
343 Self { page_index, ..self }
344 }
345}
346
347#[derive(Debug, Clone)]
362pub struct ArrowReaderMetadata {
363 pub(crate) metadata: Arc<ParquetMetaData>,
365 pub(crate) schema: SchemaRef,
367
368 pub(crate) fields: Option<Arc<ParquetField>>,
369}
370
371impl ArrowReaderMetadata {
372 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
383 let metadata = ParquetMetaDataReader::new()
384 .with_page_indexes(options.page_index)
385 .parse_and_finish(reader)?;
386 Self::try_new(Arc::new(metadata), options)
387 }
388
389 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
396 match options.supplied_schema {
397 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
398 None => {
399 let kv_metadata = match options.skip_arrow_metadata {
400 true => None,
401 false => metadata.file_metadata().key_value_metadata(),
402 };
403
404 let (schema, fields) = parquet_to_arrow_schema_and_fields(
405 metadata.file_metadata().schema_descr(),
406 ProjectionMask::all(),
407 kv_metadata,
408 )?;
409
410 Ok(Self {
411 metadata,
412 schema: Arc::new(schema),
413 fields: fields.map(Arc::new),
414 })
415 }
416 }
417 }
418
419 fn with_supplied_schema(
420 metadata: Arc<ParquetMetaData>,
421 supplied_schema: SchemaRef,
422 ) -> Result<Self> {
423 let parquet_schema = metadata.file_metadata().schema_descr();
424 let field_levels = parquet_to_arrow_field_levels(
425 parquet_schema,
426 ProjectionMask::all(),
427 Some(supplied_schema.fields()),
428 )?;
429 let fields = field_levels.fields;
430 let inferred_len = fields.len();
431 let supplied_len = supplied_schema.fields().len();
432 if inferred_len != supplied_len {
436 Err(arrow_err!(format!(
437 "incompatible arrow schema, expected {} columns received {}",
438 inferred_len, supplied_len
439 )))
440 } else {
441 let diff_fields: Vec<_> = supplied_schema
442 .fields()
443 .iter()
444 .zip(fields.iter())
445 .filter_map(|(field1, field2)| {
446 if field1 != field2 {
447 Some(field1.name().clone())
448 } else {
449 None
450 }
451 })
452 .collect();
453
454 if !diff_fields.is_empty() {
455 Err(ParquetError::ArrowError(format!(
456 "incompatible arrow schema, the following fields could not be cast: [{}]",
457 diff_fields.join(", ")
458 )))
459 } else {
460 Ok(Self {
461 metadata,
462 schema: supplied_schema,
463 fields: field_levels.levels.map(Arc::new),
464 })
465 }
466 }
467 }
468
469 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
471 &self.metadata
472 }
473
474 pub fn parquet_schema(&self) -> &SchemaDescriptor {
476 self.metadata.file_metadata().schema_descr()
477 }
478
479 pub fn schema(&self) -> &SchemaRef {
481 &self.schema
482 }
483}
484
485#[doc(hidden)]
486pub struct SyncReader<T: ChunkReader>(T);
488
489pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
495
496impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
497 pub fn try_new(reader: T) -> Result<Self> {
526 Self::try_new_with_options(reader, Default::default())
527 }
528
529 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
531 let metadata = ArrowReaderMetadata::load(&reader, options)?;
532 Ok(Self::new_with_metadata(reader, metadata))
533 }
534
535 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
572 Self::new_builder(SyncReader(input), metadata)
573 }
574
575 pub fn build(self) -> Result<ParquetRecordBatchReader> {
579 let batch_size = self
581 .batch_size
582 .min(self.metadata.file_metadata().num_rows() as usize);
583
584 let row_groups = self
585 .row_groups
586 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
587
588 let reader = ReaderRowGroups {
589 reader: Arc::new(self.input.0),
590 metadata: self.metadata,
591 row_groups,
592 };
593
594 let mut filter = self.filter;
595 let mut selection = self.selection;
596
597 if let Some(filter) = filter.as_mut() {
598 for predicate in filter.predicates.iter_mut() {
599 if !selects_any(selection.as_ref()) {
600 break;
601 }
602
603 let array_reader =
604 build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
605
606 selection = Some(evaluate_predicate(
607 batch_size,
608 array_reader,
609 selection,
610 predicate.as_mut(),
611 )?);
612 }
613 }
614
615 let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
616
617 if !selects_any(selection.as_ref()) {
619 selection = Some(RowSelection::from(vec![]));
620 }
621
622 Ok(ParquetRecordBatchReader::new(
623 batch_size,
624 array_reader,
625 apply_range(selection, reader.num_rows(), self.offset, self.limit),
626 ))
627 }
628}
629
630struct ReaderRowGroups<T: ChunkReader> {
631 reader: Arc<T>,
632
633 metadata: Arc<ParquetMetaData>,
634 row_groups: Vec<usize>,
636}
637
638impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
639 fn num_rows(&self) -> usize {
640 let meta = self.metadata.row_groups();
641 self.row_groups
642 .iter()
643 .map(|x| meta[*x].num_rows() as usize)
644 .sum()
645 }
646
647 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
648 Ok(Box::new(ReaderPageIterator {
649 column_idx: i,
650 reader: self.reader.clone(),
651 metadata: self.metadata.clone(),
652 row_groups: self.row_groups.clone().into_iter(),
653 }))
654 }
655}
656
657struct ReaderPageIterator<T: ChunkReader> {
658 reader: Arc<T>,
659 column_idx: usize,
660 row_groups: std::vec::IntoIter<usize>,
661 metadata: Arc<ParquetMetaData>,
662}
663
664impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
665 type Item = Result<Box<dyn PageReader>>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 let rg_idx = self.row_groups.next()?;
669 let rg = self.metadata.row_group(rg_idx);
670 let meta = rg.column(self.column_idx);
671 let offset_index = self.metadata.offset_index();
672 let page_locations = offset_index
675 .filter(|i| !i[rg_idx].is_empty())
676 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
677 let total_rows = rg.num_rows() as usize;
678 let reader = self.reader.clone();
679
680 let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
681 Some(ret.map(|x| Box::new(x) as _))
682 }
683}
684
685impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
686
687pub struct ParquetRecordBatchReader {
690 batch_size: usize,
691 array_reader: Box<dyn ArrayReader>,
692 schema: SchemaRef,
693 selection: Option<VecDeque<RowSelector>>,
694}
695
696impl Iterator for ParquetRecordBatchReader {
697 type Item = Result<RecordBatch, ArrowError>;
698
699 fn next(&mut self) -> Option<Self::Item> {
700 let mut read_records = 0;
701 match self.selection.as_mut() {
702 Some(selection) => {
703 while read_records < self.batch_size && !selection.is_empty() {
704 let front = selection.pop_front().unwrap();
705 if front.skip {
706 let skipped = match self.array_reader.skip_records(front.row_count) {
707 Ok(skipped) => skipped,
708 Err(e) => return Some(Err(e.into())),
709 };
710
711 if skipped != front.row_count {
712 return Some(Err(general_err!(
713 "failed to skip rows, expected {}, got {}",
714 front.row_count,
715 skipped
716 )
717 .into()));
718 }
719 continue;
720 }
721
722 if front.row_count == 0 {
725 continue;
726 }
727
728 let need_read = self.batch_size - read_records;
730 let to_read = match front.row_count.checked_sub(need_read) {
731 Some(remaining) if remaining != 0 => {
732 selection.push_front(RowSelector::select(remaining));
735 need_read
736 }
737 _ => front.row_count,
738 };
739 match self.array_reader.read_records(to_read) {
740 Ok(0) => break,
741 Ok(rec) => read_records += rec,
742 Err(error) => return Some(Err(error.into())),
743 }
744 }
745 }
746 None => {
747 if let Err(error) = self.array_reader.read_records(self.batch_size) {
748 return Some(Err(error.into()));
749 }
750 }
751 };
752
753 match self.array_reader.consume_batch() {
754 Err(error) => Some(Err(error.into())),
755 Ok(array) => {
756 let struct_array = array.as_struct_opt().ok_or_else(|| {
757 ArrowError::ParquetError(
758 "Struct array reader should return struct array".to_string(),
759 )
760 });
761
762 match struct_array {
763 Err(err) => Some(Err(err)),
764 Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
765 }
766 }
767 }
768 }
769}
770
771impl RecordBatchReader for ParquetRecordBatchReader {
772 fn schema(&self) -> SchemaRef {
777 self.schema.clone()
778 }
779}
780
781impl ParquetRecordBatchReader {
782 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
786 ParquetRecordBatchReaderBuilder::try_new(reader)?
787 .with_batch_size(batch_size)
788 .build()
789 }
790
791 pub fn try_new_with_row_groups(
796 levels: &FieldLevels,
797 row_groups: &dyn RowGroups,
798 batch_size: usize,
799 selection: Option<RowSelection>,
800 ) -> Result<Self> {
801 let array_reader =
802 build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
803
804 Ok(Self {
805 batch_size,
806 array_reader,
807 schema: Arc::new(Schema::new(levels.fields.clone())),
808 selection: selection.map(|s| s.trim().into()),
809 })
810 }
811
812 pub(crate) fn new(
816 batch_size: usize,
817 array_reader: Box<dyn ArrayReader>,
818 selection: Option<RowSelection>,
819 ) -> Self {
820 let schema = match array_reader.get_data_type() {
821 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
822 _ => unreachable!("Struct array reader's data type is not struct!"),
823 };
824
825 Self {
826 batch_size,
827 array_reader,
828 schema: Arc::new(schema),
829 selection: selection.map(|s| s.trim().into()),
830 }
831 }
832}
833
834pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
836 selection.map(|x| x.selects_any()).unwrap_or(true)
837}
838
839pub(crate) fn apply_range(
841 mut selection: Option<RowSelection>,
842 row_count: usize,
843 offset: Option<usize>,
844 limit: Option<usize>,
845) -> Option<RowSelection> {
846 if let Some(offset) = offset {
848 selection = Some(match row_count.checked_sub(offset) {
849 None => RowSelection::from(vec![]),
850 Some(remaining) => selection
851 .map(|selection| selection.offset(offset))
852 .unwrap_or_else(|| {
853 RowSelection::from(vec![
854 RowSelector::skip(offset),
855 RowSelector::select(remaining),
856 ])
857 }),
858 });
859 }
860
861 if let Some(limit) = limit {
863 selection = Some(
864 selection
865 .map(|selection| selection.limit(limit))
866 .unwrap_or_else(|| {
867 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
868 }),
869 );
870 }
871 selection
872}
873
874pub(crate) fn evaluate_predicate(
885 batch_size: usize,
886 array_reader: Box<dyn ArrayReader>,
887 input_selection: Option<RowSelection>,
888 predicate: &mut dyn ArrowPredicate,
889) -> Result<RowSelection> {
890 let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
891 let mut filters = vec![];
892 for maybe_batch in reader {
893 let maybe_batch = maybe_batch?;
894 let input_rows = maybe_batch.num_rows();
895 let filter = predicate.evaluate(maybe_batch)?;
896 if filter.len() != input_rows {
898 return Err(arrow_err!(
899 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
900 filter.len()
901 ));
902 }
903 match filter.null_count() {
904 0 => filters.push(filter),
905 _ => filters.push(prep_null_mask_filter(&filter)),
906 };
907 }
908
909 let raw = RowSelection::from_filters(&filters);
910 Ok(match input_selection {
911 Some(selection) => selection.and_then(&raw),
912 None => raw,
913 })
914}
915
916#[cfg(test)]
917mod tests {
918 use std::cmp::min;
919 use std::collections::{HashMap, VecDeque};
920 use std::fmt::Formatter;
921 use std::fs::File;
922 use std::io::Seek;
923 use std::path::PathBuf;
924 use std::sync::Arc;
925
926 use bytes::Bytes;
927 use half::f16;
928 use num::PrimInt;
929 use rand::{thread_rng, Rng, RngCore};
930 use tempfile::tempfile;
931
932 use arrow_array::builder::*;
933 use arrow_array::cast::AsArray;
934 use arrow_array::types::{
935 Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
936 Time32MillisecondType, Time64MicrosecondType,
937 };
938 use arrow_array::*;
939 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
940 use arrow_data::ArrayDataBuilder;
941 use arrow_schema::{
942 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
943 };
944 use arrow_select::concat::concat_batches;
945
946 use crate::arrow::arrow_reader::{
947 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
948 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
949 };
950 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
951 use crate::arrow::{ArrowWriter, ProjectionMask};
952 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
953 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
954 use crate::data_type::{
955 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
956 FloatType, Int32Type, Int64Type, Int96Type,
957 };
958 use crate::errors::Result;
959 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
960 use crate::file::writer::SerializedFileWriter;
961 use crate::schema::parser::parse_message_type;
962 use crate::schema::types::{Type, TypePtr};
963 use crate::util::test_common::rand_gen::RandGen;
964
965 #[test]
966 fn test_arrow_reader_all_columns() {
967 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
968
969 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
970 let original_schema = Arc::clone(builder.schema());
971 let reader = builder.build().unwrap();
972
973 assert_eq!(original_schema.fields(), reader.schema().fields());
975 }
976
977 #[test]
978 fn test_arrow_reader_single_column() {
979 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
980
981 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
982 let original_schema = Arc::clone(builder.schema());
983
984 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
985 let reader = builder.with_projection(mask).build().unwrap();
986
987 assert_eq!(1, reader.schema().fields().len());
989 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
990 }
991
992 #[test]
993 fn test_null_column_reader_test() {
994 let mut file = tempfile::tempfile().unwrap();
995
996 let schema = "
997 message message {
998 OPTIONAL INT32 int32;
999 }
1000 ";
1001 let schema = Arc::new(parse_message_type(schema).unwrap());
1002
1003 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1004 generate_single_column_file_with_data::<Int32Type>(
1005 &[vec![], vec![]],
1006 Some(&def_levels),
1007 file.try_clone().unwrap(), schema,
1009 Some(Field::new("int32", ArrowDataType::Null, true)),
1010 &Default::default(),
1011 )
1012 .unwrap();
1013
1014 file.rewind().unwrap();
1015
1016 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1017 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1018
1019 assert_eq!(batches.len(), 4);
1020 for batch in &batches[0..3] {
1021 assert_eq!(batch.num_rows(), 2);
1022 assert_eq!(batch.num_columns(), 1);
1023 assert_eq!(batch.column(0).null_count(), 2);
1024 }
1025
1026 assert_eq!(batches[3].num_rows(), 1);
1027 assert_eq!(batches[3].num_columns(), 1);
1028 assert_eq!(batches[3].column(0).null_count(), 1);
1029 }
1030
1031 #[test]
1032 fn test_primitive_single_column_reader_test() {
1033 run_single_column_reader_tests::<BoolType, _, BoolType>(
1034 2,
1035 ConvertedType::NONE,
1036 None,
1037 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1038 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1039 );
1040 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1041 2,
1042 ConvertedType::NONE,
1043 None,
1044 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1045 &[
1046 Encoding::PLAIN,
1047 Encoding::RLE_DICTIONARY,
1048 Encoding::DELTA_BINARY_PACKED,
1049 Encoding::BYTE_STREAM_SPLIT,
1050 ],
1051 );
1052 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1053 2,
1054 ConvertedType::NONE,
1055 None,
1056 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1057 &[
1058 Encoding::PLAIN,
1059 Encoding::RLE_DICTIONARY,
1060 Encoding::DELTA_BINARY_PACKED,
1061 Encoding::BYTE_STREAM_SPLIT,
1062 ],
1063 );
1064 run_single_column_reader_tests::<FloatType, _, FloatType>(
1065 2,
1066 ConvertedType::NONE,
1067 None,
1068 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1069 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1070 );
1071 }
1072
1073 #[test]
1074 fn test_unsigned_primitive_single_column_reader_test() {
1075 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1076 2,
1077 ConvertedType::UINT_32,
1078 Some(ArrowDataType::UInt32),
1079 |vals| {
1080 Arc::new(UInt32Array::from_iter(
1081 vals.iter().map(|x| x.map(|x| x as u32)),
1082 ))
1083 },
1084 &[
1085 Encoding::PLAIN,
1086 Encoding::RLE_DICTIONARY,
1087 Encoding::DELTA_BINARY_PACKED,
1088 ],
1089 );
1090 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1091 2,
1092 ConvertedType::UINT_64,
1093 Some(ArrowDataType::UInt64),
1094 |vals| {
1095 Arc::new(UInt64Array::from_iter(
1096 vals.iter().map(|x| x.map(|x| x as u64)),
1097 ))
1098 },
1099 &[
1100 Encoding::PLAIN,
1101 Encoding::RLE_DICTIONARY,
1102 Encoding::DELTA_BINARY_PACKED,
1103 ],
1104 );
1105 }
1106
1107 #[test]
1108 fn test_unsigned_roundtrip() {
1109 let schema = Arc::new(Schema::new(vec![
1110 Field::new("uint32", ArrowDataType::UInt32, true),
1111 Field::new("uint64", ArrowDataType::UInt64, true),
1112 ]));
1113
1114 let mut buf = Vec::with_capacity(1024);
1115 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1116
1117 let original = RecordBatch::try_new(
1118 schema,
1119 vec![
1120 Arc::new(UInt32Array::from_iter_values([
1121 0,
1122 i32::MAX as u32,
1123 u32::MAX,
1124 ])),
1125 Arc::new(UInt64Array::from_iter_values([
1126 0,
1127 i64::MAX as u64,
1128 u64::MAX,
1129 ])),
1130 ],
1131 )
1132 .unwrap();
1133
1134 writer.write(&original).unwrap();
1135 writer.close().unwrap();
1136
1137 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1138 let ret = reader.next().unwrap().unwrap();
1139 assert_eq!(ret, original);
1140
1141 ret.column(0)
1143 .as_any()
1144 .downcast_ref::<UInt32Array>()
1145 .unwrap();
1146
1147 ret.column(1)
1148 .as_any()
1149 .downcast_ref::<UInt64Array>()
1150 .unwrap();
1151 }
1152
1153 #[test]
1154 fn test_float16_roundtrip() -> Result<()> {
1155 let schema = Arc::new(Schema::new(vec![
1156 Field::new("float16", ArrowDataType::Float16, false),
1157 Field::new("float16-nullable", ArrowDataType::Float16, true),
1158 ]));
1159
1160 let mut buf = Vec::with_capacity(1024);
1161 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1162
1163 let original = RecordBatch::try_new(
1164 schema,
1165 vec![
1166 Arc::new(Float16Array::from_iter_values([
1167 f16::EPSILON,
1168 f16::MIN,
1169 f16::MAX,
1170 f16::NAN,
1171 f16::INFINITY,
1172 f16::NEG_INFINITY,
1173 f16::ONE,
1174 f16::NEG_ONE,
1175 f16::ZERO,
1176 f16::NEG_ZERO,
1177 f16::E,
1178 f16::PI,
1179 f16::FRAC_1_PI,
1180 ])),
1181 Arc::new(Float16Array::from(vec![
1182 None,
1183 None,
1184 None,
1185 Some(f16::NAN),
1186 Some(f16::INFINITY),
1187 Some(f16::NEG_INFINITY),
1188 None,
1189 None,
1190 None,
1191 None,
1192 None,
1193 None,
1194 Some(f16::FRAC_1_PI),
1195 ])),
1196 ],
1197 )?;
1198
1199 writer.write(&original)?;
1200 writer.close()?;
1201
1202 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1203 let ret = reader.next().unwrap()?;
1204 assert_eq!(ret, original);
1205
1206 ret.column(0).as_primitive::<Float16Type>();
1208 ret.column(1).as_primitive::<Float16Type>();
1209
1210 Ok(())
1211 }
1212
1213 #[test]
1214 fn test_time_utc_roundtrip() -> Result<()> {
1215 let schema = Arc::new(Schema::new(vec![
1216 Field::new(
1217 "time_millis",
1218 ArrowDataType::Time32(TimeUnit::Millisecond),
1219 true,
1220 )
1221 .with_metadata(HashMap::from_iter(vec![(
1222 "adjusted_to_utc".to_string(),
1223 "".to_string(),
1224 )])),
1225 Field::new(
1226 "time_micros",
1227 ArrowDataType::Time64(TimeUnit::Microsecond),
1228 true,
1229 )
1230 .with_metadata(HashMap::from_iter(vec![(
1231 "adjusted_to_utc".to_string(),
1232 "".to_string(),
1233 )])),
1234 ]));
1235
1236 let mut buf = Vec::with_capacity(1024);
1237 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1238
1239 let original = RecordBatch::try_new(
1240 schema,
1241 vec![
1242 Arc::new(Time32MillisecondArray::from(vec![
1243 Some(-1),
1244 Some(0),
1245 Some(86_399_000),
1246 Some(86_400_000),
1247 Some(86_401_000),
1248 None,
1249 ])),
1250 Arc::new(Time64MicrosecondArray::from(vec![
1251 Some(-1),
1252 Some(0),
1253 Some(86_399 * 1_000_000),
1254 Some(86_400 * 1_000_000),
1255 Some(86_401 * 1_000_000),
1256 None,
1257 ])),
1258 ],
1259 )?;
1260
1261 writer.write(&original)?;
1262 writer.close()?;
1263
1264 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1265 let ret = reader.next().unwrap()?;
1266 assert_eq!(ret, original);
1267
1268 ret.column(0).as_primitive::<Time32MillisecondType>();
1270 ret.column(1).as_primitive::<Time64MicrosecondType>();
1271
1272 Ok(())
1273 }
1274
1275 struct RandFixedLenGen {}
1276
1277 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1278 fn gen(len: i32) -> FixedLenByteArray {
1279 let mut v = vec![0u8; len as usize];
1280 thread_rng().fill_bytes(&mut v);
1281 ByteArray::from(v).into()
1282 }
1283 }
1284
1285 #[test]
1286 fn test_fixed_length_binary_column_reader() {
1287 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1288 20,
1289 ConvertedType::NONE,
1290 None,
1291 |vals| {
1292 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1293 for val in vals {
1294 match val {
1295 Some(b) => builder.append_value(b).unwrap(),
1296 None => builder.append_null(),
1297 }
1298 }
1299 Arc::new(builder.finish())
1300 },
1301 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1302 );
1303 }
1304
1305 #[test]
1306 fn test_interval_day_time_column_reader() {
1307 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1308 12,
1309 ConvertedType::INTERVAL,
1310 None,
1311 |vals| {
1312 Arc::new(
1313 vals.iter()
1314 .map(|x| {
1315 x.as_ref().map(|b| IntervalDayTime {
1316 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1317 milliseconds: i32::from_le_bytes(
1318 b.as_ref()[8..12].try_into().unwrap(),
1319 ),
1320 })
1321 })
1322 .collect::<IntervalDayTimeArray>(),
1323 )
1324 },
1325 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1326 );
1327 }
1328
1329 #[test]
1330 fn test_int96_single_column_reader_test() {
1331 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1332 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1333 2,
1334 ConvertedType::NONE,
1335 None,
1336 |vals| {
1337 Arc::new(TimestampNanosecondArray::from_iter(
1338 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1339 )) as _
1340 },
1341 encodings,
1342 );
1343 }
1344
1345 struct RandUtf8Gen {}
1346
1347 impl RandGen<ByteArrayType> for RandUtf8Gen {
1348 fn gen(len: i32) -> ByteArray {
1349 Int32Type::gen(len).to_string().as_str().into()
1350 }
1351 }
1352
1353 #[test]
1354 fn test_utf8_single_column_reader_test() {
1355 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1356 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1357 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1358 })))
1359 }
1360
1361 let encodings = &[
1362 Encoding::PLAIN,
1363 Encoding::RLE_DICTIONARY,
1364 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1365 Encoding::DELTA_BYTE_ARRAY,
1366 ];
1367
1368 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1369 2,
1370 ConvertedType::NONE,
1371 None,
1372 |vals| {
1373 Arc::new(BinaryArray::from_iter(
1374 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1375 ))
1376 },
1377 encodings,
1378 );
1379
1380 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1381 2,
1382 ConvertedType::UTF8,
1383 None,
1384 string_converter::<i32>,
1385 encodings,
1386 );
1387
1388 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1389 2,
1390 ConvertedType::UTF8,
1391 Some(ArrowDataType::Utf8),
1392 string_converter::<i32>,
1393 encodings,
1394 );
1395
1396 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1397 2,
1398 ConvertedType::UTF8,
1399 Some(ArrowDataType::LargeUtf8),
1400 string_converter::<i64>,
1401 encodings,
1402 );
1403
1404 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1405 for key in &small_key_types {
1406 for encoding in encodings {
1407 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1408 opts.encoding = *encoding;
1409
1410 let data_type =
1411 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1412
1413 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1415 opts,
1416 2,
1417 ConvertedType::UTF8,
1418 Some(data_type.clone()),
1419 move |vals| {
1420 let vals = string_converter::<i32>(vals);
1421 arrow::compute::cast(&vals, &data_type).unwrap()
1422 },
1423 );
1424 }
1425 }
1426
1427 let key_types = [
1428 ArrowDataType::Int16,
1429 ArrowDataType::UInt16,
1430 ArrowDataType::Int32,
1431 ArrowDataType::UInt32,
1432 ArrowDataType::Int64,
1433 ArrowDataType::UInt64,
1434 ];
1435
1436 for key in &key_types {
1437 let data_type =
1438 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1439
1440 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1441 2,
1442 ConvertedType::UTF8,
1443 Some(data_type.clone()),
1444 move |vals| {
1445 let vals = string_converter::<i32>(vals);
1446 arrow::compute::cast(&vals, &data_type).unwrap()
1447 },
1448 encodings,
1449 );
1450
1451 }
1468 }
1469
1470 #[test]
1471 fn test_decimal_nullable_struct() {
1472 let decimals = Decimal256Array::from_iter_values(
1473 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1474 );
1475
1476 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1477 "decimals",
1478 decimals.data_type().clone(),
1479 false,
1480 )])))
1481 .len(8)
1482 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1483 .child_data(vec![decimals.into_data()])
1484 .build()
1485 .unwrap();
1486
1487 let written =
1488 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1489 .unwrap();
1490
1491 let mut buffer = Vec::with_capacity(1024);
1492 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1493 writer.write(&written).unwrap();
1494 writer.close().unwrap();
1495
1496 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1497 .unwrap()
1498 .collect::<Result<Vec<_>, _>>()
1499 .unwrap();
1500
1501 assert_eq!(&written.slice(0, 3), &read[0]);
1502 assert_eq!(&written.slice(3, 3), &read[1]);
1503 assert_eq!(&written.slice(6, 2), &read[2]);
1504 }
1505
1506 #[test]
1507 fn test_int32_nullable_struct() {
1508 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1509 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1510 "int32",
1511 int32.data_type().clone(),
1512 false,
1513 )])))
1514 .len(8)
1515 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1516 .child_data(vec![int32.into_data()])
1517 .build()
1518 .unwrap();
1519
1520 let written =
1521 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1522 .unwrap();
1523
1524 let mut buffer = Vec::with_capacity(1024);
1525 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1526 writer.write(&written).unwrap();
1527 writer.close().unwrap();
1528
1529 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1530 .unwrap()
1531 .collect::<Result<Vec<_>, _>>()
1532 .unwrap();
1533
1534 assert_eq!(&written.slice(0, 3), &read[0]);
1535 assert_eq!(&written.slice(3, 3), &read[1]);
1536 assert_eq!(&written.slice(6, 2), &read[2]);
1537 }
1538
1539 #[test]
1540 #[ignore] fn test_decimal_list() {
1542 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1543
1544 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new(
1546 "item",
1547 decimals.data_type().clone(),
1548 false,
1549 ))))
1550 .len(7)
1551 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1552 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1553 .child_data(vec![decimals.into_data()])
1554 .build()
1555 .unwrap();
1556
1557 let written =
1558 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1559 .unwrap();
1560
1561 let mut buffer = Vec::with_capacity(1024);
1562 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1563 writer.write(&written).unwrap();
1564 writer.close().unwrap();
1565
1566 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1567 .unwrap()
1568 .collect::<Result<Vec<_>, _>>()
1569 .unwrap();
1570
1571 assert_eq!(&written.slice(0, 3), &read[0]);
1572 assert_eq!(&written.slice(3, 3), &read[1]);
1573 assert_eq!(&written.slice(6, 1), &read[2]);
1574 }
1575
1576 #[test]
1577 fn test_read_decimal_file() {
1578 use arrow_array::Decimal128Array;
1579 let testdata = arrow::util::test_util::parquet_test_data();
1580 let file_variants = vec![
1581 ("byte_array", 4),
1582 ("fixed_length", 25),
1583 ("int32", 4),
1584 ("int64", 10),
1585 ];
1586 for (prefix, target_precision) in file_variants {
1587 let path = format!("{testdata}/{prefix}_decimal.parquet");
1588 let file = File::open(path).unwrap();
1589 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1590
1591 let batch = record_reader.next().unwrap().unwrap();
1592 assert_eq!(batch.num_rows(), 24);
1593 let col = batch
1594 .column(0)
1595 .as_any()
1596 .downcast_ref::<Decimal128Array>()
1597 .unwrap();
1598
1599 let expected = 1..25;
1600
1601 assert_eq!(col.precision(), target_precision);
1602 assert_eq!(col.scale(), 2);
1603
1604 for (i, v) in expected.enumerate() {
1605 assert_eq!(col.value(i), v * 100_i128);
1606 }
1607 }
1608 }
1609
1610 #[test]
1611 fn test_read_float16_nonzeros_file() {
1612 use arrow_array::Float16Array;
1613 let testdata = arrow::util::test_util::parquet_test_data();
1614 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1616 let file = File::open(path).unwrap();
1617 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1618
1619 let batch = record_reader.next().unwrap().unwrap();
1620 assert_eq!(batch.num_rows(), 8);
1621 let col = batch
1622 .column(0)
1623 .as_any()
1624 .downcast_ref::<Float16Array>()
1625 .unwrap();
1626
1627 let f16_two = f16::ONE + f16::ONE;
1628
1629 assert_eq!(col.null_count(), 1);
1630 assert!(col.is_null(0));
1631 assert_eq!(col.value(1), f16::ONE);
1632 assert_eq!(col.value(2), -f16_two);
1633 assert!(col.value(3).is_nan());
1634 assert_eq!(col.value(4), f16::ZERO);
1635 assert!(col.value(4).is_sign_positive());
1636 assert_eq!(col.value(5), f16::NEG_ONE);
1637 assert_eq!(col.value(6), f16::NEG_ZERO);
1638 assert!(col.value(6).is_sign_negative());
1639 assert_eq!(col.value(7), f16_two);
1640 }
1641
1642 #[test]
1643 fn test_read_float16_zeros_file() {
1644 use arrow_array::Float16Array;
1645 let testdata = arrow::util::test_util::parquet_test_data();
1646 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1648 let file = File::open(path).unwrap();
1649 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1650
1651 let batch = record_reader.next().unwrap().unwrap();
1652 assert_eq!(batch.num_rows(), 3);
1653 let col = batch
1654 .column(0)
1655 .as_any()
1656 .downcast_ref::<Float16Array>()
1657 .unwrap();
1658
1659 assert_eq!(col.null_count(), 1);
1660 assert!(col.is_null(0));
1661 assert_eq!(col.value(1), f16::ZERO);
1662 assert!(col.value(1).is_sign_positive());
1663 assert!(col.value(2).is_nan());
1664 }
1665
1666 #[test]
1667 fn test_read_float32_float64_byte_stream_split() {
1668 let path = format!(
1669 "{}/byte_stream_split.zstd.parquet",
1670 arrow::util::test_util::parquet_test_data(),
1671 );
1672 let file = File::open(path).unwrap();
1673 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1674
1675 let mut row_count = 0;
1676 for batch in record_reader {
1677 let batch = batch.unwrap();
1678 row_count += batch.num_rows();
1679 let f32_col = batch.column(0).as_primitive::<Float32Type>();
1680 let f64_col = batch.column(1).as_primitive::<Float64Type>();
1681
1682 for &x in f32_col.values() {
1684 assert!(x > -10.0);
1685 assert!(x < 10.0);
1686 }
1687 for &x in f64_col.values() {
1688 assert!(x > -10.0);
1689 assert!(x < 10.0);
1690 }
1691 }
1692 assert_eq!(row_count, 300);
1693 }
1694
1695 #[test]
1696 fn test_read_extended_byte_stream_split() {
1697 let path = format!(
1698 "{}/byte_stream_split_extended.gzip.parquet",
1699 arrow::util::test_util::parquet_test_data(),
1700 );
1701 let file = File::open(path).unwrap();
1702 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1703
1704 let mut row_count = 0;
1705 for batch in record_reader {
1706 let batch = batch.unwrap();
1707 row_count += batch.num_rows();
1708
1709 let f16_col = batch.column(0).as_primitive::<Float16Type>();
1711 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1712 assert_eq!(f16_col.len(), f16_bss.len());
1713 f16_col
1714 .iter()
1715 .zip(f16_bss.iter())
1716 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1717
1718 let f32_col = batch.column(2).as_primitive::<Float32Type>();
1720 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1721 assert_eq!(f32_col.len(), f32_bss.len());
1722 f32_col
1723 .iter()
1724 .zip(f32_bss.iter())
1725 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1726
1727 let f64_col = batch.column(4).as_primitive::<Float64Type>();
1729 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1730 assert_eq!(f64_col.len(), f64_bss.len());
1731 f64_col
1732 .iter()
1733 .zip(f64_bss.iter())
1734 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1735
1736 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1738 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1739 assert_eq!(i32_col.len(), i32_bss.len());
1740 i32_col
1741 .iter()
1742 .zip(i32_bss.iter())
1743 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1744
1745 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1747 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1748 assert_eq!(i64_col.len(), i64_bss.len());
1749 i64_col
1750 .iter()
1751 .zip(i64_bss.iter())
1752 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1753
1754 let flba_col = batch.column(10).as_fixed_size_binary();
1756 let flba_bss = batch.column(11).as_fixed_size_binary();
1757 assert_eq!(flba_col.len(), flba_bss.len());
1758 flba_col
1759 .iter()
1760 .zip(flba_bss.iter())
1761 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1762
1763 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1765 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1766 assert_eq!(dec_col.len(), dec_bss.len());
1767 dec_col
1768 .iter()
1769 .zip(dec_bss.iter())
1770 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1771 }
1772 assert_eq!(row_count, 200);
1773 }
1774
1775 #[test]
1776 fn test_read_incorrect_map_schema_file() {
1777 let testdata = arrow::util::test_util::parquet_test_data();
1778 let path = format!("{testdata}/incorrect_map_schema.parquet");
1780 let file = File::open(path).unwrap();
1781 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1782
1783 let batch = record_reader.next().unwrap().unwrap();
1784 assert_eq!(batch.num_rows(), 1);
1785
1786 let expected_schema = Schema::new(Fields::from(vec![Field::new(
1787 "my_map",
1788 ArrowDataType::Map(
1789 Arc::new(Field::new(
1790 "key_value",
1791 ArrowDataType::Struct(Fields::from(vec![
1792 Field::new("key", ArrowDataType::Utf8, false),
1793 Field::new("value", ArrowDataType::Utf8, true),
1794 ])),
1795 false,
1796 )),
1797 false,
1798 ),
1799 true,
1800 )]));
1801 assert_eq!(batch.schema().as_ref(), &expected_schema);
1802
1803 assert_eq!(batch.num_rows(), 1);
1804 assert_eq!(batch.column(0).null_count(), 0);
1805 assert_eq!(
1806 batch.column(0).as_map().keys().as_ref(),
1807 &StringArray::from(vec!["parent", "name"])
1808 );
1809 assert_eq!(
1810 batch.column(0).as_map().values().as_ref(),
1811 &StringArray::from(vec!["another", "report"])
1812 );
1813 }
1814
1815 #[derive(Clone)]
1817 struct TestOptions {
1818 num_row_groups: usize,
1821 num_rows: usize,
1823 record_batch_size: usize,
1825 null_percent: Option<usize>,
1827 write_batch_size: usize,
1832 max_data_page_size: usize,
1834 max_dict_page_size: usize,
1836 writer_version: WriterVersion,
1838 enabled_statistics: EnabledStatistics,
1840 encoding: Encoding,
1842 row_selections: Option<(RowSelection, usize)>,
1844 row_filter: Option<Vec<bool>>,
1846 limit: Option<usize>,
1848 offset: Option<usize>,
1850 }
1851
1852 impl std::fmt::Debug for TestOptions {
1854 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1855 f.debug_struct("TestOptions")
1856 .field("num_row_groups", &self.num_row_groups)
1857 .field("num_rows", &self.num_rows)
1858 .field("record_batch_size", &self.record_batch_size)
1859 .field("null_percent", &self.null_percent)
1860 .field("write_batch_size", &self.write_batch_size)
1861 .field("max_data_page_size", &self.max_data_page_size)
1862 .field("max_dict_page_size", &self.max_dict_page_size)
1863 .field("writer_version", &self.writer_version)
1864 .field("enabled_statistics", &self.enabled_statistics)
1865 .field("encoding", &self.encoding)
1866 .field("row_selections", &self.row_selections.is_some())
1867 .field("row_filter", &self.row_filter.is_some())
1868 .field("limit", &self.limit)
1869 .field("offset", &self.offset)
1870 .finish()
1871 }
1872 }
1873
1874 impl Default for TestOptions {
1875 fn default() -> Self {
1876 Self {
1877 num_row_groups: 2,
1878 num_rows: 100,
1879 record_batch_size: 15,
1880 null_percent: None,
1881 write_batch_size: 64,
1882 max_data_page_size: 1024 * 1024,
1883 max_dict_page_size: 1024 * 1024,
1884 writer_version: WriterVersion::PARQUET_1_0,
1885 enabled_statistics: EnabledStatistics::Page,
1886 encoding: Encoding::PLAIN,
1887 row_selections: None,
1888 row_filter: None,
1889 limit: None,
1890 offset: None,
1891 }
1892 }
1893 }
1894
1895 impl TestOptions {
1896 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
1897 Self {
1898 num_row_groups,
1899 num_rows,
1900 record_batch_size,
1901 ..Default::default()
1902 }
1903 }
1904
1905 fn with_null_percent(self, null_percent: usize) -> Self {
1906 Self {
1907 null_percent: Some(null_percent),
1908 ..self
1909 }
1910 }
1911
1912 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
1913 Self {
1914 max_data_page_size,
1915 ..self
1916 }
1917 }
1918
1919 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
1920 Self {
1921 max_dict_page_size,
1922 ..self
1923 }
1924 }
1925
1926 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
1927 Self {
1928 enabled_statistics,
1929 ..self
1930 }
1931 }
1932
1933 fn with_row_selections(self) -> Self {
1934 assert!(self.row_filter.is_none(), "Must set row selection first");
1935
1936 let mut rng = thread_rng();
1937 let step = rng.gen_range(self.record_batch_size..self.num_rows);
1938 let row_selections =
1939 create_test_selection(step, self.num_row_groups * self.num_rows, rng.gen::<bool>());
1940 Self {
1941 row_selections: Some(row_selections),
1942 ..self
1943 }
1944 }
1945
1946 fn with_row_filter(self) -> Self {
1947 let row_count = match &self.row_selections {
1948 Some((_, count)) => *count,
1949 None => self.num_row_groups * self.num_rows,
1950 };
1951
1952 let mut rng = thread_rng();
1953 Self {
1954 row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
1955 ..self
1956 }
1957 }
1958
1959 fn with_limit(self, limit: usize) -> Self {
1960 Self {
1961 limit: Some(limit),
1962 ..self
1963 }
1964 }
1965
1966 fn with_offset(self, offset: usize) -> Self {
1967 Self {
1968 offset: Some(offset),
1969 ..self
1970 }
1971 }
1972
1973 fn writer_props(&self) -> WriterProperties {
1974 let builder = WriterProperties::builder()
1975 .set_data_page_size_limit(self.max_data_page_size)
1976 .set_write_batch_size(self.write_batch_size)
1977 .set_writer_version(self.writer_version)
1978 .set_statistics_enabled(self.enabled_statistics);
1979
1980 let builder = match self.encoding {
1981 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
1982 .set_dictionary_enabled(true)
1983 .set_dictionary_page_size_limit(self.max_dict_page_size),
1984 _ => builder
1985 .set_dictionary_enabled(false)
1986 .set_encoding(self.encoding),
1987 };
1988
1989 builder.build()
1990 }
1991 }
1992
1993 fn run_single_column_reader_tests<T, F, G>(
2000 rand_max: i32,
2001 converted_type: ConvertedType,
2002 arrow_type: Option<ArrowDataType>,
2003 converter: F,
2004 encodings: &[Encoding],
2005 ) where
2006 T: DataType,
2007 G: RandGen<T>,
2008 F: Fn(&[Option<T::T>]) -> ArrayRef,
2009 {
2010 let all_options = vec![
2011 TestOptions::new(2, 100, 15),
2014 TestOptions::new(3, 25, 5),
2019 TestOptions::new(4, 100, 25),
2023 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2025 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2027 TestOptions::new(2, 256, 127).with_null_percent(0),
2029 TestOptions::new(2, 256, 93).with_null_percent(25),
2031 TestOptions::new(4, 100, 25).with_limit(0),
2033 TestOptions::new(4, 100, 25).with_limit(50),
2035 TestOptions::new(4, 100, 25).with_limit(10),
2037 TestOptions::new(4, 100, 25).with_limit(101),
2039 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2041 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2043 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2045 TestOptions::new(2, 256, 91)
2047 .with_null_percent(25)
2048 .with_enabled_statistics(EnabledStatistics::Chunk),
2049 TestOptions::new(2, 256, 91)
2051 .with_null_percent(25)
2052 .with_enabled_statistics(EnabledStatistics::None),
2053 TestOptions::new(2, 128, 91)
2055 .with_null_percent(100)
2056 .with_enabled_statistics(EnabledStatistics::None),
2057 TestOptions::new(2, 100, 15).with_row_selections(),
2062 TestOptions::new(3, 25, 5).with_row_selections(),
2067 TestOptions::new(4, 100, 25).with_row_selections(),
2071 TestOptions::new(3, 256, 73)
2073 .with_max_data_page_size(128)
2074 .with_row_selections(),
2075 TestOptions::new(3, 256, 57)
2077 .with_max_dict_page_size(128)
2078 .with_row_selections(),
2079 TestOptions::new(2, 256, 127)
2081 .with_null_percent(0)
2082 .with_row_selections(),
2083 TestOptions::new(2, 256, 93)
2085 .with_null_percent(25)
2086 .with_row_selections(),
2087 TestOptions::new(2, 256, 93)
2089 .with_null_percent(25)
2090 .with_row_selections()
2091 .with_limit(10),
2092 TestOptions::new(2, 256, 93)
2094 .with_null_percent(25)
2095 .with_row_selections()
2096 .with_offset(20)
2097 .with_limit(10),
2098 TestOptions::new(4, 100, 25).with_row_filter(),
2102 TestOptions::new(4, 100, 25)
2104 .with_row_selections()
2105 .with_row_filter(),
2106 TestOptions::new(2, 256, 93)
2108 .with_null_percent(25)
2109 .with_max_data_page_size(10)
2110 .with_row_filter(),
2111 TestOptions::new(2, 256, 93)
2113 .with_null_percent(25)
2114 .with_max_data_page_size(10)
2115 .with_row_selections()
2116 .with_row_filter(),
2117 TestOptions::new(2, 256, 93)
2119 .with_enabled_statistics(EnabledStatistics::None)
2120 .with_max_data_page_size(10)
2121 .with_row_selections(),
2122 ];
2123
2124 all_options.into_iter().for_each(|opts| {
2125 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2126 for encoding in encodings {
2127 let opts = TestOptions {
2128 writer_version,
2129 encoding: *encoding,
2130 ..opts.clone()
2131 };
2132
2133 single_column_reader_test::<T, _, G>(
2134 opts,
2135 rand_max,
2136 converted_type,
2137 arrow_type.clone(),
2138 &converter,
2139 )
2140 }
2141 }
2142 });
2143 }
2144
2145 fn single_column_reader_test<T, F, G>(
2149 opts: TestOptions,
2150 rand_max: i32,
2151 converted_type: ConvertedType,
2152 arrow_type: Option<ArrowDataType>,
2153 converter: F,
2154 ) where
2155 T: DataType,
2156 G: RandGen<T>,
2157 F: Fn(&[Option<T::T>]) -> ArrayRef,
2158 {
2159 println!(
2161 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2162 T::get_physical_type(), converted_type, arrow_type, opts
2163 );
2164
2165 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2167 Some(null_percent) => {
2168 let mut rng = thread_rng();
2169
2170 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2171 .map(|_| {
2172 std::iter::from_fn(|| {
2173 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2174 })
2175 .take(opts.num_rows)
2176 .collect()
2177 })
2178 .collect();
2179 (Repetition::OPTIONAL, Some(def_levels))
2180 }
2181 None => (Repetition::REQUIRED, None),
2182 };
2183
2184 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2186 .map(|idx| {
2187 let null_count = match def_levels.as_ref() {
2188 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2189 None => 0,
2190 };
2191 G::gen_vec(rand_max, opts.num_rows - null_count)
2192 })
2193 .collect();
2194
2195 let len = match T::get_physical_type() {
2196 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2197 crate::basic::Type::INT96 => 12,
2198 _ => -1,
2199 };
2200
2201 let fields = vec![Arc::new(
2202 Type::primitive_type_builder("leaf", T::get_physical_type())
2203 .with_repetition(repetition)
2204 .with_converted_type(converted_type)
2205 .with_length(len)
2206 .build()
2207 .unwrap(),
2208 )];
2209
2210 let schema = Arc::new(
2211 Type::group_type_builder("test_schema")
2212 .with_fields(fields)
2213 .build()
2214 .unwrap(),
2215 );
2216
2217 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2218
2219 let mut file = tempfile::tempfile().unwrap();
2220
2221 generate_single_column_file_with_data::<T>(
2222 &values,
2223 def_levels.as_ref(),
2224 file.try_clone().unwrap(), schema,
2226 arrow_field,
2227 &opts,
2228 )
2229 .unwrap();
2230
2231 file.rewind().unwrap();
2232
2233 let options = ArrowReaderOptions::new()
2234 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2235
2236 let mut builder =
2237 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2238
2239 let expected_data = match opts.row_selections {
2240 Some((selections, row_count)) => {
2241 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2242
2243 let mut skip_data: Vec<Option<T::T>> = vec![];
2244 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2245 for select in dequeue {
2246 if select.skip {
2247 without_skip_data.drain(0..select.row_count);
2248 } else {
2249 skip_data.extend(without_skip_data.drain(0..select.row_count));
2250 }
2251 }
2252 builder = builder.with_row_selection(selections);
2253
2254 assert_eq!(skip_data.len(), row_count);
2255 skip_data
2256 }
2257 None => {
2258 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2260 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2261 expected_data
2262 }
2263 };
2264
2265 let mut expected_data = match opts.row_filter {
2266 Some(filter) => {
2267 let expected_data = expected_data
2268 .into_iter()
2269 .zip(filter.iter())
2270 .filter_map(|(d, f)| f.then(|| d))
2271 .collect();
2272
2273 let mut filter_offset = 0;
2274 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2275 ProjectionMask::all(),
2276 move |b| {
2277 let array = BooleanArray::from_iter(
2278 filter
2279 .iter()
2280 .skip(filter_offset)
2281 .take(b.num_rows())
2282 .map(|x| Some(*x)),
2283 );
2284 filter_offset += b.num_rows();
2285 Ok(array)
2286 },
2287 ))]);
2288
2289 builder = builder.with_row_filter(filter);
2290 expected_data
2291 }
2292 None => expected_data,
2293 };
2294
2295 if let Some(offset) = opts.offset {
2296 builder = builder.with_offset(offset);
2297 expected_data = expected_data.into_iter().skip(offset).collect();
2298 }
2299
2300 if let Some(limit) = opts.limit {
2301 builder = builder.with_limit(limit);
2302 expected_data = expected_data.into_iter().take(limit).collect();
2303 }
2304
2305 let mut record_reader = builder
2306 .with_batch_size(opts.record_batch_size)
2307 .build()
2308 .unwrap();
2309
2310 let mut total_read = 0;
2311 loop {
2312 let maybe_batch = record_reader.next();
2313 if total_read < expected_data.len() {
2314 let end = min(total_read + opts.record_batch_size, expected_data.len());
2315 let batch = maybe_batch.unwrap().unwrap();
2316 assert_eq!(end - total_read, batch.num_rows());
2317
2318 let a = converter(&expected_data[total_read..end]);
2319 let b = Arc::clone(batch.column(0));
2320
2321 assert_eq!(a.data_type(), b.data_type());
2322 assert_eq!(a.to_data(), b.to_data());
2323 assert_eq!(
2324 a.as_any().type_id(),
2325 b.as_any().type_id(),
2326 "incorrect type ids"
2327 );
2328
2329 total_read = end;
2330 } else {
2331 assert!(maybe_batch.is_none());
2332 break;
2333 }
2334 }
2335 }
2336
2337 fn gen_expected_data<T: DataType>(
2338 def_levels: Option<&Vec<Vec<i16>>>,
2339 values: &[Vec<T::T>],
2340 ) -> Vec<Option<T::T>> {
2341 let data: Vec<Option<T::T>> = match def_levels {
2342 Some(levels) => {
2343 let mut values_iter = values.iter().flatten();
2344 levels
2345 .iter()
2346 .flatten()
2347 .map(|d| match d {
2348 1 => Some(values_iter.next().cloned().unwrap()),
2349 0 => None,
2350 _ => unreachable!(),
2351 })
2352 .collect()
2353 }
2354 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2355 };
2356 data
2357 }
2358
2359 fn generate_single_column_file_with_data<T: DataType>(
2360 values: &[Vec<T::T>],
2361 def_levels: Option<&Vec<Vec<i16>>>,
2362 file: File,
2363 schema: TypePtr,
2364 field: Option<Field>,
2365 opts: &TestOptions,
2366 ) -> Result<crate::format::FileMetaData> {
2367 let mut writer_props = opts.writer_props();
2368 if let Some(field) = field {
2369 let arrow_schema = Schema::new(vec![field]);
2370 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2371 }
2372
2373 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2374
2375 for (idx, v) in values.iter().enumerate() {
2376 let def_levels = def_levels.map(|d| d[idx].as_slice());
2377 let mut row_group_writer = writer.next_row_group()?;
2378 {
2379 let mut column_writer = row_group_writer
2380 .next_column()?
2381 .expect("Column writer is none!");
2382
2383 column_writer
2384 .typed::<T>()
2385 .write_batch(v, def_levels, None)?;
2386
2387 column_writer.close()?;
2388 }
2389 row_group_writer.close()?;
2390 }
2391
2392 writer.close()
2393 }
2394
2395 fn get_test_file(file_name: &str) -> File {
2396 let mut path = PathBuf::new();
2397 path.push(arrow::util::test_util::arrow_test_data());
2398 path.push(file_name);
2399
2400 File::open(path.as_path()).expect("File not found!")
2401 }
2402
2403 #[test]
2404 fn test_read_structs() {
2405 let testdata = arrow::util::test_util::parquet_test_data();
2409 let path = format!("{testdata}/nested_structs.rust.parquet");
2410 let file = File::open(&path).unwrap();
2411 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2412
2413 for batch in record_batch_reader {
2414 batch.unwrap();
2415 }
2416
2417 let file = File::open(&path).unwrap();
2418 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2419
2420 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2421 let projected_reader = builder
2422 .with_projection(mask)
2423 .with_batch_size(60)
2424 .build()
2425 .unwrap();
2426
2427 let expected_schema = Schema::new(vec![
2428 Field::new(
2429 "roll_num",
2430 ArrowDataType::Struct(Fields::from(vec![Field::new(
2431 "count",
2432 ArrowDataType::UInt64,
2433 false,
2434 )])),
2435 false,
2436 ),
2437 Field::new(
2438 "PC_CUR",
2439 ArrowDataType::Struct(Fields::from(vec![
2440 Field::new("mean", ArrowDataType::Int64, false),
2441 Field::new("sum", ArrowDataType::Int64, false),
2442 ])),
2443 false,
2444 ),
2445 ]);
2446
2447 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2449
2450 for batch in projected_reader {
2451 let batch = batch.unwrap();
2452 assert_eq!(batch.schema().as_ref(), &expected_schema);
2453 }
2454 }
2455
2456 #[test]
2457 fn test_read_maps() {
2458 let testdata = arrow::util::test_util::parquet_test_data();
2459 let path = format!("{testdata}/nested_maps.snappy.parquet");
2460 let file = File::open(path).unwrap();
2461 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2462
2463 for batch in record_batch_reader {
2464 batch.unwrap();
2465 }
2466 }
2467
2468 #[test]
2469 fn test_nested_nullability() {
2470 let message_type = "message nested {
2471 OPTIONAL Group group {
2472 REQUIRED INT32 leaf;
2473 }
2474 }";
2475
2476 let file = tempfile::tempfile().unwrap();
2477 let schema = Arc::new(parse_message_type(message_type).unwrap());
2478
2479 {
2480 let mut writer =
2482 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2483 .unwrap();
2484
2485 {
2486 let mut row_group_writer = writer.next_row_group().unwrap();
2487 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2488
2489 column_writer
2490 .typed::<Int32Type>()
2491 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2492 .unwrap();
2493
2494 column_writer.close().unwrap();
2495 row_group_writer.close().unwrap();
2496 }
2497
2498 writer.close().unwrap();
2499 }
2500
2501 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2502 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2503
2504 let reader = builder.with_projection(mask).build().unwrap();
2505
2506 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2507 "group",
2508 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2509 true,
2510 )]));
2511
2512 let batch = reader.into_iter().next().unwrap().unwrap();
2513 assert_eq!(batch.schema().as_ref(), &expected_schema);
2514 assert_eq!(batch.num_rows(), 4);
2515 assert_eq!(batch.column(0).null_count(), 2);
2516 }
2517
2518 #[test]
2519 fn test_invalid_utf8() {
2520 let data = vec![
2522 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2523 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2524 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2525 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2526 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2527 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2528 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2529 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2530 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2531 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2532 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2533 82, 49,
2534 ];
2535
2536 let file = Bytes::from(data);
2537 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2538
2539 let error = record_batch_reader.next().unwrap().unwrap_err();
2540
2541 assert!(
2542 error.to_string().contains("invalid utf-8 sequence"),
2543 "{}",
2544 error
2545 );
2546 }
2547
2548 #[test]
2549 fn test_invalid_utf8_string_array() {
2550 test_invalid_utf8_string_array_inner::<i32>();
2551 }
2552
2553 #[test]
2554 fn test_invalid_utf8_large_string_array() {
2555 test_invalid_utf8_string_array_inner::<i64>();
2556 }
2557
2558 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2559 let cases = [
2560 invalid_utf8_first_char::<O>(),
2561 invalid_utf8_first_char_long_strings::<O>(),
2562 invalid_utf8_later_char::<O>(),
2563 invalid_utf8_later_char_long_strings::<O>(),
2564 invalid_utf8_later_char_really_long_strings::<O>(),
2565 invalid_utf8_later_char_really_long_strings2::<O>(),
2566 ];
2567 for array in &cases {
2568 for encoding in STRING_ENCODINGS {
2569 let array = unsafe {
2572 GenericStringArray::<O>::new_unchecked(
2573 array.offsets().clone(),
2574 array.values().clone(),
2575 array.nulls().cloned(),
2576 )
2577 };
2578 let data_type = array.data_type().clone();
2579 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2580 let err = read_from_parquet(data).unwrap_err();
2581 let expected_err =
2582 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2583 assert!(
2584 err.to_string().contains(expected_err),
2585 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2586 );
2587 }
2588 }
2589 }
2590
2591 #[test]
2592 fn test_invalid_utf8_string_view_array() {
2593 let cases = [
2594 invalid_utf8_first_char::<i32>(),
2595 invalid_utf8_first_char_long_strings::<i32>(),
2596 invalid_utf8_later_char::<i32>(),
2597 invalid_utf8_later_char_long_strings::<i32>(),
2598 invalid_utf8_later_char_really_long_strings::<i32>(),
2599 invalid_utf8_later_char_really_long_strings2::<i32>(),
2600 ];
2601
2602 for encoding in STRING_ENCODINGS {
2603 for array in &cases {
2604 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2605 let array = array.as_binary_view();
2606
2607 let array = unsafe {
2610 StringViewArray::new_unchecked(
2611 array.views().clone(),
2612 array.data_buffers().to_vec(),
2613 array.nulls().cloned(),
2614 )
2615 };
2616
2617 let data_type = array.data_type().clone();
2618 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2619 let err = read_from_parquet(data).unwrap_err();
2620 let expected_err =
2621 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2622 assert!(
2623 err.to_string().contains(expected_err),
2624 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2625 );
2626 }
2627 }
2628 }
2629
2630 const STRING_ENCODINGS: &[Option<Encoding>] = &[
2632 None,
2633 Some(Encoding::PLAIN),
2634 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2635 Some(Encoding::DELTA_BYTE_ARRAY),
2636 ];
2637
2638 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2641
2642 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2645
2646 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2648 let valid: &[u8] = b" ";
2649 let invalid = INVALID_UTF8_FIRST_CHAR;
2650 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2651 }
2652
2653 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2657 let valid: &[u8] = b" ";
2658 let mut invalid = vec![];
2659 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2660 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2661 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2662 }
2663
2664 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2667 let valid: &[u8] = b" ";
2668 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2669 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2670 }
2671
2672 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2676 let valid: &[u8] = b" ";
2677 let mut invalid = vec![];
2678 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2679 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2680 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2681 }
2682
2683 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2687 let valid: &[u8] = b" ";
2688 let mut invalid = vec![];
2689 for _ in 0..10 {
2690 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2692 }
2693 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2694 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2695 }
2696
2697 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2700 let valid: &[u8] = b" ";
2701 let mut valid_long = vec![];
2702 for _ in 0..10 {
2703 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2705 }
2706 let invalid = INVALID_UTF8_LATER_CHAR;
2707 GenericBinaryArray::<O>::from_iter(vec![
2708 None,
2709 Some(valid),
2710 Some(invalid),
2711 None,
2712 Some(&valid_long),
2713 Some(valid),
2714 ])
2715 }
2716
2717 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2722 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2723 let mut data = vec![];
2724 let schema = batch.schema();
2725 let props = encoding.map(|encoding| {
2726 WriterProperties::builder()
2727 .set_dictionary_enabled(false)
2729 .set_encoding(encoding)
2730 .build()
2731 });
2732
2733 {
2734 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
2735 writer.write(&batch).unwrap();
2736 writer.flush().unwrap();
2737 writer.close().unwrap();
2738 };
2739 data
2740 }
2741
2742 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
2744 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
2745 .unwrap()
2746 .build()
2747 .unwrap();
2748
2749 reader.collect()
2750 }
2751
2752 #[test]
2753 fn test_dictionary_preservation() {
2754 let fields = vec![Arc::new(
2755 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
2756 .with_repetition(Repetition::OPTIONAL)
2757 .with_converted_type(ConvertedType::UTF8)
2758 .build()
2759 .unwrap(),
2760 )];
2761
2762 let schema = Arc::new(
2763 Type::group_type_builder("test_schema")
2764 .with_fields(fields)
2765 .build()
2766 .unwrap(),
2767 );
2768
2769 let dict_type = ArrowDataType::Dictionary(
2770 Box::new(ArrowDataType::Int32),
2771 Box::new(ArrowDataType::Utf8),
2772 );
2773
2774 let arrow_field = Field::new("leaf", dict_type, true);
2775
2776 let mut file = tempfile::tempfile().unwrap();
2777
2778 let values = vec![
2779 vec![
2780 ByteArray::from("hello"),
2781 ByteArray::from("a"),
2782 ByteArray::from("b"),
2783 ByteArray::from("d"),
2784 ],
2785 vec![
2786 ByteArray::from("c"),
2787 ByteArray::from("a"),
2788 ByteArray::from("b"),
2789 ],
2790 ];
2791
2792 let def_levels = vec![
2793 vec![1, 0, 0, 1, 0, 0, 1, 1],
2794 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
2795 ];
2796
2797 let opts = TestOptions {
2798 encoding: Encoding::RLE_DICTIONARY,
2799 ..Default::default()
2800 };
2801
2802 generate_single_column_file_with_data::<ByteArrayType>(
2803 &values,
2804 Some(&def_levels),
2805 file.try_clone().unwrap(), schema,
2807 Some(arrow_field),
2808 &opts,
2809 )
2810 .unwrap();
2811
2812 file.rewind().unwrap();
2813
2814 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
2815
2816 let batches = record_reader
2817 .collect::<Result<Vec<RecordBatch>, _>>()
2818 .unwrap();
2819
2820 assert_eq!(batches.len(), 6);
2821 assert!(batches.iter().all(|x| x.num_columns() == 1));
2822
2823 let row_counts = batches
2824 .iter()
2825 .map(|x| (x.num_rows(), x.column(0).null_count()))
2826 .collect::<Vec<_>>();
2827
2828 assert_eq!(
2829 row_counts,
2830 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
2831 );
2832
2833 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
2834
2835 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
2837 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
2839 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
2840 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
2842 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
2843 }
2844
2845 #[test]
2846 fn test_read_null_list() {
2847 let testdata = arrow::util::test_util::parquet_test_data();
2848 let path = format!("{testdata}/null_list.parquet");
2849 let file = File::open(path).unwrap();
2850 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2851
2852 let batch = record_batch_reader.next().unwrap().unwrap();
2853 assert_eq!(batch.num_rows(), 1);
2854 assert_eq!(batch.num_columns(), 1);
2855 assert_eq!(batch.column(0).len(), 1);
2856
2857 let list = batch
2858 .column(0)
2859 .as_any()
2860 .downcast_ref::<ListArray>()
2861 .unwrap();
2862 assert_eq!(list.len(), 1);
2863 assert!(list.is_valid(0));
2864
2865 let val = list.value(0);
2866 assert_eq!(val.len(), 0);
2867 }
2868
2869 #[test]
2870 fn test_null_schema_inference() {
2871 let testdata = arrow::util::test_util::parquet_test_data();
2872 let path = format!("{testdata}/null_list.parquet");
2873 let file = File::open(path).unwrap();
2874
2875 let arrow_field = Field::new(
2876 "emptylist",
2877 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Null, true))),
2878 true,
2879 );
2880
2881 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
2882 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2883 let schema = builder.schema();
2884 assert_eq!(schema.fields().len(), 1);
2885 assert_eq!(schema.field(0), &arrow_field);
2886 }
2887
2888 #[test]
2889 fn test_skip_metadata() {
2890 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
2891 let field = Field::new("col", col.data_type().clone(), true);
2892
2893 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
2894
2895 let metadata = [("key".to_string(), "value".to_string())]
2896 .into_iter()
2897 .collect();
2898
2899 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
2900
2901 assert_ne!(schema_with_metadata, schema_without_metadata);
2902
2903 let batch =
2904 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
2905
2906 let file = |version: WriterVersion| {
2907 let props = WriterProperties::builder()
2908 .set_writer_version(version)
2909 .build();
2910
2911 let file = tempfile().unwrap();
2912 let mut writer =
2913 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2914 .unwrap();
2915 writer.write(&batch).unwrap();
2916 writer.close().unwrap();
2917 file
2918 };
2919
2920 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
2921
2922 let v1_reader = file(WriterVersion::PARQUET_1_0);
2923 let v2_reader = file(WriterVersion::PARQUET_2_0);
2924
2925 let arrow_reader =
2926 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
2927 assert_eq!(arrow_reader.schema(), schema_with_metadata);
2928
2929 let reader =
2930 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
2931 .unwrap()
2932 .build()
2933 .unwrap();
2934 assert_eq!(reader.schema(), schema_without_metadata);
2935
2936 let arrow_reader =
2937 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
2938 assert_eq!(arrow_reader.schema(), schema_with_metadata);
2939
2940 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
2941 .unwrap()
2942 .build()
2943 .unwrap();
2944 assert_eq!(reader.schema(), schema_without_metadata);
2945 }
2946
2947 fn write_parquet_from_iter<I, F>(value: I) -> File
2948 where
2949 I: IntoIterator<Item = (F, ArrayRef)>,
2950 F: AsRef<str>,
2951 {
2952 let batch = RecordBatch::try_from_iter(value).unwrap();
2953 let file = tempfile().unwrap();
2954 let mut writer =
2955 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
2956 writer.write(&batch).unwrap();
2957 writer.close().unwrap();
2958 file
2959 }
2960
2961 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
2962 where
2963 I: IntoIterator<Item = (F, ArrayRef)>,
2964 F: AsRef<str>,
2965 {
2966 let file = write_parquet_from_iter(value);
2967 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
2968 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
2969 file.try_clone().unwrap(),
2970 options_with_schema,
2971 );
2972 assert_eq!(builder.err().unwrap().to_string(), expected_error);
2973 }
2974
2975 #[test]
2976 fn test_schema_too_few_columns() {
2977 run_schema_test_with_error(
2978 vec![
2979 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
2980 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
2981 ],
2982 Arc::new(Schema::new(vec![Field::new(
2983 "int64",
2984 ArrowDataType::Int64,
2985 false,
2986 )])),
2987 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
2988 );
2989 }
2990
2991 #[test]
2992 fn test_schema_too_many_columns() {
2993 run_schema_test_with_error(
2994 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
2995 Arc::new(Schema::new(vec![
2996 Field::new("int64", ArrowDataType::Int64, false),
2997 Field::new("int32", ArrowDataType::Int32, false),
2998 ])),
2999 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3000 );
3001 }
3002
3003 #[test]
3004 fn test_schema_mismatched_column_names() {
3005 run_schema_test_with_error(
3006 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3007 Arc::new(Schema::new(vec![Field::new(
3008 "other",
3009 ArrowDataType::Int64,
3010 false,
3011 )])),
3012 "Arrow: incompatible arrow schema, expected field named int64 got other",
3013 );
3014 }
3015
3016 #[test]
3017 fn test_schema_incompatible_columns() {
3018 run_schema_test_with_error(
3019 vec![
3020 (
3021 "col1_invalid",
3022 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3023 ),
3024 (
3025 "col2_valid",
3026 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3027 ),
3028 (
3029 "col3_invalid",
3030 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3031 ),
3032 ],
3033 Arc::new(Schema::new(vec![
3034 Field::new("col1_invalid", ArrowDataType::Int32, false),
3035 Field::new("col2_valid", ArrowDataType::Int32, false),
3036 Field::new("col3_invalid", ArrowDataType::Int32, false),
3037 ])),
3038 "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3039 );
3040 }
3041
3042 #[test]
3043 fn test_one_incompatible_nested_column() {
3044 let nested_fields = Fields::from(vec![
3045 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3046 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3047 ]);
3048 let nested = StructArray::try_new(
3049 nested_fields,
3050 vec![
3051 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3052 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3053 ],
3054 None,
3055 )
3056 .expect("struct array");
3057 let supplied_nested_fields = Fields::from(vec![
3058 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3059 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3060 ]);
3061 run_schema_test_with_error(
3062 vec![
3063 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3064 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3065 ("nested", Arc::new(nested) as ArrayRef),
3066 ],
3067 Arc::new(Schema::new(vec![
3068 Field::new("col1", ArrowDataType::Int64, false),
3069 Field::new("col2", ArrowDataType::Int32, false),
3070 Field::new(
3071 "nested",
3072 ArrowDataType::Struct(supplied_nested_fields),
3073 false,
3074 ),
3075 ])),
3076 "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3077 );
3078 }
3079
3080 #[test]
3081 fn test_read_binary_as_utf8() {
3082 let file = write_parquet_from_iter(vec![
3083 (
3084 "binary_to_utf8",
3085 Arc::new(BinaryArray::from(vec![
3086 b"one".as_ref(),
3087 b"two".as_ref(),
3088 b"three".as_ref(),
3089 ])) as ArrayRef,
3090 ),
3091 (
3092 "large_binary_to_large_utf8",
3093 Arc::new(LargeBinaryArray::from(vec![
3094 b"one".as_ref(),
3095 b"two".as_ref(),
3096 b"three".as_ref(),
3097 ])) as ArrayRef,
3098 ),
3099 (
3100 "binary_view_to_utf8_view",
3101 Arc::new(BinaryViewArray::from(vec![
3102 b"one".as_ref(),
3103 b"two".as_ref(),
3104 b"three".as_ref(),
3105 ])) as ArrayRef,
3106 ),
3107 ]);
3108 let supplied_fields = Fields::from(vec![
3109 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3110 Field::new(
3111 "large_binary_to_large_utf8",
3112 ArrowDataType::LargeUtf8,
3113 false,
3114 ),
3115 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3116 ]);
3117
3118 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3119 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3120 file.try_clone().unwrap(),
3121 options,
3122 )
3123 .expect("reader builder with schema")
3124 .build()
3125 .expect("reader with schema");
3126
3127 let batch = arrow_reader.next().unwrap().unwrap();
3128 assert_eq!(batch.num_columns(), 3);
3129 assert_eq!(batch.num_rows(), 3);
3130 assert_eq!(
3131 batch
3132 .column(0)
3133 .as_string::<i32>()
3134 .iter()
3135 .collect::<Vec<_>>(),
3136 vec![Some("one"), Some("two"), Some("three")]
3137 );
3138
3139 assert_eq!(
3140 batch
3141 .column(1)
3142 .as_string::<i64>()
3143 .iter()
3144 .collect::<Vec<_>>(),
3145 vec![Some("one"), Some("two"), Some("three")]
3146 );
3147
3148 assert_eq!(
3149 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3150 vec![Some("one"), Some("two"), Some("three")]
3151 );
3152 }
3153
3154 #[test]
3155 #[should_panic(expected = "Invalid UTF8 sequence at")]
3156 fn test_read_non_utf8_binary_as_utf8() {
3157 let file = write_parquet_from_iter(vec![(
3158 "non_utf8_binary",
3159 Arc::new(BinaryArray::from(vec![
3160 b"\xDE\x00\xFF".as_ref(),
3161 b"\xDE\x01\xAA".as_ref(),
3162 b"\xDE\x02\xFF".as_ref(),
3163 ])) as ArrayRef,
3164 )]);
3165 let supplied_fields = Fields::from(vec![Field::new(
3166 "non_utf8_binary",
3167 ArrowDataType::Utf8,
3168 false,
3169 )]);
3170
3171 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3172 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3173 file.try_clone().unwrap(),
3174 options,
3175 )
3176 .expect("reader builder with schema")
3177 .build()
3178 .expect("reader with schema");
3179 arrow_reader.next().unwrap().unwrap_err();
3180 }
3181
3182 #[test]
3183 fn test_with_schema() {
3184 let nested_fields = Fields::from(vec![
3185 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3186 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3187 ]);
3188
3189 let nested_arrays: Vec<ArrayRef> = vec![
3190 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3191 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3192 ];
3193
3194 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3195
3196 let file = write_parquet_from_iter(vec![
3197 (
3198 "int32_to_ts_second",
3199 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3200 ),
3201 (
3202 "date32_to_date64",
3203 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3204 ),
3205 ("nested", Arc::new(nested) as ArrayRef),
3206 ]);
3207
3208 let supplied_nested_fields = Fields::from(vec![
3209 Field::new(
3210 "utf8_to_dict",
3211 ArrowDataType::Dictionary(
3212 Box::new(ArrowDataType::Int32),
3213 Box::new(ArrowDataType::Utf8),
3214 ),
3215 false,
3216 ),
3217 Field::new(
3218 "int64_to_ts_nano",
3219 ArrowDataType::Timestamp(
3220 arrow::datatypes::TimeUnit::Nanosecond,
3221 Some("+10:00".into()),
3222 ),
3223 false,
3224 ),
3225 ]);
3226
3227 let supplied_schema = Arc::new(Schema::new(vec![
3228 Field::new(
3229 "int32_to_ts_second",
3230 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3231 false,
3232 ),
3233 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3234 Field::new(
3235 "nested",
3236 ArrowDataType::Struct(supplied_nested_fields),
3237 false,
3238 ),
3239 ]));
3240
3241 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3242 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3243 file.try_clone().unwrap(),
3244 options,
3245 )
3246 .expect("reader builder with schema")
3247 .build()
3248 .expect("reader with schema");
3249
3250 assert_eq!(arrow_reader.schema(), supplied_schema);
3251 let batch = arrow_reader.next().unwrap().unwrap();
3252 assert_eq!(batch.num_columns(), 3);
3253 assert_eq!(batch.num_rows(), 4);
3254 assert_eq!(
3255 batch
3256 .column(0)
3257 .as_any()
3258 .downcast_ref::<TimestampSecondArray>()
3259 .expect("downcast to timestamp second")
3260 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3261 .map(|v| v.to_string())
3262 .expect("value as datetime"),
3263 "1970-01-01 01:00:00 +01:00"
3264 );
3265 assert_eq!(
3266 batch
3267 .column(1)
3268 .as_any()
3269 .downcast_ref::<Date64Array>()
3270 .expect("downcast to date64")
3271 .value_as_date(0)
3272 .map(|v| v.to_string())
3273 .expect("value as date"),
3274 "1970-01-01"
3275 );
3276
3277 let nested = batch
3278 .column(2)
3279 .as_any()
3280 .downcast_ref::<StructArray>()
3281 .expect("downcast to struct");
3282
3283 let nested_dict = nested
3284 .column(0)
3285 .as_any()
3286 .downcast_ref::<Int32DictionaryArray>()
3287 .expect("downcast to dictionary");
3288
3289 assert_eq!(
3290 nested_dict
3291 .values()
3292 .as_any()
3293 .downcast_ref::<StringArray>()
3294 .expect("downcast to string")
3295 .iter()
3296 .collect::<Vec<_>>(),
3297 vec![Some("a"), Some("b")]
3298 );
3299
3300 assert_eq!(
3301 nested_dict.keys().iter().collect::<Vec<_>>(),
3302 vec![Some(0), Some(0), Some(0), Some(1)]
3303 );
3304
3305 assert_eq!(
3306 nested
3307 .column(1)
3308 .as_any()
3309 .downcast_ref::<TimestampNanosecondArray>()
3310 .expect("downcast to timestamp nanosecond")
3311 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3312 .map(|v| v.to_string())
3313 .expect("value as datetime"),
3314 "1970-01-01 10:00:00.000000001 +10:00"
3315 );
3316 }
3317
3318 #[test]
3319 fn test_empty_projection() {
3320 let testdata = arrow::util::test_util::parquet_test_data();
3321 let path = format!("{testdata}/alltypes_plain.parquet");
3322 let file = File::open(path).unwrap();
3323
3324 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3325 let file_metadata = builder.metadata().file_metadata();
3326 let expected_rows = file_metadata.num_rows() as usize;
3327
3328 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3329 let batch_reader = builder
3330 .with_projection(mask)
3331 .with_batch_size(2)
3332 .build()
3333 .unwrap();
3334
3335 let mut total_rows = 0;
3336 for maybe_batch in batch_reader {
3337 let batch = maybe_batch.unwrap();
3338 total_rows += batch.num_rows();
3339 assert_eq!(batch.num_columns(), 0);
3340 assert!(batch.num_rows() <= 2);
3341 }
3342
3343 assert_eq!(total_rows, expected_rows);
3344 }
3345
3346 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3347 let schema = Arc::new(Schema::new(vec![Field::new(
3348 "list",
3349 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int32, true))),
3350 true,
3351 )]));
3352
3353 let mut buf = Vec::with_capacity(1024);
3354
3355 let mut writer = ArrowWriter::try_new(
3356 &mut buf,
3357 schema.clone(),
3358 Some(
3359 WriterProperties::builder()
3360 .set_max_row_group_size(row_group_size)
3361 .build(),
3362 ),
3363 )
3364 .unwrap();
3365 for _ in 0..2 {
3366 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3367 for _ in 0..(batch_size) {
3368 list_builder.append(true);
3369 }
3370 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3371 .unwrap();
3372 writer.write(&batch).unwrap();
3373 }
3374 writer.close().unwrap();
3375
3376 let mut record_reader =
3377 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3378 assert_eq!(
3379 batch_size,
3380 record_reader.next().unwrap().unwrap().num_rows()
3381 );
3382 assert_eq!(
3383 batch_size,
3384 record_reader.next().unwrap().unwrap().num_rows()
3385 );
3386 }
3387
3388 #[test]
3389 fn test_row_group_exact_multiple() {
3390 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3391 test_row_group_batch(8, 8);
3392 test_row_group_batch(10, 8);
3393 test_row_group_batch(8, 10);
3394 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3395 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3396 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3397 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3398 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3399 }
3400
3401 fn get_expected_batches(
3404 column: &RecordBatch,
3405 selection: &RowSelection,
3406 batch_size: usize,
3407 ) -> Vec<RecordBatch> {
3408 let mut expected_batches = vec![];
3409
3410 let mut selection: VecDeque<_> = selection.clone().into();
3411 let mut row_offset = 0;
3412 let mut last_start = None;
3413 while row_offset < column.num_rows() && !selection.is_empty() {
3414 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3415 while batch_remaining > 0 && !selection.is_empty() {
3416 let (to_read, skip) = match selection.front_mut() {
3417 Some(selection) if selection.row_count > batch_remaining => {
3418 selection.row_count -= batch_remaining;
3419 (batch_remaining, selection.skip)
3420 }
3421 Some(_) => {
3422 let select = selection.pop_front().unwrap();
3423 (select.row_count, select.skip)
3424 }
3425 None => break,
3426 };
3427
3428 batch_remaining -= to_read;
3429
3430 match skip {
3431 true => {
3432 if let Some(last_start) = last_start.take() {
3433 expected_batches.push(column.slice(last_start, row_offset - last_start))
3434 }
3435 row_offset += to_read
3436 }
3437 false => {
3438 last_start.get_or_insert(row_offset);
3439 row_offset += to_read
3440 }
3441 }
3442 }
3443 }
3444
3445 if let Some(last_start) = last_start.take() {
3446 expected_batches.push(column.slice(last_start, row_offset - last_start))
3447 }
3448
3449 for batch in &expected_batches[..expected_batches.len() - 1] {
3451 assert_eq!(batch.num_rows(), batch_size);
3452 }
3453
3454 expected_batches
3455 }
3456
3457 fn create_test_selection(
3458 step_len: usize,
3459 total_len: usize,
3460 skip_first: bool,
3461 ) -> (RowSelection, usize) {
3462 let mut remaining = total_len;
3463 let mut skip = skip_first;
3464 let mut vec = vec![];
3465 let mut selected_count = 0;
3466 while remaining != 0 {
3467 let step = if remaining > step_len {
3468 step_len
3469 } else {
3470 remaining
3471 };
3472 vec.push(RowSelector {
3473 row_count: step,
3474 skip,
3475 });
3476 remaining -= step;
3477 if !skip {
3478 selected_count += step;
3479 }
3480 skip = !skip;
3481 }
3482 (vec.into(), selected_count)
3483 }
3484
3485 #[test]
3486 fn test_scan_row_with_selection() {
3487 let testdata = arrow::util::test_util::parquet_test_data();
3488 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3489 let test_file = File::open(&path).unwrap();
3490
3491 let mut serial_reader =
3492 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3493 let data = serial_reader.next().unwrap().unwrap();
3494
3495 let do_test = |batch_size: usize, selection_len: usize| {
3496 for skip_first in [false, true] {
3497 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3498
3499 let expected = get_expected_batches(&data, &selections, batch_size);
3500 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3501 assert_eq!(
3502 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3503 expected,
3504 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3505 );
3506 }
3507 };
3508
3509 do_test(1000, 1000);
3512
3513 do_test(20, 20);
3515
3516 do_test(20, 5);
3518
3519 do_test(20, 5);
3522
3523 fn create_skip_reader(
3524 test_file: &File,
3525 batch_size: usize,
3526 selections: RowSelection,
3527 ) -> ParquetRecordBatchReader {
3528 let options = ArrowReaderOptions::new().with_page_index(true);
3529 let file = test_file.try_clone().unwrap();
3530 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3531 .unwrap()
3532 .with_batch_size(batch_size)
3533 .with_row_selection(selections)
3534 .build()
3535 .unwrap()
3536 }
3537 }
3538
3539 #[test]
3540 fn test_batch_size_overallocate() {
3541 let testdata = arrow::util::test_util::parquet_test_data();
3542 let path = format!("{testdata}/alltypes_plain.parquet");
3544 let test_file = File::open(path).unwrap();
3545
3546 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3547 let num_rows = builder.metadata.file_metadata().num_rows();
3548 let reader = builder
3549 .with_batch_size(1024)
3550 .with_projection(ProjectionMask::all())
3551 .build()
3552 .unwrap();
3553 assert_ne!(1024, num_rows);
3554 assert_eq!(reader.batch_size, num_rows as usize);
3555 }
3556
3557 #[test]
3558 fn test_read_with_page_index_enabled() {
3559 let testdata = arrow::util::test_util::parquet_test_data();
3560
3561 {
3562 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3564 let test_file = File::open(path).unwrap();
3565 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3566 test_file,
3567 ArrowReaderOptions::new().with_page_index(true),
3568 )
3569 .unwrap();
3570 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3571 let reader = builder.build().unwrap();
3572 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3573 assert_eq!(batches.len(), 8);
3574 }
3575
3576 {
3577 let path = format!("{testdata}/alltypes_plain.parquet");
3579 let test_file = File::open(path).unwrap();
3580 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3581 test_file,
3582 ArrowReaderOptions::new().with_page_index(true),
3583 )
3584 .unwrap();
3585 assert!(builder.metadata().offset_index().unwrap()[0].is_empty());
3590 let reader = builder.build().unwrap();
3591 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3592 assert_eq!(batches.len(), 1);
3593 }
3594 }
3595
3596 #[test]
3597 fn test_raw_repetition() {
3598 const MESSAGE_TYPE: &str = "
3599 message Log {
3600 OPTIONAL INT32 eventType;
3601 REPEATED INT32 category;
3602 REPEATED group filter {
3603 OPTIONAL INT32 error;
3604 }
3605 }
3606 ";
3607 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3608 let props = Default::default();
3609
3610 let mut buf = Vec::with_capacity(1024);
3611 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3612 let mut row_group_writer = writer.next_row_group().unwrap();
3613
3614 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3616 col_writer
3617 .typed::<Int32Type>()
3618 .write_batch(&[1], Some(&[1]), None)
3619 .unwrap();
3620 col_writer.close().unwrap();
3621 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3623 col_writer
3624 .typed::<Int32Type>()
3625 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3626 .unwrap();
3627 col_writer.close().unwrap();
3628 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3630 col_writer
3631 .typed::<Int32Type>()
3632 .write_batch(&[1], Some(&[1]), Some(&[0]))
3633 .unwrap();
3634 col_writer.close().unwrap();
3635
3636 let rg_md = row_group_writer.close().unwrap();
3637 assert_eq!(rg_md.num_rows(), 1);
3638 writer.close().unwrap();
3639
3640 let bytes = Bytes::from(buf);
3641
3642 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3643 let full = no_mask.next().unwrap().unwrap();
3644
3645 assert_eq!(full.num_columns(), 3);
3646
3647 for idx in 0..3 {
3648 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3649 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3650 let mut reader = b.with_projection(mask).build().unwrap();
3651 let projected = reader.next().unwrap().unwrap();
3652
3653 assert_eq!(projected.num_columns(), 1);
3654 assert_eq!(full.column(idx), projected.column(0));
3655 }
3656 }
3657
3658 #[test]
3659 fn test_read_lz4_raw() {
3660 let testdata = arrow::util::test_util::parquet_test_data();
3661 let path = format!("{testdata}/lz4_raw_compressed.parquet");
3662 let file = File::open(path).unwrap();
3663
3664 let batches = ParquetRecordBatchReader::try_new(file, 1024)
3665 .unwrap()
3666 .collect::<Result<Vec<_>, _>>()
3667 .unwrap();
3668 assert_eq!(batches.len(), 1);
3669 let batch = &batches[0];
3670
3671 assert_eq!(batch.num_columns(), 3);
3672 assert_eq!(batch.num_rows(), 4);
3673
3674 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3676 assert_eq!(
3677 a.values(),
3678 &[1593604800, 1593604800, 1593604801, 1593604801]
3679 );
3680
3681 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3682 let a: Vec<_> = a.iter().flatten().collect();
3683 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3684
3685 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3686 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3687 }
3688
3689 #[test]
3699 fn test_read_lz4_hadoop_fallback() {
3700 for file in [
3701 "hadoop_lz4_compressed.parquet",
3702 "non_hadoop_lz4_compressed.parquet",
3703 ] {
3704 let testdata = arrow::util::test_util::parquet_test_data();
3705 let path = format!("{testdata}/{file}");
3706 let file = File::open(path).unwrap();
3707 let expected_rows = 4;
3708
3709 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3710 .unwrap()
3711 .collect::<Result<Vec<_>, _>>()
3712 .unwrap();
3713 assert_eq!(batches.len(), 1);
3714 let batch = &batches[0];
3715
3716 assert_eq!(batch.num_columns(), 3);
3717 assert_eq!(batch.num_rows(), expected_rows);
3718
3719 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3720 assert_eq!(
3721 a.values(),
3722 &[1593604800, 1593604800, 1593604801, 1593604801]
3723 );
3724
3725 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3726 let b: Vec<_> = b.iter().flatten().collect();
3727 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
3728
3729 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3730 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
3731 }
3732 }
3733
3734 #[test]
3735 fn test_read_lz4_hadoop_large() {
3736 let testdata = arrow::util::test_util::parquet_test_data();
3737 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
3738 let file = File::open(path).unwrap();
3739 let expected_rows = 10000;
3740
3741 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3742 .unwrap()
3743 .collect::<Result<Vec<_>, _>>()
3744 .unwrap();
3745 assert_eq!(batches.len(), 1);
3746 let batch = &batches[0];
3747
3748 assert_eq!(batch.num_columns(), 1);
3749 assert_eq!(batch.num_rows(), expected_rows);
3750
3751 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
3752 let a: Vec<_> = a.iter().flatten().collect();
3753 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
3754 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
3755 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
3756 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
3757 }
3758
3759 #[test]
3760 #[cfg(feature = "snap")]
3761 fn test_read_nested_lists() {
3762 let testdata = arrow::util::test_util::parquet_test_data();
3763 let path = format!("{testdata}/nested_lists.snappy.parquet");
3764 let file = File::open(path).unwrap();
3765
3766 let f = file.try_clone().unwrap();
3767 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
3768 let expected = reader.next().unwrap().unwrap();
3769 assert_eq!(expected.num_rows(), 3);
3770
3771 let selection = RowSelection::from(vec![
3772 RowSelector::skip(1),
3773 RowSelector::select(1),
3774 RowSelector::skip(1),
3775 ]);
3776 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
3777 .unwrap()
3778 .with_row_selection(selection)
3779 .build()
3780 .unwrap();
3781
3782 let actual = reader.next().unwrap().unwrap();
3783 assert_eq!(actual.num_rows(), 1);
3784 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
3785 }
3786
3787 #[test]
3788 fn test_arbitrary_decimal() {
3789 let values = [1, 2, 3, 4, 5, 6, 7, 8];
3790 let decimals_19_0 = Decimal128Array::from_iter_values(values)
3791 .with_precision_and_scale(19, 0)
3792 .unwrap();
3793 let decimals_12_0 = Decimal128Array::from_iter_values(values)
3794 .with_precision_and_scale(12, 0)
3795 .unwrap();
3796 let decimals_17_10 = Decimal128Array::from_iter_values(values)
3797 .with_precision_and_scale(17, 10)
3798 .unwrap();
3799
3800 let written = RecordBatch::try_from_iter([
3801 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
3802 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
3803 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
3804 ])
3805 .unwrap();
3806
3807 let mut buffer = Vec::with_capacity(1024);
3808 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
3809 writer.write(&written).unwrap();
3810 writer.close().unwrap();
3811
3812 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
3813 .unwrap()
3814 .collect::<Result<Vec<_>, _>>()
3815 .unwrap();
3816
3817 assert_eq!(&written.slice(0, 8), &read[0]);
3818 }
3819
3820 #[test]
3821 fn test_list_skip() {
3822 let mut list = ListBuilder::new(Int32Builder::new());
3823 list.append_value([Some(1), Some(2)]);
3824 list.append_value([Some(3)]);
3825 list.append_value([Some(4)]);
3826 let list = list.finish();
3827 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
3828
3829 let props = WriterProperties::builder()
3831 .set_data_page_row_count_limit(1)
3832 .set_write_batch_size(2)
3833 .build();
3834
3835 let mut buffer = Vec::with_capacity(1024);
3836 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
3837 writer.write(&batch).unwrap();
3838 writer.close().unwrap();
3839
3840 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
3841 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
3842 .unwrap()
3843 .with_row_selection(selection.into())
3844 .build()
3845 .unwrap();
3846 let out = reader.next().unwrap().unwrap();
3847 assert_eq!(out.num_rows(), 1);
3848 assert_eq!(out, batch.slice(2, 1));
3849 }
3850
3851 fn test_decimal_roundtrip<T: DecimalType>() {
3852 let d = |values: Vec<usize>, p: u8| {
3857 let iter = values.into_iter().map(T::Native::usize_as);
3858 PrimitiveArray::<T>::from_iter_values(iter)
3859 .with_precision_and_scale(p, 2)
3860 .unwrap()
3861 };
3862
3863 let d1 = d(vec![1, 2, 3, 4, 5], 9);
3864 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
3865 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
3866 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
3867
3868 let batch = RecordBatch::try_from_iter([
3869 ("d1", Arc::new(d1) as ArrayRef),
3870 ("d2", Arc::new(d2) as ArrayRef),
3871 ("d3", Arc::new(d3) as ArrayRef),
3872 ("d4", Arc::new(d4) as ArrayRef),
3873 ])
3874 .unwrap();
3875
3876 let mut buffer = Vec::with_capacity(1024);
3877 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
3878 writer.write(&batch).unwrap();
3879 writer.close().unwrap();
3880
3881 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
3882 let t1 = builder.parquet_schema().columns()[0].physical_type();
3883 assert_eq!(t1, PhysicalType::INT32);
3884 let t2 = builder.parquet_schema().columns()[1].physical_type();
3885 assert_eq!(t2, PhysicalType::INT64);
3886 let t3 = builder.parquet_schema().columns()[2].physical_type();
3887 assert_eq!(t3, PhysicalType::INT64);
3888 let t4 = builder.parquet_schema().columns()[3].physical_type();
3889 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
3890
3891 let mut reader = builder.build().unwrap();
3892 assert_eq!(batch.schema(), reader.schema());
3893
3894 let out = reader.next().unwrap().unwrap();
3895 assert_eq!(batch, out);
3896 }
3897
3898 #[test]
3899 fn test_decimal() {
3900 test_decimal_roundtrip::<Decimal128Type>();
3901 test_decimal_roundtrip::<Decimal256Type>();
3902 }
3903
3904 #[test]
3905 fn test_list_selection() {
3906 let schema = Arc::new(Schema::new(vec![Field::new_list(
3907 "list",
3908 Field::new("item", ArrowDataType::Utf8, true),
3909 false,
3910 )]));
3911 let mut buf = Vec::with_capacity(1024);
3912
3913 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
3914
3915 for i in 0..2 {
3916 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
3917 for j in 0..1024 {
3918 list_a_builder.values().append_value(format!("{i} {j}"));
3919 list_a_builder.append(true);
3920 }
3921 let batch =
3922 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
3923 .unwrap();
3924 writer.write(&batch).unwrap();
3925 }
3926 let _metadata = writer.close().unwrap();
3927
3928 let buf = Bytes::from(buf);
3929 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
3930 .unwrap()
3931 .with_row_selection(RowSelection::from(vec![
3932 RowSelector::skip(100),
3933 RowSelector::select(924),
3934 RowSelector::skip(100),
3935 RowSelector::select(924),
3936 ]))
3937 .build()
3938 .unwrap();
3939
3940 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3941 let batch = concat_batches(&schema, &batches).unwrap();
3942
3943 assert_eq!(batch.num_rows(), 924 * 2);
3944 let list = batch.column(0).as_list::<i32>();
3945
3946 for w in list.value_offsets().windows(2) {
3947 assert_eq!(w[0] + 1, w[1])
3948 }
3949 let mut values = list.values().as_string::<i32>().iter();
3950
3951 for i in 0..2 {
3952 for j in 100..1024 {
3953 let expected = format!("{i} {j}");
3954 assert_eq!(values.next().unwrap().unwrap(), &expected);
3955 }
3956 }
3957 }
3958
3959 #[test]
3960 fn test_list_selection_fuzz() {
3961 let mut rng = thread_rng();
3962 let schema = Arc::new(Schema::new(vec![Field::new_list(
3963 "list",
3964 Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
3965 true,
3966 )]));
3967 let mut buf = Vec::with_capacity(1024);
3968 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
3969
3970 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
3971
3972 for _ in 0..2048 {
3973 if rng.gen_bool(0.2) {
3974 list_a_builder.append(false);
3975 continue;
3976 }
3977
3978 let list_a_len = rng.gen_range(0..10);
3979 let list_b_builder = list_a_builder.values();
3980
3981 for _ in 0..list_a_len {
3982 if rng.gen_bool(0.2) {
3983 list_b_builder.append(false);
3984 continue;
3985 }
3986
3987 let list_b_len = rng.gen_range(0..10);
3988 let int_builder = list_b_builder.values();
3989 for _ in 0..list_b_len {
3990 match rng.gen_bool(0.2) {
3991 true => int_builder.append_null(),
3992 false => int_builder.append_value(rng.gen()),
3993 }
3994 }
3995 list_b_builder.append(true)
3996 }
3997 list_a_builder.append(true);
3998 }
3999
4000 let array = Arc::new(list_a_builder.finish());
4001 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4002
4003 writer.write(&batch).unwrap();
4004 let _metadata = writer.close().unwrap();
4005
4006 let buf = Bytes::from(buf);
4007
4008 let cases = [
4009 vec![
4010 RowSelector::skip(100),
4011 RowSelector::select(924),
4012 RowSelector::skip(100),
4013 RowSelector::select(924),
4014 ],
4015 vec![
4016 RowSelector::select(924),
4017 RowSelector::skip(100),
4018 RowSelector::select(924),
4019 RowSelector::skip(100),
4020 ],
4021 vec![
4022 RowSelector::skip(1023),
4023 RowSelector::select(1),
4024 RowSelector::skip(1023),
4025 RowSelector::select(1),
4026 ],
4027 vec![
4028 RowSelector::select(1),
4029 RowSelector::skip(1023),
4030 RowSelector::select(1),
4031 RowSelector::skip(1023),
4032 ],
4033 ];
4034
4035 for batch_size in [100, 1024, 2048] {
4036 for selection in &cases {
4037 let selection = RowSelection::from(selection.clone());
4038 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4039 .unwrap()
4040 .with_row_selection(selection.clone())
4041 .with_batch_size(batch_size)
4042 .build()
4043 .unwrap();
4044
4045 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4046 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4047 assert_eq!(actual.num_rows(), selection.row_count());
4048
4049 let mut batch_offset = 0;
4050 let mut actual_offset = 0;
4051 for selector in selection.iter() {
4052 if selector.skip {
4053 batch_offset += selector.row_count;
4054 continue;
4055 }
4056
4057 assert_eq!(
4058 batch.slice(batch_offset, selector.row_count),
4059 actual.slice(actual_offset, selector.row_count)
4060 );
4061
4062 batch_offset += selector.row_count;
4063 actual_offset += selector.row_count;
4064 }
4065 }
4066 }
4067 }
4068}