use std::sync::Arc;
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding};
use super::column_descriptor::ColumnDescriptor;
use crate::compression::Compression;
use crate::error::{Error, Result};
use crate::schema::types::PhysicalType;
use crate::statistics::{deserialize_statistics, Statistics};
#[cfg(feature = "serde_types")]
mod serde_types {
pub use parquet_format_safe::thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol,
};
pub use serde::de::Error as DeserializeError;
pub use serde::ser::Error as SerializeError;
pub use serde::{Deserialize, Deserializer, Serializer};
pub use serde_derive::{Deserialize, Serialize};
pub use std::io::Cursor;
}
#[cfg(feature = "serde_types")]
use serde_types::*;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub struct ColumnChunkMetaData {
#[cfg_attr(
feature = "serde_types",
serde(serialize_with = "serialize_column_chunk")
)]
#[cfg_attr(
feature = "serde_types",
serde(deserialize_with = "deserialize_column_chunk")
)]
column_chunk: ColumnChunk,
column_descr: ColumnDescriptor,
}
#[cfg(feature = "serde_types")]
fn serialize_column_chunk<S>(
column_chunk: &ColumnChunk,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut buf = vec![];
let cursor = Cursor::new(&mut buf[..]);
let mut protocol = TCompactOutputProtocol::new(cursor);
column_chunk
.write_to_out_protocol(&mut protocol)
.map_err(S::Error::custom)?;
serializer.serialize_bytes(&buf)
}
#[cfg(feature = "serde_types")]
fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>
where
D: Deserializer<'de>,
{
let buf = Vec::<u8>::deserialize(deserializer)?;
let mut cursor = Cursor::new(&buf[..]);
let mut protocol = TCompactInputProtocol::new(&mut cursor, usize::MAX);
ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)
}
impl ColumnChunkMetaData {
pub fn new(column_chunk: ColumnChunk, column_descr: ColumnDescriptor) -> Self {
Self {
column_chunk,
column_descr,
}
}
pub fn file_path(&self) -> &Option<String> {
&self.column_chunk.file_path
}
pub fn file_offset(&self) -> i64 {
self.column_chunk.file_offset
}
pub fn column_chunk(&self) -> &ColumnChunk {
&self.column_chunk
}
pub fn metadata(&self) -> &ColumnMetaData {
self.column_chunk.meta_data.as_ref().unwrap()
}
pub fn descriptor(&self) -> &ColumnDescriptor {
&self.column_descr
}
pub fn physical_type(&self) -> PhysicalType {
self.column_descr.descriptor.primitive_type.physical_type
}
pub fn statistics(&self) -> Option<Result<Arc<dyn Statistics>>> {
self.metadata()
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.column_descr.descriptor.primitive_type.clone()))
}
pub fn num_values(&self) -> i64 {
self.metadata().num_values
}
pub fn compression(&self) -> Compression {
self.metadata().codec.try_into().unwrap()
}
pub fn compressed_size(&self) -> i64 {
self.metadata().total_compressed_size
}
pub fn uncompressed_size(&self) -> i64 {
self.metadata().total_uncompressed_size
}
pub fn data_page_offset(&self) -> i64 {
self.metadata().data_page_offset
}
pub fn has_index_page(&self) -> bool {
self.metadata().index_page_offset.is_some()
}
pub fn index_page_offset(&self) -> Option<i64> {
self.metadata().index_page_offset
}
pub fn dictionary_page_offset(&self) -> Option<i64> {
self.metadata().dictionary_page_offset
}
pub fn column_encoding(&self) -> &Vec<Encoding> {
&self.metadata().encodings
}
pub fn byte_range(&self) -> (u64, u64) {
let start = if let Some(dict_page_offset) = self.dictionary_page_offset() {
dict_page_offset as u64
} else {
self.data_page_offset() as u64
};
let length = self.compressed_size() as u64;
(start, length)
}
pub(crate) fn try_from_thrift(
column_descr: ColumnDescriptor,
column_chunk: ColumnChunk,
) -> Result<Self> {
if let Some(meta) = &column_chunk.meta_data {
let _: u64 = meta.total_compressed_size.try_into()?;
if let Some(offset) = meta.dictionary_page_offset {
let _: u64 = offset.try_into()?;
}
let _: u64 = meta.data_page_offset.try_into()?;
let _: Compression = meta.codec.try_into()?;
} else {
return Err(Error::oos("Column chunk requires metdata"));
}
Ok(Self {
column_chunk,
column_descr,
})
}
pub fn into_thrift(self) -> ColumnChunk {
self.column_chunk
}
}