parquet/file/page_index/
index_reader.rs1use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
35 match (a, b) {
36 (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37 (None, x) | (x, None) => x,
38 }
39}
40
41#[deprecated(
52 since = "55.2.0",
53 note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
54)]
55pub fn read_columns_indexes<R: ChunkReader>(
56 reader: &R,
57 chunks: &[ColumnChunkMetaData],
58) -> Result<Option<Vec<Index>>, ParquetError> {
59 let fetch = chunks
60 .iter()
61 .fold(None, |range, c| acc_range(range, c.column_index_range()));
62
63 let fetch = match fetch {
64 Some(r) => r,
65 None => return Ok(None),
66 };
67
68 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
69
70 Some(
71 chunks
72 .iter()
73 .map(|c| match c.column_index_range() {
74 Some(r) => decode_column_index(
75 &bytes[usize::try_from(r.start - fetch.start)?
76 ..usize::try_from(r.end - fetch.start)?],
77 c.column_type(),
78 ),
79 None => Ok(Index::NONE),
80 })
81 .collect(),
82 )
83 .transpose()
84}
85
86#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
98pub fn read_pages_locations<R: ChunkReader>(
99 reader: &R,
100 chunks: &[ColumnChunkMetaData],
101) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
102 let fetch = chunks
103 .iter()
104 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
105
106 let fetch = match fetch {
107 Some(r) => r,
108 None => return Ok(vec![]),
109 };
110
111 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
112
113 chunks
114 .iter()
115 .map(|c| match c.offset_index_range() {
116 Some(r) => decode_page_locations(
117 &bytes[usize::try_from(r.start - fetch.start)?
118 ..usize::try_from(r.end - fetch.start)?],
119 ),
120 None => Err(general_err!("missing offset index")),
121 })
122 .collect()
123}
124
125#[deprecated(
136 since = "55.2.0",
137 note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
138)]
139pub fn read_offset_indexes<R: ChunkReader>(
140 reader: &R,
141 chunks: &[ColumnChunkMetaData],
142) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
143 let fetch = chunks
144 .iter()
145 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
146
147 let fetch = match fetch {
148 Some(r) => r,
149 None => return Ok(None),
150 };
151
152 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
153
154 Some(
155 chunks
156 .iter()
157 .map(|c| match c.offset_index_range() {
158 Some(r) => decode_offset_index(
159 &bytes[usize::try_from(r.start - fetch.start)?
160 ..usize::try_from(r.end - fetch.start)?],
161 ),
162 None => Err(general_err!("missing offset index")),
163 })
164 .collect(),
165 )
166 .transpose()
167}
168
169pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
170 let mut prot = TCompactSliceInputProtocol::new(data);
171 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
172 OffsetIndexMetaData::try_new(offset)
173}
174
175pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, ParquetError> {
176 let mut prot = TCompactSliceInputProtocol::new(data);
177 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
178 Ok(offset.page_locations)
179}
180
181pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
182 let mut prot = TCompactSliceInputProtocol::new(data);
183
184 let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
185
186 let index = match column_type {
187 Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
188 Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
189 Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
190 Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
191 Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
192 Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
193 Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
194 Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
195 };
196
197 Ok(index)
198}