use std::sync::Arc;
pub use crate::thrift_format::{
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader,
};
use crate::indexes::Interval;
pub use crate::parquet_bridge::{DataPageHeaderExt, PageType};
use crate::compression::Compression;
use crate::encoding::{get_length, Encoding};
use crate::error::{Error, Result};
use crate::metadata::Descriptor;
use crate::statistics::{deserialize_statistics, Statistics};
#[derive(Debug)]
pub struct CompressedDataPage {
pub(crate) header: DataPageHeader,
pub(crate) buffer: Vec<u8>,
pub(crate) compression: Compression,
uncompressed_page_size: usize,
pub(crate) descriptor: Descriptor,
pub(crate) selected_rows: Option<Vec<Interval>>,
}
impl CompressedDataPage {
pub fn new(
header: DataPageHeader,
buffer: Vec<u8>,
compression: Compression,
uncompressed_page_size: usize,
descriptor: Descriptor,
rows: Option<usize>,
) -> Self {
Self::new_read(
header,
buffer,
compression,
uncompressed_page_size,
descriptor,
rows.map(|x| vec![Interval::new(0, x)]),
)
}
pub(crate) fn new_read(
header: DataPageHeader,
buffer: Vec<u8>,
compression: Compression,
uncompressed_page_size: usize,
descriptor: Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> Self {
Self {
header,
buffer,
compression,
uncompressed_page_size,
descriptor,
selected_rows,
}
}
pub fn header(&self) -> &DataPageHeader {
&self.header
}
pub fn uncompressed_size(&self) -> usize {
self.uncompressed_page_size
}
pub fn compressed_size(&self) -> usize {
self.buffer.len()
}
pub fn compression(&self) -> Compression {
self.compression
}
pub fn selected_rows(&self) -> Option<&[Interval]> {
self.selected_rows.as_deref()
}
pub fn num_values(&self) -> usize {
self.header.num_values()
}
pub fn statistics(&self) -> Option<Result<Arc<dyn Statistics>>> {
match &self.header {
DataPageHeader::V1(d) => d
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.descriptor.primitive_type.clone())),
DataPageHeader::V2(d) => d
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.descriptor.primitive_type.clone())),
}
}
}
#[derive(Debug, Clone)]
pub enum DataPageHeader {
V1(DataPageHeaderV1),
V2(DataPageHeaderV2),
}
impl DataPageHeader {
pub fn num_values(&self) -> usize {
match &self {
DataPageHeader::V1(d) => d.num_values as usize,
DataPageHeader::V2(d) => d.num_values as usize,
}
}
}
#[derive(Debug, Clone)]
pub struct DataPage {
pub(super) header: DataPageHeader,
pub(super) buffer: Vec<u8>,
pub descriptor: Descriptor,
pub selected_rows: Option<Vec<Interval>>,
}
impl DataPage {
pub fn new(
header: DataPageHeader,
buffer: Vec<u8>,
descriptor: Descriptor,
rows: Option<usize>,
) -> Self {
Self::new_read(
header,
buffer,
descriptor,
rows.map(|x| vec![Interval::new(0, x)]),
)
}
pub(crate) fn new_read(
header: DataPageHeader,
buffer: Vec<u8>,
descriptor: Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> Self {
Self {
header,
buffer,
descriptor,
selected_rows,
}
}
pub fn header(&self) -> &DataPageHeader {
&self.header
}
pub fn buffer(&self) -> &[u8] {
&self.buffer
}
pub fn selected_rows(&self) -> Option<&[Interval]> {
self.selected_rows.as_deref()
}
pub fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buffer
}
pub fn num_values(&self) -> usize {
self.header.num_values()
}
pub fn encoding(&self) -> Encoding {
match &self.header {
DataPageHeader::V1(d) => d.encoding(),
DataPageHeader::V2(d) => d.encoding(),
}
}
pub fn definition_level_encoding(&self) -> Encoding {
match &self.header {
DataPageHeader::V1(d) => d.definition_level_encoding(),
DataPageHeader::V2(_) => Encoding::Rle,
}
}
pub fn repetition_level_encoding(&self) -> Encoding {
match &self.header {
DataPageHeader::V1(d) => d.repetition_level_encoding(),
DataPageHeader::V2(_) => Encoding::Rle,
}
}
pub fn statistics(&self) -> Option<Result<Arc<dyn Statistics>>> {
match &self.header {
DataPageHeader::V1(d) => d
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.descriptor.primitive_type.clone())),
DataPageHeader::V2(d) => d
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.descriptor.primitive_type.clone())),
}
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Page {
Data(DataPage),
Dict(DictPage),
}
impl Page {
pub(crate) fn buffer(&mut self) -> &mut Vec<u8> {
match self {
Self::Data(page) => &mut page.buffer,
Self::Dict(page) => &mut page.buffer,
}
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CompressedPage {
Data(CompressedDataPage),
Dict(CompressedDictPage),
}
impl CompressedPage {
pub(crate) fn buffer(&mut self) -> &mut Vec<u8> {
match self {
CompressedPage::Data(page) => &mut page.buffer,
CompressedPage::Dict(page) => &mut page.buffer,
}
}
pub(crate) fn compression(&self) -> Compression {
match self {
CompressedPage::Data(page) => page.compression(),
CompressedPage::Dict(page) => page.compression(),
}
}
pub(crate) fn num_values(&self) -> usize {
match self {
CompressedPage::Data(page) => page.num_values(),
CompressedPage::Dict(_) => 0,
}
}
pub(crate) fn selected_rows(&self) -> Option<&[Interval]> {
match self {
CompressedPage::Data(page) => page.selected_rows(),
CompressedPage::Dict(_) => None,
}
}
pub(crate) fn uncompressed_size(&self) -> usize {
match self {
CompressedPage::Data(page) => page.uncompressed_page_size,
CompressedPage::Dict(page) => page.uncompressed_page_size,
}
}
}
#[derive(Debug)]
pub struct DictPage {
pub buffer: Vec<u8>,
pub num_values: usize,
pub is_sorted: bool,
}
impl DictPage {
pub fn new(buffer: Vec<u8>, num_values: usize, is_sorted: bool) -> Self {
Self {
buffer,
num_values,
is_sorted,
}
}
}
#[derive(Debug)]
pub struct CompressedDictPage {
pub(crate) buffer: Vec<u8>,
compression: Compression,
pub(crate) num_values: usize,
pub(crate) uncompressed_page_size: usize,
pub is_sorted: bool,
}
impl CompressedDictPage {
pub fn new(
buffer: Vec<u8>,
compression: Compression,
uncompressed_page_size: usize,
num_values: usize,
is_sorted: bool,
) -> Self {
Self {
buffer,
compression,
uncompressed_page_size,
num_values,
is_sorted,
}
}
pub fn compression(&self) -> Compression {
self.compression
}
}
#[inline]
pub fn split_buffer_v1(
buffer: &[u8],
has_rep: bool,
has_def: bool,
) -> Result<(&[u8], &[u8], &[u8])> {
let (rep, buffer) = if has_rep {
let level_buffer_length = get_length(buffer).ok_or_else(|| {
Error::oos("The number of bytes declared in v1 rep levels is higher than the page size")
})?;
(
buffer.get(4..4 + level_buffer_length).ok_or_else(|| {
Error::oos(
"The number of bytes declared in v1 rep levels is higher than the page size",
)
})?,
buffer.get(4 + level_buffer_length..).ok_or_else(|| {
Error::oos(
"The number of bytes declared in v1 rep levels is higher than the page size",
)
})?,
)
} else {
(&[] as &[u8], buffer)
};
let (def, buffer) = if has_def {
let level_buffer_length = get_length(buffer).ok_or_else(|| {
Error::oos("The number of bytes declared in v1 rep levels is higher than the page size")
})?;
(
buffer.get(4..4 + level_buffer_length).ok_or_else(|| {
Error::oos(
"The number of bytes declared in v1 def levels is higher than the page size",
)
})?,
buffer.get(4 + level_buffer_length..).ok_or_else(|| {
Error::oos(
"The number of bytes declared in v1 def levels is higher than the page size",
)
})?,
)
} else {
(&[] as &[u8], buffer)
};
Ok((rep, def, buffer))
}
pub fn split_buffer_v2(
buffer: &[u8],
rep_level_buffer_length: usize,
def_level_buffer_length: usize,
) -> Result<(&[u8], &[u8], &[u8])> {
Ok((
&buffer[..rep_level_buffer_length],
&buffer[rep_level_buffer_length..rep_level_buffer_length + def_level_buffer_length],
&buffer[rep_level_buffer_length + def_level_buffer_length..],
))
}
pub fn split_buffer(page: &DataPage) -> Result<(&[u8], &[u8], &[u8])> {
match page.header() {
DataPageHeader::V1(_) => split_buffer_v1(
page.buffer(),
page.descriptor.max_rep_level > 0,
page.descriptor.max_def_level > 0,
),
DataPageHeader::V2(header) => {
let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?;
let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?;
split_buffer_v2(
page.buffer(),
rep_level_buffer_length,
def_level_buffer_length,
)
}
}
}