1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718//! File reader API and methods to access file metadata, row group
19//! readers to read individual column chunks, or access record
20//! iterator.
2122use bytes::{Buf, Bytes};
23use std::fs::File;
24use std::io::{BufReader, Seek, SeekFrom};
25use std::{io::Read, sync::Arc};
2627use crate::bloom_filter::Sbbf;
28use crate::column::page::PageIterator;
29use crate::column::{page::PageReader, reader::ColumnReader};
30use crate::errors::{ParquetError, Result};
31use crate::file::metadata::*;
32pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
33use crate::record::reader::RowIter;
34use crate::schema::types::Type as SchemaType;
3536use crate::basic::Type;
3738use crate::column::reader::ColumnReaderImpl;
3940/// Length should return the total number of bytes in the input source.
41/// It's mainly used to read the metadata, which is at the end of the source.
42#[allow(clippy::len_without_is_empty)]
43pub trait Length {
44/// Returns the amount of bytes of the inner source.
45fn len(&self) -> u64;
46}
4748/// Generates [`Read`]ers to read chunks of a Parquet data source.
49///
50/// The Parquet reader uses [`ChunkReader`] to access Parquet data, allowing
51/// multiple decoders to read concurrently from different locations in the same file.
52///
53/// The trait provides:
54/// * random access (via [`Self::get_bytes`])
55/// * sequential (via [`Self::get_read`])
56///
57/// # Provided Implementations
58/// * [`File`] for reading from local file system
59/// * [`Bytes`] for reading from an in-memory buffer
60///
61/// User provided implementations can implement more sophisticated behaviors
62/// such as on-demand buffering or scan sharing.
63pub trait ChunkReader: Length + Send + Sync {
64/// The concrete type of reader returned by this trait
65type T: Read;
6667/// Get a [`Read`] instance starting at the provided file offset
68 ///
69 /// Returned readers follow the model of [`File::try_clone`] where mutations
70 /// of one reader affect all readers. Thus subsequent or concurrent calls to
71 /// [`Self::get_read`] or [`Self::get_bytes`] may cause side-effects on
72 /// previously returned readers. Callers of `get_read` should take care
73 /// to avoid race conditions.
74fn get_read(&self, start: u64) -> Result<Self::T>;
7576/// Get a range of data in memory as [`Bytes`]
77 ///
78 /// Similarly to [`Self::get_read`], this method may have side-effects on
79 /// previously returned readers.
80fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
81}
8283impl Length for File {
84fn len(&self) -> u64 {
85self.metadata().map(|m| m.len()).unwrap_or(0u64)
86 }
87}
8889impl ChunkReader for File {
90type T = BufReader<File>;
9192fn get_read(&self, start: u64) -> Result<Self::T> {
93let mut reader = self.try_clone()?;
94 reader.seek(SeekFrom::Start(start))?;
95Ok(BufReader::new(self.try_clone()?))
96 }
9798fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
99let mut buffer = Vec::with_capacity(length);
100let mut reader = self.try_clone()?;
101 reader.seek(SeekFrom::Start(start))?;
102let read = reader.take(length as _).read_to_end(&mut buffer)?;
103104if read != length {
105return Err(eof_err!(
106"Expected to read {} bytes, read only {}",
107 length,
108 read
109 ));
110 }
111Ok(buffer.into())
112 }
113}
114115impl Length for Bytes {
116fn len(&self) -> u64 {
117self.len() as u64
118 }
119}
120121impl ChunkReader for Bytes {
122type T = bytes::buf::Reader<Bytes>;
123124fn get_read(&self, start: u64) -> Result<Self::T> {
125let start = start as usize;
126Ok(self.slice(start..).reader())
127 }
128129fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
130let start = start as usize;
131Ok(self.slice(start..start + length))
132 }
133}
134135// ----------------------------------------------------------------------
136// APIs for file & row group readers
137138/// Parquet file reader API. With this, user can get metadata information about the
139/// Parquet file, can get reader for each row group, and access record iterator.
140pub trait FileReader: Send + Sync {
141/// Get metadata information about this file.
142fn metadata(&self) -> &ParquetMetaData;
143144/// Get the total number of row groups for this file.
145fn num_row_groups(&self) -> usize;
146147/// Get the `i`th row group reader. Note this doesn't do bound check.
148fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
149150/// Get an iterator over the row in this file, see [`RowIter`] for caveats.
151 ///
152 /// Iterator will automatically load the next row group to advance.
153 ///
154 /// Projected schema can be a subset of or equal to the file schema, when it is None,
155 /// full file schema is assumed.
156fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
157}
158159/// Parquet row group reader API. With this, user can get metadata information about the
160/// row group, as well as readers for each individual column chunk.
161pub trait RowGroupReader: Send + Sync {
162/// Get metadata information about this row group.
163fn metadata(&self) -> &RowGroupMetaData;
164165/// Get the total number of column chunks in this row group.
166fn num_columns(&self) -> usize;
167168/// Get page reader for the `i`th column chunk.
169fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
170171/// Get value reader for the `i`th column chunk.
172fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
173let schema_descr = self.metadata().schema_descr();
174let col_descr = schema_descr.column(i);
175let col_page_reader = self.get_column_page_reader(i)?;
176let col_reader = match col_descr.physical_type() {
177 Type::BOOLEAN => {
178 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
179 }
180 Type::INT32 => {
181 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
182 }
183 Type::INT64 => {
184 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
185 }
186 Type::INT96 => {
187 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
188 }
189 Type::FLOAT => {
190 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
191 }
192 Type::DOUBLE => {
193 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
194 }
195 Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
196 col_descr,
197 col_page_reader,
198 )),
199 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
200 ColumnReaderImpl::new(col_descr, col_page_reader),
201 ),
202 };
203Ok(col_reader)
204 }
205206/// Get bloom filter for the `i`th column chunk, if present and the reader was configured
207 /// to read bloom filters.
208fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
209210/// Get an iterator over the row in this file, see [`RowIter`] for caveats.
211 ///
212 /// Projected schema can be a subset of or equal to the file schema, when it is None,
213 /// full file schema is assumed.
214fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
215}
216217// ----------------------------------------------------------------------
218// Iterator
219220/// Implementation of page iterator for parquet file.
221pub struct FilePageIterator {
222 column_index: usize,
223 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
224 file_reader: Arc<dyn FileReader>,
225}
226227impl FilePageIterator {
228/// Creates a page iterator for all row groups in file.
229pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
230let num_row_groups = file_reader.metadata().num_row_groups();
231232let row_group_indices = Box::new(0..num_row_groups);
233234Self::with_row_groups(column_index, row_group_indices, file_reader)
235 }
236237/// Create page iterator from parquet file reader with only some row groups.
238pub fn with_row_groups(
239 column_index: usize,
240 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
241 file_reader: Arc<dyn FileReader>,
242 ) -> Result<Self> {
243// Check that column_index is valid
244let num_columns = file_reader
245 .metadata()
246 .file_metadata()
247 .schema_descr()
248 .num_columns();
249250if column_index >= num_columns {
251return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
252 }
253254// We don't check iterators here because iterator may be infinite
255Ok(Self {
256 column_index,
257 row_group_indices,
258 file_reader,
259 })
260 }
261}
262263impl Iterator for FilePageIterator {
264type Item = Result<Box<dyn PageReader>>;
265266fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
267self.row_group_indices.next().map(|row_group_index| {
268self.file_reader
269 .get_row_group(row_group_index)
270 .and_then(|r| r.get_column_page_reader(self.column_index))
271 })
272 }
273}
274275impl PageIterator for FilePageIterator {}