parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::Decimal256Array;
27use arrow_array::{
28    builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
29    Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
30    UInt64Array,
31};
32use arrow_buffer::{i256, BooleanBuffer, Buffer};
33use arrow_data::ArrayDataBuilder;
34use arrow_schema::{DataType as ArrowType, TimeUnit};
35use std::any::Any;
36use std::sync::Arc;
37
38/// Provides conversion from `Vec<T>` to `Buffer`
39pub trait IntoBuffer {
40    fn into_buffer(self) -> Buffer;
41}
42
43macro_rules! native_buffer {
44    ($($t:ty),*) => {
45        $(impl IntoBuffer for Vec<$t> {
46            fn into_buffer(self) -> Buffer {
47                Buffer::from_vec(self)
48            }
49        })*
50    };
51}
52native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
53
54impl IntoBuffer for Vec<bool> {
55    fn into_buffer(self) -> Buffer {
56        BooleanBuffer::from_iter(self).into_inner()
57    }
58}
59
60impl IntoBuffer for Vec<Int96> {
61    fn into_buffer(self) -> Buffer {
62        let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
63        for v in self {
64            builder.append(v.to_nanos())
65        }
66        builder.finish()
67    }
68}
69
70/// Primitive array readers are leaves of array reader tree. They accept page iterator
71/// and read them into primitive arrays.
72pub struct PrimitiveArrayReader<T>
73where
74    T: DataType,
75    T::T: Copy + Default,
76    Vec<T::T>: IntoBuffer,
77{
78    data_type: ArrowType,
79    pages: Box<dyn PageIterator>,
80    def_levels_buffer: Option<Vec<i16>>,
81    rep_levels_buffer: Option<Vec<i16>>,
82    record_reader: RecordReader<T>,
83}
84
85impl<T> PrimitiveArrayReader<T>
86where
87    T: DataType,
88    T::T: Copy + Default,
89    Vec<T::T>: IntoBuffer,
90{
91    /// Construct primitive array reader.
92    pub fn new(
93        pages: Box<dyn PageIterator>,
94        column_desc: ColumnDescPtr,
95        arrow_type: Option<ArrowType>,
96    ) -> Result<Self> {
97        // Check if Arrow type is specified, else create it from Parquet type
98        let data_type = match arrow_type {
99            Some(t) => t,
100            None => parquet_to_arrow_field(column_desc.as_ref())?
101                .data_type()
102                .clone(),
103        };
104
105        let record_reader = RecordReader::<T>::new(column_desc);
106
107        Ok(Self {
108            data_type,
109            pages,
110            def_levels_buffer: None,
111            rep_levels_buffer: None,
112            record_reader,
113        })
114    }
115}
116
117/// Implementation of primitive array reader.
118impl<T> ArrayReader for PrimitiveArrayReader<T>
119where
120    T: DataType,
121    T::T: Copy + Default,
122    Vec<T::T>: IntoBuffer,
123{
124    fn as_any(&self) -> &dyn Any {
125        self
126    }
127
128    /// Returns data type of primitive array.
129    fn get_data_type(&self) -> &ArrowType {
130        &self.data_type
131    }
132
133    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
134        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
135    }
136
137    fn consume_batch(&mut self) -> Result<ArrayRef> {
138        let target_type = &self.data_type;
139        let arrow_data_type = match T::get_physical_type() {
140            PhysicalType::BOOLEAN => ArrowType::Boolean,
141            PhysicalType::INT32 => {
142                match target_type {
143                    ArrowType::UInt32 => {
144                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
145                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
146                        ArrowType::UInt32
147                    }
148                    _ => ArrowType::Int32,
149                }
150            }
151            PhysicalType::INT64 => {
152                match target_type {
153                    ArrowType::UInt64 => {
154                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
155                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
156                        ArrowType::UInt64
157                    }
158                    _ => ArrowType::Int64,
159                }
160            }
161            PhysicalType::FLOAT => ArrowType::Float32,
162            PhysicalType::DOUBLE => ArrowType::Float64,
163            PhysicalType::INT96 => match target_type {
164                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
165                _ => unreachable!("INT96 must be timestamp nanosecond"),
166            },
167            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
168                unreachable!("PrimitiveArrayReaders don't support complex physical types");
169            }
170        };
171
172        // Convert to arrays by using the Parquet physical type.
173        // The physical types are then cast to Arrow types if necessary
174
175        let record_data = self.record_reader.consume_record_data().into_buffer();
176
177        let array_data = ArrayDataBuilder::new(arrow_data_type)
178            .len(self.record_reader.num_values())
179            .add_buffer(record_data)
180            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
181
182        let array_data = unsafe { array_data.build_unchecked() };
183        let array: ArrayRef = match T::get_physical_type() {
184            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
185            PhysicalType::INT32 => match array_data.data_type() {
186                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
187                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
188                _ => unreachable!(),
189            },
190            PhysicalType::INT64 => match array_data.data_type() {
191                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
192                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
193                _ => unreachable!(),
194            },
195            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
196            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
197            PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
198            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
199                unreachable!("PrimitiveArrayReaders don't support complex physical types");
200            }
201        };
202
203        // cast to Arrow type
204        // We make a strong assumption here that the casts should be infallible.
205        // If the cast fails because of incompatible datatypes, then there might
206        // be a bigger problem with how Arrow schemas are converted to Parquet.
207        //
208        // As there is not always a 1:1 mapping between Arrow and Parquet, there
209        // are datatypes which we must convert explicitly.
210        // These are:
211        // - date64: we should cast int32 to date32, then date32 to date64.
212        // - decimal: cast in32 to decimal, int64 to decimal
213        let array = match target_type {
214            ArrowType::Date64 => {
215                // this is cheap as it internally reinterprets the data
216                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
217                arrow_cast::cast(&a, target_type)?
218            }
219            ArrowType::Decimal128(p, s) => {
220                // Apply conversion to all elements regardless of null slots as the conversion
221                // to `i128` is infallible. This improves performance by avoiding a branch in
222                // the inner loop (see docs for `PrimitiveArray::unary`).
223                let array = match array.data_type() {
224                    ArrowType::Int32 => array
225                        .as_any()
226                        .downcast_ref::<Int32Array>()
227                        .unwrap()
228                        .unary(|i| i as i128)
229                        as Decimal128Array,
230                    ArrowType::Int64 => array
231                        .as_any()
232                        .downcast_ref::<Int64Array>()
233                        .unwrap()
234                        .unary(|i| i as i128)
235                        as Decimal128Array,
236                    _ => {
237                        return Err(arrow_err!(
238                            "Cannot convert {:?} to decimal",
239                            array.data_type()
240                        ));
241                    }
242                }
243                .with_precision_and_scale(*p, *s)?;
244
245                Arc::new(array) as ArrayRef
246            }
247            ArrowType::Decimal256(p, s) => {
248                // See above comment. Conversion to `i256` is likewise infallible.
249                let array = match array.data_type() {
250                    ArrowType::Int32 => array
251                        .as_any()
252                        .downcast_ref::<Int32Array>()
253                        .unwrap()
254                        .unary(|i| i256::from_i128(i as i128))
255                        as Decimal256Array,
256                    ArrowType::Int64 => array
257                        .as_any()
258                        .downcast_ref::<Int64Array>()
259                        .unwrap()
260                        .unary(|i| i256::from_i128(i as i128))
261                        as Decimal256Array,
262                    _ => {
263                        return Err(arrow_err!(
264                            "Cannot convert {:?} to decimal",
265                            array.data_type()
266                        ));
267                    }
268                }
269                .with_precision_and_scale(*p, *s)?;
270
271                Arc::new(array) as ArrayRef
272            }
273            _ => arrow_cast::cast(&array, target_type)?,
274        };
275
276        // save definition and repetition buffers
277        self.def_levels_buffer = self.record_reader.consume_def_levels();
278        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
279        self.record_reader.reset();
280        Ok(array)
281    }
282
283    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
284        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
285    }
286
287    fn get_def_levels(&self) -> Option<&[i16]> {
288        self.def_levels_buffer.as_deref()
289    }
290
291    fn get_rep_levels(&self) -> Option<&[i16]> {
292        self.rep_levels_buffer.as_deref()
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use crate::arrow::array_reader::test_util::EmptyPageIterator;
300    use crate::basic::Encoding;
301    use crate::column::page::Page;
302    use crate::data_type::{Int32Type, Int64Type};
303    use crate::schema::parser::parse_message_type;
304    use crate::schema::types::SchemaDescriptor;
305    use crate::util::test_common::rand_gen::make_pages;
306    use crate::util::InMemoryPageIterator;
307    use arrow::datatypes::ArrowPrimitiveType;
308    use arrow_array::{Array, PrimitiveArray};
309
310    use arrow::datatypes::DataType::Decimal128;
311    use rand::distributions::uniform::SampleUniform;
312    use std::collections::VecDeque;
313
314    #[allow(clippy::too_many_arguments)]
315    fn make_column_chunks<T: DataType>(
316        column_desc: ColumnDescPtr,
317        encoding: Encoding,
318        num_levels: usize,
319        min_value: T::T,
320        max_value: T::T,
321        def_levels: &mut Vec<i16>,
322        rep_levels: &mut Vec<i16>,
323        values: &mut Vec<T::T>,
324        page_lists: &mut Vec<Vec<Page>>,
325        use_v2: bool,
326        num_chunks: usize,
327    ) where
328        T::T: PartialOrd + SampleUniform + Copy,
329    {
330        for _i in 0..num_chunks {
331            let mut pages = VecDeque::new();
332            let mut data = Vec::new();
333            let mut page_def_levels = Vec::new();
334            let mut page_rep_levels = Vec::new();
335
336            make_pages::<T>(
337                column_desc.clone(),
338                encoding,
339                1,
340                num_levels,
341                min_value,
342                max_value,
343                &mut page_def_levels,
344                &mut page_rep_levels,
345                &mut data,
346                &mut pages,
347                use_v2,
348            );
349
350            def_levels.append(&mut page_def_levels);
351            rep_levels.append(&mut page_rep_levels);
352            values.append(&mut data);
353            page_lists.push(Vec::from(pages));
354        }
355    }
356
357    #[test]
358    fn test_primitive_array_reader_empty_pages() {
359        // Construct column schema
360        let message_type = "
361        message test_schema {
362          REQUIRED INT32 leaf;
363        }
364        ";
365
366        let schema = parse_message_type(message_type)
367            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
368            .unwrap();
369
370        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
371            Box::<EmptyPageIterator>::default(),
372            schema.column(0),
373            None,
374        )
375        .unwrap();
376
377        // expect no values to be read
378        let array = array_reader.next_batch(50).unwrap();
379        assert!(array.is_empty());
380    }
381
382    #[test]
383    fn test_primitive_array_reader_data() {
384        // Construct column schema
385        let message_type = "
386        message test_schema {
387          REQUIRED INT32 leaf;
388        }
389        ";
390
391        let schema = parse_message_type(message_type)
392            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
393            .unwrap();
394
395        let column_desc = schema.column(0);
396
397        // Construct page iterator
398        {
399            let mut data = Vec::new();
400            let mut page_lists = Vec::new();
401            make_column_chunks::<Int32Type>(
402                column_desc.clone(),
403                Encoding::PLAIN,
404                100,
405                1,
406                200,
407                &mut Vec::new(),
408                &mut Vec::new(),
409                &mut data,
410                &mut page_lists,
411                true,
412                2,
413            );
414            let page_iterator = InMemoryPageIterator::new(page_lists);
415
416            let mut array_reader =
417                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
418                    .unwrap();
419
420            // Read first 50 values, which are all from the first column chunk
421            let array = array_reader.next_batch(50).unwrap();
422            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
423
424            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
425
426            // Read next 100 values, the first 50 ones are from the first column chunk,
427            // and the last 50 ones are from the second column chunk
428            let array = array_reader.next_batch(100).unwrap();
429            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
430
431            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
432
433            // Try to read 100 values, however there are only 50 values
434            let array = array_reader.next_batch(100).unwrap();
435            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
436
437            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
438        }
439    }
440
441    macro_rules! test_primitive_array_reader_one_type {
442        (
443            $arrow_parquet_type:ty,
444            $physical_type:expr,
445            $converted_type_str:expr,
446            $result_arrow_type:ty,
447            $result_arrow_cast_type:ty,
448            $result_primitive_type:ty
449            $(, $timezone:expr)?
450        ) => {{
451            let message_type = format!(
452                "
453            message test_schema {{
454              REQUIRED {:?} leaf ({});
455          }}
456            ",
457                $physical_type, $converted_type_str
458            );
459            let schema = parse_message_type(&message_type)
460                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
461                .unwrap();
462
463            let column_desc = schema.column(0);
464
465            // Construct page iterator
466            {
467                let mut data = Vec::new();
468                let mut page_lists = Vec::new();
469                make_column_chunks::<$arrow_parquet_type>(
470                    column_desc.clone(),
471                    Encoding::PLAIN,
472                    100,
473                    1,
474                    200,
475                    &mut Vec::new(),
476                    &mut Vec::new(),
477                    &mut data,
478                    &mut page_lists,
479                    true,
480                    2,
481                );
482                let page_iterator = InMemoryPageIterator::new(page_lists);
483                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
484                    Box::new(page_iterator),
485                    column_desc.clone(),
486                    None,
487                )
488                .expect("Unable to get array reader");
489
490                let array = array_reader
491                    .next_batch(50)
492                    .expect("Unable to get batch from reader");
493
494                let result_data_type = <$result_arrow_type>::DATA_TYPE;
495                let array = array
496                    .as_any()
497                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
498                    .expect(
499                        format!(
500                            "Unable to downcast {:?} to {:?}",
501                            array.data_type(),
502                            result_data_type
503                        )
504                        .as_str(),
505                    )
506                    $(.clone().with_timezone($timezone))?
507                    ;
508
509                // create expected array as primitive, and cast to result type
510                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
511                    data[0..50]
512                        .iter()
513                        .map(|x| *x as $result_primitive_type)
514                        .collect::<Vec<$result_primitive_type>>(),
515                );
516                let expected = Arc::new(expected) as ArrayRef;
517                let expected = arrow::compute::cast(&expected, &result_data_type)
518                    .expect("Unable to cast expected array");
519                assert_eq!(expected.data_type(), &result_data_type);
520                let expected = expected
521                    .as_any()
522                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
523                    .expect(
524                        format!(
525                            "Unable to downcast expected {:?} to {:?}",
526                            expected.data_type(),
527                            result_data_type
528                        )
529                        .as_str(),
530                    )
531                    $(.clone().with_timezone($timezone))?
532                    ;
533                assert_eq!(expected, array);
534            }
535        }};
536    }
537
538    #[test]
539    fn test_primitive_array_reader_temporal_types() {
540        test_primitive_array_reader_one_type!(
541            crate::data_type::Int32Type,
542            PhysicalType::INT32,
543            "DATE",
544            arrow::datatypes::Date32Type,
545            arrow::datatypes::Int32Type,
546            i32
547        );
548        test_primitive_array_reader_one_type!(
549            crate::data_type::Int32Type,
550            PhysicalType::INT32,
551            "TIME_MILLIS",
552            arrow::datatypes::Time32MillisecondType,
553            arrow::datatypes::Int32Type,
554            i32
555        );
556        test_primitive_array_reader_one_type!(
557            crate::data_type::Int64Type,
558            PhysicalType::INT64,
559            "TIME_MICROS",
560            arrow::datatypes::Time64MicrosecondType,
561            arrow::datatypes::Int64Type,
562            i64
563        );
564        test_primitive_array_reader_one_type!(
565            crate::data_type::Int64Type,
566            PhysicalType::INT64,
567            "TIMESTAMP_MILLIS",
568            arrow::datatypes::TimestampMillisecondType,
569            arrow::datatypes::Int64Type,
570            i64,
571            "UTC"
572        );
573        test_primitive_array_reader_one_type!(
574            crate::data_type::Int64Type,
575            PhysicalType::INT64,
576            "TIMESTAMP_MICROS",
577            arrow::datatypes::TimestampMicrosecondType,
578            arrow::datatypes::Int64Type,
579            i64,
580            "UTC"
581        );
582    }
583
584    #[test]
585    fn test_primitive_array_reader_def_and_rep_levels() {
586        // Construct column schema
587        let message_type = "
588        message test_schema {
589            REPEATED Group test_mid {
590                OPTIONAL INT32 leaf;
591            }
592        }
593        ";
594
595        let schema = parse_message_type(message_type)
596            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
597            .unwrap();
598
599        let column_desc = schema.column(0);
600
601        // Construct page iterator
602        {
603            let mut def_levels = Vec::new();
604            let mut rep_levels = Vec::new();
605            let mut page_lists = Vec::new();
606            make_column_chunks::<Int32Type>(
607                column_desc.clone(),
608                Encoding::PLAIN,
609                100,
610                1,
611                200,
612                &mut def_levels,
613                &mut rep_levels,
614                &mut Vec::new(),
615                &mut page_lists,
616                true,
617                2,
618            );
619
620            let page_iterator = InMemoryPageIterator::new(page_lists);
621
622            let mut array_reader =
623                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
624                    .unwrap();
625
626            let mut accu_len: usize = 0;
627
628            // Read first 50 values, which are all from the first column chunk
629            let array = array_reader.next_batch(50).unwrap();
630            assert_eq!(
631                Some(&def_levels[accu_len..(accu_len + array.len())]),
632                array_reader.get_def_levels()
633            );
634            assert_eq!(
635                Some(&rep_levels[accu_len..(accu_len + array.len())]),
636                array_reader.get_rep_levels()
637            );
638            accu_len += array.len();
639
640            // Read next 100 values, the first 50 ones are from the first column chunk,
641            // and the last 50 ones are from the second column chunk
642            let array = array_reader.next_batch(100).unwrap();
643            assert_eq!(
644                Some(&def_levels[accu_len..(accu_len + array.len())]),
645                array_reader.get_def_levels()
646            );
647            assert_eq!(
648                Some(&rep_levels[accu_len..(accu_len + array.len())]),
649                array_reader.get_rep_levels()
650            );
651            accu_len += array.len();
652
653            // Try to read 100 values, however there are only 50 values
654            let array = array_reader.next_batch(100).unwrap();
655            assert_eq!(
656                Some(&def_levels[accu_len..(accu_len + array.len())]),
657                array_reader.get_def_levels()
658            );
659            assert_eq!(
660                Some(&rep_levels[accu_len..(accu_len + array.len())]),
661                array_reader.get_rep_levels()
662            );
663        }
664    }
665
666    #[test]
667    fn test_primitive_array_reader_decimal_types() {
668        // parquet `INT32` to decimal
669        let message_type = "
670            message test_schema {
671                REQUIRED INT32 decimal1 (DECIMAL(8,2));
672        }
673        ";
674        let schema = parse_message_type(message_type)
675            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
676            .unwrap();
677        let column_desc = schema.column(0);
678
679        // create the array reader
680        {
681            let mut data = Vec::new();
682            let mut page_lists = Vec::new();
683            make_column_chunks::<Int32Type>(
684                column_desc.clone(),
685                Encoding::PLAIN,
686                100,
687                -99999999,
688                99999999,
689                &mut Vec::new(),
690                &mut Vec::new(),
691                &mut data,
692                &mut page_lists,
693                true,
694                2,
695            );
696            let page_iterator = InMemoryPageIterator::new(page_lists);
697
698            let mut array_reader =
699                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
700                    .unwrap();
701
702            // read data from the reader
703            // the data type is decimal(8,2)
704            let array = array_reader.next_batch(50).unwrap();
705            assert_eq!(array.data_type(), &Decimal128(8, 2));
706            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
707            let data_decimal_array = data[0..50]
708                .iter()
709                .copied()
710                .map(|v| Some(v as i128))
711                .collect::<Decimal128Array>()
712                .with_precision_and_scale(8, 2)
713                .unwrap();
714            assert_eq!(array, &data_decimal_array);
715
716            // not equal with different data type(precision and scale)
717            let data_decimal_array = data[0..50]
718                .iter()
719                .copied()
720                .map(|v| Some(v as i128))
721                .collect::<Decimal128Array>()
722                .with_precision_and_scale(9, 0)
723                .unwrap();
724            assert_ne!(array, &data_decimal_array)
725        }
726
727        // parquet `INT64` to decimal
728        let message_type = "
729            message test_schema {
730                REQUIRED INT64 decimal1 (DECIMAL(18,4));
731        }
732        ";
733        let schema = parse_message_type(message_type)
734            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
735            .unwrap();
736        let column_desc = schema.column(0);
737
738        // create the array reader
739        {
740            let mut data = Vec::new();
741            let mut page_lists = Vec::new();
742            make_column_chunks::<Int64Type>(
743                column_desc.clone(),
744                Encoding::PLAIN,
745                100,
746                -999999999999999999,
747                999999999999999999,
748                &mut Vec::new(),
749                &mut Vec::new(),
750                &mut data,
751                &mut page_lists,
752                true,
753                2,
754            );
755            let page_iterator = InMemoryPageIterator::new(page_lists);
756
757            let mut array_reader =
758                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
759                    .unwrap();
760
761            // read data from the reader
762            // the data type is decimal(18,4)
763            let array = array_reader.next_batch(50).unwrap();
764            assert_eq!(array.data_type(), &Decimal128(18, 4));
765            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
766            let data_decimal_array = data[0..50]
767                .iter()
768                .copied()
769                .map(|v| Some(v as i128))
770                .collect::<Decimal128Array>()
771                .with_precision_and_scale(18, 4)
772                .unwrap();
773            assert_eq!(array, &data_decimal_array);
774
775            // not equal with different data type(precision and scale)
776            let data_decimal_array = data[0..50]
777                .iter()
778                .copied()
779                .map(|v| Some(v as i128))
780                .collect::<Decimal128Array>()
781                .with_precision_and_scale(34, 0)
782                .unwrap();
783            assert_ne!(array, &data_decimal_array)
784        }
785    }
786}