use std::io::{Read, Seek};
use futures::{
future::{try_join_all, BoxFuture},
AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
};
use parquet2::{
indexes::FilteredPage,
metadata::ColumnChunkMetaData,
read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader},
};
use crate::{
array::Array, chunk::Chunk, datatypes::Field, error::Result,
io::parquet::read::column_iter_to_arrays,
};
use super::ArrayIter;
use super::RowGroupMetaData;
pub struct RowGroupDeserializer {
num_rows: usize,
remaining_rows: usize,
column_chunks: Vec<ArrayIter<'static>>,
}
impl RowGroupDeserializer {
pub fn new(
column_chunks: Vec<ArrayIter<'static>>,
num_rows: usize,
limit: Option<usize>,
) -> Self {
Self {
num_rows,
remaining_rows: limit.unwrap_or(usize::MAX).min(num_rows),
column_chunks,
}
}
pub fn num_rows(&self) -> usize {
self.num_rows
}
}
impl Iterator for RowGroupDeserializer {
type Item = Result<Chunk<Box<dyn Array>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining_rows == 0 {
return None;
}
let chunk = self
.column_chunks
.iter_mut()
.map(|iter| iter.next().unwrap())
.collect::<Result<Vec<_>>>()
.and_then(Chunk::try_new);
self.remaining_rows = self.remaining_rows.saturating_sub(
chunk
.as_ref()
.map(|x| x.len())
.unwrap_or(self.remaining_rows),
);
Some(chunk)
}
}
pub fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}
pub fn get_field_pages<'a, T>(
columns: &'a [ColumnChunkMetaData],
items: &'a [T],
field_name: &str,
) -> Vec<&'a T> {
columns
.iter()
.zip(items)
.filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name)
.map(|(_, item)| item)
.collect()
}
pub fn read_columns<'a, R: Read + Seek>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _read_single_column(reader, meta))
.collect()
}
fn _read_single_column<'a, R>(
reader: &mut R,
meta: &'a ColumnChunkMetaData,
) -> Result<(&'a ColumnChunkMetaData, Vec<u8>)>
where
R: Read + Seek,
{
let (start, length) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut chunk = vec![];
chunk.try_reserve(length as usize)?;
reader.by_ref().take(length).read_to_end(&mut chunk)?;
Ok((meta, chunk))
}
async fn _read_single_column_async<'b, R, F>(
reader_factory: F,
meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
where
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>>,
{
let mut reader = reader_factory().await?;
let (start, length) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start)).await?;
let mut chunk = vec![];
chunk.try_reserve(length as usize)?;
reader.take(length).read_to_end(&mut chunk).await?;
Result::Ok((meta, chunk))
}
pub async fn read_columns_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let futures = get_field_columns(columns, field_name)
.into_iter()
.map(|meta| async { _read_single_column_async(reader_factory.clone(), meta).await });
try_join_all(futures).await
}
type Pages = Box<
dyn Iterator<Item = std::result::Result<parquet2::page::CompressedPage, parquet2::error::Error>>
+ Sync
+ Send,
>;
pub fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, Vec<u8>)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
pages: Option<Vec<Vec<FilteredPage>>>,
) -> Result<ArrayIter<'a>> {
let chunk_size = chunk_size.map(|c| c.min(num_rows));
let (columns, types) = if let Some(pages) = pages {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.zip(pages.into_iter())
.map(|((column_meta, chunk), mut pages)| {
let mut meta: PageMetaData = column_meta.into();
pages
.iter_mut()
.for_each(|page| page.start -= meta.column_start);
meta.column_start = 0;
let pages = IndexedPageReader::new_with_page_meta(
std::io::Cursor::new(chunk),
meta,
pages,
vec![],
vec![],
);
let pages = Box::new(pages) as Pages;
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();
(columns, types)
} else {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
let len = chunk.len();
let pages = PageReader::new(
std::io::Cursor::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
len * 2 + 1024,
);
let pages = Box::new(pages) as Pages;
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();
(columns, types)
};
column_iter_to_arrays(columns, types, field, chunk_size, num_rows)
}
pub fn read_columns_many<'a, R: Read + Seek>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
chunk_size: Option<usize>,
limit: Option<usize>,
pages: Option<Vec<Vec<Vec<FilteredPage>>>>,
) -> Result<Vec<ArrayIter<'a>>> {
let num_rows = row_group.num_rows();
let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows);
let field_columns = fields
.iter()
.map(|field| read_columns(reader, row_group.columns(), &field.name))
.collect::<Result<Vec<_>>>()?;
if let Some(pages) = pages {
field_columns
.into_iter()
.zip(fields)
.zip(pages)
.map(|((columns, field), pages)| {
to_deserializer(columns, field, num_rows, chunk_size, Some(pages))
})
.collect()
} else {
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None))
.collect()
}
}
pub async fn read_columns_many_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
chunk_size: Option<usize>,
limit: Option<usize>,
pages: Option<Vec<Vec<Vec<FilteredPage>>>>,
) -> Result<Vec<ArrayIter<'a>>> {
let num_rows = row_group.num_rows();
let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows);
let futures = fields
.iter()
.map(|field| read_columns_async(reader_factory.clone(), row_group.columns(), &field.name));
let field_columns = try_join_all(futures).await?;
if let Some(pages) = pages {
field_columns
.into_iter()
.zip(fields)
.zip(pages)
.map(|((columns, field), pages)| {
to_deserializer(columns, field, num_rows, chunk_size, Some(pages))
})
.collect()
} else {
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None))
.collect()
}
}