1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::column::page::{PageIterator, PageReader};
34#[cfg(feature = "encryption")]
35use crate::encryption::decrypt::FileDecryptionProperties;
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
42
43mod filter;
44mod read_plan;
45mod selection;
46pub mod statistics;
47
48pub struct ArrowReaderBuilder<T> {
94    pub(crate) input: T,
95
96    pub(crate) metadata: Arc<ParquetMetaData>,
97
98    pub(crate) schema: SchemaRef,
99
100    pub(crate) fields: Option<Arc<ParquetField>>,
101
102    pub(crate) batch_size: usize,
103
104    pub(crate) row_groups: Option<Vec<usize>>,
105
106    pub(crate) projection: ProjectionMask,
107
108    pub(crate) filter: Option<RowFilter>,
109
110    pub(crate) selection: Option<RowSelection>,
111
112    pub(crate) limit: Option<usize>,
113
114    pub(crate) offset: Option<usize>,
115}
116
117impl<T: Debug> Debug for ArrowReaderBuilder<T> {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("ArrowReaderBuilder<T>")
120            .field("input", &self.input)
121            .field("metadata", &self.metadata)
122            .field("schema", &self.schema)
123            .field("fields", &self.fields)
124            .field("batch_size", &self.batch_size)
125            .field("row_groups", &self.row_groups)
126            .field("projection", &self.projection)
127            .field("filter", &self.filter)
128            .field("selection", &self.selection)
129            .field("limit", &self.limit)
130            .field("offset", &self.offset)
131            .finish()
132    }
133}
134
135impl<T> ArrowReaderBuilder<T> {
136    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
137        Self {
138            input,
139            metadata: metadata.metadata,
140            schema: metadata.schema,
141            fields: metadata.fields,
142            batch_size: 1024,
143            row_groups: None,
144            projection: ProjectionMask::all(),
145            filter: None,
146            selection: None,
147            limit: None,
148            offset: None,
149        }
150    }
151
152    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
154        &self.metadata
155    }
156
157    pub fn parquet_schema(&self) -> &SchemaDescriptor {
159        self.metadata.file_metadata().schema_descr()
160    }
161
162    pub fn schema(&self) -> &SchemaRef {
164        &self.schema
165    }
166
167    pub fn with_batch_size(self, batch_size: usize) -> Self {
170        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
172        Self { batch_size, ..self }
173    }
174
175    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
179        Self {
180            row_groups: Some(row_groups),
181            ..self
182        }
183    }
184
185    pub fn with_projection(self, mask: ProjectionMask) -> Self {
187        Self {
188            projection: mask,
189            ..self
190        }
191    }
192
193    pub fn with_row_selection(self, selection: RowSelection) -> Self {
253        Self {
254            selection: Some(selection),
255            ..self
256        }
257    }
258
259    pub fn with_row_filter(self, filter: RowFilter) -> Self {
266        Self {
267            filter: Some(filter),
268            ..self
269        }
270    }
271
272    pub fn with_limit(self, limit: usize) -> Self {
280        Self {
281            limit: Some(limit),
282            ..self
283        }
284    }
285
286    pub fn with_offset(self, offset: usize) -> Self {
294        Self {
295            offset: Some(offset),
296            ..self
297        }
298    }
299}
300
301#[derive(Debug, Clone, Default)]
306pub struct ArrowReaderOptions {
307    skip_arrow_metadata: bool,
309    supplied_schema: Option<SchemaRef>,
314    pub(crate) page_index: bool,
316    #[cfg(feature = "encryption")]
318    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
319}
320
321impl ArrowReaderOptions {
322    pub fn new() -> Self {
324        Self::default()
325    }
326
327    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
334        Self {
335            skip_arrow_metadata,
336            ..self
337        }
338    }
339
340    pub fn with_schema(self, schema: SchemaRef) -> Self {
397        Self {
398            supplied_schema: Some(schema),
399            skip_arrow_metadata: true,
400            ..self
401        }
402    }
403
404    pub fn with_page_index(self, page_index: bool) -> Self {
417        Self { page_index, ..self }
418    }
419
420    #[cfg(feature = "encryption")]
424    pub fn with_file_decryption_properties(
425        self,
426        file_decryption_properties: FileDecryptionProperties,
427    ) -> Self {
428        Self {
429            file_decryption_properties: Some(file_decryption_properties),
430            ..self
431        }
432    }
433
434    pub fn page_index(&self) -> bool {
438        self.page_index
439    }
440
441    #[cfg(feature = "encryption")]
446    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
447        self.file_decryption_properties.as_ref()
448    }
449}
450
451#[derive(Debug, Clone)]
466pub struct ArrowReaderMetadata {
467    pub(crate) metadata: Arc<ParquetMetaData>,
469    pub(crate) schema: SchemaRef,
471
472    pub(crate) fields: Option<Arc<ParquetField>>,
473}
474
475impl ArrowReaderMetadata {
476    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
487        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
488        #[cfg(feature = "encryption")]
489        let metadata =
490            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
491        let metadata = metadata.parse_and_finish(reader)?;
492        Self::try_new(Arc::new(metadata), options)
493    }
494
495    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
502        match options.supplied_schema {
503            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
504            None => {
505                let kv_metadata = match options.skip_arrow_metadata {
506                    true => None,
507                    false => metadata.file_metadata().key_value_metadata(),
508                };
509
510                let (schema, fields) = parquet_to_arrow_schema_and_fields(
511                    metadata.file_metadata().schema_descr(),
512                    ProjectionMask::all(),
513                    kv_metadata,
514                )?;
515
516                Ok(Self {
517                    metadata,
518                    schema: Arc::new(schema),
519                    fields: fields.map(Arc::new),
520                })
521            }
522        }
523    }
524
525    fn with_supplied_schema(
526        metadata: Arc<ParquetMetaData>,
527        supplied_schema: SchemaRef,
528    ) -> Result<Self> {
529        let parquet_schema = metadata.file_metadata().schema_descr();
530        let field_levels = parquet_to_arrow_field_levels(
531            parquet_schema,
532            ProjectionMask::all(),
533            Some(supplied_schema.fields()),
534        )?;
535        let fields = field_levels.fields;
536        let inferred_len = fields.len();
537        let supplied_len = supplied_schema.fields().len();
538        if inferred_len != supplied_len {
542            return Err(arrow_err!(format!(
543                "Incompatible supplied Arrow schema: expected {} columns received {}",
544                inferred_len, supplied_len
545            )));
546        }
547
548        let mut errors = Vec::new();
549
550        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
551
552        for (field1, field2) in field_iter {
553            if field1.data_type() != field2.data_type() {
554                errors.push(format!(
555                    "data type mismatch for field {}: requested {:?} but found {:?}",
556                    field1.name(),
557                    field1.data_type(),
558                    field2.data_type()
559                ));
560            }
561            if field1.is_nullable() != field2.is_nullable() {
562                errors.push(format!(
563                    "nullability mismatch for field {}: expected {:?} but found {:?}",
564                    field1.name(),
565                    field1.is_nullable(),
566                    field2.is_nullable()
567                ));
568            }
569            if field1.metadata() != field2.metadata() {
570                errors.push(format!(
571                    "metadata mismatch for field {}: expected {:?} but found {:?}",
572                    field1.name(),
573                    field1.metadata(),
574                    field2.metadata()
575                ));
576            }
577        }
578
579        if !errors.is_empty() {
580            let message = errors.join(", ");
581            return Err(ParquetError::ArrowError(format!(
582                "Incompatible supplied Arrow schema: {message}",
583            )));
584        }
585
586        Ok(Self {
587            metadata,
588            schema: supplied_schema,
589            fields: field_levels.levels.map(Arc::new),
590        })
591    }
592
593    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
595        &self.metadata
596    }
597
598    pub fn parquet_schema(&self) -> &SchemaDescriptor {
600        self.metadata.file_metadata().schema_descr()
601    }
602
603    pub fn schema(&self) -> &SchemaRef {
605        &self.schema
606    }
607}
608
609#[doc(hidden)]
610pub struct SyncReader<T: ChunkReader>(T);
612
613impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
614    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
615        f.debug_tuple("SyncReader").field(&self.0).finish()
616    }
617}
618
619pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
625
626impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
627    pub fn try_new(reader: T) -> Result<Self> {
656        Self::try_new_with_options(reader, Default::default())
657    }
658
659    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
661        let metadata = ArrowReaderMetadata::load(&reader, options)?;
662        Ok(Self::new_with_metadata(reader, metadata))
663    }
664
665    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
703        Self::new_builder(SyncReader(input), metadata)
704    }
705
706    pub fn build(self) -> Result<ParquetRecordBatchReader> {
710        let batch_size = self
712            .batch_size
713            .min(self.metadata.file_metadata().num_rows() as usize);
714
715        let row_groups = self
716            .row_groups
717            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
718
719        let reader = ReaderRowGroups {
720            reader: Arc::new(self.input.0),
721            metadata: self.metadata,
722            row_groups,
723        };
724
725        let mut filter = self.filter;
726        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
727
728        if let Some(filter) = filter.as_mut() {
730            for predicate in filter.predicates.iter_mut() {
731                if !plan_builder.selects_any() {
733                    break;
734                }
735
736                let array_reader = ArrayReaderBuilder::new(&reader)
737                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
738
739                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
740            }
741        }
742
743        let array_reader = ArrayReaderBuilder::new(&reader)
744            .build_array_reader(self.fields.as_deref(), &self.projection)?;
745
746        let read_plan = plan_builder
747            .limited(reader.num_rows())
748            .with_offset(self.offset)
749            .with_limit(self.limit)
750            .build_limited()
751            .build();
752
753        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
754    }
755}
756
757struct ReaderRowGroups<T: ChunkReader> {
758    reader: Arc<T>,
759
760    metadata: Arc<ParquetMetaData>,
761    row_groups: Vec<usize>,
763}
764
765impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
766    fn num_rows(&self) -> usize {
767        let meta = self.metadata.row_groups();
768        self.row_groups
769            .iter()
770            .map(|x| meta[*x].num_rows() as usize)
771            .sum()
772    }
773
774    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
775        Ok(Box::new(ReaderPageIterator {
776            column_idx: i,
777            reader: self.reader.clone(),
778            metadata: self.metadata.clone(),
779            row_groups: self.row_groups.clone().into_iter(),
780        }))
781    }
782}
783
784struct ReaderPageIterator<T: ChunkReader> {
785    reader: Arc<T>,
786    column_idx: usize,
787    row_groups: std::vec::IntoIter<usize>,
788    metadata: Arc<ParquetMetaData>,
789}
790
791impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
792    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
794        let rg = self.metadata.row_group(rg_idx);
795        let column_chunk_metadata = rg.column(self.column_idx);
796        let offset_index = self.metadata.offset_index();
797        let page_locations = offset_index
800            .filter(|i| !i[rg_idx].is_empty())
801            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
802        let total_rows = rg.num_rows() as usize;
803        let reader = self.reader.clone();
804
805        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
806            .add_crypto_context(
807                rg_idx,
808                self.column_idx,
809                self.metadata.as_ref(),
810                column_chunk_metadata,
811            )
812    }
813}
814
815impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
816    type Item = Result<Box<dyn PageReader>>;
817
818    fn next(&mut self) -> Option<Self::Item> {
819        let rg_idx = self.row_groups.next()?;
820        let page_reader = self
821            .next_page_reader(rg_idx)
822            .map(|page_reader| Box::new(page_reader) as _);
823        Some(page_reader)
824    }
825}
826
827impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
828
829pub struct ParquetRecordBatchReader {
832    array_reader: Box<dyn ArrayReader>,
833    schema: SchemaRef,
834    read_plan: ReadPlan,
835}
836
837impl Iterator for ParquetRecordBatchReader {
838    type Item = Result<RecordBatch, ArrowError>;
839
840    fn next(&mut self) -> Option<Self::Item> {
841        self.next_inner()
842            .map_err(|arrow_err| arrow_err.into())
843            .transpose()
844    }
845}
846
847impl ParquetRecordBatchReader {
848    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
854        let mut read_records = 0;
855        let batch_size = self.batch_size();
856        match self.read_plan.selection_mut() {
857            Some(selection) => {
858                while read_records < batch_size && !selection.is_empty() {
859                    let front = selection.pop_front().unwrap();
860                    if front.skip {
861                        let skipped = self.array_reader.skip_records(front.row_count)?;
862
863                        if skipped != front.row_count {
864                            return Err(general_err!(
865                                "failed to skip rows, expected {}, got {}",
866                                front.row_count,
867                                skipped
868                            ));
869                        }
870                        continue;
871                    }
872
873                    if front.row_count == 0 {
876                        continue;
877                    }
878
879                    let need_read = batch_size - read_records;
881                    let to_read = match front.row_count.checked_sub(need_read) {
882                        Some(remaining) if remaining != 0 => {
883                            selection.push_front(RowSelector::select(remaining));
886                            need_read
887                        }
888                        _ => front.row_count,
889                    };
890                    match self.array_reader.read_records(to_read)? {
891                        0 => break,
892                        rec => read_records += rec,
893                    };
894                }
895            }
896            None => {
897                self.array_reader.read_records(batch_size)?;
898            }
899        };
900
901        let array = self.array_reader.consume_batch()?;
902        let struct_array = array.as_struct_opt().ok_or_else(|| {
903            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
904        })?;
905
906        Ok(if struct_array.len() > 0 {
907            Some(RecordBatch::from(struct_array))
908        } else {
909            None
910        })
911    }
912}
913
914impl RecordBatchReader for ParquetRecordBatchReader {
915    fn schema(&self) -> SchemaRef {
920        self.schema.clone()
921    }
922}
923
924impl ParquetRecordBatchReader {
925    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
929        ParquetRecordBatchReaderBuilder::try_new(reader)?
930            .with_batch_size(batch_size)
931            .build()
932    }
933
934    pub fn try_new_with_row_groups(
939        levels: &FieldLevels,
940        row_groups: &dyn RowGroups,
941        batch_size: usize,
942        selection: Option<RowSelection>,
943    ) -> Result<Self> {
944        let array_reader = ArrayReaderBuilder::new(row_groups)
945            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
946
947        let read_plan = ReadPlanBuilder::new(batch_size)
948            .with_selection(selection)
949            .build();
950
951        Ok(Self {
952            array_reader,
953            schema: Arc::new(Schema::new(levels.fields.clone())),
954            read_plan,
955        })
956    }
957
958    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
962        let schema = match array_reader.get_data_type() {
963            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
964            _ => unreachable!("Struct array reader's data type is not struct!"),
965        };
966
967        Self {
968            array_reader,
969            schema: Arc::new(schema),
970            read_plan,
971        }
972    }
973
974    #[inline(always)]
975    pub(crate) fn batch_size(&self) -> usize {
976        self.read_plan.batch_size()
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use std::cmp::min;
983    use std::collections::{HashMap, VecDeque};
984    use std::fmt::Formatter;
985    use std::fs::File;
986    use std::io::Seek;
987    use std::path::PathBuf;
988    use std::sync::Arc;
989
990    use arrow_array::builder::*;
991    use arrow_array::cast::AsArray;
992    use arrow_array::types::{
993        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
994        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
995    };
996    use arrow_array::*;
997    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
998    use arrow_data::{ArrayData, ArrayDataBuilder};
999    use arrow_schema::{
1000        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1001    };
1002    use arrow_select::concat::concat_batches;
1003    use bytes::Bytes;
1004    use half::f16;
1005    use num::PrimInt;
1006    use rand::{rng, Rng, RngCore};
1007    use tempfile::tempfile;
1008
1009    use crate::arrow::arrow_reader::{
1010        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1011        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1012    };
1013    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1014    use crate::arrow::{ArrowWriter, ProjectionMask};
1015    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1016    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1017    use crate::data_type::{
1018        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1019        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1020    };
1021    use crate::errors::Result;
1022    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1023    use crate::file::writer::SerializedFileWriter;
1024    use crate::schema::parser::parse_message_type;
1025    use crate::schema::types::{Type, TypePtr};
1026    use crate::util::test_common::rand_gen::RandGen;
1027
1028    #[test]
1029    fn test_arrow_reader_all_columns() {
1030        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1031
1032        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1033        let original_schema = Arc::clone(builder.schema());
1034        let reader = builder.build().unwrap();
1035
1036        assert_eq!(original_schema.fields(), reader.schema().fields());
1038    }
1039
1040    #[test]
1041    fn test_arrow_reader_single_column() {
1042        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1043
1044        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045        let original_schema = Arc::clone(builder.schema());
1046
1047        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1048        let reader = builder.with_projection(mask).build().unwrap();
1049
1050        assert_eq!(1, reader.schema().fields().len());
1052        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1053    }
1054
1055    #[test]
1056    fn test_arrow_reader_single_column_by_name() {
1057        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1058
1059        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1060        let original_schema = Arc::clone(builder.schema());
1061
1062        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1063        let reader = builder.with_projection(mask).build().unwrap();
1064
1065        assert_eq!(1, reader.schema().fields().len());
1067        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1068    }
1069
1070    #[test]
1071    fn test_null_column_reader_test() {
1072        let mut file = tempfile::tempfile().unwrap();
1073
1074        let schema = "
1075            message message {
1076                OPTIONAL INT32 int32;
1077            }
1078        ";
1079        let schema = Arc::new(parse_message_type(schema).unwrap());
1080
1081        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1082        generate_single_column_file_with_data::<Int32Type>(
1083            &[vec![], vec![]],
1084            Some(&def_levels),
1085            file.try_clone().unwrap(), schema,
1087            Some(Field::new("int32", ArrowDataType::Null, true)),
1088            &Default::default(),
1089        )
1090        .unwrap();
1091
1092        file.rewind().unwrap();
1093
1094        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1095        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1096
1097        assert_eq!(batches.len(), 4);
1098        for batch in &batches[0..3] {
1099            assert_eq!(batch.num_rows(), 2);
1100            assert_eq!(batch.num_columns(), 1);
1101            assert_eq!(batch.column(0).null_count(), 2);
1102        }
1103
1104        assert_eq!(batches[3].num_rows(), 1);
1105        assert_eq!(batches[3].num_columns(), 1);
1106        assert_eq!(batches[3].column(0).null_count(), 1);
1107    }
1108
1109    #[test]
1110    fn test_primitive_single_column_reader_test() {
1111        run_single_column_reader_tests::<BoolType, _, BoolType>(
1112            2,
1113            ConvertedType::NONE,
1114            None,
1115            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1116            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1117        );
1118        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1119            2,
1120            ConvertedType::NONE,
1121            None,
1122            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1123            &[
1124                Encoding::PLAIN,
1125                Encoding::RLE_DICTIONARY,
1126                Encoding::DELTA_BINARY_PACKED,
1127                Encoding::BYTE_STREAM_SPLIT,
1128            ],
1129        );
1130        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1131            2,
1132            ConvertedType::NONE,
1133            None,
1134            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1135            &[
1136                Encoding::PLAIN,
1137                Encoding::RLE_DICTIONARY,
1138                Encoding::DELTA_BINARY_PACKED,
1139                Encoding::BYTE_STREAM_SPLIT,
1140            ],
1141        );
1142        run_single_column_reader_tests::<FloatType, _, FloatType>(
1143            2,
1144            ConvertedType::NONE,
1145            None,
1146            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1147            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1148        );
1149    }
1150
1151    #[test]
1152    fn test_unsigned_primitive_single_column_reader_test() {
1153        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1154            2,
1155            ConvertedType::UINT_32,
1156            Some(ArrowDataType::UInt32),
1157            |vals| {
1158                Arc::new(UInt32Array::from_iter(
1159                    vals.iter().map(|x| x.map(|x| x as u32)),
1160                ))
1161            },
1162            &[
1163                Encoding::PLAIN,
1164                Encoding::RLE_DICTIONARY,
1165                Encoding::DELTA_BINARY_PACKED,
1166            ],
1167        );
1168        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1169            2,
1170            ConvertedType::UINT_64,
1171            Some(ArrowDataType::UInt64),
1172            |vals| {
1173                Arc::new(UInt64Array::from_iter(
1174                    vals.iter().map(|x| x.map(|x| x as u64)),
1175                ))
1176            },
1177            &[
1178                Encoding::PLAIN,
1179                Encoding::RLE_DICTIONARY,
1180                Encoding::DELTA_BINARY_PACKED,
1181            ],
1182        );
1183    }
1184
1185    #[test]
1186    fn test_unsigned_roundtrip() {
1187        let schema = Arc::new(Schema::new(vec![
1188            Field::new("uint32", ArrowDataType::UInt32, true),
1189            Field::new("uint64", ArrowDataType::UInt64, true),
1190        ]));
1191
1192        let mut buf = Vec::with_capacity(1024);
1193        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1194
1195        let original = RecordBatch::try_new(
1196            schema,
1197            vec![
1198                Arc::new(UInt32Array::from_iter_values([
1199                    0,
1200                    i32::MAX as u32,
1201                    u32::MAX,
1202                ])),
1203                Arc::new(UInt64Array::from_iter_values([
1204                    0,
1205                    i64::MAX as u64,
1206                    u64::MAX,
1207                ])),
1208            ],
1209        )
1210        .unwrap();
1211
1212        writer.write(&original).unwrap();
1213        writer.close().unwrap();
1214
1215        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1216        let ret = reader.next().unwrap().unwrap();
1217        assert_eq!(ret, original);
1218
1219        ret.column(0)
1221            .as_any()
1222            .downcast_ref::<UInt32Array>()
1223            .unwrap();
1224
1225        ret.column(1)
1226            .as_any()
1227            .downcast_ref::<UInt64Array>()
1228            .unwrap();
1229    }
1230
1231    #[test]
1232    fn test_float16_roundtrip() -> Result<()> {
1233        let schema = Arc::new(Schema::new(vec![
1234            Field::new("float16", ArrowDataType::Float16, false),
1235            Field::new("float16-nullable", ArrowDataType::Float16, true),
1236        ]));
1237
1238        let mut buf = Vec::with_capacity(1024);
1239        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1240
1241        let original = RecordBatch::try_new(
1242            schema,
1243            vec![
1244                Arc::new(Float16Array::from_iter_values([
1245                    f16::EPSILON,
1246                    f16::MIN,
1247                    f16::MAX,
1248                    f16::NAN,
1249                    f16::INFINITY,
1250                    f16::NEG_INFINITY,
1251                    f16::ONE,
1252                    f16::NEG_ONE,
1253                    f16::ZERO,
1254                    f16::NEG_ZERO,
1255                    f16::E,
1256                    f16::PI,
1257                    f16::FRAC_1_PI,
1258                ])),
1259                Arc::new(Float16Array::from(vec![
1260                    None,
1261                    None,
1262                    None,
1263                    Some(f16::NAN),
1264                    Some(f16::INFINITY),
1265                    Some(f16::NEG_INFINITY),
1266                    None,
1267                    None,
1268                    None,
1269                    None,
1270                    None,
1271                    None,
1272                    Some(f16::FRAC_1_PI),
1273                ])),
1274            ],
1275        )?;
1276
1277        writer.write(&original)?;
1278        writer.close()?;
1279
1280        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1281        let ret = reader.next().unwrap()?;
1282        assert_eq!(ret, original);
1283
1284        ret.column(0).as_primitive::<Float16Type>();
1286        ret.column(1).as_primitive::<Float16Type>();
1287
1288        Ok(())
1289    }
1290
1291    #[test]
1292    fn test_time_utc_roundtrip() -> Result<()> {
1293        let schema = Arc::new(Schema::new(vec![
1294            Field::new(
1295                "time_millis",
1296                ArrowDataType::Time32(TimeUnit::Millisecond),
1297                true,
1298            )
1299            .with_metadata(HashMap::from_iter(vec![(
1300                "adjusted_to_utc".to_string(),
1301                "".to_string(),
1302            )])),
1303            Field::new(
1304                "time_micros",
1305                ArrowDataType::Time64(TimeUnit::Microsecond),
1306                true,
1307            )
1308            .with_metadata(HashMap::from_iter(vec![(
1309                "adjusted_to_utc".to_string(),
1310                "".to_string(),
1311            )])),
1312        ]));
1313
1314        let mut buf = Vec::with_capacity(1024);
1315        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1316
1317        let original = RecordBatch::try_new(
1318            schema,
1319            vec![
1320                Arc::new(Time32MillisecondArray::from(vec![
1321                    Some(-1),
1322                    Some(0),
1323                    Some(86_399_000),
1324                    Some(86_400_000),
1325                    Some(86_401_000),
1326                    None,
1327                ])),
1328                Arc::new(Time64MicrosecondArray::from(vec![
1329                    Some(-1),
1330                    Some(0),
1331                    Some(86_399 * 1_000_000),
1332                    Some(86_400 * 1_000_000),
1333                    Some(86_401 * 1_000_000),
1334                    None,
1335                ])),
1336            ],
1337        )?;
1338
1339        writer.write(&original)?;
1340        writer.close()?;
1341
1342        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1343        let ret = reader.next().unwrap()?;
1344        assert_eq!(ret, original);
1345
1346        ret.column(0).as_primitive::<Time32MillisecondType>();
1348        ret.column(1).as_primitive::<Time64MicrosecondType>();
1349
1350        Ok(())
1351    }
1352
1353    #[test]
1354    fn test_date32_roundtrip() -> Result<()> {
1355        use arrow_array::Date32Array;
1356
1357        let schema = Arc::new(Schema::new(vec![Field::new(
1358            "date32",
1359            ArrowDataType::Date32,
1360            false,
1361        )]));
1362
1363        let mut buf = Vec::with_capacity(1024);
1364
1365        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1366
1367        let original = RecordBatch::try_new(
1368            schema,
1369            vec![Arc::new(Date32Array::from(vec![
1370                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1371            ]))],
1372        )?;
1373
1374        writer.write(&original)?;
1375        writer.close()?;
1376
1377        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1378        let ret = reader.next().unwrap()?;
1379        assert_eq!(ret, original);
1380
1381        ret.column(0).as_primitive::<Date32Type>();
1383
1384        Ok(())
1385    }
1386
1387    #[test]
1388    fn test_date64_roundtrip() -> Result<()> {
1389        use arrow_array::Date64Array;
1390
1391        let schema = Arc::new(Schema::new(vec![
1392            Field::new("small-date64", ArrowDataType::Date64, false),
1393            Field::new("big-date64", ArrowDataType::Date64, false),
1394            Field::new("invalid-date64", ArrowDataType::Date64, false),
1395        ]));
1396
1397        let mut default_buf = Vec::with_capacity(1024);
1398        let mut coerce_buf = Vec::with_capacity(1024);
1399
1400        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1401
1402        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1403        let mut coerce_writer =
1404            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1405
1406        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1407
1408        let original = RecordBatch::try_new(
1409            schema,
1410            vec![
1411                Arc::new(Date64Array::from(vec![
1413                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1414                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1415                    0,
1416                    1_000 * NUM_MILLISECONDS_IN_DAY,
1417                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1418                ])),
1419                Arc::new(Date64Array::from(vec![
1421                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1422                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1423                    0,
1424                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1425                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1426                ])),
1427                Arc::new(Date64Array::from(vec![
1429                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1430                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1431                    1,
1432                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1433                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1434                ])),
1435            ],
1436        )?;
1437
1438        default_writer.write(&original)?;
1439        coerce_writer.write(&original)?;
1440
1441        default_writer.close()?;
1442        coerce_writer.close()?;
1443
1444        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1445        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1446
1447        let default_ret = default_reader.next().unwrap()?;
1448        let coerce_ret = coerce_reader.next().unwrap()?;
1449
1450        assert_eq!(default_ret, original);
1452
1453        assert_eq!(coerce_ret.column(0), original.column(0));
1455        assert_ne!(coerce_ret.column(1), original.column(1));
1456        assert_ne!(coerce_ret.column(2), original.column(2));
1457
1458        default_ret.column(0).as_primitive::<Date64Type>();
1460        coerce_ret.column(0).as_primitive::<Date64Type>();
1461
1462        Ok(())
1463    }
1464    struct RandFixedLenGen {}
1465
1466    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1467        fn gen(len: i32) -> FixedLenByteArray {
1468            let mut v = vec![0u8; len as usize];
1469            rng().fill_bytes(&mut v);
1470            ByteArray::from(v).into()
1471        }
1472    }
1473
1474    #[test]
1475    fn test_fixed_length_binary_column_reader() {
1476        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1477            20,
1478            ConvertedType::NONE,
1479            None,
1480            |vals| {
1481                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1482                for val in vals {
1483                    match val {
1484                        Some(b) => builder.append_value(b).unwrap(),
1485                        None => builder.append_null(),
1486                    }
1487                }
1488                Arc::new(builder.finish())
1489            },
1490            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1491        );
1492    }
1493
1494    #[test]
1495    fn test_interval_day_time_column_reader() {
1496        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1497            12,
1498            ConvertedType::INTERVAL,
1499            None,
1500            |vals| {
1501                Arc::new(
1502                    vals.iter()
1503                        .map(|x| {
1504                            x.as_ref().map(|b| IntervalDayTime {
1505                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1506                                milliseconds: i32::from_le_bytes(
1507                                    b.as_ref()[8..12].try_into().unwrap(),
1508                                ),
1509                            })
1510                        })
1511                        .collect::<IntervalDayTimeArray>(),
1512                )
1513            },
1514            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1515        );
1516    }
1517
1518    #[test]
1519    fn test_int96_single_column_reader_test() {
1520        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1521
1522        type TypeHintAndConversionFunction =
1523            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1524
1525        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1526            (None, |vals: &[Option<Int96>]| {
1528                Arc::new(TimestampNanosecondArray::from_iter(
1529                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1530                )) as ArrayRef
1531            }),
1532            (
1534                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1535                |vals: &[Option<Int96>]| {
1536                    Arc::new(TimestampSecondArray::from_iter(
1537                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1538                    )) as ArrayRef
1539                },
1540            ),
1541            (
1542                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1543                |vals: &[Option<Int96>]| {
1544                    Arc::new(TimestampMillisecondArray::from_iter(
1545                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1546                    )) as ArrayRef
1547                },
1548            ),
1549            (
1550                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1551                |vals: &[Option<Int96>]| {
1552                    Arc::new(TimestampMicrosecondArray::from_iter(
1553                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1554                    )) as ArrayRef
1555                },
1556            ),
1557            (
1558                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1559                |vals: &[Option<Int96>]| {
1560                    Arc::new(TimestampNanosecondArray::from_iter(
1561                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1562                    )) as ArrayRef
1563                },
1564            ),
1565            (
1567                Some(ArrowDataType::Timestamp(
1568                    TimeUnit::Second,
1569                    Some(Arc::from("-05:00")),
1570                )),
1571                |vals: &[Option<Int96>]| {
1572                    Arc::new(
1573                        TimestampSecondArray::from_iter(
1574                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1575                        )
1576                        .with_timezone("-05:00"),
1577                    ) as ArrayRef
1578                },
1579            ),
1580        ];
1581
1582        resolutions.iter().for_each(|(arrow_type, converter)| {
1583            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1584                2,
1585                ConvertedType::NONE,
1586                arrow_type.clone(),
1587                converter,
1588                encodings,
1589            );
1590        })
1591    }
1592
1593    #[test]
1594    fn test_int96_from_spark_file_with_provided_schema() {
1595        use arrow_schema::DataType::Timestamp;
1599        let test_data = arrow::util::test_util::parquet_test_data();
1600        let path = format!("{test_data}/int96_from_spark.parquet");
1601        let file = File::open(path).unwrap();
1602
1603        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1604            "a",
1605            Timestamp(TimeUnit::Microsecond, None),
1606            true,
1607        )]));
1608        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1609
1610        let mut record_reader =
1611            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1612                .unwrap()
1613                .build()
1614                .unwrap();
1615
1616        let batch = record_reader.next().unwrap().unwrap();
1617        assert_eq!(batch.num_columns(), 1);
1618        let column = batch.column(0);
1619        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1620
1621        let expected = Arc::new(Int64Array::from(vec![
1622            Some(1704141296123456),
1623            Some(1704070800000000),
1624            Some(253402225200000000),
1625            Some(1735599600000000),
1626            None,
1627            Some(9089380393200000000),
1628        ]));
1629
1630        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1635        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1636
1637        assert_eq!(casted_timestamps.len(), expected.len());
1638
1639        casted_timestamps
1640            .iter()
1641            .zip(expected.iter())
1642            .for_each(|(lhs, rhs)| {
1643                assert_eq!(lhs, rhs);
1644            });
1645    }
1646
1647    #[test]
1648    fn test_int96_from_spark_file_without_provided_schema() {
1649        use arrow_schema::DataType::Timestamp;
1653        let test_data = arrow::util::test_util::parquet_test_data();
1654        let path = format!("{test_data}/int96_from_spark.parquet");
1655        let file = File::open(path).unwrap();
1656
1657        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1658            .unwrap()
1659            .build()
1660            .unwrap();
1661
1662        let batch = record_reader.next().unwrap().unwrap();
1663        assert_eq!(batch.num_columns(), 1);
1664        let column = batch.column(0);
1665        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1666
1667        let expected = Arc::new(Int64Array::from(vec![
1668            Some(1704141296123456000),  Some(1704070800000000000),  Some(-4852191831933722624), Some(1735599600000000000),  None,
1673            Some(-4864435138808946688), ]));
1675
1676        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1681        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1682
1683        assert_eq!(casted_timestamps.len(), expected.len());
1684
1685        casted_timestamps
1686            .iter()
1687            .zip(expected.iter())
1688            .for_each(|(lhs, rhs)| {
1689                assert_eq!(lhs, rhs);
1690            });
1691    }
1692
1693    struct RandUtf8Gen {}
1694
1695    impl RandGen<ByteArrayType> for RandUtf8Gen {
1696        fn gen(len: i32) -> ByteArray {
1697            Int32Type::gen(len).to_string().as_str().into()
1698        }
1699    }
1700
1701    #[test]
1702    fn test_utf8_single_column_reader_test() {
1703        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1704            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1705                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1706            })))
1707        }
1708
1709        let encodings = &[
1710            Encoding::PLAIN,
1711            Encoding::RLE_DICTIONARY,
1712            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1713            Encoding::DELTA_BYTE_ARRAY,
1714        ];
1715
1716        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1717            2,
1718            ConvertedType::NONE,
1719            None,
1720            |vals| {
1721                Arc::new(BinaryArray::from_iter(
1722                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1723                ))
1724            },
1725            encodings,
1726        );
1727
1728        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1729            2,
1730            ConvertedType::UTF8,
1731            None,
1732            string_converter::<i32>,
1733            encodings,
1734        );
1735
1736        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1737            2,
1738            ConvertedType::UTF8,
1739            Some(ArrowDataType::Utf8),
1740            string_converter::<i32>,
1741            encodings,
1742        );
1743
1744        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1745            2,
1746            ConvertedType::UTF8,
1747            Some(ArrowDataType::LargeUtf8),
1748            string_converter::<i64>,
1749            encodings,
1750        );
1751
1752        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1753        for key in &small_key_types {
1754            for encoding in encodings {
1755                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1756                opts.encoding = *encoding;
1757
1758                let data_type =
1759                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1760
1761                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1763                    opts,
1764                    2,
1765                    ConvertedType::UTF8,
1766                    Some(data_type.clone()),
1767                    move |vals| {
1768                        let vals = string_converter::<i32>(vals);
1769                        arrow::compute::cast(&vals, &data_type).unwrap()
1770                    },
1771                );
1772            }
1773        }
1774
1775        let key_types = [
1776            ArrowDataType::Int16,
1777            ArrowDataType::UInt16,
1778            ArrowDataType::Int32,
1779            ArrowDataType::UInt32,
1780            ArrowDataType::Int64,
1781            ArrowDataType::UInt64,
1782        ];
1783
1784        for key in &key_types {
1785            let data_type =
1786                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1787
1788            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1789                2,
1790                ConvertedType::UTF8,
1791                Some(data_type.clone()),
1792                move |vals| {
1793                    let vals = string_converter::<i32>(vals);
1794                    arrow::compute::cast(&vals, &data_type).unwrap()
1795                },
1796                encodings,
1797            );
1798
1799            }
1816    }
1817
1818    #[test]
1819    fn test_decimal_nullable_struct() {
1820        let decimals = Decimal256Array::from_iter_values(
1821            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1822        );
1823
1824        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1825            "decimals",
1826            decimals.data_type().clone(),
1827            false,
1828        )])))
1829        .len(8)
1830        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1831        .child_data(vec![decimals.into_data()])
1832        .build()
1833        .unwrap();
1834
1835        let written =
1836            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1837                .unwrap();
1838
1839        let mut buffer = Vec::with_capacity(1024);
1840        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1841        writer.write(&written).unwrap();
1842        writer.close().unwrap();
1843
1844        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1845            .unwrap()
1846            .collect::<Result<Vec<_>, _>>()
1847            .unwrap();
1848
1849        assert_eq!(&written.slice(0, 3), &read[0]);
1850        assert_eq!(&written.slice(3, 3), &read[1]);
1851        assert_eq!(&written.slice(6, 2), &read[2]);
1852    }
1853
1854    #[test]
1855    fn test_int32_nullable_struct() {
1856        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1857        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1858            "int32",
1859            int32.data_type().clone(),
1860            false,
1861        )])))
1862        .len(8)
1863        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1864        .child_data(vec![int32.into_data()])
1865        .build()
1866        .unwrap();
1867
1868        let written =
1869            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1870                .unwrap();
1871
1872        let mut buffer = Vec::with_capacity(1024);
1873        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1874        writer.write(&written).unwrap();
1875        writer.close().unwrap();
1876
1877        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1878            .unwrap()
1879            .collect::<Result<Vec<_>, _>>()
1880            .unwrap();
1881
1882        assert_eq!(&written.slice(0, 3), &read[0]);
1883        assert_eq!(&written.slice(3, 3), &read[1]);
1884        assert_eq!(&written.slice(6, 2), &read[2]);
1885    }
1886
1887    #[test]
1888    fn test_decimal_list() {
1889        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1890
1891        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1893            decimals.data_type().clone(),
1894            false,
1895        ))))
1896        .len(7)
1897        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1898        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1899        .child_data(vec![decimals.into_data()])
1900        .build()
1901        .unwrap();
1902
1903        let written =
1904            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1905                .unwrap();
1906
1907        let mut buffer = Vec::with_capacity(1024);
1908        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1909        writer.write(&written).unwrap();
1910        writer.close().unwrap();
1911
1912        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1913            .unwrap()
1914            .collect::<Result<Vec<_>, _>>()
1915            .unwrap();
1916
1917        assert_eq!(&written.slice(0, 3), &read[0]);
1918        assert_eq!(&written.slice(3, 3), &read[1]);
1919        assert_eq!(&written.slice(6, 1), &read[2]);
1920    }
1921
1922    #[test]
1923    fn test_read_decimal_file() {
1924        use arrow_array::Decimal128Array;
1925        let testdata = arrow::util::test_util::parquet_test_data();
1926        let file_variants = vec![
1927            ("byte_array", 4),
1928            ("fixed_length", 25),
1929            ("int32", 4),
1930            ("int64", 10),
1931        ];
1932        for (prefix, target_precision) in file_variants {
1933            let path = format!("{testdata}/{prefix}_decimal.parquet");
1934            let file = File::open(path).unwrap();
1935            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1936
1937            let batch = record_reader.next().unwrap().unwrap();
1938            assert_eq!(batch.num_rows(), 24);
1939            let col = batch
1940                .column(0)
1941                .as_any()
1942                .downcast_ref::<Decimal128Array>()
1943                .unwrap();
1944
1945            let expected = 1..25;
1946
1947            assert_eq!(col.precision(), target_precision);
1948            assert_eq!(col.scale(), 2);
1949
1950            for (i, v) in expected.enumerate() {
1951                assert_eq!(col.value(i), v * 100_i128);
1952            }
1953        }
1954    }
1955
1956    #[test]
1957    fn test_read_float16_nonzeros_file() {
1958        use arrow_array::Float16Array;
1959        let testdata = arrow::util::test_util::parquet_test_data();
1960        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1962        let file = File::open(path).unwrap();
1963        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1964
1965        let batch = record_reader.next().unwrap().unwrap();
1966        assert_eq!(batch.num_rows(), 8);
1967        let col = batch
1968            .column(0)
1969            .as_any()
1970            .downcast_ref::<Float16Array>()
1971            .unwrap();
1972
1973        let f16_two = f16::ONE + f16::ONE;
1974
1975        assert_eq!(col.null_count(), 1);
1976        assert!(col.is_null(0));
1977        assert_eq!(col.value(1), f16::ONE);
1978        assert_eq!(col.value(2), -f16_two);
1979        assert!(col.value(3).is_nan());
1980        assert_eq!(col.value(4), f16::ZERO);
1981        assert!(col.value(4).is_sign_positive());
1982        assert_eq!(col.value(5), f16::NEG_ONE);
1983        assert_eq!(col.value(6), f16::NEG_ZERO);
1984        assert!(col.value(6).is_sign_negative());
1985        assert_eq!(col.value(7), f16_two);
1986    }
1987
1988    #[test]
1989    fn test_read_float16_zeros_file() {
1990        use arrow_array::Float16Array;
1991        let testdata = arrow::util::test_util::parquet_test_data();
1992        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1994        let file = File::open(path).unwrap();
1995        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1996
1997        let batch = record_reader.next().unwrap().unwrap();
1998        assert_eq!(batch.num_rows(), 3);
1999        let col = batch
2000            .column(0)
2001            .as_any()
2002            .downcast_ref::<Float16Array>()
2003            .unwrap();
2004
2005        assert_eq!(col.null_count(), 1);
2006        assert!(col.is_null(0));
2007        assert_eq!(col.value(1), f16::ZERO);
2008        assert!(col.value(1).is_sign_positive());
2009        assert!(col.value(2).is_nan());
2010    }
2011
2012    #[test]
2013    fn test_read_float32_float64_byte_stream_split() {
2014        let path = format!(
2015            "{}/byte_stream_split.zstd.parquet",
2016            arrow::util::test_util::parquet_test_data(),
2017        );
2018        let file = File::open(path).unwrap();
2019        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2020
2021        let mut row_count = 0;
2022        for batch in record_reader {
2023            let batch = batch.unwrap();
2024            row_count += batch.num_rows();
2025            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2026            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2027
2028            for &x in f32_col.values() {
2030                assert!(x > -10.0);
2031                assert!(x < 10.0);
2032            }
2033            for &x in f64_col.values() {
2034                assert!(x > -10.0);
2035                assert!(x < 10.0);
2036            }
2037        }
2038        assert_eq!(row_count, 300);
2039    }
2040
2041    #[test]
2042    fn test_read_extended_byte_stream_split() {
2043        let path = format!(
2044            "{}/byte_stream_split_extended.gzip.parquet",
2045            arrow::util::test_util::parquet_test_data(),
2046        );
2047        let file = File::open(path).unwrap();
2048        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2049
2050        let mut row_count = 0;
2051        for batch in record_reader {
2052            let batch = batch.unwrap();
2053            row_count += batch.num_rows();
2054
2055            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2057            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2058            assert_eq!(f16_col.len(), f16_bss.len());
2059            f16_col
2060                .iter()
2061                .zip(f16_bss.iter())
2062                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2063
2064            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2066            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2067            assert_eq!(f32_col.len(), f32_bss.len());
2068            f32_col
2069                .iter()
2070                .zip(f32_bss.iter())
2071                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2072
2073            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2075            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2076            assert_eq!(f64_col.len(), f64_bss.len());
2077            f64_col
2078                .iter()
2079                .zip(f64_bss.iter())
2080                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2081
2082            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2084            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2085            assert_eq!(i32_col.len(), i32_bss.len());
2086            i32_col
2087                .iter()
2088                .zip(i32_bss.iter())
2089                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2090
2091            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2093            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2094            assert_eq!(i64_col.len(), i64_bss.len());
2095            i64_col
2096                .iter()
2097                .zip(i64_bss.iter())
2098                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2099
2100            let flba_col = batch.column(10).as_fixed_size_binary();
2102            let flba_bss = batch.column(11).as_fixed_size_binary();
2103            assert_eq!(flba_col.len(), flba_bss.len());
2104            flba_col
2105                .iter()
2106                .zip(flba_bss.iter())
2107                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2108
2109            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2111            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2112            assert_eq!(dec_col.len(), dec_bss.len());
2113            dec_col
2114                .iter()
2115                .zip(dec_bss.iter())
2116                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2117        }
2118        assert_eq!(row_count, 200);
2119    }
2120
2121    #[test]
2122    fn test_read_incorrect_map_schema_file() {
2123        let testdata = arrow::util::test_util::parquet_test_data();
2124        let path = format!("{testdata}/incorrect_map_schema.parquet");
2126        let file = File::open(path).unwrap();
2127        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2128
2129        let batch = record_reader.next().unwrap().unwrap();
2130        assert_eq!(batch.num_rows(), 1);
2131
2132        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2133            "my_map",
2134            ArrowDataType::Map(
2135                Arc::new(Field::new(
2136                    "key_value",
2137                    ArrowDataType::Struct(Fields::from(vec![
2138                        Field::new("key", ArrowDataType::Utf8, false),
2139                        Field::new("value", ArrowDataType::Utf8, true),
2140                    ])),
2141                    false,
2142                )),
2143                false,
2144            ),
2145            true,
2146        )]));
2147        assert_eq!(batch.schema().as_ref(), &expected_schema);
2148
2149        assert_eq!(batch.num_rows(), 1);
2150        assert_eq!(batch.column(0).null_count(), 0);
2151        assert_eq!(
2152            batch.column(0).as_map().keys().as_ref(),
2153            &StringArray::from(vec!["parent", "name"])
2154        );
2155        assert_eq!(
2156            batch.column(0).as_map().values().as_ref(),
2157            &StringArray::from(vec!["another", "report"])
2158        );
2159    }
2160
2161    #[test]
2162    fn test_read_dict_fixed_size_binary() {
2163        let schema = Arc::new(Schema::new(vec![Field::new(
2164            "a",
2165            ArrowDataType::Dictionary(
2166                Box::new(ArrowDataType::UInt8),
2167                Box::new(ArrowDataType::FixedSizeBinary(8)),
2168            ),
2169            true,
2170        )]));
2171        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2172        let values = FixedSizeBinaryArray::try_from_iter(
2173            vec![
2174                (0u8..8u8).collect::<Vec<u8>>(),
2175                (24u8..32u8).collect::<Vec<u8>>(),
2176            ]
2177            .into_iter(),
2178        )
2179        .unwrap();
2180        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2181        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2182
2183        let mut buffer = Vec::with_capacity(1024);
2184        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2185        writer.write(&batch).unwrap();
2186        writer.close().unwrap();
2187        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2188            .unwrap()
2189            .collect::<Result<Vec<_>, _>>()
2190            .unwrap();
2191
2192        assert_eq!(read.len(), 1);
2193        assert_eq!(&batch, &read[0])
2194    }
2195
2196    #[derive(Clone)]
2198    struct TestOptions {
2199        num_row_groups: usize,
2202        num_rows: usize,
2204        record_batch_size: usize,
2206        null_percent: Option<usize>,
2208        write_batch_size: usize,
2213        max_data_page_size: usize,
2215        max_dict_page_size: usize,
2217        writer_version: WriterVersion,
2219        enabled_statistics: EnabledStatistics,
2221        encoding: Encoding,
2223        row_selections: Option<(RowSelection, usize)>,
2225        row_filter: Option<Vec<bool>>,
2227        limit: Option<usize>,
2229        offset: Option<usize>,
2231    }
2232
2233    impl std::fmt::Debug for TestOptions {
2235        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2236            f.debug_struct("TestOptions")
2237                .field("num_row_groups", &self.num_row_groups)
2238                .field("num_rows", &self.num_rows)
2239                .field("record_batch_size", &self.record_batch_size)
2240                .field("null_percent", &self.null_percent)
2241                .field("write_batch_size", &self.write_batch_size)
2242                .field("max_data_page_size", &self.max_data_page_size)
2243                .field("max_dict_page_size", &self.max_dict_page_size)
2244                .field("writer_version", &self.writer_version)
2245                .field("enabled_statistics", &self.enabled_statistics)
2246                .field("encoding", &self.encoding)
2247                .field("row_selections", &self.row_selections.is_some())
2248                .field("row_filter", &self.row_filter.is_some())
2249                .field("limit", &self.limit)
2250                .field("offset", &self.offset)
2251                .finish()
2252        }
2253    }
2254
2255    impl Default for TestOptions {
2256        fn default() -> Self {
2257            Self {
2258                num_row_groups: 2,
2259                num_rows: 100,
2260                record_batch_size: 15,
2261                null_percent: None,
2262                write_batch_size: 64,
2263                max_data_page_size: 1024 * 1024,
2264                max_dict_page_size: 1024 * 1024,
2265                writer_version: WriterVersion::PARQUET_1_0,
2266                enabled_statistics: EnabledStatistics::Page,
2267                encoding: Encoding::PLAIN,
2268                row_selections: None,
2269                row_filter: None,
2270                limit: None,
2271                offset: None,
2272            }
2273        }
2274    }
2275
2276    impl TestOptions {
2277        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2278            Self {
2279                num_row_groups,
2280                num_rows,
2281                record_batch_size,
2282                ..Default::default()
2283            }
2284        }
2285
2286        fn with_null_percent(self, null_percent: usize) -> Self {
2287            Self {
2288                null_percent: Some(null_percent),
2289                ..self
2290            }
2291        }
2292
2293        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2294            Self {
2295                max_data_page_size,
2296                ..self
2297            }
2298        }
2299
2300        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2301            Self {
2302                max_dict_page_size,
2303                ..self
2304            }
2305        }
2306
2307        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2308            Self {
2309                enabled_statistics,
2310                ..self
2311            }
2312        }
2313
2314        fn with_row_selections(self) -> Self {
2315            assert!(self.row_filter.is_none(), "Must set row selection first");
2316
2317            let mut rng = rng();
2318            let step = rng.random_range(self.record_batch_size..self.num_rows);
2319            let row_selections = create_test_selection(
2320                step,
2321                self.num_row_groups * self.num_rows,
2322                rng.random::<bool>(),
2323            );
2324            Self {
2325                row_selections: Some(row_selections),
2326                ..self
2327            }
2328        }
2329
2330        fn with_row_filter(self) -> Self {
2331            let row_count = match &self.row_selections {
2332                Some((_, count)) => *count,
2333                None => self.num_row_groups * self.num_rows,
2334            };
2335
2336            let mut rng = rng();
2337            Self {
2338                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2339                ..self
2340            }
2341        }
2342
2343        fn with_limit(self, limit: usize) -> Self {
2344            Self {
2345                limit: Some(limit),
2346                ..self
2347            }
2348        }
2349
2350        fn with_offset(self, offset: usize) -> Self {
2351            Self {
2352                offset: Some(offset),
2353                ..self
2354            }
2355        }
2356
2357        fn writer_props(&self) -> WriterProperties {
2358            let builder = WriterProperties::builder()
2359                .set_data_page_size_limit(self.max_data_page_size)
2360                .set_write_batch_size(self.write_batch_size)
2361                .set_writer_version(self.writer_version)
2362                .set_statistics_enabled(self.enabled_statistics);
2363
2364            let builder = match self.encoding {
2365                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2366                    .set_dictionary_enabled(true)
2367                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2368                _ => builder
2369                    .set_dictionary_enabled(false)
2370                    .set_encoding(self.encoding),
2371            };
2372
2373            builder.build()
2374        }
2375    }
2376
2377    fn run_single_column_reader_tests<T, F, G>(
2384        rand_max: i32,
2385        converted_type: ConvertedType,
2386        arrow_type: Option<ArrowDataType>,
2387        converter: F,
2388        encodings: &[Encoding],
2389    ) where
2390        T: DataType,
2391        G: RandGen<T>,
2392        F: Fn(&[Option<T::T>]) -> ArrayRef,
2393    {
2394        let all_options = vec![
2395            TestOptions::new(2, 100, 15),
2398            TestOptions::new(3, 25, 5),
2403            TestOptions::new(4, 100, 25),
2407            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2409            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2411            TestOptions::new(2, 256, 127).with_null_percent(0),
2413            TestOptions::new(2, 256, 93).with_null_percent(25),
2415            TestOptions::new(4, 100, 25).with_limit(0),
2417            TestOptions::new(4, 100, 25).with_limit(50),
2419            TestOptions::new(4, 100, 25).with_limit(10),
2421            TestOptions::new(4, 100, 25).with_limit(101),
2423            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2425            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2427            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2429            TestOptions::new(2, 256, 91)
2431                .with_null_percent(25)
2432                .with_enabled_statistics(EnabledStatistics::Chunk),
2433            TestOptions::new(2, 256, 91)
2435                .with_null_percent(25)
2436                .with_enabled_statistics(EnabledStatistics::None),
2437            TestOptions::new(2, 128, 91)
2439                .with_null_percent(100)
2440                .with_enabled_statistics(EnabledStatistics::None),
2441            TestOptions::new(2, 100, 15).with_row_selections(),
2446            TestOptions::new(3, 25, 5).with_row_selections(),
2451            TestOptions::new(4, 100, 25).with_row_selections(),
2455            TestOptions::new(3, 256, 73)
2457                .with_max_data_page_size(128)
2458                .with_row_selections(),
2459            TestOptions::new(3, 256, 57)
2461                .with_max_dict_page_size(128)
2462                .with_row_selections(),
2463            TestOptions::new(2, 256, 127)
2465                .with_null_percent(0)
2466                .with_row_selections(),
2467            TestOptions::new(2, 256, 93)
2469                .with_null_percent(25)
2470                .with_row_selections(),
2471            TestOptions::new(2, 256, 93)
2473                .with_null_percent(25)
2474                .with_row_selections()
2475                .with_limit(10),
2476            TestOptions::new(2, 256, 93)
2478                .with_null_percent(25)
2479                .with_row_selections()
2480                .with_offset(20)
2481                .with_limit(10),
2482            TestOptions::new(4, 100, 25).with_row_filter(),
2486            TestOptions::new(4, 100, 25)
2488                .with_row_selections()
2489                .with_row_filter(),
2490            TestOptions::new(2, 256, 93)
2492                .with_null_percent(25)
2493                .with_max_data_page_size(10)
2494                .with_row_filter(),
2495            TestOptions::new(2, 256, 93)
2497                .with_null_percent(25)
2498                .with_max_data_page_size(10)
2499                .with_row_selections()
2500                .with_row_filter(),
2501            TestOptions::new(2, 256, 93)
2503                .with_enabled_statistics(EnabledStatistics::None)
2504                .with_max_data_page_size(10)
2505                .with_row_selections(),
2506        ];
2507
2508        all_options.into_iter().for_each(|opts| {
2509            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2510                for encoding in encodings {
2511                    let opts = TestOptions {
2512                        writer_version,
2513                        encoding: *encoding,
2514                        ..opts.clone()
2515                    };
2516
2517                    single_column_reader_test::<T, _, G>(
2518                        opts,
2519                        rand_max,
2520                        converted_type,
2521                        arrow_type.clone(),
2522                        &converter,
2523                    )
2524                }
2525            }
2526        });
2527    }
2528
2529    fn single_column_reader_test<T, F, G>(
2533        opts: TestOptions,
2534        rand_max: i32,
2535        converted_type: ConvertedType,
2536        arrow_type: Option<ArrowDataType>,
2537        converter: F,
2538    ) where
2539        T: DataType,
2540        G: RandGen<T>,
2541        F: Fn(&[Option<T::T>]) -> ArrayRef,
2542    {
2543        println!(
2545            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2546            T::get_physical_type(), converted_type, arrow_type, opts
2547        );
2548
2549        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2551            Some(null_percent) => {
2552                let mut rng = rng();
2553
2554                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2555                    .map(|_| {
2556                        std::iter::from_fn(|| {
2557                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2558                        })
2559                        .take(opts.num_rows)
2560                        .collect()
2561                    })
2562                    .collect();
2563                (Repetition::OPTIONAL, Some(def_levels))
2564            }
2565            None => (Repetition::REQUIRED, None),
2566        };
2567
2568        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2570            .map(|idx| {
2571                let null_count = match def_levels.as_ref() {
2572                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2573                    None => 0,
2574                };
2575                G::gen_vec(rand_max, opts.num_rows - null_count)
2576            })
2577            .collect();
2578
2579        let len = match T::get_physical_type() {
2580            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2581            crate::basic::Type::INT96 => 12,
2582            _ => -1,
2583        };
2584
2585        let fields = vec![Arc::new(
2586            Type::primitive_type_builder("leaf", T::get_physical_type())
2587                .with_repetition(repetition)
2588                .with_converted_type(converted_type)
2589                .with_length(len)
2590                .build()
2591                .unwrap(),
2592        )];
2593
2594        let schema = Arc::new(
2595            Type::group_type_builder("test_schema")
2596                .with_fields(fields)
2597                .build()
2598                .unwrap(),
2599        );
2600
2601        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2602
2603        let mut file = tempfile::tempfile().unwrap();
2604
2605        generate_single_column_file_with_data::<T>(
2606            &values,
2607            def_levels.as_ref(),
2608            file.try_clone().unwrap(), schema,
2610            arrow_field,
2611            &opts,
2612        )
2613        .unwrap();
2614
2615        file.rewind().unwrap();
2616
2617        let options = ArrowReaderOptions::new()
2618            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2619
2620        let mut builder =
2621            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2622
2623        let expected_data = match opts.row_selections {
2624            Some((selections, row_count)) => {
2625                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2626
2627                let mut skip_data: Vec<Option<T::T>> = vec![];
2628                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2629                for select in dequeue {
2630                    if select.skip {
2631                        without_skip_data.drain(0..select.row_count);
2632                    } else {
2633                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2634                    }
2635                }
2636                builder = builder.with_row_selection(selections);
2637
2638                assert_eq!(skip_data.len(), row_count);
2639                skip_data
2640            }
2641            None => {
2642                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2644                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2645                expected_data
2646            }
2647        };
2648
2649        let mut expected_data = match opts.row_filter {
2650            Some(filter) => {
2651                let expected_data = expected_data
2652                    .into_iter()
2653                    .zip(filter.iter())
2654                    .filter_map(|(d, f)| f.then(|| d))
2655                    .collect();
2656
2657                let mut filter_offset = 0;
2658                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2659                    ProjectionMask::all(),
2660                    move |b| {
2661                        let array = BooleanArray::from_iter(
2662                            filter
2663                                .iter()
2664                                .skip(filter_offset)
2665                                .take(b.num_rows())
2666                                .map(|x| Some(*x)),
2667                        );
2668                        filter_offset += b.num_rows();
2669                        Ok(array)
2670                    },
2671                ))]);
2672
2673                builder = builder.with_row_filter(filter);
2674                expected_data
2675            }
2676            None => expected_data,
2677        };
2678
2679        if let Some(offset) = opts.offset {
2680            builder = builder.with_offset(offset);
2681            expected_data = expected_data.into_iter().skip(offset).collect();
2682        }
2683
2684        if let Some(limit) = opts.limit {
2685            builder = builder.with_limit(limit);
2686            expected_data = expected_data.into_iter().take(limit).collect();
2687        }
2688
2689        let mut record_reader = builder
2690            .with_batch_size(opts.record_batch_size)
2691            .build()
2692            .unwrap();
2693
2694        let mut total_read = 0;
2695        loop {
2696            let maybe_batch = record_reader.next();
2697            if total_read < expected_data.len() {
2698                let end = min(total_read + opts.record_batch_size, expected_data.len());
2699                let batch = maybe_batch.unwrap().unwrap();
2700                assert_eq!(end - total_read, batch.num_rows());
2701
2702                let a = converter(&expected_data[total_read..end]);
2703                let b = Arc::clone(batch.column(0));
2704
2705                assert_eq!(a.data_type(), b.data_type());
2706                assert_eq!(a.to_data(), b.to_data());
2707                assert_eq!(
2708                    a.as_any().type_id(),
2709                    b.as_any().type_id(),
2710                    "incorrect type ids"
2711                );
2712
2713                total_read = end;
2714            } else {
2715                assert!(maybe_batch.is_none());
2716                break;
2717            }
2718        }
2719    }
2720
2721    fn gen_expected_data<T: DataType>(
2722        def_levels: Option<&Vec<Vec<i16>>>,
2723        values: &[Vec<T::T>],
2724    ) -> Vec<Option<T::T>> {
2725        let data: Vec<Option<T::T>> = match def_levels {
2726            Some(levels) => {
2727                let mut values_iter = values.iter().flatten();
2728                levels
2729                    .iter()
2730                    .flatten()
2731                    .map(|d| match d {
2732                        1 => Some(values_iter.next().cloned().unwrap()),
2733                        0 => None,
2734                        _ => unreachable!(),
2735                    })
2736                    .collect()
2737            }
2738            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2739        };
2740        data
2741    }
2742
2743    fn generate_single_column_file_with_data<T: DataType>(
2744        values: &[Vec<T::T>],
2745        def_levels: Option<&Vec<Vec<i16>>>,
2746        file: File,
2747        schema: TypePtr,
2748        field: Option<Field>,
2749        opts: &TestOptions,
2750    ) -> Result<crate::format::FileMetaData> {
2751        let mut writer_props = opts.writer_props();
2752        if let Some(field) = field {
2753            let arrow_schema = Schema::new(vec![field]);
2754            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2755        }
2756
2757        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2758
2759        for (idx, v) in values.iter().enumerate() {
2760            let def_levels = def_levels.map(|d| d[idx].as_slice());
2761            let mut row_group_writer = writer.next_row_group()?;
2762            {
2763                let mut column_writer = row_group_writer
2764                    .next_column()?
2765                    .expect("Column writer is none!");
2766
2767                column_writer
2768                    .typed::<T>()
2769                    .write_batch(v, def_levels, None)?;
2770
2771                column_writer.close()?;
2772            }
2773            row_group_writer.close()?;
2774        }
2775
2776        writer.close()
2777    }
2778
2779    fn get_test_file(file_name: &str) -> File {
2780        let mut path = PathBuf::new();
2781        path.push(arrow::util::test_util::arrow_test_data());
2782        path.push(file_name);
2783
2784        File::open(path.as_path()).expect("File not found!")
2785    }
2786
2787    #[test]
2788    fn test_read_structs() {
2789        let testdata = arrow::util::test_util::parquet_test_data();
2793        let path = format!("{testdata}/nested_structs.rust.parquet");
2794        let file = File::open(&path).unwrap();
2795        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2796
2797        for batch in record_batch_reader {
2798            batch.unwrap();
2799        }
2800
2801        let file = File::open(&path).unwrap();
2802        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2803
2804        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2805        let projected_reader = builder
2806            .with_projection(mask)
2807            .with_batch_size(60)
2808            .build()
2809            .unwrap();
2810
2811        let expected_schema = Schema::new(vec![
2812            Field::new(
2813                "roll_num",
2814                ArrowDataType::Struct(Fields::from(vec![Field::new(
2815                    "count",
2816                    ArrowDataType::UInt64,
2817                    false,
2818                )])),
2819                false,
2820            ),
2821            Field::new(
2822                "PC_CUR",
2823                ArrowDataType::Struct(Fields::from(vec![
2824                    Field::new("mean", ArrowDataType::Int64, false),
2825                    Field::new("sum", ArrowDataType::Int64, false),
2826                ])),
2827                false,
2828            ),
2829        ]);
2830
2831        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2833
2834        for batch in projected_reader {
2835            let batch = batch.unwrap();
2836            assert_eq!(batch.schema().as_ref(), &expected_schema);
2837        }
2838    }
2839
2840    #[test]
2841    fn test_read_structs_by_name() {
2843        let testdata = arrow::util::test_util::parquet_test_data();
2844        let path = format!("{testdata}/nested_structs.rust.parquet");
2845        let file = File::open(&path).unwrap();
2846        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2847
2848        for batch in record_batch_reader {
2849            batch.unwrap();
2850        }
2851
2852        let file = File::open(&path).unwrap();
2853        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2854
2855        let mask = ProjectionMask::columns(
2856            builder.parquet_schema(),
2857            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2858        );
2859        let projected_reader = builder
2860            .with_projection(mask)
2861            .with_batch_size(60)
2862            .build()
2863            .unwrap();
2864
2865        let expected_schema = Schema::new(vec![
2866            Field::new(
2867                "roll_num",
2868                ArrowDataType::Struct(Fields::from(vec![Field::new(
2869                    "count",
2870                    ArrowDataType::UInt64,
2871                    false,
2872                )])),
2873                false,
2874            ),
2875            Field::new(
2876                "PC_CUR",
2877                ArrowDataType::Struct(Fields::from(vec![
2878                    Field::new("mean", ArrowDataType::Int64, false),
2879                    Field::new("sum", ArrowDataType::Int64, false),
2880                ])),
2881                false,
2882            ),
2883        ]);
2884
2885        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2886
2887        for batch in projected_reader {
2888            let batch = batch.unwrap();
2889            assert_eq!(batch.schema().as_ref(), &expected_schema);
2890        }
2891    }
2892
2893    #[test]
2894    fn test_read_maps() {
2895        let testdata = arrow::util::test_util::parquet_test_data();
2896        let path = format!("{testdata}/nested_maps.snappy.parquet");
2897        let file = File::open(path).unwrap();
2898        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2899
2900        for batch in record_batch_reader {
2901            batch.unwrap();
2902        }
2903    }
2904
2905    #[test]
2906    fn test_nested_nullability() {
2907        let message_type = "message nested {
2908          OPTIONAL Group group {
2909            REQUIRED INT32 leaf;
2910          }
2911        }";
2912
2913        let file = tempfile::tempfile().unwrap();
2914        let schema = Arc::new(parse_message_type(message_type).unwrap());
2915
2916        {
2917            let mut writer =
2919                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2920                    .unwrap();
2921
2922            {
2923                let mut row_group_writer = writer.next_row_group().unwrap();
2924                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2925
2926                column_writer
2927                    .typed::<Int32Type>()
2928                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2929                    .unwrap();
2930
2931                column_writer.close().unwrap();
2932                row_group_writer.close().unwrap();
2933            }
2934
2935            writer.close().unwrap();
2936        }
2937
2938        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2939        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2940
2941        let reader = builder.with_projection(mask).build().unwrap();
2942
2943        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2944            "group",
2945            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2946            true,
2947        )]));
2948
2949        let batch = reader.into_iter().next().unwrap().unwrap();
2950        assert_eq!(batch.schema().as_ref(), &expected_schema);
2951        assert_eq!(batch.num_rows(), 4);
2952        assert_eq!(batch.column(0).null_count(), 2);
2953    }
2954
2955    #[test]
2956    fn test_invalid_utf8() {
2957        let data = vec![
2959            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2960            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2961            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2962            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2963            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2964            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2965            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2966            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2967            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2968            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2969            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2970            82, 49,
2971        ];
2972
2973        let file = Bytes::from(data);
2974        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2975
2976        let error = record_batch_reader.next().unwrap().unwrap_err();
2977
2978        assert!(
2979            error.to_string().contains("invalid utf-8 sequence"),
2980            "{}",
2981            error
2982        );
2983    }
2984
2985    #[test]
2986    fn test_invalid_utf8_string_array() {
2987        test_invalid_utf8_string_array_inner::<i32>();
2988    }
2989
2990    #[test]
2991    fn test_invalid_utf8_large_string_array() {
2992        test_invalid_utf8_string_array_inner::<i64>();
2993    }
2994
2995    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2996        let cases = [
2997            invalid_utf8_first_char::<O>(),
2998            invalid_utf8_first_char_long_strings::<O>(),
2999            invalid_utf8_later_char::<O>(),
3000            invalid_utf8_later_char_long_strings::<O>(),
3001            invalid_utf8_later_char_really_long_strings::<O>(),
3002            invalid_utf8_later_char_really_long_strings2::<O>(),
3003        ];
3004        for array in &cases {
3005            for encoding in STRING_ENCODINGS {
3006                let array = unsafe {
3009                    GenericStringArray::<O>::new_unchecked(
3010                        array.offsets().clone(),
3011                        array.values().clone(),
3012                        array.nulls().cloned(),
3013                    )
3014                };
3015                let data_type = array.data_type().clone();
3016                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3017                let err = read_from_parquet(data).unwrap_err();
3018                let expected_err =
3019                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3020                assert!(
3021                    err.to_string().contains(expected_err),
3022                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3023                );
3024            }
3025        }
3026    }
3027
3028    #[test]
3029    fn test_invalid_utf8_string_view_array() {
3030        let cases = [
3031            invalid_utf8_first_char::<i32>(),
3032            invalid_utf8_first_char_long_strings::<i32>(),
3033            invalid_utf8_later_char::<i32>(),
3034            invalid_utf8_later_char_long_strings::<i32>(),
3035            invalid_utf8_later_char_really_long_strings::<i32>(),
3036            invalid_utf8_later_char_really_long_strings2::<i32>(),
3037        ];
3038
3039        for encoding in STRING_ENCODINGS {
3040            for array in &cases {
3041                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3042                let array = array.as_binary_view();
3043
3044                let array = unsafe {
3047                    StringViewArray::new_unchecked(
3048                        array.views().clone(),
3049                        array.data_buffers().to_vec(),
3050                        array.nulls().cloned(),
3051                    )
3052                };
3053
3054                let data_type = array.data_type().clone();
3055                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3056                let err = read_from_parquet(data).unwrap_err();
3057                let expected_err =
3058                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3059                assert!(
3060                    err.to_string().contains(expected_err),
3061                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3062                );
3063            }
3064        }
3065    }
3066
3067    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3069        None,
3070        Some(Encoding::PLAIN),
3071        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3072        Some(Encoding::DELTA_BYTE_ARRAY),
3073    ];
3074
3075    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3078
3079    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3082
3083    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3085        let valid: &[u8] = b"   ";
3086        let invalid = INVALID_UTF8_FIRST_CHAR;
3087        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3088    }
3089
3090    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3094        let valid: &[u8] = b"   ";
3095        let mut invalid = vec![];
3096        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3097        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3098        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3099    }
3100
3101    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3104        let valid: &[u8] = b"   ";
3105        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3106        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3107    }
3108
3109    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3113        let valid: &[u8] = b"   ";
3114        let mut invalid = vec![];
3115        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3116        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3117        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3118    }
3119
3120    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3124        let valid: &[u8] = b"   ";
3125        let mut invalid = vec![];
3126        for _ in 0..10 {
3127            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3129        }
3130        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3131        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3132    }
3133
3134    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3137        let valid: &[u8] = b"   ";
3138        let mut valid_long = vec![];
3139        for _ in 0..10 {
3140            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3142        }
3143        let invalid = INVALID_UTF8_LATER_CHAR;
3144        GenericBinaryArray::<O>::from_iter(vec![
3145            None,
3146            Some(valid),
3147            Some(invalid),
3148            None,
3149            Some(&valid_long),
3150            Some(valid),
3151        ])
3152    }
3153
3154    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3159        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3160        let mut data = vec![];
3161        let schema = batch.schema();
3162        let props = encoding.map(|encoding| {
3163            WriterProperties::builder()
3164                .set_dictionary_enabled(false)
3166                .set_encoding(encoding)
3167                .build()
3168        });
3169
3170        {
3171            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3172            writer.write(&batch).unwrap();
3173            writer.flush().unwrap();
3174            writer.close().unwrap();
3175        };
3176        data
3177    }
3178
3179    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3181        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3182            .unwrap()
3183            .build()
3184            .unwrap();
3185
3186        reader.collect()
3187    }
3188
3189    #[test]
3190    fn test_dictionary_preservation() {
3191        let fields = vec![Arc::new(
3192            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3193                .with_repetition(Repetition::OPTIONAL)
3194                .with_converted_type(ConvertedType::UTF8)
3195                .build()
3196                .unwrap(),
3197        )];
3198
3199        let schema = Arc::new(
3200            Type::group_type_builder("test_schema")
3201                .with_fields(fields)
3202                .build()
3203                .unwrap(),
3204        );
3205
3206        let dict_type = ArrowDataType::Dictionary(
3207            Box::new(ArrowDataType::Int32),
3208            Box::new(ArrowDataType::Utf8),
3209        );
3210
3211        let arrow_field = Field::new("leaf", dict_type, true);
3212
3213        let mut file = tempfile::tempfile().unwrap();
3214
3215        let values = vec![
3216            vec![
3217                ByteArray::from("hello"),
3218                ByteArray::from("a"),
3219                ByteArray::from("b"),
3220                ByteArray::from("d"),
3221            ],
3222            vec![
3223                ByteArray::from("c"),
3224                ByteArray::from("a"),
3225                ByteArray::from("b"),
3226            ],
3227        ];
3228
3229        let def_levels = vec![
3230            vec![1, 0, 0, 1, 0, 0, 1, 1],
3231            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3232        ];
3233
3234        let opts = TestOptions {
3235            encoding: Encoding::RLE_DICTIONARY,
3236            ..Default::default()
3237        };
3238
3239        generate_single_column_file_with_data::<ByteArrayType>(
3240            &values,
3241            Some(&def_levels),
3242            file.try_clone().unwrap(), schema,
3244            Some(arrow_field),
3245            &opts,
3246        )
3247        .unwrap();
3248
3249        file.rewind().unwrap();
3250
3251        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3252
3253        let batches = record_reader
3254            .collect::<Result<Vec<RecordBatch>, _>>()
3255            .unwrap();
3256
3257        assert_eq!(batches.len(), 6);
3258        assert!(batches.iter().all(|x| x.num_columns() == 1));
3259
3260        let row_counts = batches
3261            .iter()
3262            .map(|x| (x.num_rows(), x.column(0).null_count()))
3263            .collect::<Vec<_>>();
3264
3265        assert_eq!(
3266            row_counts,
3267            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3268        );
3269
3270        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3271
3272        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3274        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3276        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3277        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3279        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3280    }
3281
3282    #[test]
3283    fn test_read_null_list() {
3284        let testdata = arrow::util::test_util::parquet_test_data();
3285        let path = format!("{testdata}/null_list.parquet");
3286        let file = File::open(path).unwrap();
3287        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3288
3289        let batch = record_batch_reader.next().unwrap().unwrap();
3290        assert_eq!(batch.num_rows(), 1);
3291        assert_eq!(batch.num_columns(), 1);
3292        assert_eq!(batch.column(0).len(), 1);
3293
3294        let list = batch
3295            .column(0)
3296            .as_any()
3297            .downcast_ref::<ListArray>()
3298            .unwrap();
3299        assert_eq!(list.len(), 1);
3300        assert!(list.is_valid(0));
3301
3302        let val = list.value(0);
3303        assert_eq!(val.len(), 0);
3304    }
3305
3306    #[test]
3307    fn test_null_schema_inference() {
3308        let testdata = arrow::util::test_util::parquet_test_data();
3309        let path = format!("{testdata}/null_list.parquet");
3310        let file = File::open(path).unwrap();
3311
3312        let arrow_field = Field::new(
3313            "emptylist",
3314            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3315            true,
3316        );
3317
3318        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3319        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3320        let schema = builder.schema();
3321        assert_eq!(schema.fields().len(), 1);
3322        assert_eq!(schema.field(0), &arrow_field);
3323    }
3324
3325    #[test]
3326    fn test_skip_metadata() {
3327        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3328        let field = Field::new("col", col.data_type().clone(), true);
3329
3330        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3331
3332        let metadata = [("key".to_string(), "value".to_string())]
3333            .into_iter()
3334            .collect();
3335
3336        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3337
3338        assert_ne!(schema_with_metadata, schema_without_metadata);
3339
3340        let batch =
3341            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3342
3343        let file = |version: WriterVersion| {
3344            let props = WriterProperties::builder()
3345                .set_writer_version(version)
3346                .build();
3347
3348            let file = tempfile().unwrap();
3349            let mut writer =
3350                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3351                    .unwrap();
3352            writer.write(&batch).unwrap();
3353            writer.close().unwrap();
3354            file
3355        };
3356
3357        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3358
3359        let v1_reader = file(WriterVersion::PARQUET_1_0);
3360        let v2_reader = file(WriterVersion::PARQUET_2_0);
3361
3362        let arrow_reader =
3363            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3364        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3365
3366        let reader =
3367            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3368                .unwrap()
3369                .build()
3370                .unwrap();
3371        assert_eq!(reader.schema(), schema_without_metadata);
3372
3373        let arrow_reader =
3374            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3375        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3376
3377        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3378            .unwrap()
3379            .build()
3380            .unwrap();
3381        assert_eq!(reader.schema(), schema_without_metadata);
3382    }
3383
3384    fn write_parquet_from_iter<I, F>(value: I) -> File
3385    where
3386        I: IntoIterator<Item = (F, ArrayRef)>,
3387        F: AsRef<str>,
3388    {
3389        let batch = RecordBatch::try_from_iter(value).unwrap();
3390        let file = tempfile().unwrap();
3391        let mut writer =
3392            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3393        writer.write(&batch).unwrap();
3394        writer.close().unwrap();
3395        file
3396    }
3397
3398    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3399    where
3400        I: IntoIterator<Item = (F, ArrayRef)>,
3401        F: AsRef<str>,
3402    {
3403        let file = write_parquet_from_iter(value);
3404        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3405        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3406            file.try_clone().unwrap(),
3407            options_with_schema,
3408        );
3409        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3410    }
3411
3412    #[test]
3413    fn test_schema_too_few_columns() {
3414        run_schema_test_with_error(
3415            vec![
3416                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3417                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3418            ],
3419            Arc::new(Schema::new(vec![Field::new(
3420                "int64",
3421                ArrowDataType::Int64,
3422                false,
3423            )])),
3424            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3425        );
3426    }
3427
3428    #[test]
3429    fn test_schema_too_many_columns() {
3430        run_schema_test_with_error(
3431            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3432            Arc::new(Schema::new(vec![
3433                Field::new("int64", ArrowDataType::Int64, false),
3434                Field::new("int32", ArrowDataType::Int32, false),
3435            ])),
3436            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3437        );
3438    }
3439
3440    #[test]
3441    fn test_schema_mismatched_column_names() {
3442        run_schema_test_with_error(
3443            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3444            Arc::new(Schema::new(vec![Field::new(
3445                "other",
3446                ArrowDataType::Int64,
3447                false,
3448            )])),
3449            "Arrow: incompatible arrow schema, expected field named int64 got other",
3450        );
3451    }
3452
3453    #[test]
3454    fn test_schema_incompatible_columns() {
3455        run_schema_test_with_error(
3456            vec![
3457                (
3458                    "col1_invalid",
3459                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3460                ),
3461                (
3462                    "col2_valid",
3463                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3464                ),
3465                (
3466                    "col3_invalid",
3467                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3468                ),
3469            ],
3470            Arc::new(Schema::new(vec![
3471                Field::new("col1_invalid", ArrowDataType::Int32, false),
3472                Field::new("col2_valid", ArrowDataType::Int32, false),
3473                Field::new("col3_invalid", ArrowDataType::Int32, false),
3474            ])),
3475            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3476        );
3477    }
3478
3479    #[test]
3480    fn test_one_incompatible_nested_column() {
3481        let nested_fields = Fields::from(vec![
3482            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3483            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3484        ]);
3485        let nested = StructArray::try_new(
3486            nested_fields,
3487            vec![
3488                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3489                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3490            ],
3491            None,
3492        )
3493        .expect("struct array");
3494        let supplied_nested_fields = Fields::from(vec![
3495            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3496            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3497        ]);
3498        run_schema_test_with_error(
3499            vec![
3500                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3501                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3502                ("nested", Arc::new(nested) as ArrayRef),
3503            ],
3504            Arc::new(Schema::new(vec![
3505                Field::new("col1", ArrowDataType::Int64, false),
3506                Field::new("col2", ArrowDataType::Int32, false),
3507                Field::new(
3508                    "nested",
3509                    ArrowDataType::Struct(supplied_nested_fields),
3510                    false,
3511                ),
3512            ])),
3513            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3514            requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3515            but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3516        );
3517    }
3518
3519    fn utf8_parquet() -> Bytes {
3521        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3522        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3523        let props = None;
3524        let mut parquet_data = vec![];
3526        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3527        writer.write(&batch).unwrap();
3528        writer.close().unwrap();
3529        Bytes::from(parquet_data)
3530    }
3531
3532    #[test]
3533    fn test_schema_error_bad_types() {
3534        let parquet_data = utf8_parquet();
3536
3537        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3539            "column1",
3540            arrow::datatypes::DataType::Int32,
3541            false,
3542        )]));
3543
3544        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3546        let err =
3547            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3548                .unwrap_err();
3549        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3550    }
3551
3552    #[test]
3553    fn test_schema_error_bad_nullability() {
3554        let parquet_data = utf8_parquet();
3556
3557        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3559            "column1",
3560            arrow::datatypes::DataType::Utf8,
3561            true,
3562        )]));
3563
3564        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3566        let err =
3567            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3568                .unwrap_err();
3569        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3570    }
3571
3572    #[test]
3573    fn test_read_binary_as_utf8() {
3574        let file = write_parquet_from_iter(vec![
3575            (
3576                "binary_to_utf8",
3577                Arc::new(BinaryArray::from(vec![
3578                    b"one".as_ref(),
3579                    b"two".as_ref(),
3580                    b"three".as_ref(),
3581                ])) as ArrayRef,
3582            ),
3583            (
3584                "large_binary_to_large_utf8",
3585                Arc::new(LargeBinaryArray::from(vec![
3586                    b"one".as_ref(),
3587                    b"two".as_ref(),
3588                    b"three".as_ref(),
3589                ])) as ArrayRef,
3590            ),
3591            (
3592                "binary_view_to_utf8_view",
3593                Arc::new(BinaryViewArray::from(vec![
3594                    b"one".as_ref(),
3595                    b"two".as_ref(),
3596                    b"three".as_ref(),
3597                ])) as ArrayRef,
3598            ),
3599        ]);
3600        let supplied_fields = Fields::from(vec![
3601            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3602            Field::new(
3603                "large_binary_to_large_utf8",
3604                ArrowDataType::LargeUtf8,
3605                false,
3606            ),
3607            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3608        ]);
3609
3610        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3611        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3612            file.try_clone().unwrap(),
3613            options,
3614        )
3615        .expect("reader builder with schema")
3616        .build()
3617        .expect("reader with schema");
3618
3619        let batch = arrow_reader.next().unwrap().unwrap();
3620        assert_eq!(batch.num_columns(), 3);
3621        assert_eq!(batch.num_rows(), 3);
3622        assert_eq!(
3623            batch
3624                .column(0)
3625                .as_string::<i32>()
3626                .iter()
3627                .collect::<Vec<_>>(),
3628            vec![Some("one"), Some("two"), Some("three")]
3629        );
3630
3631        assert_eq!(
3632            batch
3633                .column(1)
3634                .as_string::<i64>()
3635                .iter()
3636                .collect::<Vec<_>>(),
3637            vec![Some("one"), Some("two"), Some("three")]
3638        );
3639
3640        assert_eq!(
3641            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3642            vec![Some("one"), Some("two"), Some("three")]
3643        );
3644    }
3645
3646    #[test]
3647    #[should_panic(expected = "Invalid UTF8 sequence at")]
3648    fn test_read_non_utf8_binary_as_utf8() {
3649        let file = write_parquet_from_iter(vec![(
3650            "non_utf8_binary",
3651            Arc::new(BinaryArray::from(vec![
3652                b"\xDE\x00\xFF".as_ref(),
3653                b"\xDE\x01\xAA".as_ref(),
3654                b"\xDE\x02\xFF".as_ref(),
3655            ])) as ArrayRef,
3656        )]);
3657        let supplied_fields = Fields::from(vec![Field::new(
3658            "non_utf8_binary",
3659            ArrowDataType::Utf8,
3660            false,
3661        )]);
3662
3663        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3664        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3665            file.try_clone().unwrap(),
3666            options,
3667        )
3668        .expect("reader builder with schema")
3669        .build()
3670        .expect("reader with schema");
3671        arrow_reader.next().unwrap().unwrap_err();
3672    }
3673
3674    #[test]
3675    fn test_with_schema() {
3676        let nested_fields = Fields::from(vec![
3677            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3678            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3679        ]);
3680
3681        let nested_arrays: Vec<ArrayRef> = vec![
3682            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3683            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3684        ];
3685
3686        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3687
3688        let file = write_parquet_from_iter(vec![
3689            (
3690                "int32_to_ts_second",
3691                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3692            ),
3693            (
3694                "date32_to_date64",
3695                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3696            ),
3697            ("nested", Arc::new(nested) as ArrayRef),
3698        ]);
3699
3700        let supplied_nested_fields = Fields::from(vec![
3701            Field::new(
3702                "utf8_to_dict",
3703                ArrowDataType::Dictionary(
3704                    Box::new(ArrowDataType::Int32),
3705                    Box::new(ArrowDataType::Utf8),
3706                ),
3707                false,
3708            ),
3709            Field::new(
3710                "int64_to_ts_nano",
3711                ArrowDataType::Timestamp(
3712                    arrow::datatypes::TimeUnit::Nanosecond,
3713                    Some("+10:00".into()),
3714                ),
3715                false,
3716            ),
3717        ]);
3718
3719        let supplied_schema = Arc::new(Schema::new(vec![
3720            Field::new(
3721                "int32_to_ts_second",
3722                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3723                false,
3724            ),
3725            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3726            Field::new(
3727                "nested",
3728                ArrowDataType::Struct(supplied_nested_fields),
3729                false,
3730            ),
3731        ]));
3732
3733        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3734        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3735            file.try_clone().unwrap(),
3736            options,
3737        )
3738        .expect("reader builder with schema")
3739        .build()
3740        .expect("reader with schema");
3741
3742        assert_eq!(arrow_reader.schema(), supplied_schema);
3743        let batch = arrow_reader.next().unwrap().unwrap();
3744        assert_eq!(batch.num_columns(), 3);
3745        assert_eq!(batch.num_rows(), 4);
3746        assert_eq!(
3747            batch
3748                .column(0)
3749                .as_any()
3750                .downcast_ref::<TimestampSecondArray>()
3751                .expect("downcast to timestamp second")
3752                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3753                .map(|v| v.to_string())
3754                .expect("value as datetime"),
3755            "1970-01-01 01:00:00 +01:00"
3756        );
3757        assert_eq!(
3758            batch
3759                .column(1)
3760                .as_any()
3761                .downcast_ref::<Date64Array>()
3762                .expect("downcast to date64")
3763                .value_as_date(0)
3764                .map(|v| v.to_string())
3765                .expect("value as date"),
3766            "1970-01-01"
3767        );
3768
3769        let nested = batch
3770            .column(2)
3771            .as_any()
3772            .downcast_ref::<StructArray>()
3773            .expect("downcast to struct");
3774
3775        let nested_dict = nested
3776            .column(0)
3777            .as_any()
3778            .downcast_ref::<Int32DictionaryArray>()
3779            .expect("downcast to dictionary");
3780
3781        assert_eq!(
3782            nested_dict
3783                .values()
3784                .as_any()
3785                .downcast_ref::<StringArray>()
3786                .expect("downcast to string")
3787                .iter()
3788                .collect::<Vec<_>>(),
3789            vec![Some("a"), Some("b")]
3790        );
3791
3792        assert_eq!(
3793            nested_dict.keys().iter().collect::<Vec<_>>(),
3794            vec![Some(0), Some(0), Some(0), Some(1)]
3795        );
3796
3797        assert_eq!(
3798            nested
3799                .column(1)
3800                .as_any()
3801                .downcast_ref::<TimestampNanosecondArray>()
3802                .expect("downcast to timestamp nanosecond")
3803                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3804                .map(|v| v.to_string())
3805                .expect("value as datetime"),
3806            "1970-01-01 10:00:00.000000001 +10:00"
3807        );
3808    }
3809
3810    #[test]
3811    fn test_empty_projection() {
3812        let testdata = arrow::util::test_util::parquet_test_data();
3813        let path = format!("{testdata}/alltypes_plain.parquet");
3814        let file = File::open(path).unwrap();
3815
3816        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3817        let file_metadata = builder.metadata().file_metadata();
3818        let expected_rows = file_metadata.num_rows() as usize;
3819
3820        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3821        let batch_reader = builder
3822            .with_projection(mask)
3823            .with_batch_size(2)
3824            .build()
3825            .unwrap();
3826
3827        let mut total_rows = 0;
3828        for maybe_batch in batch_reader {
3829            let batch = maybe_batch.unwrap();
3830            total_rows += batch.num_rows();
3831            assert_eq!(batch.num_columns(), 0);
3832            assert!(batch.num_rows() <= 2);
3833        }
3834
3835        assert_eq!(total_rows, expected_rows);
3836    }
3837
3838    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3839        let schema = Arc::new(Schema::new(vec![Field::new(
3840            "list",
3841            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3842            true,
3843        )]));
3844
3845        let mut buf = Vec::with_capacity(1024);
3846
3847        let mut writer = ArrowWriter::try_new(
3848            &mut buf,
3849            schema.clone(),
3850            Some(
3851                WriterProperties::builder()
3852                    .set_max_row_group_size(row_group_size)
3853                    .build(),
3854            ),
3855        )
3856        .unwrap();
3857        for _ in 0..2 {
3858            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3859            for _ in 0..(batch_size) {
3860                list_builder.append(true);
3861            }
3862            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3863                .unwrap();
3864            writer.write(&batch).unwrap();
3865        }
3866        writer.close().unwrap();
3867
3868        let mut record_reader =
3869            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3870        assert_eq!(
3871            batch_size,
3872            record_reader.next().unwrap().unwrap().num_rows()
3873        );
3874        assert_eq!(
3875            batch_size,
3876            record_reader.next().unwrap().unwrap().num_rows()
3877        );
3878    }
3879
3880    #[test]
3881    fn test_row_group_exact_multiple() {
3882        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3883        test_row_group_batch(8, 8);
3884        test_row_group_batch(10, 8);
3885        test_row_group_batch(8, 10);
3886        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3887        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3888        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3889        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3890        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3891    }
3892
3893    fn get_expected_batches(
3896        column: &RecordBatch,
3897        selection: &RowSelection,
3898        batch_size: usize,
3899    ) -> Vec<RecordBatch> {
3900        let mut expected_batches = vec![];
3901
3902        let mut selection: VecDeque<_> = selection.clone().into();
3903        let mut row_offset = 0;
3904        let mut last_start = None;
3905        while row_offset < column.num_rows() && !selection.is_empty() {
3906            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3907            while batch_remaining > 0 && !selection.is_empty() {
3908                let (to_read, skip) = match selection.front_mut() {
3909                    Some(selection) if selection.row_count > batch_remaining => {
3910                        selection.row_count -= batch_remaining;
3911                        (batch_remaining, selection.skip)
3912                    }
3913                    Some(_) => {
3914                        let select = selection.pop_front().unwrap();
3915                        (select.row_count, select.skip)
3916                    }
3917                    None => break,
3918                };
3919
3920                batch_remaining -= to_read;
3921
3922                match skip {
3923                    true => {
3924                        if let Some(last_start) = last_start.take() {
3925                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3926                        }
3927                        row_offset += to_read
3928                    }
3929                    false => {
3930                        last_start.get_or_insert(row_offset);
3931                        row_offset += to_read
3932                    }
3933                }
3934            }
3935        }
3936
3937        if let Some(last_start) = last_start.take() {
3938            expected_batches.push(column.slice(last_start, row_offset - last_start))
3939        }
3940
3941        for batch in &expected_batches[..expected_batches.len() - 1] {
3943            assert_eq!(batch.num_rows(), batch_size);
3944        }
3945
3946        expected_batches
3947    }
3948
3949    fn create_test_selection(
3950        step_len: usize,
3951        total_len: usize,
3952        skip_first: bool,
3953    ) -> (RowSelection, usize) {
3954        let mut remaining = total_len;
3955        let mut skip = skip_first;
3956        let mut vec = vec![];
3957        let mut selected_count = 0;
3958        while remaining != 0 {
3959            let step = if remaining > step_len {
3960                step_len
3961            } else {
3962                remaining
3963            };
3964            vec.push(RowSelector {
3965                row_count: step,
3966                skip,
3967            });
3968            remaining -= step;
3969            if !skip {
3970                selected_count += step;
3971            }
3972            skip = !skip;
3973        }
3974        (vec.into(), selected_count)
3975    }
3976
3977    #[test]
3978    fn test_scan_row_with_selection() {
3979        let testdata = arrow::util::test_util::parquet_test_data();
3980        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3981        let test_file = File::open(&path).unwrap();
3982
3983        let mut serial_reader =
3984            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3985        let data = serial_reader.next().unwrap().unwrap();
3986
3987        let do_test = |batch_size: usize, selection_len: usize| {
3988            for skip_first in [false, true] {
3989                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3990
3991                let expected = get_expected_batches(&data, &selections, batch_size);
3992                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3993                assert_eq!(
3994                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3995                    expected,
3996                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3997                );
3998            }
3999        };
4000
4001        do_test(1000, 1000);
4004
4005        do_test(20, 20);
4007
4008        do_test(20, 5);
4010
4011        do_test(20, 5);
4014
4015        fn create_skip_reader(
4016            test_file: &File,
4017            batch_size: usize,
4018            selections: RowSelection,
4019        ) -> ParquetRecordBatchReader {
4020            let options = ArrowReaderOptions::new().with_page_index(true);
4021            let file = test_file.try_clone().unwrap();
4022            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4023                .unwrap()
4024                .with_batch_size(batch_size)
4025                .with_row_selection(selections)
4026                .build()
4027                .unwrap()
4028        }
4029    }
4030
4031    #[test]
4032    fn test_batch_size_overallocate() {
4033        let testdata = arrow::util::test_util::parquet_test_data();
4034        let path = format!("{testdata}/alltypes_plain.parquet");
4036        let test_file = File::open(path).unwrap();
4037
4038        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4039        let num_rows = builder.metadata.file_metadata().num_rows();
4040        let reader = builder
4041            .with_batch_size(1024)
4042            .with_projection(ProjectionMask::all())
4043            .build()
4044            .unwrap();
4045        assert_ne!(1024, num_rows);
4046        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4047    }
4048
4049    #[test]
4050    fn test_read_with_page_index_enabled() {
4051        let testdata = arrow::util::test_util::parquet_test_data();
4052
4053        {
4054            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4056            let test_file = File::open(path).unwrap();
4057            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4058                test_file,
4059                ArrowReaderOptions::new().with_page_index(true),
4060            )
4061            .unwrap();
4062            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4063            let reader = builder.build().unwrap();
4064            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4065            assert_eq!(batches.len(), 8);
4066        }
4067
4068        {
4069            let path = format!("{testdata}/alltypes_plain.parquet");
4071            let test_file = File::open(path).unwrap();
4072            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4073                test_file,
4074                ArrowReaderOptions::new().with_page_index(true),
4075            )
4076            .unwrap();
4077            assert!(builder.metadata().offset_index().is_none());
4080            let reader = builder.build().unwrap();
4081            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4082            assert_eq!(batches.len(), 1);
4083        }
4084    }
4085
4086    #[test]
4087    fn test_raw_repetition() {
4088        const MESSAGE_TYPE: &str = "
4089            message Log {
4090              OPTIONAL INT32 eventType;
4091              REPEATED INT32 category;
4092              REPEATED group filter {
4093                OPTIONAL INT32 error;
4094              }
4095            }
4096        ";
4097        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4098        let props = Default::default();
4099
4100        let mut buf = Vec::with_capacity(1024);
4101        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4102        let mut row_group_writer = writer.next_row_group().unwrap();
4103
4104        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4106        col_writer
4107            .typed::<Int32Type>()
4108            .write_batch(&[1], Some(&[1]), None)
4109            .unwrap();
4110        col_writer.close().unwrap();
4111        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4113        col_writer
4114            .typed::<Int32Type>()
4115            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4116            .unwrap();
4117        col_writer.close().unwrap();
4118        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4120        col_writer
4121            .typed::<Int32Type>()
4122            .write_batch(&[1], Some(&[1]), Some(&[0]))
4123            .unwrap();
4124        col_writer.close().unwrap();
4125
4126        let rg_md = row_group_writer.close().unwrap();
4127        assert_eq!(rg_md.num_rows(), 1);
4128        writer.close().unwrap();
4129
4130        let bytes = Bytes::from(buf);
4131
4132        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4133        let full = no_mask.next().unwrap().unwrap();
4134
4135        assert_eq!(full.num_columns(), 3);
4136
4137        for idx in 0..3 {
4138            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4139            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4140            let mut reader = b.with_projection(mask).build().unwrap();
4141            let projected = reader.next().unwrap().unwrap();
4142
4143            assert_eq!(projected.num_columns(), 1);
4144            assert_eq!(full.column(idx), projected.column(0));
4145        }
4146    }
4147
4148    #[test]
4149    fn test_read_lz4_raw() {
4150        let testdata = arrow::util::test_util::parquet_test_data();
4151        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4152        let file = File::open(path).unwrap();
4153
4154        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4155            .unwrap()
4156            .collect::<Result<Vec<_>, _>>()
4157            .unwrap();
4158        assert_eq!(batches.len(), 1);
4159        let batch = &batches[0];
4160
4161        assert_eq!(batch.num_columns(), 3);
4162        assert_eq!(batch.num_rows(), 4);
4163
4164        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4166        assert_eq!(
4167            a.values(),
4168            &[1593604800, 1593604800, 1593604801, 1593604801]
4169        );
4170
4171        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4172        let a: Vec<_> = a.iter().flatten().collect();
4173        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4174
4175        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4176        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4177    }
4178
4179    #[test]
4189    fn test_read_lz4_hadoop_fallback() {
4190        for file in [
4191            "hadoop_lz4_compressed.parquet",
4192            "non_hadoop_lz4_compressed.parquet",
4193        ] {
4194            let testdata = arrow::util::test_util::parquet_test_data();
4195            let path = format!("{testdata}/{file}");
4196            let file = File::open(path).unwrap();
4197            let expected_rows = 4;
4198
4199            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4200                .unwrap()
4201                .collect::<Result<Vec<_>, _>>()
4202                .unwrap();
4203            assert_eq!(batches.len(), 1);
4204            let batch = &batches[0];
4205
4206            assert_eq!(batch.num_columns(), 3);
4207            assert_eq!(batch.num_rows(), expected_rows);
4208
4209            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4210            assert_eq!(
4211                a.values(),
4212                &[1593604800, 1593604800, 1593604801, 1593604801]
4213            );
4214
4215            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4216            let b: Vec<_> = b.iter().flatten().collect();
4217            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4218
4219            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4220            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4221        }
4222    }
4223
4224    #[test]
4225    fn test_read_lz4_hadoop_large() {
4226        let testdata = arrow::util::test_util::parquet_test_data();
4227        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4228        let file = File::open(path).unwrap();
4229        let expected_rows = 10000;
4230
4231        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4232            .unwrap()
4233            .collect::<Result<Vec<_>, _>>()
4234            .unwrap();
4235        assert_eq!(batches.len(), 1);
4236        let batch = &batches[0];
4237
4238        assert_eq!(batch.num_columns(), 1);
4239        assert_eq!(batch.num_rows(), expected_rows);
4240
4241        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4242        let a: Vec<_> = a.iter().flatten().collect();
4243        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4244        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4245        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4246        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4247    }
4248
4249    #[test]
4250    #[cfg(feature = "snap")]
4251    fn test_read_nested_lists() {
4252        let testdata = arrow::util::test_util::parquet_test_data();
4253        let path = format!("{testdata}/nested_lists.snappy.parquet");
4254        let file = File::open(path).unwrap();
4255
4256        let f = file.try_clone().unwrap();
4257        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4258        let expected = reader.next().unwrap().unwrap();
4259        assert_eq!(expected.num_rows(), 3);
4260
4261        let selection = RowSelection::from(vec![
4262            RowSelector::skip(1),
4263            RowSelector::select(1),
4264            RowSelector::skip(1),
4265        ]);
4266        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4267            .unwrap()
4268            .with_row_selection(selection)
4269            .build()
4270            .unwrap();
4271
4272        let actual = reader.next().unwrap().unwrap();
4273        assert_eq!(actual.num_rows(), 1);
4274        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4275    }
4276
4277    #[test]
4278    fn test_arbitrary_decimal() {
4279        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4280        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4281            .with_precision_and_scale(19, 0)
4282            .unwrap();
4283        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4284            .with_precision_and_scale(12, 0)
4285            .unwrap();
4286        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4287            .with_precision_and_scale(17, 10)
4288            .unwrap();
4289
4290        let written = RecordBatch::try_from_iter([
4291            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4292            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4293            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4294        ])
4295        .unwrap();
4296
4297        let mut buffer = Vec::with_capacity(1024);
4298        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4299        writer.write(&written).unwrap();
4300        writer.close().unwrap();
4301
4302        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4303            .unwrap()
4304            .collect::<Result<Vec<_>, _>>()
4305            .unwrap();
4306
4307        assert_eq!(&written.slice(0, 8), &read[0]);
4308    }
4309
4310    #[test]
4311    fn test_list_skip() {
4312        let mut list = ListBuilder::new(Int32Builder::new());
4313        list.append_value([Some(1), Some(2)]);
4314        list.append_value([Some(3)]);
4315        list.append_value([Some(4)]);
4316        let list = list.finish();
4317        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4318
4319        let props = WriterProperties::builder()
4321            .set_data_page_row_count_limit(1)
4322            .set_write_batch_size(2)
4323            .build();
4324
4325        let mut buffer = Vec::with_capacity(1024);
4326        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4327        writer.write(&batch).unwrap();
4328        writer.close().unwrap();
4329
4330        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4331        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4332            .unwrap()
4333            .with_row_selection(selection.into())
4334            .build()
4335            .unwrap();
4336        let out = reader.next().unwrap().unwrap();
4337        assert_eq!(out.num_rows(), 1);
4338        assert_eq!(out, batch.slice(2, 1));
4339    }
4340
4341    fn test_decimal_roundtrip<T: DecimalType>() {
4342        let d = |values: Vec<usize>, p: u8| {
4347            let iter = values.into_iter().map(T::Native::usize_as);
4348            PrimitiveArray::<T>::from_iter_values(iter)
4349                .with_precision_and_scale(p, 2)
4350                .unwrap()
4351        };
4352
4353        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4354        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4355        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4356        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4357
4358        let batch = RecordBatch::try_from_iter([
4359            ("d1", Arc::new(d1) as ArrayRef),
4360            ("d2", Arc::new(d2) as ArrayRef),
4361            ("d3", Arc::new(d3) as ArrayRef),
4362            ("d4", Arc::new(d4) as ArrayRef),
4363        ])
4364        .unwrap();
4365
4366        let mut buffer = Vec::with_capacity(1024);
4367        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4368        writer.write(&batch).unwrap();
4369        writer.close().unwrap();
4370
4371        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4372        let t1 = builder.parquet_schema().columns()[0].physical_type();
4373        assert_eq!(t1, PhysicalType::INT32);
4374        let t2 = builder.parquet_schema().columns()[1].physical_type();
4375        assert_eq!(t2, PhysicalType::INT64);
4376        let t3 = builder.parquet_schema().columns()[2].physical_type();
4377        assert_eq!(t3, PhysicalType::INT64);
4378        let t4 = builder.parquet_schema().columns()[3].physical_type();
4379        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4380
4381        let mut reader = builder.build().unwrap();
4382        assert_eq!(batch.schema(), reader.schema());
4383
4384        let out = reader.next().unwrap().unwrap();
4385        assert_eq!(batch, out);
4386    }
4387
4388    #[test]
4389    fn test_decimal() {
4390        test_decimal_roundtrip::<Decimal128Type>();
4391        test_decimal_roundtrip::<Decimal256Type>();
4392    }
4393
4394    #[test]
4395    fn test_list_selection() {
4396        let schema = Arc::new(Schema::new(vec![Field::new_list(
4397            "list",
4398            Field::new_list_field(ArrowDataType::Utf8, true),
4399            false,
4400        )]));
4401        let mut buf = Vec::with_capacity(1024);
4402
4403        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4404
4405        for i in 0..2 {
4406            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4407            for j in 0..1024 {
4408                list_a_builder.values().append_value(format!("{i} {j}"));
4409                list_a_builder.append(true);
4410            }
4411            let batch =
4412                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4413                    .unwrap();
4414            writer.write(&batch).unwrap();
4415        }
4416        let _metadata = writer.close().unwrap();
4417
4418        let buf = Bytes::from(buf);
4419        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4420            .unwrap()
4421            .with_row_selection(RowSelection::from(vec![
4422                RowSelector::skip(100),
4423                RowSelector::select(924),
4424                RowSelector::skip(100),
4425                RowSelector::select(924),
4426            ]))
4427            .build()
4428            .unwrap();
4429
4430        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4431        let batch = concat_batches(&schema, &batches).unwrap();
4432
4433        assert_eq!(batch.num_rows(), 924 * 2);
4434        let list = batch.column(0).as_list::<i32>();
4435
4436        for w in list.value_offsets().windows(2) {
4437            assert_eq!(w[0] + 1, w[1])
4438        }
4439        let mut values = list.values().as_string::<i32>().iter();
4440
4441        for i in 0..2 {
4442            for j in 100..1024 {
4443                let expected = format!("{i} {j}");
4444                assert_eq!(values.next().unwrap().unwrap(), &expected);
4445            }
4446        }
4447    }
4448
4449    #[test]
4450    fn test_list_selection_fuzz() {
4451        let mut rng = rng();
4452        let schema = Arc::new(Schema::new(vec![Field::new_list(
4453            "list",
4454            Field::new_list(
4455                Field::LIST_FIELD_DEFAULT_NAME,
4456                Field::new_list_field(ArrowDataType::Int32, true),
4457                true,
4458            ),
4459            true,
4460        )]));
4461        let mut buf = Vec::with_capacity(1024);
4462        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4463
4464        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4465
4466        for _ in 0..2048 {
4467            if rng.random_bool(0.2) {
4468                list_a_builder.append(false);
4469                continue;
4470            }
4471
4472            let list_a_len = rng.random_range(0..10);
4473            let list_b_builder = list_a_builder.values();
4474
4475            for _ in 0..list_a_len {
4476                if rng.random_bool(0.2) {
4477                    list_b_builder.append(false);
4478                    continue;
4479                }
4480
4481                let list_b_len = rng.random_range(0..10);
4482                let int_builder = list_b_builder.values();
4483                for _ in 0..list_b_len {
4484                    match rng.random_bool(0.2) {
4485                        true => int_builder.append_null(),
4486                        false => int_builder.append_value(rng.random()),
4487                    }
4488                }
4489                list_b_builder.append(true)
4490            }
4491            list_a_builder.append(true);
4492        }
4493
4494        let array = Arc::new(list_a_builder.finish());
4495        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4496
4497        writer.write(&batch).unwrap();
4498        let _metadata = writer.close().unwrap();
4499
4500        let buf = Bytes::from(buf);
4501
4502        let cases = [
4503            vec![
4504                RowSelector::skip(100),
4505                RowSelector::select(924),
4506                RowSelector::skip(100),
4507                RowSelector::select(924),
4508            ],
4509            vec![
4510                RowSelector::select(924),
4511                RowSelector::skip(100),
4512                RowSelector::select(924),
4513                RowSelector::skip(100),
4514            ],
4515            vec![
4516                RowSelector::skip(1023),
4517                RowSelector::select(1),
4518                RowSelector::skip(1023),
4519                RowSelector::select(1),
4520            ],
4521            vec![
4522                RowSelector::select(1),
4523                RowSelector::skip(1023),
4524                RowSelector::select(1),
4525                RowSelector::skip(1023),
4526            ],
4527        ];
4528
4529        for batch_size in [100, 1024, 2048] {
4530            for selection in &cases {
4531                let selection = RowSelection::from(selection.clone());
4532                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4533                    .unwrap()
4534                    .with_row_selection(selection.clone())
4535                    .with_batch_size(batch_size)
4536                    .build()
4537                    .unwrap();
4538
4539                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4540                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4541                assert_eq!(actual.num_rows(), selection.row_count());
4542
4543                let mut batch_offset = 0;
4544                let mut actual_offset = 0;
4545                for selector in selection.iter() {
4546                    if selector.skip {
4547                        batch_offset += selector.row_count;
4548                        continue;
4549                    }
4550
4551                    assert_eq!(
4552                        batch.slice(batch_offset, selector.row_count),
4553                        actual.slice(actual_offset, selector.row_count)
4554                    );
4555
4556                    batch_offset += selector.row_count;
4557                    actual_offset += selector.row_count;
4558                }
4559            }
4560        }
4561    }
4562
4563    #[test]
4564    fn test_read_old_nested_list() {
4565        use arrow::datatypes::DataType;
4566        use arrow::datatypes::ToByteSlice;
4567
4568        let testdata = arrow::util::test_util::parquet_test_data();
4569        let path = format!("{testdata}/old_list_structure.parquet");
4578        let test_file = File::open(path).unwrap();
4579
4580        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4582
4583        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4585
4586        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4588            "array",
4589            DataType::Int32,
4590            false,
4591        ))))
4592        .len(2)
4593        .add_buffer(a_value_offsets)
4594        .add_child_data(a_values.into_data())
4595        .build()
4596        .unwrap();
4597        let a = ListArray::from(a_list_data);
4598
4599        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4600        let mut reader = builder.build().unwrap();
4601        let out = reader.next().unwrap().unwrap();
4602        assert_eq!(out.num_rows(), 1);
4603        assert_eq!(out.num_columns(), 1);
4604        let c0 = out.column(0);
4606        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4607        let r0 = c0arr.value(0);
4609        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4610        assert_eq!(r0arr, &a);
4611    }
4612
4613    #[test]
4614    fn test_map_no_value() {
4615        let testdata = arrow::util::test_util::parquet_test_data();
4635        let path = format!("{testdata}/map_no_value.parquet");
4636        let file = File::open(path).unwrap();
4637
4638        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4639            .unwrap()
4640            .build()
4641            .unwrap();
4642        let out = reader.next().unwrap().unwrap();
4643        assert_eq!(out.num_rows(), 3);
4644        assert_eq!(out.num_columns(), 3);
4645        let c0 = out.column(1).as_list::<i32>();
4647        let c1 = out.column(2).as_list::<i32>();
4648        assert_eq!(c0.len(), c1.len());
4649        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4650    }
4651}