parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use std::collections::VecDeque;
22use std::iter;
23use std::{fs::File, io::Read, path::Path, sync::Arc};
24
25use crate::basic::{Encoding, Type};
26use crate::bloom_filter::Sbbf;
27use crate::column::page::{Page, PageMetadata, PageReader};
28use crate::compression::{create_codec, Codec};
29use crate::errors::{ParquetError, Result};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35    statistics,
36};
37use crate::format::{PageHeader, PageLocation, PageType};
38use crate::record::reader::RowIter;
39use crate::record::Row;
40use crate::schema::types::Type as SchemaType;
41use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
42use bytes::Bytes;
43use thrift::protocol::TCompactInputProtocol;
44
45impl TryFrom<File> for SerializedFileReader<File> {
46    type Error = ParquetError;
47
48    fn try_from(file: File) -> Result<Self> {
49        Self::new(file)
50    }
51}
52
53impl TryFrom<&Path> for SerializedFileReader<File> {
54    type Error = ParquetError;
55
56    fn try_from(path: &Path) -> Result<Self> {
57        let file = File::open(path)?;
58        Self::try_from(file)
59    }
60}
61
62impl TryFrom<String> for SerializedFileReader<File> {
63    type Error = ParquetError;
64
65    fn try_from(path: String) -> Result<Self> {
66        Self::try_from(Path::new(&path))
67    }
68}
69
70impl TryFrom<&str> for SerializedFileReader<File> {
71    type Error = ParquetError;
72
73    fn try_from(path: &str) -> Result<Self> {
74        Self::try_from(Path::new(&path))
75    }
76}
77
78/// Conversion into a [`RowIter`]
79/// using the full file schema over all row groups.
80impl IntoIterator for SerializedFileReader<File> {
81    type Item = Result<Row>;
82    type IntoIter = RowIter<'static>;
83
84    fn into_iter(self) -> Self::IntoIter {
85        RowIter::from_file_into(Box::new(self))
86    }
87}
88
89// ----------------------------------------------------------------------
90// Implementations of file & row group readers
91
92/// A serialized implementation for Parquet [`FileReader`].
93pub struct SerializedFileReader<R: ChunkReader> {
94    chunk_reader: Arc<R>,
95    metadata: Arc<ParquetMetaData>,
96    props: ReaderPropertiesPtr,
97}
98
99/// A predicate for filtering row groups, invoked with the metadata and index
100/// of each row group in the file. Only row groups for which the predicate
101/// evaluates to `true` will be scanned
102pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
103
104/// A builder for [`ReadOptions`].
105/// For the predicates that are added to the builder,
106/// they will be chained using 'AND' to filter the row groups.
107#[derive(Default)]
108pub struct ReadOptionsBuilder {
109    predicates: Vec<ReadGroupPredicate>,
110    enable_page_index: bool,
111    props: Option<ReaderProperties>,
112}
113
114impl ReadOptionsBuilder {
115    /// New builder
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Add a predicate on row group metadata to the reading option,
121    /// Filter only row groups that match the predicate criteria
122    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
123        self.predicates.push(predicate);
124        self
125    }
126
127    /// Add a range predicate on filtering row groups if their midpoints are within
128    /// the Closed-Open range `[start..end) {x | start <= x < end}`
129    pub fn with_range(mut self, start: i64, end: i64) -> Self {
130        assert!(start < end);
131        let predicate = move |rg: &RowGroupMetaData, _: usize| {
132            let mid = get_midpoint_offset(rg);
133            mid >= start && mid < end
134        };
135        self.predicates.push(Box::new(predicate));
136        self
137    }
138
139    /// Enable reading the page index structures described in
140    /// "[Column Index] Layout to Support Page Skipping"
141    ///
142    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
143    pub fn with_page_index(mut self) -> Self {
144        self.enable_page_index = true;
145        self
146    }
147
148    /// Set the [`ReaderProperties`] configuration.
149    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
150        self.props = Some(properties);
151        self
152    }
153
154    /// Seal the builder and return the read options
155    pub fn build(self) -> ReadOptions {
156        let props = self
157            .props
158            .unwrap_or_else(|| ReaderProperties::builder().build());
159        ReadOptions {
160            predicates: self.predicates,
161            enable_page_index: self.enable_page_index,
162            props,
163        }
164    }
165}
166
167/// A collection of options for reading a Parquet file.
168///
169/// Currently, only predicates on row group metadata are supported.
170/// All predicates will be chained using 'AND' to filter the row groups.
171pub struct ReadOptions {
172    predicates: Vec<ReadGroupPredicate>,
173    enable_page_index: bool,
174    props: ReaderProperties,
175}
176
177impl<R: 'static + ChunkReader> SerializedFileReader<R> {
178    /// Creates file reader from a Parquet file.
179    /// Returns error if Parquet file does not exist or is corrupt.
180    pub fn new(chunk_reader: R) -> Result<Self> {
181        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
182        let props = Arc::new(ReaderProperties::builder().build());
183        Ok(Self {
184            chunk_reader: Arc::new(chunk_reader),
185            metadata: Arc::new(metadata),
186            props,
187        })
188    }
189
190    /// Creates file reader from a Parquet file with read options.
191    /// Returns error if Parquet file does not exist or is corrupt.
192    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
193        let mut metadata_builder = ParquetMetaDataReader::new()
194            .parse_and_finish(&chunk_reader)?
195            .into_builder();
196        let mut predicates = options.predicates;
197
198        // Filter row groups based on the predicates
199        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
200            let mut keep = true;
201            for predicate in &mut predicates {
202                if !predicate(&rg_meta, i) {
203                    keep = false;
204                    break;
205                }
206            }
207            if keep {
208                metadata_builder = metadata_builder.add_row_group(rg_meta);
209            }
210        }
211
212        let mut metadata = metadata_builder.build();
213
214        // If page indexes are desired, build them with the filtered set of row groups
215        if options.enable_page_index {
216            let mut reader =
217                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
218            reader.read_page_indexes(&chunk_reader)?;
219            metadata = reader.finish()?;
220        }
221
222        Ok(Self {
223            chunk_reader: Arc::new(chunk_reader),
224            metadata: Arc::new(metadata),
225            props: Arc::new(options.props),
226        })
227    }
228}
229
230/// Get midpoint offset for a row group
231fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
232    let col = meta.column(0);
233    let mut offset = col.data_page_offset();
234    if let Some(dic_offset) = col.dictionary_page_offset() {
235        if offset > dic_offset {
236            offset = dic_offset
237        }
238    };
239    offset + meta.compressed_size() / 2
240}
241
242impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
243    fn metadata(&self) -> &ParquetMetaData {
244        &self.metadata
245    }
246
247    fn num_row_groups(&self) -> usize {
248        self.metadata.num_row_groups()
249    }
250
251    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
252        let row_group_metadata = self.metadata.row_group(i);
253        // Row groups should be processed sequentially.
254        let props = Arc::clone(&self.props);
255        let f = Arc::clone(&self.chunk_reader);
256        Ok(Box::new(SerializedRowGroupReader::new(
257            f,
258            row_group_metadata,
259            self.metadata.offset_index().map(|x| x[i].as_slice()),
260            props,
261        )?))
262    }
263
264    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
265        RowIter::from_file(projection, self)
266    }
267}
268
269/// A serialized implementation for Parquet [`RowGroupReader`].
270pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
271    chunk_reader: Arc<R>,
272    metadata: &'a RowGroupMetaData,
273    offset_index: Option<&'a [OffsetIndexMetaData]>,
274    props: ReaderPropertiesPtr,
275    bloom_filters: Vec<Option<Sbbf>>,
276}
277
278impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
279    /// Creates new row group reader from a file, row group metadata and custom config.
280    pub fn new(
281        chunk_reader: Arc<R>,
282        metadata: &'a RowGroupMetaData,
283        offset_index: Option<&'a [OffsetIndexMetaData]>,
284        props: ReaderPropertiesPtr,
285    ) -> Result<Self> {
286        let bloom_filters = if props.read_bloom_filter() {
287            metadata
288                .columns()
289                .iter()
290                .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
291                .collect::<Result<Vec<_>>>()?
292        } else {
293            iter::repeat(None).take(metadata.columns().len()).collect()
294        };
295        Ok(Self {
296            chunk_reader,
297            metadata,
298            offset_index,
299            props,
300            bloom_filters,
301        })
302    }
303}
304
305impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
306    fn metadata(&self) -> &RowGroupMetaData {
307        self.metadata
308    }
309
310    fn num_columns(&self) -> usize {
311        self.metadata.num_columns()
312    }
313
314    // TODO: fix PARQUET-816
315    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
316        let col = self.metadata.column(i);
317
318        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
319
320        let props = Arc::clone(&self.props);
321        Ok(Box::new(SerializedPageReader::new_with_properties(
322            Arc::clone(&self.chunk_reader),
323            col,
324            self.metadata.num_rows() as usize,
325            page_locations,
326            props,
327        )?))
328    }
329
330    /// get bloom filter for the `i`th column
331    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
332        self.bloom_filters[i].as_ref()
333    }
334
335    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
336        RowIter::from_row_group(projection, self)
337    }
338}
339
340/// Reads a [`PageHeader`] from the provided [`Read`]
341pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
342    let mut prot = TCompactInputProtocol::new(input);
343    let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
344    Ok(page_header)
345}
346
347/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read
348fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
349    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
350    struct TrackedRead<R> {
351        inner: R,
352        bytes_read: usize,
353    }
354
355    impl<R: Read> Read for TrackedRead<R> {
356        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
357            let v = self.inner.read(buf)?;
358            self.bytes_read += v;
359            Ok(v)
360        }
361    }
362
363    let mut tracked = TrackedRead {
364        inner: input,
365        bytes_read: 0,
366    };
367    let header = read_page_header(&mut tracked)?;
368    Ok((tracked.bytes_read, header))
369}
370
371/// Decodes a [`Page`] from the provided `buffer`
372pub(crate) fn decode_page(
373    page_header: PageHeader,
374    buffer: Bytes,
375    physical_type: Type,
376    decompressor: Option<&mut Box<dyn Codec>>,
377) -> Result<Page> {
378    // Verify the 32-bit CRC checksum of the page
379    #[cfg(feature = "crc")]
380    if let Some(expected_crc) = page_header.crc {
381        let crc = crc32fast::hash(&buffer);
382        if crc != expected_crc as u32 {
383            return Err(general_err!("Page CRC checksum mismatch"));
384        }
385    }
386
387    // When processing data page v2, depending on enabled compression for the
388    // page, we should account for uncompressed data ('offset') of
389    // repetition and definition levels.
390    //
391    // We always use 0 offset for other pages other than v2, `true` flag means
392    // that compression will be applied if decompressor is defined
393    let mut offset: usize = 0;
394    let mut can_decompress = true;
395
396    if let Some(ref header_v2) = page_header.data_page_header_v2 {
397        offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
398            as usize;
399        // When is_compressed flag is missing the page is considered compressed
400        can_decompress = header_v2.is_compressed.unwrap_or(true);
401    }
402
403    // TODO: page header could be huge because of statistics. We should set a
404    // maximum page header size and abort if that is exceeded.
405    let buffer = match decompressor {
406        Some(decompressor) if can_decompress => {
407            let uncompressed_size = page_header.uncompressed_page_size as usize;
408            let mut decompressed = Vec::with_capacity(uncompressed_size);
409            let compressed = &buffer.as_ref()[offset..];
410            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
411            decompressor.decompress(
412                compressed,
413                &mut decompressed,
414                Some(uncompressed_size - offset),
415            )?;
416
417            if decompressed.len() != uncompressed_size {
418                return Err(general_err!(
419                    "Actual decompressed size doesn't match the expected one ({} vs {})",
420                    decompressed.len(),
421                    uncompressed_size
422                ));
423            }
424
425            Bytes::from(decompressed)
426        }
427        _ => buffer,
428    };
429
430    let result = match page_header.type_ {
431        PageType::DICTIONARY_PAGE => {
432            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433                ParquetError::General("Missing dictionary page header".to_string())
434            })?;
435            let is_sorted = dict_header.is_sorted.unwrap_or(false);
436            Page::DictionaryPage {
437                buf: buffer,
438                num_values: dict_header.num_values as u32,
439                encoding: Encoding::try_from(dict_header.encoding)?,
440                is_sorted,
441            }
442        }
443        PageType::DATA_PAGE => {
444            let header = page_header
445                .data_page_header
446                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447            Page::DataPage {
448                buf: buffer,
449                num_values: header.num_values as u32,
450                encoding: Encoding::try_from(header.encoding)?,
451                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
452                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
453                statistics: statistics::from_thrift(physical_type, header.statistics)?,
454            }
455        }
456        PageType::DATA_PAGE_V2 => {
457            let header = page_header
458                .data_page_header_v2
459                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460            let is_compressed = header.is_compressed.unwrap_or(true);
461            Page::DataPageV2 {
462                buf: buffer,
463                num_values: header.num_values as u32,
464                encoding: Encoding::try_from(header.encoding)?,
465                num_nulls: header.num_nulls as u32,
466                num_rows: header.num_rows as u32,
467                def_levels_byte_len: header.definition_levels_byte_length as u32,
468                rep_levels_byte_len: header.repetition_levels_byte_length as u32,
469                is_compressed,
470                statistics: statistics::from_thrift(physical_type, header.statistics)?,
471            }
472        }
473        _ => {
474            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
475            unimplemented!("Page type {:?} is not supported", page_header.type_)
476        }
477    };
478
479    Ok(result)
480}
481
482enum SerializedPageReaderState {
483    Values {
484        /// The current byte offset in the reader
485        offset: usize,
486
487        /// The length of the chunk in bytes
488        remaining_bytes: usize,
489
490        // If the next page header has already been "peeked", we will cache it and it`s length here
491        next_page_header: Option<Box<PageHeader>>,
492    },
493    Pages {
494        /// Remaining page locations
495        page_locations: VecDeque<PageLocation>,
496        /// Remaining dictionary location if any
497        dictionary_page: Option<PageLocation>,
498        /// The total number of rows in this column chunk
499        total_rows: usize,
500    },
501}
502
503/// A serialized implementation for Parquet [`PageReader`].
504pub struct SerializedPageReader<R: ChunkReader> {
505    /// The chunk reader
506    reader: Arc<R>,
507
508    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
509    decompressor: Option<Box<dyn Codec>>,
510
511    /// Column chunk type.
512    physical_type: Type,
513
514    state: SerializedPageReaderState,
515}
516
517impl<R: ChunkReader> SerializedPageReader<R> {
518    /// Creates a new serialized page reader from a chunk reader and metadata
519    pub fn new(
520        reader: Arc<R>,
521        meta: &ColumnChunkMetaData,
522        total_rows: usize,
523        page_locations: Option<Vec<PageLocation>>,
524    ) -> Result<Self> {
525        let props = Arc::new(ReaderProperties::builder().build());
526        SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props)
527    }
528
529    /// Creates a new serialized page with custom options.
530    pub fn new_with_properties(
531        reader: Arc<R>,
532        meta: &ColumnChunkMetaData,
533        total_rows: usize,
534        page_locations: Option<Vec<PageLocation>>,
535        props: ReaderPropertiesPtr,
536    ) -> Result<Self> {
537        let decompressor = create_codec(meta.compression(), props.codec_options())?;
538        let (start, len) = meta.byte_range();
539
540        let state = match page_locations {
541            Some(locations) => {
542                let dictionary_page = match locations.first() {
543                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
544                        offset: start as i64,
545                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
546                        first_row_index: 0,
547                    }),
548                    _ => None,
549                };
550
551                SerializedPageReaderState::Pages {
552                    page_locations: locations.into(),
553                    dictionary_page,
554                    total_rows,
555                }
556            }
557            None => SerializedPageReaderState::Values {
558                offset: start as usize,
559                remaining_bytes: len as usize,
560                next_page_header: None,
561            },
562        };
563
564        Ok(Self {
565            reader,
566            decompressor,
567            state,
568            physical_type: meta.column_type(),
569        })
570    }
571}
572
573impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
574    type Item = Result<Page>;
575
576    fn next(&mut self) -> Option<Self::Item> {
577        self.get_next_page().transpose()
578    }
579}
580
581impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
582    fn get_next_page(&mut self) -> Result<Option<Page>> {
583        loop {
584            let page = match &mut self.state {
585                SerializedPageReaderState::Values {
586                    offset,
587                    remaining_bytes: remaining,
588                    next_page_header,
589                } => {
590                    if *remaining == 0 {
591                        return Ok(None);
592                    }
593
594                    let mut read = self.reader.get_read(*offset as u64)?;
595                    let header = if let Some(header) = next_page_header.take() {
596                        *header
597                    } else {
598                        let (header_len, header) = read_page_header_len(&mut read)?;
599                        *offset += header_len;
600                        *remaining -= header_len;
601                        header
602                    };
603                    let data_len = header.compressed_page_size as usize;
604                    *offset += data_len;
605                    *remaining -= data_len;
606
607                    if header.type_ == PageType::INDEX_PAGE {
608                        continue;
609                    }
610
611                    let mut buffer = Vec::with_capacity(data_len);
612                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
613
614                    if read != data_len {
615                        return Err(eof_err!(
616                            "Expected to read {} bytes of page, read only {}",
617                            data_len,
618                            read
619                        ));
620                    }
621
622                    decode_page(
623                        header,
624                        Bytes::from(buffer),
625                        self.physical_type,
626                        self.decompressor.as_mut(),
627                    )?
628                }
629                SerializedPageReaderState::Pages {
630                    page_locations,
631                    dictionary_page,
632                    ..
633                } => {
634                    let front = match dictionary_page
635                        .take()
636                        .or_else(|| page_locations.pop_front())
637                    {
638                        Some(front) => front,
639                        None => return Ok(None),
640                    };
641
642                    let page_len = front.compressed_page_size as usize;
643
644                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
645
646                    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
647                    let header = PageHeader::read_from_in_protocol(&mut prot)?;
648                    let offset = buffer.len() - prot.as_slice().len();
649
650                    let bytes = buffer.slice(offset..);
651                    decode_page(
652                        header,
653                        bytes,
654                        self.physical_type,
655                        self.decompressor.as_mut(),
656                    )?
657                }
658            };
659
660            return Ok(Some(page));
661        }
662    }
663
664    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
665        match &mut self.state {
666            SerializedPageReaderState::Values {
667                offset,
668                remaining_bytes,
669                next_page_header,
670            } => {
671                loop {
672                    if *remaining_bytes == 0 {
673                        return Ok(None);
674                    }
675                    return if let Some(header) = next_page_header.as_ref() {
676                        if let Ok(page_meta) = (&**header).try_into() {
677                            Ok(Some(page_meta))
678                        } else {
679                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
680                            *next_page_header = None;
681                            continue;
682                        }
683                    } else {
684                        let mut read = self.reader.get_read(*offset as u64)?;
685                        let (header_len, header) = read_page_header_len(&mut read)?;
686                        *offset += header_len;
687                        *remaining_bytes -= header_len;
688                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
689                            Ok(Some(page_meta))
690                        } else {
691                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
692                            continue;
693                        };
694                        *next_page_header = Some(Box::new(header));
695                        page_meta
696                    };
697                }
698            }
699            SerializedPageReaderState::Pages {
700                page_locations,
701                dictionary_page,
702                total_rows,
703            } => {
704                if dictionary_page.is_some() {
705                    Ok(Some(PageMetadata {
706                        num_rows: None,
707                        num_levels: None,
708                        is_dict: true,
709                    }))
710                } else if let Some(page) = page_locations.front() {
711                    let next_rows = page_locations
712                        .get(1)
713                        .map(|x| x.first_row_index as usize)
714                        .unwrap_or(*total_rows);
715
716                    Ok(Some(PageMetadata {
717                        num_rows: Some(next_rows - page.first_row_index as usize),
718                        num_levels: None,
719                        is_dict: false,
720                    }))
721                } else {
722                    Ok(None)
723                }
724            }
725        }
726    }
727
728    fn skip_next_page(&mut self) -> Result<()> {
729        match &mut self.state {
730            SerializedPageReaderState::Values {
731                offset,
732                remaining_bytes,
733                next_page_header,
734            } => {
735                if let Some(buffered_header) = next_page_header.take() {
736                    // The next page header has already been peeked, so just advance the offset
737                    *offset += buffered_header.compressed_page_size as usize;
738                    *remaining_bytes -= buffered_header.compressed_page_size as usize;
739                } else {
740                    let mut read = self.reader.get_read(*offset as u64)?;
741                    let (header_len, header) = read_page_header_len(&mut read)?;
742                    let data_page_size = header.compressed_page_size as usize;
743                    *offset += header_len + data_page_size;
744                    *remaining_bytes -= header_len + data_page_size;
745                }
746                Ok(())
747            }
748            SerializedPageReaderState::Pages { page_locations, .. } => {
749                page_locations.pop_front();
750
751                Ok(())
752            }
753        }
754    }
755
756    fn at_record_boundary(&mut self) -> Result<bool> {
757        match &mut self.state {
758            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
759            SerializedPageReaderState::Pages { .. } => Ok(true),
760        }
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use bytes::Buf;
767
768    use crate::file::properties::{EnabledStatistics, WriterProperties};
769    use crate::format::BoundaryOrder;
770
771    use crate::basic::{self, ColumnOrder};
772    use crate::column::reader::ColumnReader;
773    use crate::data_type::private::ParquetValueType;
774    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
775    use crate::file::page_index::index::{Index, NativeIndex};
776    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
777    use crate::file::writer::SerializedFileWriter;
778    use crate::record::RowAccessor;
779    use crate::schema::parser::parse_message_type;
780    use crate::util::test_common::file_util::{get_test_file, get_test_path};
781
782    use super::*;
783
784    #[test]
785    fn test_cursor_and_file_has_the_same_behaviour() {
786        let mut buf: Vec<u8> = Vec::new();
787        get_test_file("alltypes_plain.parquet")
788            .read_to_end(&mut buf)
789            .unwrap();
790        let cursor = Bytes::from(buf);
791        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
792
793        let test_file = get_test_file("alltypes_plain.parquet");
794        let read_from_file = SerializedFileReader::new(test_file).unwrap();
795
796        let file_iter = read_from_file.get_row_iter(None).unwrap();
797        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
798
799        for (a, b) in file_iter.zip(cursor_iter) {
800            assert_eq!(a.unwrap(), b.unwrap())
801        }
802    }
803
804    #[test]
805    fn test_file_reader_try_from() {
806        // Valid file path
807        let test_file = get_test_file("alltypes_plain.parquet");
808        let test_path_buf = get_test_path("alltypes_plain.parquet");
809        let test_path = test_path_buf.as_path();
810        let test_path_str = test_path.to_str().unwrap();
811
812        let reader = SerializedFileReader::try_from(test_file);
813        assert!(reader.is_ok());
814
815        let reader = SerializedFileReader::try_from(test_path);
816        assert!(reader.is_ok());
817
818        let reader = SerializedFileReader::try_from(test_path_str);
819        assert!(reader.is_ok());
820
821        let reader = SerializedFileReader::try_from(test_path_str.to_string());
822        assert!(reader.is_ok());
823
824        // Invalid file path
825        let test_path = Path::new("invalid.parquet");
826        let test_path_str = test_path.to_str().unwrap();
827
828        let reader = SerializedFileReader::try_from(test_path);
829        assert!(reader.is_err());
830
831        let reader = SerializedFileReader::try_from(test_path_str);
832        assert!(reader.is_err());
833
834        let reader = SerializedFileReader::try_from(test_path_str.to_string());
835        assert!(reader.is_err());
836    }
837
838    #[test]
839    fn test_file_reader_into_iter() {
840        let path = get_test_path("alltypes_plain.parquet");
841        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
842        let iter = reader.into_iter();
843        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
844
845        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
846    }
847
848    #[test]
849    fn test_file_reader_into_iter_project() {
850        let path = get_test_path("alltypes_plain.parquet");
851        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
852        let schema = "message schema { OPTIONAL INT32 id; }";
853        let proj = parse_message_type(schema).ok();
854        let iter = reader.into_iter().project(proj).unwrap();
855        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
856
857        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
858    }
859
860    #[test]
861    fn test_reuse_file_chunk() {
862        // This test covers the case of maintaining the correct start position in a file
863        // stream for each column reader after initializing and moving to the next one
864        // (without necessarily reading the entire column).
865        let test_file = get_test_file("alltypes_plain.parquet");
866        let reader = SerializedFileReader::new(test_file).unwrap();
867        let row_group = reader.get_row_group(0).unwrap();
868
869        let mut page_readers = Vec::new();
870        for i in 0..row_group.num_columns() {
871            page_readers.push(row_group.get_column_page_reader(i).unwrap());
872        }
873
874        // Now buffer each col reader, we do not expect any failures like:
875        // General("underlying Thrift error: end of file")
876        for mut page_reader in page_readers {
877            assert!(page_reader.get_next_page().is_ok());
878        }
879    }
880
881    #[test]
882    fn test_file_reader() {
883        let test_file = get_test_file("alltypes_plain.parquet");
884        let reader_result = SerializedFileReader::new(test_file);
885        assert!(reader_result.is_ok());
886        let reader = reader_result.unwrap();
887
888        // Test contents in Parquet metadata
889        let metadata = reader.metadata();
890        assert_eq!(metadata.num_row_groups(), 1);
891
892        // Test contents in file metadata
893        let file_metadata = metadata.file_metadata();
894        assert!(file_metadata.created_by().is_some());
895        assert_eq!(
896            file_metadata.created_by().unwrap(),
897            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
898        );
899        assert!(file_metadata.key_value_metadata().is_none());
900        assert_eq!(file_metadata.num_rows(), 8);
901        assert_eq!(file_metadata.version(), 1);
902        assert_eq!(file_metadata.column_orders(), None);
903
904        // Test contents in row group metadata
905        let row_group_metadata = metadata.row_group(0);
906        assert_eq!(row_group_metadata.num_columns(), 11);
907        assert_eq!(row_group_metadata.num_rows(), 8);
908        assert_eq!(row_group_metadata.total_byte_size(), 671);
909        // Check each column order
910        for i in 0..row_group_metadata.num_columns() {
911            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
912        }
913
914        // Test row group reader
915        let row_group_reader_result = reader.get_row_group(0);
916        assert!(row_group_reader_result.is_ok());
917        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
918        assert_eq!(
919            row_group_reader.num_columns(),
920            row_group_metadata.num_columns()
921        );
922        assert_eq!(
923            row_group_reader.metadata().total_byte_size(),
924            row_group_metadata.total_byte_size()
925        );
926
927        // Test page readers
928        // TODO: test for every column
929        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
930        assert!(page_reader_0_result.is_ok());
931        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
932        let mut page_count = 0;
933        while let Ok(Some(page)) = page_reader_0.get_next_page() {
934            let is_expected_page = match page {
935                Page::DictionaryPage {
936                    buf,
937                    num_values,
938                    encoding,
939                    is_sorted,
940                } => {
941                    assert_eq!(buf.len(), 32);
942                    assert_eq!(num_values, 8);
943                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
944                    assert!(!is_sorted);
945                    true
946                }
947                Page::DataPage {
948                    buf,
949                    num_values,
950                    encoding,
951                    def_level_encoding,
952                    rep_level_encoding,
953                    statistics,
954                } => {
955                    assert_eq!(buf.len(), 11);
956                    assert_eq!(num_values, 8);
957                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
958                    assert_eq!(def_level_encoding, Encoding::RLE);
959                    #[allow(deprecated)]
960                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
961                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
962                    assert!(statistics.is_none());
963                    true
964                }
965                _ => false,
966            };
967            assert!(is_expected_page);
968            page_count += 1;
969        }
970        assert_eq!(page_count, 2);
971    }
972
973    #[test]
974    fn test_file_reader_datapage_v2() {
975        let test_file = get_test_file("datapage_v2.snappy.parquet");
976        let reader_result = SerializedFileReader::new(test_file);
977        assert!(reader_result.is_ok());
978        let reader = reader_result.unwrap();
979
980        // Test contents in Parquet metadata
981        let metadata = reader.metadata();
982        assert_eq!(metadata.num_row_groups(), 1);
983
984        // Test contents in file metadata
985        let file_metadata = metadata.file_metadata();
986        assert!(file_metadata.created_by().is_some());
987        assert_eq!(
988            file_metadata.created_by().unwrap(),
989            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
990        );
991        assert!(file_metadata.key_value_metadata().is_some());
992        assert_eq!(
993            file_metadata.key_value_metadata().to_owned().unwrap().len(),
994            1
995        );
996
997        assert_eq!(file_metadata.num_rows(), 5);
998        assert_eq!(file_metadata.version(), 1);
999        assert_eq!(file_metadata.column_orders(), None);
1000
1001        let row_group_metadata = metadata.row_group(0);
1002
1003        // Check each column order
1004        for i in 0..row_group_metadata.num_columns() {
1005            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1006        }
1007
1008        // Test row group reader
1009        let row_group_reader_result = reader.get_row_group(0);
1010        assert!(row_group_reader_result.is_ok());
1011        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1012        assert_eq!(
1013            row_group_reader.num_columns(),
1014            row_group_metadata.num_columns()
1015        );
1016        assert_eq!(
1017            row_group_reader.metadata().total_byte_size(),
1018            row_group_metadata.total_byte_size()
1019        );
1020
1021        // Test page readers
1022        // TODO: test for every column
1023        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1024        assert!(page_reader_0_result.is_ok());
1025        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1026        let mut page_count = 0;
1027        while let Ok(Some(page)) = page_reader_0.get_next_page() {
1028            let is_expected_page = match page {
1029                Page::DictionaryPage {
1030                    buf,
1031                    num_values,
1032                    encoding,
1033                    is_sorted,
1034                } => {
1035                    assert_eq!(buf.len(), 7);
1036                    assert_eq!(num_values, 1);
1037                    assert_eq!(encoding, Encoding::PLAIN);
1038                    assert!(!is_sorted);
1039                    true
1040                }
1041                Page::DataPageV2 {
1042                    buf,
1043                    num_values,
1044                    encoding,
1045                    num_nulls,
1046                    num_rows,
1047                    def_levels_byte_len,
1048                    rep_levels_byte_len,
1049                    is_compressed,
1050                    statistics,
1051                } => {
1052                    assert_eq!(buf.len(), 4);
1053                    assert_eq!(num_values, 5);
1054                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1055                    assert_eq!(num_nulls, 1);
1056                    assert_eq!(num_rows, 5);
1057                    assert_eq!(def_levels_byte_len, 2);
1058                    assert_eq!(rep_levels_byte_len, 0);
1059                    assert!(is_compressed);
1060                    assert!(statistics.is_some());
1061                    true
1062                }
1063                _ => false,
1064            };
1065            assert!(is_expected_page);
1066            page_count += 1;
1067        }
1068        assert_eq!(page_count, 2);
1069    }
1070
1071    #[test]
1072    fn test_page_iterator() {
1073        let file = get_test_file("alltypes_plain.parquet");
1074        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1075
1076        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1077
1078        // read first page
1079        let page = page_iterator.next();
1080        assert!(page.is_some());
1081        assert!(page.unwrap().is_ok());
1082
1083        // reach end of file
1084        let page = page_iterator.next();
1085        assert!(page.is_none());
1086
1087        let row_group_indices = Box::new(0..1);
1088        let mut page_iterator =
1089            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1090
1091        // read first page
1092        let page = page_iterator.next();
1093        assert!(page.is_some());
1094        assert!(page.unwrap().is_ok());
1095
1096        // reach end of file
1097        let page = page_iterator.next();
1098        assert!(page.is_none());
1099    }
1100
1101    #[test]
1102    fn test_file_reader_key_value_metadata() {
1103        let file = get_test_file("binary.parquet");
1104        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1105
1106        let metadata = file_reader
1107            .metadata
1108            .file_metadata()
1109            .key_value_metadata()
1110            .unwrap();
1111
1112        assert_eq!(metadata.len(), 3);
1113
1114        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1115
1116        assert_eq!(metadata[1].key, "writer.model.name");
1117        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1118
1119        assert_eq!(metadata[2].key, "parquet.proto.class");
1120        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1121    }
1122
1123    #[test]
1124    fn test_file_reader_optional_metadata() {
1125        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1126        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1127        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1128
1129        let row_group_metadata = file_reader.metadata.row_group(0);
1130        let col0_metadata = row_group_metadata.column(0);
1131
1132        // test optional bloom filter offset
1133        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1134
1135        // test page encoding stats
1136        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1137
1138        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1139        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1140        assert_eq!(page_encoding_stats.count, 1);
1141
1142        // test optional column index offset
1143        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1144        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1145
1146        // test optional offset index offset
1147        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1148        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1149    }
1150
1151    #[test]
1152    fn test_file_reader_with_no_filter() -> Result<()> {
1153        let test_file = get_test_file("alltypes_plain.parquet");
1154        let origin_reader = SerializedFileReader::new(test_file)?;
1155        // test initial number of row groups
1156        let metadata = origin_reader.metadata();
1157        assert_eq!(metadata.num_row_groups(), 1);
1158        Ok(())
1159    }
1160
1161    #[test]
1162    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1163        let test_file = get_test_file("alltypes_plain.parquet");
1164        let read_options = ReadOptionsBuilder::new()
1165            .with_predicate(Box::new(|_, _| false))
1166            .build();
1167        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1168        let metadata = reader.metadata();
1169        assert_eq!(metadata.num_row_groups(), 0);
1170        Ok(())
1171    }
1172
1173    #[test]
1174    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1175        let test_file = get_test_file("alltypes_plain.parquet");
1176        let origin_reader = SerializedFileReader::new(test_file)?;
1177        // test initial number of row groups
1178        let metadata = origin_reader.metadata();
1179        assert_eq!(metadata.num_row_groups(), 1);
1180        let mid = get_midpoint_offset(metadata.row_group(0));
1181
1182        let test_file = get_test_file("alltypes_plain.parquet");
1183        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1184        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1185        let metadata = reader.metadata();
1186        assert_eq!(metadata.num_row_groups(), 1);
1187
1188        let test_file = get_test_file("alltypes_plain.parquet");
1189        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1190        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1191        let metadata = reader.metadata();
1192        assert_eq!(metadata.num_row_groups(), 0);
1193        Ok(())
1194    }
1195
1196    #[test]
1197    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1198        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1199        let origin_reader = SerializedFileReader::new(test_file)?;
1200        let metadata = origin_reader.metadata();
1201        let mid = get_midpoint_offset(metadata.row_group(0));
1202
1203        // true, true predicate
1204        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1205        let read_options = ReadOptionsBuilder::new()
1206            .with_page_index()
1207            .with_predicate(Box::new(|_, _| true))
1208            .with_range(mid, mid + 1)
1209            .build();
1210        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1211        let metadata = reader.metadata();
1212        assert_eq!(metadata.num_row_groups(), 1);
1213        assert_eq!(metadata.column_index().unwrap().len(), 1);
1214        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1215
1216        // true, false predicate
1217        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1218        let read_options = ReadOptionsBuilder::new()
1219            .with_page_index()
1220            .with_predicate(Box::new(|_, _| true))
1221            .with_range(0, mid)
1222            .build();
1223        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1224        let metadata = reader.metadata();
1225        assert_eq!(metadata.num_row_groups(), 0);
1226        assert_eq!(metadata.column_index().unwrap().len(), 0);
1227        assert_eq!(metadata.offset_index().unwrap().len(), 0);
1228
1229        // false, true predicate
1230        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1231        let read_options = ReadOptionsBuilder::new()
1232            .with_page_index()
1233            .with_predicate(Box::new(|_, _| false))
1234            .with_range(mid, mid + 1)
1235            .build();
1236        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1237        let metadata = reader.metadata();
1238        assert_eq!(metadata.num_row_groups(), 0);
1239        assert_eq!(metadata.column_index().unwrap().len(), 0);
1240        assert_eq!(metadata.offset_index().unwrap().len(), 0);
1241
1242        // false, false predicate
1243        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1244        let read_options = ReadOptionsBuilder::new()
1245            .with_page_index()
1246            .with_predicate(Box::new(|_, _| false))
1247            .with_range(0, mid)
1248            .build();
1249        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1250        let metadata = reader.metadata();
1251        assert_eq!(metadata.num_row_groups(), 0);
1252        assert_eq!(metadata.column_index().unwrap().len(), 0);
1253        assert_eq!(metadata.offset_index().unwrap().len(), 0);
1254        Ok(())
1255    }
1256
1257    #[test]
1258    fn test_file_reader_invalid_metadata() {
1259        let data = [
1260            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1261            80, 65, 82, 49,
1262        ];
1263        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1264        assert_eq!(
1265            ret.err().unwrap().to_string(),
1266            "Parquet error: Could not parse metadata: bad data"
1267        );
1268    }
1269
1270    #[test]
1271    // Use java parquet-tools get below pageIndex info
1272    // !```
1273    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1274    // row group 0:
1275    // column index for column String:
1276    // Boundary order: ASCENDING
1277    // page-0  :
1278    // null count                 min                                  max
1279    // 0                          Hello                                today
1280    //
1281    // offset index for column String:
1282    // page-0   :
1283    // offset   compressed size       first row index
1284    // 4               152                     0
1285    ///```
1286    //
1287    fn test_page_index_reader() {
1288        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1289        let builder = ReadOptionsBuilder::new();
1290        //enable read page index
1291        let options = builder.with_page_index().build();
1292        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1293        let reader = reader_result.unwrap();
1294
1295        // Test contents in Parquet metadata
1296        let metadata = reader.metadata();
1297        assert_eq!(metadata.num_row_groups(), 1);
1298
1299        let column_index = metadata.column_index().unwrap();
1300
1301        // only one row group
1302        assert_eq!(column_index.len(), 1);
1303        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1304            index
1305        } else {
1306            unreachable!()
1307        };
1308
1309        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1310        let index_in_pages = &index.indexes;
1311
1312        //only one page group
1313        assert_eq!(index_in_pages.len(), 1);
1314
1315        let page0 = &index_in_pages[0];
1316        let min = page0.min.as_ref().unwrap();
1317        let max = page0.max.as_ref().unwrap();
1318        assert_eq!(b"Hello", min.as_bytes());
1319        assert_eq!(b"today", max.as_bytes());
1320
1321        let offset_indexes = metadata.offset_index().unwrap();
1322        // only one row group
1323        assert_eq!(offset_indexes.len(), 1);
1324        let offset_index = &offset_indexes[0];
1325        let page_offset = &offset_index[0].page_locations()[0];
1326
1327        assert_eq!(4, page_offset.offset);
1328        assert_eq!(152, page_offset.compressed_page_size);
1329        assert_eq!(0, page_offset.first_row_index);
1330    }
1331
1332    #[test]
1333    fn test_page_index_reader_out_of_order() {
1334        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1335        let options = ReadOptionsBuilder::new().with_page_index().build();
1336        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1337        let metadata = reader.metadata();
1338
1339        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1340        let columns = metadata.row_group(0).columns();
1341        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1342
1343        let a = read_columns_indexes(&test_file, columns).unwrap();
1344        let mut b = read_columns_indexes(&test_file, &reversed).unwrap();
1345        b.reverse();
1346        assert_eq!(a, b);
1347
1348        let a = read_offset_indexes(&test_file, columns).unwrap();
1349        let mut b = read_offset_indexes(&test_file, &reversed).unwrap();
1350        b.reverse();
1351        assert_eq!(a, b);
1352    }
1353
1354    #[test]
1355    fn test_page_index_reader_all_type() {
1356        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1357        let builder = ReadOptionsBuilder::new();
1358        //enable read page index
1359        let options = builder.with_page_index().build();
1360        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1361        let reader = reader_result.unwrap();
1362
1363        // Test contents in Parquet metadata
1364        let metadata = reader.metadata();
1365        assert_eq!(metadata.num_row_groups(), 1);
1366
1367        let column_index = metadata.column_index().unwrap();
1368        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1369
1370        // only one row group
1371        assert_eq!(column_index.len(), 1);
1372        let row_group_metadata = metadata.row_group(0);
1373
1374        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1375        assert!(!&column_index[0][0].is_sorted());
1376        let boundary_order = &column_index[0][0].get_boundary_order();
1377        assert!(boundary_order.is_some());
1378        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1379        if let Index::INT32(index) = &column_index[0][0] {
1380            check_native_page_index(
1381                index,
1382                325,
1383                get_row_group_min_max_bytes(row_group_metadata, 0),
1384                BoundaryOrder::UNORDERED,
1385            );
1386            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1387        } else {
1388            unreachable!()
1389        };
1390        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
1391        assert!(&column_index[0][1].is_sorted());
1392        if let Index::BOOLEAN(index) = &column_index[0][1] {
1393            assert_eq!(index.indexes.len(), 82);
1394            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1395        } else {
1396            unreachable!()
1397        };
1398        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1399        assert!(&column_index[0][2].is_sorted());
1400        if let Index::INT32(index) = &column_index[0][2] {
1401            check_native_page_index(
1402                index,
1403                325,
1404                get_row_group_min_max_bytes(row_group_metadata, 2),
1405                BoundaryOrder::ASCENDING,
1406            );
1407            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1408        } else {
1409            unreachable!()
1410        };
1411        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1412        assert!(&column_index[0][3].is_sorted());
1413        if let Index::INT32(index) = &column_index[0][3] {
1414            check_native_page_index(
1415                index,
1416                325,
1417                get_row_group_min_max_bytes(row_group_metadata, 3),
1418                BoundaryOrder::ASCENDING,
1419            );
1420            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1421        } else {
1422            unreachable!()
1423        };
1424        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1425        assert!(&column_index[0][4].is_sorted());
1426        if let Index::INT32(index) = &column_index[0][4] {
1427            check_native_page_index(
1428                index,
1429                325,
1430                get_row_group_min_max_bytes(row_group_metadata, 4),
1431                BoundaryOrder::ASCENDING,
1432            );
1433            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1434        } else {
1435            unreachable!()
1436        };
1437        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
1438        assert!(!&column_index[0][5].is_sorted());
1439        if let Index::INT64(index) = &column_index[0][5] {
1440            check_native_page_index(
1441                index,
1442                528,
1443                get_row_group_min_max_bytes(row_group_metadata, 5),
1444                BoundaryOrder::UNORDERED,
1445            );
1446            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1447        } else {
1448            unreachable!()
1449        };
1450        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
1451        assert!(&column_index[0][6].is_sorted());
1452        if let Index::FLOAT(index) = &column_index[0][6] {
1453            check_native_page_index(
1454                index,
1455                325,
1456                get_row_group_min_max_bytes(row_group_metadata, 6),
1457                BoundaryOrder::ASCENDING,
1458            );
1459            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1460        } else {
1461            unreachable!()
1462        };
1463        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
1464        assert!(!&column_index[0][7].is_sorted());
1465        if let Index::DOUBLE(index) = &column_index[0][7] {
1466            check_native_page_index(
1467                index,
1468                528,
1469                get_row_group_min_max_bytes(row_group_metadata, 7),
1470                BoundaryOrder::UNORDERED,
1471            );
1472            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1473        } else {
1474            unreachable!()
1475        };
1476        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
1477        assert!(!&column_index[0][8].is_sorted());
1478        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1479            check_native_page_index(
1480                index,
1481                974,
1482                get_row_group_min_max_bytes(row_group_metadata, 8),
1483                BoundaryOrder::UNORDERED,
1484            );
1485            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1486        } else {
1487            unreachable!()
1488        };
1489        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1490        assert!(&column_index[0][9].is_sorted());
1491        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1492            check_native_page_index(
1493                index,
1494                352,
1495                get_row_group_min_max_bytes(row_group_metadata, 9),
1496                BoundaryOrder::ASCENDING,
1497            );
1498            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1499        } else {
1500            unreachable!()
1501        };
1502        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
1503        //Notice: min_max values for each page for this col not exits.
1504        assert!(!&column_index[0][10].is_sorted());
1505        if let Index::NONE = &column_index[0][10] {
1506            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1507        } else {
1508            unreachable!()
1509        };
1510        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
1511        assert!(&column_index[0][11].is_sorted());
1512        if let Index::INT32(index) = &column_index[0][11] {
1513            check_native_page_index(
1514                index,
1515                325,
1516                get_row_group_min_max_bytes(row_group_metadata, 11),
1517                BoundaryOrder::ASCENDING,
1518            );
1519            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1520        } else {
1521            unreachable!()
1522        };
1523        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
1524        assert!(!&column_index[0][12].is_sorted());
1525        if let Index::INT32(index) = &column_index[0][12] {
1526            check_native_page_index(
1527                index,
1528                325,
1529                get_row_group_min_max_bytes(row_group_metadata, 12),
1530                BoundaryOrder::UNORDERED,
1531            );
1532            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1533        } else {
1534            unreachable!()
1535        };
1536    }
1537
1538    fn check_native_page_index<T: ParquetValueType>(
1539        row_group_index: &NativeIndex<T>,
1540        page_size: usize,
1541        min_max: (&[u8], &[u8]),
1542        boundary_order: BoundaryOrder,
1543    ) {
1544        assert_eq!(row_group_index.indexes.len(), page_size);
1545        assert_eq!(row_group_index.boundary_order, boundary_order);
1546        row_group_index.indexes.iter().all(|x| {
1547            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1548                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1549        });
1550    }
1551
1552    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1553        let statistics = r.column(col_num).statistics().unwrap();
1554        (
1555            statistics.min_bytes_opt().unwrap_or_default(),
1556            statistics.max_bytes_opt().unwrap_or_default(),
1557        )
1558    }
1559
1560    #[test]
1561    fn test_skip_page_with_offset_index() {
1562        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1563        let builder = ReadOptionsBuilder::new();
1564        //enable read page index
1565        let options = builder.with_page_index().build();
1566        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1567        let reader = reader_result.unwrap();
1568
1569        let row_group_reader = reader.get_row_group(0).unwrap();
1570
1571        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1572        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1573
1574        let mut vec = vec![];
1575
1576        for i in 0..325 {
1577            if i % 2 == 0 {
1578                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1579            } else {
1580                column_page_reader.skip_next_page().unwrap();
1581            }
1582        }
1583        //check read all pages.
1584        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1585        assert!(column_page_reader.get_next_page().unwrap().is_none());
1586
1587        assert_eq!(vec.len(), 163);
1588    }
1589
1590    #[test]
1591    fn test_skip_page_without_offset_index() {
1592        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1593
1594        // use default SerializedFileReader without read offsetIndex
1595        let reader_result = SerializedFileReader::new(test_file);
1596        let reader = reader_result.unwrap();
1597
1598        let row_group_reader = reader.get_row_group(0).unwrap();
1599
1600        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1601        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1602
1603        let mut vec = vec![];
1604
1605        for i in 0..325 {
1606            if i % 2 == 0 {
1607                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1608            } else {
1609                column_page_reader.peek_next_page().unwrap().unwrap();
1610                column_page_reader.skip_next_page().unwrap();
1611            }
1612        }
1613        //check read all pages.
1614        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1615        assert!(column_page_reader.get_next_page().unwrap().is_none());
1616
1617        assert_eq!(vec.len(), 163);
1618    }
1619
1620    #[test]
1621    fn test_peek_page_with_dictionary_page() {
1622        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1623        let builder = ReadOptionsBuilder::new();
1624        //enable read page index
1625        let options = builder.with_page_index().build();
1626        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1627        let reader = reader_result.unwrap();
1628        let row_group_reader = reader.get_row_group(0).unwrap();
1629
1630        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
1631        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1632
1633        let mut vec = vec![];
1634
1635        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1636        assert!(meta.is_dict);
1637        let page = column_page_reader.get_next_page().unwrap().unwrap();
1638        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1639
1640        for i in 0..352 {
1641            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1642            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
1643            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
1644            if i != 351 {
1645                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1646            } else {
1647                // last page first row index is 7290, total row count is 7300
1648                // because first row start with zero, last page row count should be 10.
1649                assert_eq!(meta.num_rows, Some(10));
1650            }
1651            assert!(!meta.is_dict);
1652            vec.push(meta);
1653            let page = column_page_reader.get_next_page().unwrap().unwrap();
1654            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1655        }
1656
1657        //check read all pages.
1658        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1659        assert!(column_page_reader.get_next_page().unwrap().is_none());
1660
1661        assert_eq!(vec.len(), 352);
1662    }
1663
1664    #[test]
1665    fn test_peek_page_with_dictionary_page_without_offset_index() {
1666        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1667
1668        let reader_result = SerializedFileReader::new(test_file);
1669        let reader = reader_result.unwrap();
1670        let row_group_reader = reader.get_row_group(0).unwrap();
1671
1672        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
1673        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1674
1675        let mut vec = vec![];
1676
1677        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1678        assert!(meta.is_dict);
1679        let page = column_page_reader.get_next_page().unwrap().unwrap();
1680        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1681
1682        for i in 0..352 {
1683            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1684            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
1685            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
1686            if i != 351 {
1687                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
1688            } else {
1689                // last page first row index is 7290, total row count is 7300
1690                // because first row start with zero, last page row count should be 10.
1691                assert_eq!(meta.num_levels, Some(10));
1692            }
1693            assert!(!meta.is_dict);
1694            vec.push(meta);
1695            let page = column_page_reader.get_next_page().unwrap().unwrap();
1696            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1697        }
1698
1699        //check read all pages.
1700        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1701        assert!(column_page_reader.get_next_page().unwrap().is_none());
1702
1703        assert_eq!(vec.len(), 352);
1704    }
1705
1706    #[test]
1707    fn test_fixed_length_index() {
1708        let message_type = "
1709        message test_schema {
1710          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
1711        }
1712        ";
1713
1714        let schema = parse_message_type(message_type).unwrap();
1715        let mut out = Vec::with_capacity(1024);
1716        let mut writer =
1717            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
1718
1719        let mut r = writer.next_row_group().unwrap();
1720        let mut c = r.next_column().unwrap().unwrap();
1721        c.typed::<FixedLenByteArrayType>()
1722            .write_batch(
1723                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
1724                Some(&[1, 1, 0, 1]),
1725                None,
1726            )
1727            .unwrap();
1728        c.close().unwrap();
1729        r.close().unwrap();
1730        writer.close().unwrap();
1731
1732        let b = Bytes::from(out);
1733        let options = ReadOptionsBuilder::new().with_page_index().build();
1734        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
1735        let index = reader.metadata().column_index().unwrap();
1736
1737        // 1 row group
1738        assert_eq!(index.len(), 1);
1739        let c = &index[0];
1740        // 1 column
1741        assert_eq!(c.len(), 1);
1742
1743        match &c[0] {
1744            Index::FIXED_LEN_BYTE_ARRAY(v) => {
1745                assert_eq!(v.indexes.len(), 1);
1746                let page_idx = &v.indexes[0];
1747                assert_eq!(page_idx.null_count.unwrap(), 1);
1748                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
1749                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
1750            }
1751            _ => unreachable!(),
1752        }
1753    }
1754
1755    #[test]
1756    fn test_multi_gz() {
1757        let file = get_test_file("concatenated_gzip_members.parquet");
1758        let reader = SerializedFileReader::new(file).unwrap();
1759        let row_group_reader = reader.get_row_group(0).unwrap();
1760        match row_group_reader.get_column_reader(0).unwrap() {
1761            ColumnReader::Int64ColumnReader(mut reader) => {
1762                let mut buffer = Vec::with_capacity(1024);
1763                let mut def_levels = Vec::with_capacity(1024);
1764                let (num_records, num_values, num_levels) = reader
1765                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
1766                    .unwrap();
1767
1768                assert_eq!(num_records, 513);
1769                assert_eq!(num_values, 513);
1770                assert_eq!(num_levels, 513);
1771
1772                let expected: Vec<i64> = (1..514).collect();
1773                assert_eq!(&buffer, &expected);
1774            }
1775            _ => unreachable!(),
1776        }
1777    }
1778
1779    #[test]
1780    fn test_byte_stream_split_extended() {
1781        let path = format!(
1782            "{}/byte_stream_split_extended.gzip.parquet",
1783            arrow::util::test_util::parquet_test_data(),
1784        );
1785        let file = File::open(path).unwrap();
1786        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
1787
1788        // Use full schema as projected schema
1789        let mut iter = reader
1790            .get_row_iter(None)
1791            .expect("Failed to create row iterator");
1792
1793        let mut start = 0;
1794        let end = reader.metadata().file_metadata().num_rows();
1795
1796        let check_row = |row: Result<Row, ParquetError>| {
1797            assert!(row.is_ok());
1798            let r = row.unwrap();
1799            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
1800            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
1801            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
1802            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
1803            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
1804            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
1805            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
1806        };
1807
1808        while start < end {
1809            match iter.next() {
1810                Some(row) => check_row(row),
1811                None => break,
1812            };
1813            start += 1;
1814        }
1815    }
1816
1817    #[test]
1818    fn test_filtered_rowgroup_metadata() {
1819        let message_type = "
1820            message test_schema {
1821                REQUIRED INT32 a;
1822            }
1823        ";
1824        let schema = Arc::new(parse_message_type(message_type).unwrap());
1825        let props = Arc::new(
1826            WriterProperties::builder()
1827                .set_statistics_enabled(EnabledStatistics::Page)
1828                .build(),
1829        );
1830        let mut file: File = tempfile::tempfile().unwrap();
1831        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1832        let data = [1, 2, 3, 4, 5];
1833
1834        // write 5 row groups
1835        for idx in 0..5 {
1836            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
1837            let mut row_group_writer = file_writer.next_row_group().unwrap();
1838            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1839                writer
1840                    .typed::<Int32Type>()
1841                    .write_batch(data_i.as_slice(), None, None)
1842                    .unwrap();
1843                writer.close().unwrap();
1844            }
1845            row_group_writer.close().unwrap();
1846            file_writer.flushed_row_groups();
1847        }
1848        let file_metadata = file_writer.close().unwrap();
1849
1850        assert_eq!(file_metadata.num_rows, 25);
1851        assert_eq!(file_metadata.row_groups.len(), 5);
1852
1853        // read only the 3rd row group
1854        let read_options = ReadOptionsBuilder::new()
1855            .with_page_index()
1856            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
1857            .build();
1858        let reader =
1859            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
1860                .unwrap();
1861        let metadata = reader.metadata();
1862
1863        // check we got the expected row group
1864        assert_eq!(metadata.num_row_groups(), 1);
1865        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
1866
1867        // check we only got the relevant page indexes
1868        assert!(metadata.column_index().is_some());
1869        assert!(metadata.offset_index().is_some());
1870        assert_eq!(metadata.column_index().unwrap().len(), 1);
1871        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1872        let col_idx = metadata.column_index().unwrap();
1873        let off_idx = metadata.offset_index().unwrap();
1874        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
1875        let pg_idx = &col_idx[0][0];
1876        let off_idx_i = &off_idx[0][0];
1877
1878        // test that we got the index matching the row group
1879        match pg_idx {
1880            Index::INT32(int_idx) => {
1881                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
1882                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
1883                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
1884                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
1885            }
1886            _ => panic!("wrong stats type"),
1887        }
1888
1889        // check offset index matches too
1890        assert_eq!(
1891            off_idx_i.page_locations[0].offset,
1892            metadata.row_group(0).column(0).data_page_offset()
1893        );
1894
1895        // read non-contiguous row groups
1896        let read_options = ReadOptionsBuilder::new()
1897            .with_page_index()
1898            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
1899            .build();
1900        let reader =
1901            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
1902                .unwrap();
1903        let metadata = reader.metadata();
1904
1905        // check we got the expected row groups
1906        assert_eq!(metadata.num_row_groups(), 2);
1907        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
1908        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
1909
1910        // check we only got the relevant page indexes
1911        assert!(metadata.column_index().is_some());
1912        assert!(metadata.offset_index().is_some());
1913        assert_eq!(metadata.column_index().unwrap().len(), 2);
1914        assert_eq!(metadata.offset_index().unwrap().len(), 2);
1915        let col_idx = metadata.column_index().unwrap();
1916        let off_idx = metadata.offset_index().unwrap();
1917
1918        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
1919            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
1920            let pg_idx = &col_idx_i[0];
1921            let off_idx_i = &off_idx[i][0];
1922
1923            // test that we got the index matching the row group
1924            match pg_idx {
1925                Index::INT32(int_idx) => {
1926                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
1927                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
1928                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
1929                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
1930                }
1931                _ => panic!("wrong stats type"),
1932            }
1933
1934            // check offset index matches too
1935            assert_eq!(
1936                off_idx_i.page_locations[0].offset,
1937                metadata.row_group(i).column(0).data_page_offset()
1938            );
1939        }
1940    }
1941}