use std::{
collections::VecDeque,
io::{Cursor, Read, Seek, SeekFrom},
};
use crate::{
error::Error,
indexes::{FilteredPage, Interval},
metadata::{ColumnChunkMetaData, Descriptor},
page::{CompressedDictPage, CompressedPage, ParquetPageHeader},
parquet_bridge::Compression,
};
use super::reader::{finish_page, read_page_header, PageMetaData};
#[derive(Debug, Clone, Copy)]
enum State {
MaybeDict,
Data,
}
pub struct IndexedPageReader<R: Read + Seek> {
reader: R,
column_start: u64,
compression: Compression,
descriptor: Descriptor,
buffer: Vec<u8>,
data_buffer: Vec<u8>,
pages: VecDeque<FilteredPage>,
state: State,
}
fn read_page<R: Read + Seek>(
reader: &mut R,
start: u64,
length: usize,
buffer: &mut Vec<u8>,
data: &mut Vec<u8>,
) -> Result<ParquetPageHeader, Error> {
reader.seek(SeekFrom::Start(start))?;
buffer.clear();
buffer.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(buffer)?;
let mut reader = Cursor::new(buffer);
let page_header = read_page_header(&mut reader, 1024 * 1024)?;
let header_size = reader.seek(SeekFrom::Current(0)).unwrap() as usize;
let buffer = reader.into_inner();
data.clear();
data.extend_from_slice(&buffer[header_size..]);
Ok(page_header)
}
fn read_dict_page<R: Read + Seek>(
reader: &mut R,
start: u64,
length: usize,
buffer: &mut Vec<u8>,
data: &mut Vec<u8>,
compression: Compression,
descriptor: &Descriptor,
) -> Result<CompressedDictPage, Error> {
let page_header = read_page(reader, start, length, buffer, data)?;
let page = finish_page(page_header, data, compression, descriptor, None)?;
if let CompressedPage::Dict(page) = page {
Ok(page)
} else {
Err(Error::oos(
"The first page is not a dictionary page but it should",
))
}
}
impl<R: Read + Seek> IndexedPageReader<R> {
pub fn new(
reader: R,
column: &ColumnChunkMetaData,
pages: Vec<FilteredPage>,
buffer: Vec<u8>,
data_buffer: Vec<u8>,
) -> Self {
Self::new_with_page_meta(reader, column.into(), pages, buffer, data_buffer)
}
pub fn new_with_page_meta(
reader: R,
column: PageMetaData,
pages: Vec<FilteredPage>,
buffer: Vec<u8>,
data_buffer: Vec<u8>,
) -> Self {
let pages = pages.into_iter().collect();
Self {
reader,
column_start: column.column_start,
compression: column.compression,
descriptor: column.descriptor,
buffer,
data_buffer,
pages,
state: State::MaybeDict,
}
}
pub fn into_inner(self) -> (R, Vec<u8>, Vec<u8>) {
(self.reader, self.buffer, self.data_buffer)
}
fn read_page(
&mut self,
start: u64,
length: usize,
selected_rows: Vec<Interval>,
) -> Result<CompressedPage, Error> {
let mut data = std::mem::take(&mut self.data_buffer);
let page_header = read_page(&mut self.reader, start, length, &mut self.buffer, &mut data)?;
finish_page(
page_header,
&mut data,
self.compression,
&self.descriptor,
Some(selected_rows),
)
}
fn read_dict(&mut self) -> Option<Result<CompressedPage, Error>> {
let (start, length) = match self.pages.get(0) {
Some(page) => {
let length = (page.start - self.column_start) as usize;
if length > 0 {
(self.column_start, length)
} else {
return None;
}
}
None => return None,
};
let mut data = std::mem::take(&mut self.data_buffer);
let maybe_page = read_dict_page(
&mut self.reader,
start,
length,
&mut self.buffer,
&mut data,
self.compression,
&self.descriptor,
);
Some(maybe_page.map(CompressedPage::Dict))
}
}
impl<R: Read + Seek> Iterator for IndexedPageReader<R> {
type Item = Result<CompressedPage, Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.state {
State::MaybeDict => {
self.state = State::Data;
if let Some(dict) = self.read_dict() {
Some(dict)
} else {
self.next()
}
}
State::Data => {
if let Some(page) = self.pages.pop_front() {
if page.selected_rows.is_empty() {
self.next()
} else {
Some(self.read_page(page.start, page.length, page.selected_rows))
}
} else {
None
}
}
}
}
}