use crate::basic::Type as PhysicalType;
use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::record::api::Field;
use crate::schema::types::ColumnDescPtr;
macro_rules! triplet_enum_func {
($self:ident, $func:ident, $( $token:tt ),*) => ({
match *$self {
TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
}
});
}
#[allow(clippy::enum_variant_names)]
pub enum TripletIter {
BoolTripletIter(TypedTripletIter<BoolType>),
Int32TripletIter(TypedTripletIter<Int32Type>),
Int64TripletIter(TypedTripletIter<Int64Type>),
Int96TripletIter(TypedTripletIter<Int96Type>),
FloatTripletIter(TypedTripletIter<FloatType>),
DoubleTripletIter(TypedTripletIter<DoubleType>),
ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
}
impl TripletIter {
pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
match descr.physical_type() {
PhysicalType::BOOLEAN => {
TripletIter::BoolTripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::INT32 => {
TripletIter::Int32TripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::INT64 => {
TripletIter::Int64TripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::INT96 => {
TripletIter::Int96TripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::FLOAT => {
TripletIter::FloatTripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::DOUBLE => {
TripletIter::DoubleTripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::BYTE_ARRAY => {
TripletIter::ByteArrayTripletIter(TypedTripletIter::new(descr, batch_size, reader))
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => TripletIter::FixedLenByteArrayTripletIter(
TypedTripletIter::new(descr, batch_size, reader),
),
}
}
#[inline]
pub fn read_next(&mut self) -> Result<bool> {
triplet_enum_func!(self, read_next, ref, mut)
}
#[inline]
pub fn has_next(&self) -> bool {
triplet_enum_func!(self, has_next, ref)
}
#[inline]
pub fn current_def_level(&self) -> i16 {
triplet_enum_func!(self, current_def_level, ref)
}
#[inline]
pub fn max_def_level(&self) -> i16 {
triplet_enum_func!(self, max_def_level, ref)
}
#[inline]
pub fn current_rep_level(&self) -> i16 {
triplet_enum_func!(self, current_rep_level, ref)
}
#[inline]
pub fn max_rep_level(&self) -> i16 {
triplet_enum_func!(self, max_rep_level, ref)
}
#[inline]
pub fn is_null(&self) -> bool {
self.current_def_level() < self.max_def_level()
}
pub fn current_value(&self) -> Result<Field> {
if self.is_null() {
return Ok(Field::Null);
}
let field = match *self {
TripletIter::BoolTripletIter(ref typed) => {
Field::convert_bool(typed.column_descr(), *typed.current_value())
}
TripletIter::Int32TripletIter(ref typed) => {
Field::convert_int32(typed.column_descr(), *typed.current_value())
}
TripletIter::Int64TripletIter(ref typed) => {
Field::convert_int64(typed.column_descr(), *typed.current_value())
}
TripletIter::Int96TripletIter(ref typed) => {
Field::convert_int96(typed.column_descr(), *typed.current_value())
}
TripletIter::FloatTripletIter(ref typed) => {
Field::convert_float(typed.column_descr(), *typed.current_value())
}
TripletIter::DoubleTripletIter(ref typed) => {
Field::convert_double(typed.column_descr(), *typed.current_value())
}
TripletIter::ByteArrayTripletIter(ref typed) => {
Field::convert_byte_array(typed.column_descr(), typed.current_value().clone())?
}
TripletIter::FixedLenByteArrayTripletIter(ref typed) => Field::convert_byte_array(
typed.column_descr(),
typed.current_value().clone().into(),
)?,
};
Ok(field)
}
}
pub struct TypedTripletIter<T: DataType> {
reader: ColumnReaderImpl<T>,
column_descr: ColumnDescPtr,
batch_size: usize,
max_def_level: i16,
max_rep_level: i16,
values: Vec<T::T>,
def_levels: Option<Vec<i16>>,
rep_levels: Option<Vec<i16>>,
curr_triplet_index: usize,
triplets_left: usize,
has_next: bool,
}
impl<T: DataType> TypedTripletIter<T> {
fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
assert!(
batch_size > 0,
"Expected positive batch size, found: {batch_size}"
);
let max_def_level = descr.max_def_level();
let max_rep_level = descr.max_rep_level();
let def_levels = if max_def_level == 0 {
None
} else {
Some(vec![0; batch_size])
};
let rep_levels = if max_rep_level == 0 {
None
} else {
Some(vec![0; batch_size])
};
Self {
reader: get_typed_column_reader(column_reader),
column_descr: descr,
batch_size,
max_def_level,
max_rep_level,
values: vec![T::T::default(); batch_size],
def_levels,
rep_levels,
curr_triplet_index: 0,
triplets_left: 0,
has_next: false,
}
}
#[inline]
pub fn column_descr(&self) -> &ColumnDescPtr {
&self.column_descr
}
#[inline]
fn max_def_level(&self) -> i16 {
self.max_def_level
}
#[inline]
fn max_rep_level(&self) -> i16 {
self.max_rep_level
}
#[inline]
fn current_value(&self) -> &T::T {
assert!(
self.current_def_level() == self.max_def_level(),
"Cannot extract value, max definition level: {}, current level: {}",
self.max_def_level(),
self.current_def_level()
);
&self.values[self.curr_triplet_index]
}
#[inline]
fn current_def_level(&self) -> i16 {
match self.def_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_def_level,
}
}
#[inline]
fn current_rep_level(&self) -> i16 {
match self.rep_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_rep_level,
}
}
#[inline]
fn has_next(&self) -> bool {
self.has_next
}
fn read_next(&mut self) -> Result<bool> {
self.curr_triplet_index += 1;
while self.curr_triplet_index >= self.triplets_left {
let (records_read, values_read, levels_read) = {
self.values.clear();
if let Some(x) = &mut self.def_levels {
x.clear()
}
if let Some(x) = &mut self.rep_levels {
x.clear()
}
self.reader.read_records(
self.batch_size,
self.def_levels.as_mut(),
self.rep_levels.as_mut(),
&mut self.values,
)?
};
if records_read == 0 && values_read == 0 && levels_read == 0 {
self.has_next = false;
return Ok(false);
}
if levels_read == 0 || values_read == levels_read {
self.curr_triplet_index = 0;
self.triplets_left = values_read;
} else if values_read < levels_read {
let mut idx = values_read;
let def_levels = self.def_levels.as_ref().unwrap();
self.values.resize(levels_read, T::T::default());
for i in 0..levels_read {
if def_levels[levels_read - i - 1] == self.max_def_level {
idx -= 1; self.values.swap(levels_read - i - 1, idx);
}
}
self.curr_triplet_index = 0;
self.triplets_left = levels_read;
} else {
return Err(general_err!(
"Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
values_read,
levels_read
));
}
}
self.has_next = true;
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::types::ColumnPath;
use crate::util::test_common::file_util::get_test_file;
#[test]
#[should_panic(expected = "Expected positive batch size, found: 0")]
fn test_triplet_zero_batch_size() {
let column_path = ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
}
#[test]
fn test_triplet_null_column() {
let path = vec!["b_struct", "b_c_int"];
let values = vec![];
let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
test_triplet_iter(
"nulls.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_required_column() {
let path = vec!["ID"];
let values = vec![Field::Long(8)];
let def_levels = vec![0];
let rep_levels = vec![0];
test_triplet_iter(
"nonnullable.impala.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_column() {
let path = vec!["nested_struct", "A"];
let values = vec![Field::Int(1), Field::Int(7)];
let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
test_triplet_iter(
"nullable.impala.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_list_column() {
let path = vec!["a", "list", "element", "list", "element", "list", "element"];
let values = vec![
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("e".to_string()),
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("e".to_string()),
Field::Str("f".to_string()),
];
let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
test_triplet_iter(
"nested_lists.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_map_column() {
let path = vec!["a", "key_value", "value", "key_value", "key"];
let values = vec![
Field::Int(1),
Field::Int(2),
Field::Int(1),
Field::Int(1),
Field::Int(3),
Field::Int(4),
Field::Int(5),
];
let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
test_triplet_iter(
"nested_maps.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
fn test_triplet_iter(
file_name: &str,
column_path: Vec<&str>,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
let column_path = ColumnPath::from(path);
let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
for batch_size in batch_sizes {
test_column_in_file(
file_name,
batch_size,
&column_path,
expected_values,
expected_def_levels,
expected_rep_levels,
);
}
}
fn test_column_in_file(
file_name: &str,
batch_size: usize,
column_path: &ColumnPath,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
let file = get_test_file(file_name);
let file_reader = SerializedFileReader::new(file).unwrap();
let metadata = file_reader.metadata();
let file_metadata = metadata.file_metadata();
let schema = file_metadata.schema_descr();
let row_group_reader = file_reader.get_row_group(0).unwrap();
for i in 0..schema.num_columns() {
let descr = schema.column(i);
if descr.path() == column_path {
let reader = row_group_reader.get_column_reader(i).unwrap();
test_triplet_column(
descr,
reader,
batch_size,
expected_values,
expected_def_levels,
expected_rep_levels,
);
}
}
}
fn test_triplet_column(
descr: ColumnDescPtr,
reader: ColumnReader,
batch_size: usize,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
let mut values: Vec<Field> = Vec::new();
let mut def_levels: Vec<i16> = Vec::new();
let mut rep_levels: Vec<i16> = Vec::new();
assert_eq!(iter.max_def_level(), descr.max_def_level());
assert_eq!(iter.max_rep_level(), descr.max_rep_level());
while let Ok(true) = iter.read_next() {
assert!(iter.has_next());
if !iter.is_null() {
values.push(iter.current_value().unwrap());
}
def_levels.push(iter.current_def_level());
rep_levels.push(iter.current_rep_level());
}
assert_eq!(values, expected_values);
assert_eq!(def_levels, expected_def_levels);
assert_eq!(rep_levels, expected_rep_levels);
}
}