parquet/file/page_index/
index_reader.rs
1use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31pub(crate) fn acc_range(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Option<Range<usize>> {
35 match (a, b) {
36 (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37 (None, x) | (x, None) => x,
38 }
39}
40
41pub fn read_columns_indexes<R: ChunkReader>(
53 reader: &R,
54 chunks: &[ColumnChunkMetaData],
55) -> Result<Vec<Index>, ParquetError> {
56 let fetch = chunks
57 .iter()
58 .fold(None, |range, c| acc_range(range, c.column_index_range()));
59
60 let fetch = match fetch {
61 Some(r) => r,
62 None => return Ok(vec![Index::NONE; chunks.len()]),
63 };
64
65 let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
66 let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
67
68 chunks
69 .iter()
70 .map(|c| match c.column_index_range() {
71 Some(r) => decode_column_index(get(r), c.column_type()),
72 None => Ok(Index::NONE),
73 })
74 .collect()
75}
76
77#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
89pub fn read_pages_locations<R: ChunkReader>(
90 reader: &R,
91 chunks: &[ColumnChunkMetaData],
92) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
93 let fetch = chunks
94 .iter()
95 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
96
97 let fetch = match fetch {
98 Some(r) => r,
99 None => return Ok(vec![]),
100 };
101
102 let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
103 let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
104
105 chunks
106 .iter()
107 .map(|c| match c.offset_index_range() {
108 Some(r) => decode_page_locations(get(r)),
109 None => Err(general_err!("missing offset index")),
110 })
111 .collect()
112}
113
114pub fn read_offset_indexes<R: ChunkReader>(
126 reader: &R,
127 chunks: &[ColumnChunkMetaData],
128) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
129 let fetch = chunks
130 .iter()
131 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
132
133 let fetch = match fetch {
134 Some(r) => r,
135 None => return Ok(vec![]),
136 };
137
138 let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
139 let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];
140
141 chunks
142 .iter()
143 .map(|c| match c.offset_index_range() {
144 Some(r) => decode_offset_index(get(r)),
145 None => Err(general_err!("missing offset index")),
146 })
147 .collect()
148}
149
150pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
151 let mut prot = TCompactSliceInputProtocol::new(data);
152 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
153 OffsetIndexMetaData::try_new(offset)
154}
155
156pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, ParquetError> {
157 let mut prot = TCompactSliceInputProtocol::new(data);
158 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
159 Ok(offset.page_locations)
160}
161
162pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
163 let mut prot = TCompactSliceInputProtocol::new(data);
164
165 let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
166
167 let index = match column_type {
168 Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
169 Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
170 Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
171 Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
172 Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
173 Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
174 Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
175 Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
176 };
177
178 Ok(index)
179}