parquet/column/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column reader API.
19
20use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25    ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26    RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35/// Column reader for a Parquet type.
36pub enum ColumnReader {
37    /// Column reader for boolean type
38    BoolColumnReader(ColumnReaderImpl<BoolType>),
39    /// Column reader for int32 type
40    Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41    /// Column reader for int64 type
42    Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43    /// Column reader for int96 type
44    Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45    /// Column reader for float type
46    FloatColumnReader(ColumnReaderImpl<FloatType>),
47    /// Column reader for double type
48    DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49    /// Column reader for byte array type
50    ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51    /// Column reader for fixed length byte array type
52    FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55/// Gets a specific column reader corresponding to column descriptor `col_descr`. The
56/// column reader will read from pages in `col_page_reader`.
57pub fn get_column_reader(
58    col_descr: ColumnDescPtr,
59    col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61    match col_descr.physical_type() {
62        Type::BOOLEAN => {
63            ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64        }
65        Type::INT32 => {
66            ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67        }
68        Type::INT64 => {
69            ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70        }
71        Type::INT96 => {
72            ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73        }
74        Type::FLOAT => {
75            ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76        }
77        Type::DOUBLE => {
78            ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79        }
80        Type::BYTE_ARRAY => {
81            ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82        }
83        Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84            ColumnReaderImpl::new(col_descr, col_page_reader),
85        ),
86    }
87}
88
89/// Gets a typed column reader for the specific type `T`, by "up-casting" `col_reader` of
90/// non-generic type to a generic column reader type `ColumnReaderImpl`.
91///
92/// Panics if actual enum value for `col_reader` does not match the type `T`.
93pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94    T::get_column_reader(col_reader).unwrap_or_else(|| {
95        panic!(
96            "Failed to convert column reader into a typed column reader for `{}` type",
97            T::get_physical_type()
98        )
99    })
100}
101
102/// Typed value reader for a particular primitive column.
103pub type ColumnReaderImpl<T> = GenericColumnReader<
104    RepetitionLevelDecoderImpl,
105    DefinitionLevelDecoderImpl,
106    ColumnValueDecoderImpl<T>,
107>;
108
109/// Reads data for a given column chunk, using the provided decoders:
110///
111/// - R: `ColumnLevelDecoder` used to decode repetition levels
112/// - D: `ColumnLevelDecoder` used to decode definition levels
113/// - V: `ColumnValueDecoder` used to decode value data
114pub struct GenericColumnReader<R, D, V> {
115    descr: ColumnDescPtr,
116
117    page_reader: Box<dyn PageReader>,
118
119    /// The total number of values stored in the data page.
120    num_buffered_values: usize,
121
122    /// The number of values from the current data page that has been decoded into memory
123    /// so far.
124    num_decoded_values: usize,
125
126    /// True if the end of the current data page denotes the end of a record
127    has_record_delimiter: bool,
128
129    /// The decoder for the definition levels if any
130    def_level_decoder: Option<D>,
131
132    /// The decoder for the repetition levels if any
133    rep_level_decoder: Option<R>,
134
135    /// The decoder for the values
136    values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141    V: ColumnValueDecoder,
142{
143    /// Creates new column reader based on column descriptor and page reader.
144    pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145        let values_decoder = V::new(&descr);
146
147        let def_level_decoder = (descr.max_def_level() != 0)
148            .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150        let rep_level_decoder = (descr.max_rep_level() != 0)
151            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153        Self::new_with_decoders(
154            descr,
155            page_reader,
156            values_decoder,
157            def_level_decoder,
158            rep_level_decoder,
159        )
160    }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165    R: RepetitionLevelDecoder,
166    D: DefinitionLevelDecoder,
167    V: ColumnValueDecoder,
168{
169    pub(crate) fn new_with_decoders(
170        descr: ColumnDescPtr,
171        page_reader: Box<dyn PageReader>,
172        values_decoder: V,
173        def_level_decoder: Option<D>,
174        rep_level_decoder: Option<R>,
175    ) -> Self {
176        Self {
177            descr,
178            def_level_decoder,
179            rep_level_decoder,
180            page_reader,
181            num_buffered_values: 0,
182            num_decoded_values: 0,
183            values_decoder,
184            has_record_delimiter: false,
185        }
186    }
187
188    /// Reads a batch of values of at most `batch_size`, returning a tuple containing the
189    /// actual number of non-null values read, followed by the corresponding number of levels,
190    /// i.e, the total number of values including nulls, empty lists, etc...
191    ///
192    /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be
193    /// populated with the number of levels read, with an error returned if it is `None`.
194    ///
195    /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be
196    /// populated with the number of levels read, with an error returned if it is `None`.
197    ///
198    /// `values` will be contiguously populated with the non-null values. Note that if the column
199    /// is not required, this may be less than either `batch_size` or the number of levels read
200    #[deprecated(note = "Use read_records")]
201    pub fn read_batch(
202        &mut self,
203        batch_size: usize,
204        def_levels: Option<&mut D::Buffer>,
205        rep_levels: Option<&mut R::Buffer>,
206        values: &mut V::Buffer,
207    ) -> Result<(usize, usize)> {
208        let (_, values, levels) = self.read_records(batch_size, def_levels, rep_levels, values)?;
209
210        Ok((values, levels))
211    }
212
213    /// Read up to `max_records` whole records, returning the number of complete
214    /// records, non-null values and levels decoded. All levels for a given record
215    /// will be read, i.e. the next repetition level, if any, will be 0
216    ///
217    /// If the max definition level is 0, `def_levels` will be ignored and the number of records,
218    /// non-null values and levels decoded will all be equal, otherwise `def_levels` will be
219    /// populated with the number of levels read, with an error returned if it is `None`.
220    ///
221    /// If the max repetition level is 0, `rep_levels` will be ignored and the number of records
222    /// and levels decoded will both be equal, otherwise `rep_levels` will be populated with
223    /// the number of levels read, with an error returned if it is `None`.
224    ///
225    /// `values` will be contiguously populated with the non-null values. Note that if the column
226    /// is not required, this may be less than either `max_records` or the number of levels read
227    pub fn read_records(
228        &mut self,
229        max_records: usize,
230        mut def_levels: Option<&mut D::Buffer>,
231        mut rep_levels: Option<&mut R::Buffer>,
232        values: &mut V::Buffer,
233    ) -> Result<(usize, usize, usize)> {
234        let mut total_records_read = 0;
235        let mut total_levels_read = 0;
236        let mut total_values_read = 0;
237
238        while total_records_read < max_records && self.has_next()? {
239            let remaining_records = max_records - total_records_read;
240            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
241
242            let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
243                Some(reader) => {
244                    let out = rep_levels
245                        .as_mut()
246                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
247
248                    let (mut records_read, levels_read) =
249                        reader.read_rep_levels(out, remaining_records, remaining_levels)?;
250
251                    if records_read == 0 && levels_read == 0 {
252                        // The fact that we're still looping implies there must be some levels to read.
253                        return Err(general_err!(
254                            "Insufficient repetition levels read from column"
255                        ));
256                    }
257                    if levels_read == remaining_levels && self.has_record_delimiter {
258                        // Reached end of page, which implies records_read < remaining_records
259                        // as otherwise would have stopped reading before reaching the end
260                        assert!(records_read < remaining_records); // Sanity check
261                        records_read += reader.flush_partial() as usize;
262                    }
263                    (records_read, levels_read)
264                }
265                None => {
266                    let min = remaining_records.min(remaining_levels);
267                    (min, min)
268                }
269            };
270
271            let values_to_read = match self.def_level_decoder.as_mut() {
272                Some(reader) => {
273                    let out = def_levels
274                        .as_mut()
275                        .ok_or_else(|| general_err!("must specify definition levels"))?;
276
277                    let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
278
279                    if levels_read != levels_to_read {
280                        return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));
281                    }
282
283                    values_read
284                }
285                None => levels_to_read,
286            };
287
288            let values_read = self.values_decoder.read(values, values_to_read)?;
289
290            if values_read != values_to_read {
291                return Err(general_err!(
292                    "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
293                ));
294            }
295
296            self.num_decoded_values += levels_to_read;
297            total_records_read += records_read;
298            total_levels_read += levels_to_read;
299            total_values_read += values_read;
300        }
301
302        Ok((total_records_read, total_values_read, total_levels_read))
303    }
304
305    /// Skips over `num_records` records, where records are delimited by repetition levels of 0
306    ///
307    /// # Returns
308    ///
309    /// Returns the number of records skipped
310    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
311        let mut remaining_records = num_records;
312        while remaining_records != 0 {
313            if self.num_buffered_values == self.num_decoded_values {
314                let metadata = match self.page_reader.peek_next_page()? {
315                    None => return Ok(num_records - remaining_records),
316                    Some(metadata) => metadata,
317                };
318
319                // If dictionary, we must read it
320                if metadata.is_dict {
321                    self.read_dictionary_page()?;
322                    continue;
323                }
324
325                // If page has less rows than the remaining records to
326                // be skipped, skip entire page
327                let rows = metadata.num_rows.or_else(|| {
328                    // If no repetition levels, num_levels == num_rows
329                    self.rep_level_decoder
330                        .is_none()
331                        .then_some(metadata.num_levels)?
332                });
333
334                if let Some(rows) = rows {
335                    if rows <= remaining_records {
336                        self.page_reader.skip_next_page()?;
337                        remaining_records -= rows;
338                        continue;
339                    }
340                }
341                // because self.num_buffered_values == self.num_decoded_values means
342                // we need reads a new page and set up the decoders for levels
343                if !self.read_new_page()? {
344                    return Ok(num_records - remaining_records);
345                }
346            }
347
348            // start skip values in page level
349
350            // The number of levels in the current data page
351            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
352
353            let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
354                Some(decoder) => {
355                    let (mut records_read, levels_read) =
356                        decoder.skip_rep_levels(remaining_records, remaining_levels)?;
357
358                    if levels_read == remaining_levels && self.has_record_delimiter {
359                        // Reached end of page, which implies records_read < remaining_records
360                        // as otherwise would have stopped reading before reaching the end
361                        assert!(records_read < remaining_records); // Sanity check
362                        records_read += decoder.flush_partial() as usize;
363                    }
364
365                    (records_read, levels_read)
366                }
367                None => {
368                    // No repetition levels, so each level corresponds to a row
369                    let levels = remaining_levels.min(remaining_records);
370                    (levels, levels)
371                }
372            };
373
374            self.num_decoded_values += rep_levels_read;
375            remaining_records -= records_read;
376
377            if self.num_buffered_values == self.num_decoded_values {
378                // Exhausted buffered page - no need to advance other decoders
379                continue;
380            }
381
382            let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
383                Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
384                None => (rep_levels_read, rep_levels_read),
385            };
386
387            if rep_levels_read != def_levels_read {
388                return Err(general_err!(
389                    "levels mismatch, read {} repetition levels and {} definition levels",
390                    rep_levels_read,
391                    def_levels_read
392                ));
393            }
394
395            let values = self.values_decoder.skip_values(values_read)?;
396            if values != values_read {
397                return Err(general_err!(
398                    "skipped {} values, expected {}",
399                    values,
400                    values_read
401                ));
402            }
403        }
404        Ok(num_records - remaining_records)
405    }
406
407    /// Read the next page as a dictionary page. If the next page is not a dictionary page,
408    /// this will return an error.
409    fn read_dictionary_page(&mut self) -> Result<()> {
410        match self.page_reader.get_next_page()? {
411            Some(Page::DictionaryPage {
412                buf,
413                num_values,
414                encoding,
415                is_sorted,
416            }) => self
417                .values_decoder
418                .set_dict(buf, num_values, encoding, is_sorted),
419            _ => Err(ParquetError::General(
420                "Invalid page. Expecting dictionary page".to_string(),
421            )),
422        }
423    }
424
425    /// Reads a new page and set up the decoders for levels, values or dictionary.
426    /// Returns false if there's no page left.
427    fn read_new_page(&mut self) -> Result<bool> {
428        loop {
429            match self.page_reader.get_next_page()? {
430                // No more page to read
431                None => return Ok(false),
432                Some(current_page) => {
433                    match current_page {
434                        // 1. Dictionary page: configure dictionary for this page.
435                        Page::DictionaryPage {
436                            buf,
437                            num_values,
438                            encoding,
439                            is_sorted,
440                        } => {
441                            self.values_decoder
442                                .set_dict(buf, num_values, encoding, is_sorted)?;
443                            continue;
444                        }
445                        // 2. Data page v1
446                        Page::DataPage {
447                            buf,
448                            num_values,
449                            encoding,
450                            def_level_encoding,
451                            rep_level_encoding,
452                            statistics: _,
453                        } => {
454                            self.num_buffered_values = num_values as _;
455                            self.num_decoded_values = 0;
456
457                            let max_rep_level = self.descr.max_rep_level();
458                            let max_def_level = self.descr.max_def_level();
459
460                            let mut offset = 0;
461
462                            if max_rep_level > 0 {
463                                let (bytes_read, level_data) = parse_v1_level(
464                                    max_rep_level,
465                                    num_values,
466                                    rep_level_encoding,
467                                    buf.slice(offset..),
468                                )?;
469                                offset += bytes_read;
470
471                                self.has_record_delimiter =
472                                    self.page_reader.at_record_boundary()?;
473
474                                self.rep_level_decoder
475                                    .as_mut()
476                                    .unwrap()
477                                    .set_data(rep_level_encoding, level_data);
478                            }
479
480                            if max_def_level > 0 {
481                                let (bytes_read, level_data) = parse_v1_level(
482                                    max_def_level,
483                                    num_values,
484                                    def_level_encoding,
485                                    buf.slice(offset..),
486                                )?;
487                                offset += bytes_read;
488
489                                self.def_level_decoder
490                                    .as_mut()
491                                    .unwrap()
492                                    .set_data(def_level_encoding, level_data);
493                            }
494
495                            self.values_decoder.set_data(
496                                encoding,
497                                buf.slice(offset..),
498                                num_values as usize,
499                                None,
500                            )?;
501                            return Ok(true);
502                        }
503                        // 3. Data page v2
504                        Page::DataPageV2 {
505                            buf,
506                            num_values,
507                            encoding,
508                            num_nulls,
509                            num_rows: _,
510                            def_levels_byte_len,
511                            rep_levels_byte_len,
512                            is_compressed: _,
513                            statistics: _,
514                        } => {
515                            if num_nulls > num_values {
516                                return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
517                            }
518
519                            self.num_buffered_values = num_values as _;
520                            self.num_decoded_values = 0;
521
522                            // DataPage v2 only supports RLE encoding for repetition
523                            // levels
524                            if self.descr.max_rep_level() > 0 {
525                                // Technically a DataPage v2 should not write a record
526                                // across multiple pages, however, the parquet writer
527                                // used to do this so we preserve backwards compatibility
528                                self.has_record_delimiter =
529                                    self.page_reader.at_record_boundary()?;
530
531                                self.rep_level_decoder.as_mut().unwrap().set_data(
532                                    Encoding::RLE,
533                                    buf.slice(..rep_levels_byte_len as usize),
534                                );
535                            }
536
537                            // DataPage v2 only supports RLE encoding for definition
538                            // levels
539                            if self.descr.max_def_level() > 0 {
540                                self.def_level_decoder.as_mut().unwrap().set_data(
541                                    Encoding::RLE,
542                                    buf.slice(
543                                        rep_levels_byte_len as usize
544                                            ..(rep_levels_byte_len + def_levels_byte_len) as usize,
545                                    ),
546                                );
547                            }
548
549                            self.values_decoder.set_data(
550                                encoding,
551                                buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
552                                num_values as usize,
553                                Some((num_values - num_nulls) as usize),
554                            )?;
555                            return Ok(true);
556                        }
557                    };
558                }
559            }
560        }
561    }
562
563    /// Check whether there is more data to read from this column,
564    /// If the current page is fully decoded, this will load the next page
565    /// (if it exists) into the buffer
566    #[inline]
567    pub(crate) fn has_next(&mut self) -> Result<bool> {
568        if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
569            // TODO: should we return false if read_new_page() = true and
570            // num_buffered_values = 0?
571            if !self.read_new_page()? {
572                Ok(false)
573            } else {
574                Ok(self.num_buffered_values != 0)
575            }
576        } else {
577            Ok(true)
578        }
579    }
580}
581
582fn parse_v1_level(
583    max_level: i16,
584    num_buffered_values: u32,
585    encoding: Encoding,
586    buf: Bytes,
587) -> Result<(usize, Bytes)> {
588    match encoding {
589        Encoding::RLE => {
590            let i32_size = std::mem::size_of::<i32>();
591            let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
592            Ok((
593                i32_size + data_size,
594                buf.slice(i32_size..i32_size + data_size),
595            ))
596        }
597        #[allow(deprecated)]
598        Encoding::BIT_PACKED => {
599            let bit_width = num_required_bits(max_level as u64);
600            let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
601            Ok((num_bytes, buf.slice(..num_bytes)))
602        }
603        _ => Err(general_err!("invalid level encoding: {}", encoding)),
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610
611    use rand::distributions::uniform::SampleUniform;
612    use std::{collections::VecDeque, sync::Arc};
613
614    use crate::basic::Type as PhysicalType;
615    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
616    use crate::util::test_common::page_util::InMemoryPageReader;
617    use crate::util::test_common::rand_gen::make_pages;
618
619    const NUM_LEVELS: usize = 128;
620    const NUM_PAGES: usize = 2;
621    const MAX_DEF_LEVEL: i16 = 5;
622    const MAX_REP_LEVEL: i16 = 5;
623
624    // Macro to generate test cases
625    macro_rules! test {
626        // branch for generating i32 cases
627        ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
628     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
629            test_internal!(
630                $test_func,
631                Int32Type,
632                get_test_int32_type,
633                $func,
634                $def_level,
635                $rep_level,
636                $num_pages,
637                $num_levels,
638                $batch_size,
639                $min,
640                $max
641            );
642        };
643        // branch for generating i64 cases
644        ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
645     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
646            test_internal!(
647                $test_func,
648                Int64Type,
649                get_test_int64_type,
650                $func,
651                $def_level,
652                $rep_level,
653                $num_pages,
654                $num_levels,
655                $batch_size,
656                $min,
657                $max
658            );
659        };
660    }
661
662    macro_rules! test_internal {
663        ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
664     $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
665     $min:expr, $max:expr) => {
666            #[test]
667            fn $test_func() {
668                let desc = Arc::new(ColumnDescriptor::new(
669                    Arc::new($pty()),
670                    $def_level,
671                    $rep_level,
672                    ColumnPath::new(Vec::new()),
673                ));
674                let mut tester = ColumnReaderTester::<$ty>::new();
675                tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
676            }
677        };
678    }
679
680    test!(
681        test_read_plain_v1_int32,
682        i32,
683        plain_v1,
684        MAX_DEF_LEVEL,
685        MAX_REP_LEVEL,
686        NUM_PAGES,
687        NUM_LEVELS,
688        16,
689        i32::MIN,
690        i32::MAX
691    );
692    test!(
693        test_read_plain_v2_int32,
694        i32,
695        plain_v2,
696        MAX_DEF_LEVEL,
697        MAX_REP_LEVEL,
698        NUM_PAGES,
699        NUM_LEVELS,
700        16,
701        i32::MIN,
702        i32::MAX
703    );
704
705    test!(
706        test_read_plain_v1_int32_uneven,
707        i32,
708        plain_v1,
709        MAX_DEF_LEVEL,
710        MAX_REP_LEVEL,
711        NUM_PAGES,
712        NUM_LEVELS,
713        17,
714        i32::MIN,
715        i32::MAX
716    );
717    test!(
718        test_read_plain_v2_int32_uneven,
719        i32,
720        plain_v2,
721        MAX_DEF_LEVEL,
722        MAX_REP_LEVEL,
723        NUM_PAGES,
724        NUM_LEVELS,
725        17,
726        i32::MIN,
727        i32::MAX
728    );
729
730    test!(
731        test_read_plain_v1_int32_multi_page,
732        i32,
733        plain_v1,
734        MAX_DEF_LEVEL,
735        MAX_REP_LEVEL,
736        NUM_PAGES,
737        NUM_LEVELS,
738        512,
739        i32::MIN,
740        i32::MAX
741    );
742    test!(
743        test_read_plain_v2_int32_multi_page,
744        i32,
745        plain_v2,
746        MAX_DEF_LEVEL,
747        MAX_REP_LEVEL,
748        NUM_PAGES,
749        NUM_LEVELS,
750        512,
751        i32::MIN,
752        i32::MAX
753    );
754
755    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
756    test!(
757        test_read_plain_v1_int32_required_non_repeated,
758        i32,
759        plain_v1,
760        0,
761        0,
762        NUM_PAGES,
763        NUM_LEVELS,
764        16,
765        i32::MIN,
766        i32::MAX
767    );
768    test!(
769        test_read_plain_v2_int32_required_non_repeated,
770        i32,
771        plain_v2,
772        0,
773        0,
774        NUM_PAGES,
775        NUM_LEVELS,
776        16,
777        i32::MIN,
778        i32::MAX
779    );
780
781    test!(
782        test_read_plain_v1_int64,
783        i64,
784        plain_v1,
785        1,
786        1,
787        NUM_PAGES,
788        NUM_LEVELS,
789        16,
790        i64::MIN,
791        i64::MAX
792    );
793    test!(
794        test_read_plain_v2_int64,
795        i64,
796        plain_v2,
797        1,
798        1,
799        NUM_PAGES,
800        NUM_LEVELS,
801        16,
802        i64::MIN,
803        i64::MAX
804    );
805
806    test!(
807        test_read_plain_v1_int64_uneven,
808        i64,
809        plain_v1,
810        1,
811        1,
812        NUM_PAGES,
813        NUM_LEVELS,
814        17,
815        i64::MIN,
816        i64::MAX
817    );
818    test!(
819        test_read_plain_v2_int64_uneven,
820        i64,
821        plain_v2,
822        1,
823        1,
824        NUM_PAGES,
825        NUM_LEVELS,
826        17,
827        i64::MIN,
828        i64::MAX
829    );
830
831    test!(
832        test_read_plain_v1_int64_multi_page,
833        i64,
834        plain_v1,
835        1,
836        1,
837        NUM_PAGES,
838        NUM_LEVELS,
839        512,
840        i64::MIN,
841        i64::MAX
842    );
843    test!(
844        test_read_plain_v2_int64_multi_page,
845        i64,
846        plain_v2,
847        1,
848        1,
849        NUM_PAGES,
850        NUM_LEVELS,
851        512,
852        i64::MIN,
853        i64::MAX
854    );
855
856    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
857    test!(
858        test_read_plain_v1_int64_required_non_repeated,
859        i64,
860        plain_v1,
861        0,
862        0,
863        NUM_PAGES,
864        NUM_LEVELS,
865        16,
866        i64::MIN,
867        i64::MAX
868    );
869    test!(
870        test_read_plain_v2_int64_required_non_repeated,
871        i64,
872        plain_v2,
873        0,
874        0,
875        NUM_PAGES,
876        NUM_LEVELS,
877        16,
878        i64::MIN,
879        i64::MAX
880    );
881
882    test!(
883        test_read_dict_v1_int32_small,
884        i32,
885        dict_v1,
886        MAX_DEF_LEVEL,
887        MAX_REP_LEVEL,
888        2,
889        2,
890        16,
891        0,
892        3
893    );
894    test!(
895        test_read_dict_v2_int32_small,
896        i32,
897        dict_v2,
898        MAX_DEF_LEVEL,
899        MAX_REP_LEVEL,
900        2,
901        2,
902        16,
903        0,
904        3
905    );
906
907    test!(
908        test_read_dict_v1_int32,
909        i32,
910        dict_v1,
911        MAX_DEF_LEVEL,
912        MAX_REP_LEVEL,
913        NUM_PAGES,
914        NUM_LEVELS,
915        16,
916        0,
917        3
918    );
919    test!(
920        test_read_dict_v2_int32,
921        i32,
922        dict_v2,
923        MAX_DEF_LEVEL,
924        MAX_REP_LEVEL,
925        NUM_PAGES,
926        NUM_LEVELS,
927        16,
928        0,
929        3
930    );
931
932    test!(
933        test_read_dict_v1_int32_uneven,
934        i32,
935        dict_v1,
936        MAX_DEF_LEVEL,
937        MAX_REP_LEVEL,
938        NUM_PAGES,
939        NUM_LEVELS,
940        17,
941        0,
942        3
943    );
944    test!(
945        test_read_dict_v2_int32_uneven,
946        i32,
947        dict_v2,
948        MAX_DEF_LEVEL,
949        MAX_REP_LEVEL,
950        NUM_PAGES,
951        NUM_LEVELS,
952        17,
953        0,
954        3
955    );
956
957    test!(
958        test_read_dict_v1_int32_multi_page,
959        i32,
960        dict_v1,
961        MAX_DEF_LEVEL,
962        MAX_REP_LEVEL,
963        NUM_PAGES,
964        NUM_LEVELS,
965        512,
966        0,
967        3
968    );
969    test!(
970        test_read_dict_v2_int32_multi_page,
971        i32,
972        dict_v2,
973        MAX_DEF_LEVEL,
974        MAX_REP_LEVEL,
975        NUM_PAGES,
976        NUM_LEVELS,
977        512,
978        0,
979        3
980    );
981
982    test!(
983        test_read_dict_v1_int64,
984        i64,
985        dict_v1,
986        MAX_DEF_LEVEL,
987        MAX_REP_LEVEL,
988        NUM_PAGES,
989        NUM_LEVELS,
990        16,
991        0,
992        3
993    );
994    test!(
995        test_read_dict_v2_int64,
996        i64,
997        dict_v2,
998        MAX_DEF_LEVEL,
999        MAX_REP_LEVEL,
1000        NUM_PAGES,
1001        NUM_LEVELS,
1002        16,
1003        0,
1004        3
1005    );
1006
1007    #[test]
1008    fn test_read_batch_values_only() {
1009        test_read_batch_int32(16, 0, 0);
1010    }
1011
1012    #[test]
1013    fn test_read_batch_values_def_levels() {
1014        test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
1015    }
1016
1017    #[test]
1018    fn test_read_batch_values_rep_levels() {
1019        test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1020    }
1021
1022    #[test]
1023    fn test_read_batch_values_def_rep_levels() {
1024        test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1025    }
1026
1027    #[test]
1028    fn test_read_batch_adjust_after_buffering_page() {
1029        // This test covers scenario when buffering new page results in setting number
1030        // of decoded values to 0, resulting on reading `batch_size` of values, but it is
1031        // larger than we can insert into slice (affects values and levels).
1032        //
1033        // Note: values are chosen to reproduce the issue.
1034        //
1035        let primitive_type = get_test_int32_type();
1036        let desc = Arc::new(ColumnDescriptor::new(
1037            Arc::new(primitive_type),
1038            1,
1039            1,
1040            ColumnPath::new(Vec::new()),
1041        ));
1042
1043        let num_pages = 2;
1044        let num_levels = 4;
1045        let batch_size = 5;
1046
1047        let mut tester = ColumnReaderTester::<Int32Type>::new();
1048        tester.test_read_batch(
1049            desc,
1050            Encoding::RLE_DICTIONARY,
1051            num_pages,
1052            num_levels,
1053            batch_size,
1054            i32::MIN,
1055            i32::MAX,
1056            false,
1057        );
1058    }
1059
1060    // ----------------------------------------------------------------------
1061    // Helper methods to make pages and test
1062    //
1063    // # Overview
1064    //
1065    // Most of the test functionality is implemented in `ColumnReaderTester`, which
1066    // provides some general data page test methods:
1067    // - `test_read_batch_general`
1068    // - `test_read_batch`
1069    //
1070    // There are also some high level wrappers that are part of `ColumnReaderTester`:
1071    // - `plain_v1` -> call `test_read_batch_general` with data page v1 and plain encoding
1072    // - `plain_v2` -> call `test_read_batch_general` with data page v2 and plain encoding
1073    // - `dict_v1` -> call `test_read_batch_general` with data page v1 + dictionary page
1074    // - `dict_v2` -> call `test_read_batch_general` with data page v2 + dictionary page
1075    //
1076    // And even higher level wrappers that simplify testing of almost the same test cases:
1077    // - `get_test_int32_type`, provides dummy schema type
1078    // - `get_test_int64_type`, provides dummy schema type
1079    // - `test_read_batch_int32`, wrapper for `read_batch` tests, since they are basically
1080    //   the same, just different def/rep levels and batch size.
1081    //
1082    // # Page assembly
1083    //
1084    // Page construction and generation of values, definition and repetition levels
1085    // happens in `make_pages` function.
1086    // All values are randomly generated based on provided min/max, levels are calculated
1087    // based on provided max level for column descriptor (which is basically either int32
1088    // or int64 type in tests) and `levels_per_page` variable.
1089    //
1090    // We use `DataPageBuilder` and its implementation `DataPageBuilderImpl` to actually
1091    // turn values, definition and repetition levels into data pages (either v1 or v2).
1092    //
1093    // Those data pages are then stored as part of `TestPageReader` (we just pass vector
1094    // of generated pages directly), which implements `PageReader` interface.
1095    //
1096    // # Comparison
1097    //
1098    // This allows us to pass test page reader into column reader, so we can test
1099    // functionality of column reader - see `test_read_batch`, where we create column
1100    // reader -> typed column reader, buffer values in `read_batch` method and compare
1101    // output with generated data.
1102
1103    // Returns dummy Parquet `Type` for primitive field, because most of our tests use
1104    // INT32 physical type.
1105    fn get_test_int32_type() -> SchemaType {
1106        SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1107            .with_repetition(Repetition::REQUIRED)
1108            .with_converted_type(ConvertedType::INT_32)
1109            .with_length(-1)
1110            .build()
1111            .expect("build() should be OK")
1112    }
1113
1114    // Returns dummy Parquet `Type` for INT64 physical type.
1115    fn get_test_int64_type() -> SchemaType {
1116        SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1117            .with_repetition(Repetition::REQUIRED)
1118            .with_converted_type(ConvertedType::INT_64)
1119            .with_length(-1)
1120            .build()
1121            .expect("build() should be OK")
1122    }
1123
1124    // Tests `read_batch()` functionality for INT32.
1125    //
1126    // This is a high level wrapper on `ColumnReaderTester` that allows us to specify some
1127    // boilerplate code for setting up definition/repetition levels and column descriptor.
1128    fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1129        let primitive_type = get_test_int32_type();
1130
1131        let desc = Arc::new(ColumnDescriptor::new(
1132            Arc::new(primitive_type),
1133            max_def_level,
1134            max_rep_level,
1135            ColumnPath::new(Vec::new()),
1136        ));
1137
1138        let mut tester = ColumnReaderTester::<Int32Type>::new();
1139        tester.test_read_batch(
1140            desc,
1141            Encoding::RLE_DICTIONARY,
1142            NUM_PAGES,
1143            NUM_LEVELS,
1144            batch_size,
1145            i32::MIN,
1146            i32::MAX,
1147            false,
1148        );
1149    }
1150
1151    struct ColumnReaderTester<T: DataType>
1152    where
1153        T::T: PartialOrd + SampleUniform + Copy,
1154    {
1155        rep_levels: Vec<i16>,
1156        def_levels: Vec<i16>,
1157        values: Vec<T::T>,
1158    }
1159
1160    impl<T: DataType> ColumnReaderTester<T>
1161    where
1162        T::T: PartialOrd + SampleUniform + Copy,
1163    {
1164        pub fn new() -> Self {
1165            Self {
1166                rep_levels: Vec::new(),
1167                def_levels: Vec::new(),
1168                values: Vec::new(),
1169            }
1170        }
1171
1172        // Method to generate and test data pages v1
1173        fn plain_v1(
1174            &mut self,
1175            desc: ColumnDescPtr,
1176            num_pages: usize,
1177            num_levels: usize,
1178            batch_size: usize,
1179            min: T::T,
1180            max: T::T,
1181        ) {
1182            self.test_read_batch_general(
1183                desc,
1184                Encoding::PLAIN,
1185                num_pages,
1186                num_levels,
1187                batch_size,
1188                min,
1189                max,
1190                false,
1191            );
1192        }
1193
1194        // Method to generate and test data pages v2
1195        fn plain_v2(
1196            &mut self,
1197            desc: ColumnDescPtr,
1198            num_pages: usize,
1199            num_levels: usize,
1200            batch_size: usize,
1201            min: T::T,
1202            max: T::T,
1203        ) {
1204            self.test_read_batch_general(
1205                desc,
1206                Encoding::PLAIN,
1207                num_pages,
1208                num_levels,
1209                batch_size,
1210                min,
1211                max,
1212                true,
1213            );
1214        }
1215
1216        // Method to generate and test dictionary page + data pages v1
1217        fn dict_v1(
1218            &mut self,
1219            desc: ColumnDescPtr,
1220            num_pages: usize,
1221            num_levels: usize,
1222            batch_size: usize,
1223            min: T::T,
1224            max: T::T,
1225        ) {
1226            self.test_read_batch_general(
1227                desc,
1228                Encoding::RLE_DICTIONARY,
1229                num_pages,
1230                num_levels,
1231                batch_size,
1232                min,
1233                max,
1234                false,
1235            );
1236        }
1237
1238        // Method to generate and test dictionary page + data pages v2
1239        fn dict_v2(
1240            &mut self,
1241            desc: ColumnDescPtr,
1242            num_pages: usize,
1243            num_levels: usize,
1244            batch_size: usize,
1245            min: T::T,
1246            max: T::T,
1247        ) {
1248            self.test_read_batch_general(
1249                desc,
1250                Encoding::RLE_DICTIONARY,
1251                num_pages,
1252                num_levels,
1253                batch_size,
1254                min,
1255                max,
1256                true,
1257            );
1258        }
1259
1260        // Helper function for the general case of `read_batch()` where `values`,
1261        // `def_levels` and `rep_levels` are always provided with enough space.
1262        #[allow(clippy::too_many_arguments)]
1263        fn test_read_batch_general(
1264            &mut self,
1265            desc: ColumnDescPtr,
1266            encoding: Encoding,
1267            num_pages: usize,
1268            num_levels: usize,
1269            batch_size: usize,
1270            min: T::T,
1271            max: T::T,
1272            use_v2: bool,
1273        ) {
1274            self.test_read_batch(
1275                desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1276            );
1277        }
1278
1279        // Helper function to test `read_batch()` method with custom buffers for values,
1280        // definition and repetition levels.
1281        #[allow(clippy::too_many_arguments)]
1282        fn test_read_batch(
1283            &mut self,
1284            desc: ColumnDescPtr,
1285            encoding: Encoding,
1286            num_pages: usize,
1287            num_levels: usize,
1288            batch_size: usize,
1289            min: T::T,
1290            max: T::T,
1291            use_v2: bool,
1292        ) {
1293            let mut pages = VecDeque::new();
1294            make_pages::<T>(
1295                desc.clone(),
1296                encoding,
1297                num_pages,
1298                num_levels,
1299                min,
1300                max,
1301                &mut self.def_levels,
1302                &mut self.rep_levels,
1303                &mut self.values,
1304                &mut pages,
1305                use_v2,
1306            );
1307            let max_def_level = desc.max_def_level();
1308            let max_rep_level = desc.max_rep_level();
1309            let page_reader = InMemoryPageReader::new(pages);
1310            let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1311            let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1312
1313            let mut values = Vec::new();
1314            let mut def_levels = Vec::new();
1315            let mut rep_levels = Vec::new();
1316
1317            let mut curr_values_read = 0;
1318            let mut curr_levels_read = 0;
1319            loop {
1320                let (_, values_read, levels_read) = typed_column_reader
1321                    .read_records(
1322                        batch_size,
1323                        Some(&mut def_levels),
1324                        Some(&mut rep_levels),
1325                        &mut values,
1326                    )
1327                    .expect("read_batch() should be OK");
1328
1329                curr_values_read += values_read;
1330                curr_levels_read += levels_read;
1331
1332                if values_read == 0 && levels_read == 0 {
1333                    break;
1334                }
1335            }
1336
1337            assert_eq!(values, self.values, "values content doesn't match");
1338
1339            if max_def_level > 0 {
1340                assert_eq!(
1341                    def_levels, self.def_levels,
1342                    "definition levels content doesn't match"
1343                );
1344            }
1345
1346            if max_rep_level > 0 {
1347                assert_eq!(
1348                    rep_levels, self.rep_levels,
1349                    "repetition levels content doesn't match"
1350                );
1351            }
1352
1353            assert!(
1354                curr_levels_read >= curr_values_read,
1355                "expected levels read to be greater than values read"
1356            );
1357        }
1358    }
1359}