use std::convert::TryInto;
use std::{io::Read, sync::Arc};
use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use crate::compression::Compression;
use crate::error::{Error, Result};
use crate::indexes::Interval;
use crate::metadata::{ColumnChunkMetaData, Descriptor};
use crate::page::{
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, PageType,
ParquetPageHeader,
};
use crate::parquet_bridge::Encoding;
use super::PageIterator;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageMetaData {
pub column_start: u64,
pub num_values: i64,
pub compression: Compression,
pub descriptor: Descriptor,
}
impl PageMetaData {
pub fn new(
column_start: u64,
num_values: i64,
compression: Compression,
descriptor: Descriptor,
) -> Self {
Self {
column_start,
num_values,
compression,
descriptor,
}
}
}
impl From<&ColumnChunkMetaData> for PageMetaData {
fn from(column: &ColumnChunkMetaData) -> Self {
Self {
column_start: column.byte_range().0,
num_values: column.num_values(),
compression: column.compression(),
descriptor: column.descriptor().descriptor.clone(),
}
}
}
pub type PageFilter = Arc<dyn Fn(&Descriptor, &DataPageHeader) -> bool + Send + Sync>;
pub struct PageReader<R: Read> {
reader: R,
compression: Compression,
seen_num_values: i64,
total_num_values: i64,
pages_filter: PageFilter,
descriptor: Descriptor,
pub(crate) scratch: Vec<u8>,
max_page_size: usize,
}
impl<R: Read> PageReader<R> {
pub fn new(
reader: R,
column: &ColumnChunkMetaData,
pages_filter: PageFilter,
scratch: Vec<u8>,
max_page_size: usize,
) -> Self {
Self::new_with_page_meta(reader, column.into(), pages_filter, scratch, max_page_size)
}
pub fn new_with_page_meta(
reader: R,
reader_meta: PageMetaData,
pages_filter: PageFilter,
scratch: Vec<u8>,
max_page_size: usize,
) -> Self {
Self {
reader,
total_num_values: reader_meta.num_values,
compression: reader_meta.compression,
seen_num_values: 0,
descriptor: reader_meta.descriptor,
pages_filter,
scratch,
max_page_size,
}
}
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.scratch)
}
}
impl<R: Read> PageIterator for PageReader<R> {
fn swap_buffer(&mut self, scratch: &mut Vec<u8>) {
std::mem::swap(&mut self.scratch, scratch)
}
}
impl<R: Read> Iterator for PageReader<R> {
type Item = Result<CompressedPage>;
fn next(&mut self) -> Option<Self::Item> {
let mut buffer = std::mem::take(&mut self.scratch);
let maybe_maybe_page = next_page(self, &mut buffer).transpose();
if let Some(ref maybe_page) = maybe_maybe_page {
if let Ok(CompressedPage::Data(page)) = maybe_page {
let to_consume = (self.pages_filter)(&self.descriptor, page.header());
if !to_consume {
self.scratch = std::mem::take(&mut buffer);
return self.next();
}
}
} else {
self.scratch = std::mem::take(&mut buffer);
}
maybe_maybe_page
}
}
pub(super) fn read_page_header<R: Read>(
reader: &mut R,
max_size: usize,
) -> Result<ParquetPageHeader> {
let mut prot = TCompactInputProtocol::new(reader, max_size);
let page_header = ParquetPageHeader::read_from_in_protocol(&mut prot)?;
Ok(page_header)
}
fn next_page<R: Read>(
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> Result<Option<CompressedPage>> {
if reader.seen_num_values >= reader.total_num_values {
return Ok(None);
};
build_page(reader, buffer)
}
pub(super) fn build_page<R: Read>(
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> Result<Option<CompressedPage>> {
let page_header = read_page_header(&mut reader.reader, reader.max_page_size)?;
reader.seen_num_values += get_page_header(&page_header)?
.map(|x| x.num_values() as i64)
.unwrap_or_default();
let read_size: usize = page_header.compressed_page_size.try_into()?;
if read_size > reader.max_page_size {
return Err(Error::WouldOverAllocate);
}
buffer.clear();
buffer.try_reserve(read_size)?;
let bytes_read = reader
.reader
.by_ref()
.take(read_size as u64)
.read_to_end(buffer)?;
if bytes_read != read_size {
return Err(Error::oos(
"The page header reported the wrong page size".to_string(),
));
}
finish_page(
page_header,
buffer,
reader.compression,
&reader.descriptor,
None,
)
.map(Some)
}
pub(super) fn finish_page(
page_header: ParquetPageHeader,
data: &mut Vec<u8>,
compression: Compression,
descriptor: &Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> Result<CompressedPage> {
let type_ = page_header.type_.try_into()?;
let uncompressed_page_size = page_header.uncompressed_page_size.try_into()?;
match type_ {
PageType::DictionaryPage => {
let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
Error::oos(
"The page header type is a dictionary page but the dictionary header is empty",
)
})?;
let is_sorted = dict_header.is_sorted.unwrap_or(false);
let page = CompressedDictPage::new(
std::mem::take(data),
compression,
uncompressed_page_size,
dict_header.num_values.try_into()?,
is_sorted,
);
Ok(CompressedPage::Dict(page))
}
PageType::DataPage => {
let header = page_header.data_page_header.ok_or_else(|| {
Error::oos("The page header type is a v1 data page but the v1 data header is empty")
})?;
Ok(CompressedPage::Data(CompressedDataPage::new_read(
DataPageHeader::V1(header),
std::mem::take(data),
compression,
uncompressed_page_size,
descriptor.clone(),
selected_rows,
)))
}
PageType::DataPageV2 => {
let header = page_header.data_page_header_v2.ok_or_else(|| {
Error::oos("The page header type is a v2 data page but the v2 data header is empty")
})?;
Ok(CompressedPage::Data(CompressedDataPage::new_read(
DataPageHeader::V2(header),
std::mem::take(data),
compression,
uncompressed_page_size,
descriptor.clone(),
selected_rows,
)))
}
}
}
pub(super) fn get_page_header(header: &ParquetPageHeader) -> Result<Option<DataPageHeader>> {
let type_ = header.type_.try_into()?;
Ok(match type_ {
PageType::DataPage => {
let header = header.data_page_header.clone().ok_or_else(|| {
Error::oos("The page header type is a v1 data page but the v1 header is empty")
})?;
let _: Encoding = header.encoding.try_into()?;
let _: Encoding = header.repetition_level_encoding.try_into()?;
let _: Encoding = header.definition_level_encoding.try_into()?;
Some(DataPageHeader::V1(header))
}
PageType::DataPageV2 => {
let header = header.data_page_header_v2.clone().ok_or_else(|| {
Error::oos("The page header type is a v1 data page but the v1 header is empty")
})?;
let _: Encoding = header.encoding.try_into()?;
Some(DataPageHeader::V2(header))
}
_ => None,
})
}