parquet/file/page_index/
index_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for reading [`Index`] and [`PageLocation`] from parquet metadata.
19
20use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31/// Computes the covering range of two optional ranges
32///
33/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
34pub(crate) fn acc_range(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Option<Range<usize>> {
35    match (a, b) {
36        (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37        (None, x) | (x, None) => x,
38    }
39}
40
41/// Reads per-column [`Index`] for all columns of a row group by
42/// decoding [`ColumnIndex`] .
43///
44/// Returns a vector of `index[column_number]`.
45///
46/// Returns an empty vector if this row group does not contain a
47/// [`ColumnIndex`].
48///
49/// See [Page Index Documentation] for more details.
50///
51/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
52pub fn read_columns_indexes<R: ChunkReader>(
53    reader: &R,
54    chunks: &[ColumnChunkMetaData],
55) -> Result<Vec<Index>, ParquetError> {
56    let fetch = chunks
57        .iter()
58        .fold(None, |range, c| acc_range(range, c.column_index_range()));
59
60    let fetch = match fetch {
61        Some(r) => r,
62        None => return Ok(vec![Index::NONE; chunks.len()]),
63    };
64
65    let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
66    let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
67
68    chunks
69        .iter()
70        .map(|c| match c.column_index_range() {
71            Some(r) => decode_column_index(get(r), c.column_type()),
72            None => Ok(Index::NONE),
73        })
74        .collect()
75}
76
77/// Reads [`OffsetIndex`],  per-page [`PageLocation`] for all columns of a row
78/// group.
79///
80/// Returns a vector of `location[column_number][page_number]`
81///
82/// Return an empty vector if this row group does not contain an
83/// [`OffsetIndex]`.
84///
85/// See [Page Index Documentation] for more details.
86///
87/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
88#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
89pub fn read_pages_locations<R: ChunkReader>(
90    reader: &R,
91    chunks: &[ColumnChunkMetaData],
92) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
93    let fetch = chunks
94        .iter()
95        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
96
97    let fetch = match fetch {
98        Some(r) => r,
99        None => return Ok(vec![]),
100    };
101
102    let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
103    let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
104
105    chunks
106        .iter()
107        .map(|c| match c.offset_index_range() {
108            Some(r) => decode_page_locations(get(r)),
109            None => Err(general_err!("missing offset index")),
110        })
111        .collect()
112}
113
114/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
115/// decoding [`OffsetIndex`] .
116///
117/// Returns a vector of `offset_index[column_number]`.
118///
119/// Returns an empty vector if this row group does not contain an
120/// [`OffsetIndex`].
121///
122/// See [Page Index Documentation] for more details.
123///
124/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
125pub fn read_offset_indexes<R: ChunkReader>(
126    reader: &R,
127    chunks: &[ColumnChunkMetaData],
128) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
129    let fetch = chunks
130        .iter()
131        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
132
133    let fetch = match fetch {
134        Some(r) => r,
135        None => return Ok(vec![]),
136    };
137
138    let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
139    let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
140
141    chunks
142        .iter()
143        .map(|c| match c.offset_index_range() {
144            Some(r) => decode_offset_index(get(r)),
145            None => Err(general_err!("missing offset index")),
146        })
147        .collect()
148}
149
150pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
151    let mut prot = TCompactSliceInputProtocol::new(data);
152    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
153    OffsetIndexMetaData::try_new(offset)
154}
155
156pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, ParquetError> {
157    let mut prot = TCompactSliceInputProtocol::new(data);
158    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
159    Ok(offset.page_locations)
160}
161
162pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
163    let mut prot = TCompactSliceInputProtocol::new(data);
164
165    let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
166
167    let index = match column_type {
168        Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
169        Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
170        Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
171        Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
172        Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
173        Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
174        Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
175        Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
176    };
177
178    Ok(index)
179}