use bytes::Bytes;
use num::traits::WrappingAdd;
use num::FromPrimitive;
use std::{cmp, marker::PhantomData, mem};
use super::rle::RleDecoder;
use crate::basic::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::decoding::byte_stream_split_decoder::{
ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{self, BitReader};
mod byte_stream_split_decoder;
pub(crate) mod private {
use super::*;
pub trait GetDecoder {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
get_decoder_default(descr, encoding)
}
}
fn get_decoder_default<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
"Cannot initialize this encoding through this function"
)),
Encoding::RLE
| Encoding::DELTA_BINARY_PACKED
| Encoding::DELTA_BYTE_ARRAY
| Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
"Encoding {} is not supported for type",
encoding
)),
e => Err(nyi_err!("Encoding {} is not supported", e)),
}
}
impl GetDecoder for bool {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for i32 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for i64 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for f32 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for f64 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for ByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
Encoding::DELTA_LENGTH_BYTE_ARRAY => {
Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
}
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for FixedLenByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
)),
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for Int96 {}
}
pub trait Decoder<T: DataType>: Send {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
fn get_spaced(
&mut self,
buffer: &mut [T::T],
null_count: usize,
valid_bits: &[u8],
) -> Result<usize> {
assert!(buffer.len() >= null_count);
if null_count == 0 {
return self.get(buffer);
}
let num_values = buffer.len();
let values_to_read = num_values - null_count;
let values_read = self.get(buffer)?;
if values_read != values_to_read {
return Err(general_err!(
"Number of values read: {}, doesn't match expected: {}",
values_read,
values_to_read
));
}
let mut values_to_move = values_read;
for i in (0..num_values).rev() {
if bit_util::get_bit(valid_bits, i) {
values_to_move -= 1;
buffer.swap(i, values_to_move);
}
}
Ok(num_values)
}
fn values_left(&self) -> usize;
fn encoding(&self) -> Encoding;
fn skip(&mut self, num_values: usize) -> Result<usize>;
}
pub fn get_decoder<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
use self::private::GetDecoder;
T::T::get_decoder(descr, encoding)
}
#[derive(Default)]
pub struct PlainDecoderDetails {
pub(crate) num_values: usize,
pub(crate) start: usize,
pub(crate) type_length: i32,
pub(crate) data: Option<Bytes>,
pub(crate) bit_reader: Option<BitReader>,
}
pub struct PlainDecoder<T: DataType> {
inner: PlainDecoderDetails,
_phantom: PhantomData<T>,
}
impl<T: DataType> PlainDecoder<T> {
pub fn new(type_length: i32) -> Self {
PlainDecoder {
inner: PlainDecoderDetails {
type_length,
num_values: 0,
start: 0,
data: None,
bit_reader: None,
},
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for PlainDecoder<T> {
#[inline]
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
T::T::set_data(&mut self.inner, data, num_values);
Ok(())
}
#[inline]
fn values_left(&self) -> usize {
self.inner.num_values
}
#[inline]
fn encoding(&self) -> Encoding {
Encoding::PLAIN
}
#[inline]
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
T::T::decode(buffer, &mut self.inner)
}
#[inline]
fn skip(&mut self, num_values: usize) -> Result<usize> {
T::T::skip(&mut self.inner, num_values)
}
}
pub struct DictDecoder<T: DataType> {
dictionary: Vec<T::T>,
has_dictionary: bool,
rle_decoder: Option<RleDecoder>,
num_values: usize,
}
impl<T: DataType> Default for DictDecoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DictDecoder<T> {
pub fn new() -> Self {
Self {
dictionary: vec![],
has_dictionary: false,
rle_decoder: None,
num_values: 0,
}
}
pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
let num_values = decoder.values_left();
self.dictionary.resize(num_values, T::T::default());
let _ = decoder.get(&mut self.dictionary)?;
self.has_dictionary = true;
Ok(())
}
}
impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
let bit_width = data.as_ref()[0];
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.slice(1..));
self.num_values = num_values;
self.rle_decoder = Some(rle_decoder);
Ok(())
}
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.rle_decoder.is_some());
assert!(self.has_dictionary, "Must call set_dict() first!");
let rle = self.rle_decoder.as_mut().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
}
fn values_left(&self) -> usize {
self.num_values
}
fn encoding(&self) -> Encoding {
Encoding::RLE_DICTIONARY
}
fn skip(&mut self, num_values: usize) -> Result<usize> {
assert!(self.rle_decoder.is_some());
assert!(self.has_dictionary, "Must call set_dict() first!");
let rle = self.rle_decoder.as_mut().unwrap();
let num_values = cmp::min(num_values, self.num_values);
rle.skip(num_values)
}
}
pub struct RleValueDecoder<T: DataType> {
values_left: usize,
decoder: RleDecoder,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for RleValueDecoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> RleValueDecoder<T> {
pub fn new() -> Self {
Self {
values_left: 0,
decoder: RleDecoder::new(1),
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
#[inline]
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
const I32_SIZE: usize = mem::size_of::<i32>();
let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
self.decoder = RleDecoder::new(1);
self.decoder
.set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
self.values_left = num_values;
Ok(())
}
#[inline]
fn values_left(&self) -> usize {
self.values_left
}
#[inline]
fn encoding(&self) -> Encoding {
Encoding::RLE
}
#[inline]
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
let num_values = cmp::min(buffer.len(), self.values_left);
let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
self.values_left -= values_read;
Ok(values_read)
}
#[inline]
fn skip(&mut self, num_values: usize) -> Result<usize> {
let num_values = cmp::min(num_values, self.values_left);
let values_skipped = self.decoder.skip(num_values)?;
self.values_left -= values_skipped;
Ok(values_skipped)
}
}
pub struct DeltaBitPackDecoder<T: DataType> {
bit_reader: BitReader,
initialized: bool,
block_size: usize,
values_left: usize,
mini_blocks_per_block: usize,
values_per_mini_block: usize,
min_delta: T::T,
block_end_offset: usize,
mini_block_idx: usize,
mini_block_bit_widths: Vec<u8>,
mini_block_remaining: usize,
first_value: Option<T::T>,
last_value: T::T,
}
impl<T: DataType> Default for DeltaBitPackDecoder<T>
where
T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaBitPackDecoder<T>
where
T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
pub fn new() -> Self {
Self {
bit_reader: BitReader::from(vec![]),
initialized: false,
block_size: 0,
values_left: 0,
mini_blocks_per_block: 0,
values_per_mini_block: 0,
min_delta: Default::default(),
mini_block_idx: 0,
mini_block_bit_widths: vec![],
mini_block_remaining: 0,
block_end_offset: 0,
first_value: None,
last_value: Default::default(),
}
}
pub fn get_offset(&self) -> usize {
assert!(self.initialized, "Bit reader is not initialized");
match self.values_left {
0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
_ => self.bit_reader.get_byte_offset(),
}
}
#[inline]
fn next_block(&mut self) -> Result<()> {
let min_delta = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
self.min_delta =
T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
self.mini_block_bit_widths.clear();
self.bit_reader
.get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
let mut offset = self.bit_reader.get_byte_offset();
let mut remaining = self.values_left;
for b in &mut self.mini_block_bit_widths {
if remaining == 0 {
*b = 0;
}
remaining = remaining.saturating_sub(self.values_per_mini_block);
offset += *b as usize * self.values_per_mini_block / 8;
}
self.block_end_offset = offset;
if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
return Err(eof_err!("insufficient mini block bit widths"));
}
self.mini_block_remaining = self.values_per_mini_block;
self.mini_block_idx = 0;
Ok(())
}
#[inline]
fn next_mini_block(&mut self) -> Result<()> {
if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
self.mini_block_idx += 1;
self.mini_block_remaining = self.values_per_mini_block;
Ok(())
} else {
self.next_block()
}
}
}
impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
where
T::T: Default + FromPrimitive + WrappingAdd + Copy,
{
#[inline]
fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
self.bit_reader = BitReader::new(data);
self.initialized = true;
self.block_size = self
.bit_reader
.get_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
.try_into()
.map_err(|_| general_err!("invalid 'block_size'"))?;
self.mini_blocks_per_block = self
.bit_reader
.get_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
.try_into()
.map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
self.values_left = self
.bit_reader
.get_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
.try_into()
.map_err(|_| general_err!("invalid 'values_left'"))?;
let first_value = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
self.first_value =
Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
if self.block_size % 128 != 0 {
return Err(general_err!(
"'block_size' must be a multiple of 128, got {}",
self.block_size
));
}
if self.block_size % self.mini_blocks_per_block != 0 {
return Err(general_err!(
"'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
self.block_size,
self.mini_blocks_per_block
));
}
self.mini_block_idx = 0;
self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
self.mini_block_remaining = 0;
self.mini_block_bit_widths.clear();
if self.values_per_mini_block % 32 != 0 {
return Err(general_err!(
"'values_per_mini_block' must be a multiple of 32 got {}",
self.values_per_mini_block
));
}
Ok(())
}
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.initialized, "Bit reader is not initialized");
if buffer.is_empty() {
return Ok(0);
}
let mut read = 0;
let to_read = buffer.len().min(self.values_left);
if let Some(value) = self.first_value.take() {
self.last_value = value;
buffer[0] = value;
read += 1;
self.values_left -= 1;
}
while read != to_read {
if self.mini_block_remaining == 0 {
self.next_mini_block()?;
}
let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
let batch_to_read = self.mini_block_remaining.min(to_read - read);
let batch_read = self
.bit_reader
.get_batch(&mut buffer[read..read + batch_to_read], bit_width);
if batch_read != batch_to_read {
return Err(general_err!(
"Expected to read {} values from miniblock got {}",
batch_to_read,
batch_read
));
}
for v in &mut buffer[read..read + batch_read] {
*v = v
.wrapping_add(&self.min_delta)
.wrapping_add(&self.last_value);
self.last_value = *v;
}
read += batch_read;
self.mini_block_remaining -= batch_read;
self.values_left -= batch_read;
}
Ok(to_read)
}
fn values_left(&self) -> usize {
self.values_left
}
fn encoding(&self) -> Encoding {
Encoding::DELTA_BINARY_PACKED
}
fn skip(&mut self, num_values: usize) -> Result<usize> {
let mut skip = 0;
let to_skip = num_values.min(self.values_left);
if to_skip == 0 {
return Ok(0);
}
if let Some(value) = self.first_value.take() {
self.last_value = value;
skip += 1;
self.values_left -= 1;
}
let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
Type::INT32 => 32,
Type::INT64 => 64,
_ => unreachable!(),
};
let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
while skip < to_skip {
if self.mini_block_remaining == 0 {
self.next_mini_block()?;
}
let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
let mini_block_should_skip = mini_block_to_skip;
let skip_count = self
.bit_reader
.get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
if skip_count != mini_block_to_skip {
return Err(general_err!(
"Expected to skip {} values from mini block got {}.",
mini_block_batch_size,
skip_count
));
}
for v in &mut skip_buffer[0..skip_count] {
*v = v
.wrapping_add(&self.min_delta)
.wrapping_add(&self.last_value);
self.last_value = *v;
}
skip += mini_block_should_skip;
self.mini_block_remaining -= mini_block_should_skip;
self.values_left -= mini_block_should_skip;
}
Ok(to_skip)
}
}
pub struct DeltaLengthByteArrayDecoder<T: DataType> {
lengths: Vec<i32>,
current_idx: usize,
data: Option<Bytes>,
offset: usize,
num_values: usize,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
pub fn new() -> Self {
Self {
lengths: vec![],
current_idx: 0,
data: None,
offset: 0,
num_values: 0,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
match T::get_physical_type() {
Type::BYTE_ARRAY => {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.clone(), num_values)?;
let num_lengths = len_decoder.values_left();
self.lengths.resize(num_lengths, 0);
len_decoder.get(&mut self.lengths[..])?;
self.data = Some(data.slice(len_decoder.get_offset()..));
self.offset = 0;
self.current_idx = 0;
self.num_values = num_lengths;
Ok(())
}
_ => Err(general_err!(
"DeltaLengthByteArrayDecoder only support ByteArrayType"
)),
}
}
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
match T::get_physical_type() {
Type::BYTE_ARRAY => {
assert!(self.data.is_some());
let data = self.data.as_ref().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
for item in buffer.iter_mut().take(num_values) {
let len = self.lengths[self.current_idx] as usize;
item.set_from_bytes(data.slice(self.offset..self.offset + len));
self.offset += len;
self.current_idx += 1;
}
self.num_values -= num_values;
Ok(num_values)
}
_ => Err(general_err!(
"DeltaLengthByteArrayDecoder only support ByteArrayType"
)),
}
}
fn values_left(&self) -> usize {
self.num_values
}
fn encoding(&self) -> Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY
}
fn skip(&mut self, num_values: usize) -> Result<usize> {
match T::get_physical_type() {
Type::BYTE_ARRAY => {
let num_values = cmp::min(num_values, self.num_values);
let next_offset: i32 = self.lengths
[self.current_idx..self.current_idx + num_values]
.iter()
.sum();
self.current_idx += num_values;
self.offset += next_offset as usize;
self.num_values -= num_values;
Ok(num_values)
}
other_type => Err(general_err!(
"DeltaLengthByteArrayDecoder not support {}, only support byte array",
other_type
)),
}
}
}
pub struct DeltaByteArrayDecoder<T: DataType> {
prefix_lengths: Vec<i32>,
current_idx: usize,
suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
previous_value: Vec<u8>,
num_values: usize,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaByteArrayDecoder<T> {
pub fn new() -> Self {
Self {
prefix_lengths: vec![],
current_idx: 0,
suffix_decoder: None,
previous_value: vec![],
num_values: 0,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
match T::get_physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
prefix_len_decoder.set_data(data.clone(), num_values)?;
let num_prefixes = prefix_len_decoder.values_left();
self.prefix_lengths.resize(num_prefixes, 0);
prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
suffix_decoder
.set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
self.suffix_decoder = Some(suffix_decoder);
self.num_values = num_prefixes;
self.current_idx = 0;
self.previous_value.clear();
Ok(())
}
_ => Err(general_err!(
"DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
)),
}
}
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
match T::get_physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for item in buffer.iter_mut().take(num_values) {
let suffix_decoder = self
.suffix_decoder
.as_mut()
.expect("decoder not initialized");
suffix_decoder.get(&mut v[..])?;
let suffix = v[0].data();
let prefix_len = self.prefix_lengths[self.current_idx] as usize;
let mut result = Vec::new();
result.extend_from_slice(&self.previous_value[0..prefix_len]);
result.extend_from_slice(suffix);
let data = Bytes::from(result.clone());
item.set_from_bytes(data);
self.previous_value = result;
self.current_idx += 1;
}
self.num_values -= num_values;
Ok(num_values)
}
_ => Err(general_err!(
"DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
)),
}
}
fn values_left(&self) -> usize {
self.num_values
}
fn encoding(&self) -> Encoding {
Encoding::DELTA_BYTE_ARRAY
}
fn skip(&mut self, num_values: usize) -> Result<usize> {
let mut buffer = vec![T::T::default(); num_values];
self.get(&mut buffer)
}
}
#[cfg(test)]
mod tests {
use super::{super::encoding::*, *};
use std::f32::consts::PI as PI_f32;
use std::f64::consts::PI as PI_f64;
use std::sync::Arc;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
use crate::util::test_common::rand_gen::RandGen;
#[test]
fn test_get_decoders() {
create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<BoolType>(Encoding::RLE, None);
create_and_check_decoder::<Int32Type>(
Encoding::RLE_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::PLAIN_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_BYTE_ARRAY is not supported for type"
)),
);
#[allow(deprecated)]
create_and_check_decoder::<Int32Type>(
Encoding::BIT_PACKED,
Some(nyi_err!("Encoding BIT_PACKED is not supported")),
);
}
#[test]
fn test_plain_decode_int32() {
let data = [42, 18, 52];
let data_bytes = Int32Type::to_byte_array(&data[..]);
let mut buffer = [0; 3];
test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_skip_int32() {
let data = [42, 18, 52];
let data_bytes = Int32Type::to_byte_array(&data[..]);
test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
}
#[test]
fn test_plain_skip_all_int32() {
let data = [42, 18, 52];
let data_bytes = Int32Type::to_byte_array(&data[..]);
test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
}
#[test]
fn test_plain_decode_int32_spaced() {
let data = [42, 18, 52];
let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
let data_bytes = Int32Type::to_byte_array(&data[..]);
let mut buffer = [0; 8];
let num_nulls = 5;
let valid_bits = [0b01001010];
test_plain_decode_spaced::<Int32Type>(
Bytes::from(data_bytes),
3,
-1,
&mut buffer[..],
num_nulls,
&valid_bits,
&expected_data[..],
);
}
#[test]
fn test_plain_decode_int64() {
let data = [42, 18, 52];
let data_bytes = Int64Type::to_byte_array(&data[..]);
let mut buffer = [0; 3];
test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_skip_int64() {
let data = [42, 18, 52];
let data_bytes = Int64Type::to_byte_array(&data[..]);
test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
}
#[test]
fn test_plain_skip_all_int64() {
let data = [42, 18, 52];
let data_bytes = Int64Type::to_byte_array(&data[..]);
test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
}
#[test]
fn test_plain_decode_float() {
let data = [PI_f32, 2.414, 12.51];
let data_bytes = FloatType::to_byte_array(&data[..]);
let mut buffer = [0.0; 3];
test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_skip_float() {
let data = [PI_f32, 2.414, 12.51];
let data_bytes = FloatType::to_byte_array(&data[..]);
test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
}
#[test]
fn test_plain_skip_all_float() {
let data = [PI_f32, 2.414, 12.51];
let data_bytes = FloatType::to_byte_array(&data[..]);
test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
}
#[test]
fn test_plain_skip_double() {
let data = [PI_f64, 2.414f64, 12.51f64];
let data_bytes = DoubleType::to_byte_array(&data[..]);
test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
}
#[test]
fn test_plain_skip_all_double() {
let data = [PI_f64, 2.414f64, 12.51f64];
let data_bytes = DoubleType::to_byte_array(&data[..]);
test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
}
#[test]
fn test_plain_decode_double() {
let data = [PI_f64, 2.414f64, 12.51f64];
let data_bytes = DoubleType::to_byte_array(&data[..]);
let mut buffer = [0.0f64; 3];
test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_decode_int96() {
let mut data = [Int96::new(); 4];
data[0].set_data(11, 22, 33);
data[1].set_data(44, 55, 66);
data[2].set_data(10, 20, 30);
data[3].set_data(40, 50, 60);
let data_bytes = Int96Type::to_byte_array(&data[..]);
let mut buffer = [Int96::new(); 4];
test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_skip_int96() {
let mut data = [Int96::new(); 4];
data[0].set_data(11, 22, 33);
data[1].set_data(44, 55, 66);
data[2].set_data(10, 20, 30);
data[3].set_data(40, 50, 60);
let data_bytes = Int96Type::to_byte_array(&data[..]);
test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
}
#[test]
fn test_plain_skip_all_int96() {
let mut data = [Int96::new(); 4];
data[0].set_data(11, 22, 33);
data[1].set_data(44, 55, 66);
data[2].set_data(10, 20, 30);
data[3].set_data(40, 50, 60);
let data_bytes = Int96Type::to_byte_array(&data[..]);
test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
}
#[test]
fn test_plain_decode_bool() {
let data = [
false, true, false, false, true, false, true, true, false, true,
];
let data_bytes = BoolType::to_byte_array(&data[..]);
let mut buffer = [false; 10];
test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
}
#[test]
fn test_plain_skip_bool() {
let data = [
false, true, false, false, true, false, true, true, false, true,
];
let data_bytes = BoolType::to_byte_array(&data[..]);
test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
}
#[test]
fn test_plain_skip_all_bool() {
let data = [
false, true, false, false, true, false, true, true, false, true,
];
let data_bytes = BoolType::to_byte_array(&data[..]);
test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
}
#[test]
fn test_plain_decode_byte_array() {
let mut data = vec![ByteArray::new(); 2];
data[0].set_data(Bytes::from(String::from("hello")));
data[1].set_data(Bytes::from(String::from("parquet")));
let data_bytes = ByteArrayType::to_byte_array(&data[..]);
let mut buffer = vec![ByteArray::new(); 2];
test_plain_decode::<ByteArrayType>(
Bytes::from(data_bytes),
2,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_skip_byte_array() {
let mut data = vec![ByteArray::new(); 2];
data[0].set_data(Bytes::from(String::from("hello")));
data[1].set_data(Bytes::from(String::from("parquet")));
let data_bytes = ByteArrayType::to_byte_array(&data[..]);
test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
}
#[test]
fn test_plain_skip_all_byte_array() {
let mut data = vec![ByteArray::new(); 2];
data[0].set_data(Bytes::from(String::from("hello")));
data[1].set_data(Bytes::from(String::from("parquet")));
let data_bytes = ByteArrayType::to_byte_array(&data[..]);
test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
}
#[test]
fn test_plain_decode_fixed_len_byte_array() {
let mut data = vec![FixedLenByteArray::default(); 3];
data[0].set_data(Bytes::from(String::from("bird")));
data[1].set_data(Bytes::from(String::from("come")));
data[2].set_data(Bytes::from(String::from("flow")));
let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
let mut buffer = vec![FixedLenByteArray::default(); 3];
test_plain_decode::<FixedLenByteArrayType>(
Bytes::from(data_bytes),
3,
4,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_skip_fixed_len_byte_array() {
let mut data = vec![FixedLenByteArray::default(); 3];
data[0].set_data(Bytes::from(String::from("bird")));
data[1].set_data(Bytes::from(String::from("come")));
data[2].set_data(Bytes::from(String::from("flow")));
let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
}
#[test]
fn test_plain_skip_all_fixed_len_byte_array() {
let mut data = vec![FixedLenByteArray::default(); 3];
data[0].set_data(Bytes::from(String::from("bird")));
data[1].set_data(Bytes::from(String::from("come")));
data[2].set_data(Bytes::from(String::from("flow")));
let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
}
fn test_plain_decode<T: DataType>(
data: Bytes,
num_values: usize,
type_length: i32,
buffer: &mut [T::T],
expected: &[T::T],
) {
let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
let result = decoder.set_data(data, num_values);
assert!(result.is_ok());
let result = decoder.get(buffer);
assert!(result.is_ok());
assert_eq!(decoder.values_left(), 0);
assert_eq!(buffer, expected);
}
fn test_plain_skip<T: DataType>(
data: Bytes,
num_values: usize,
skip: usize,
type_length: i32,
expected: &[T::T],
) {
let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
let result = decoder.set_data(data, num_values);
assert!(result.is_ok());
let skipped = decoder.skip(skip).expect("skipping values");
if skip >= num_values {
assert_eq!(skipped, num_values);
let mut buffer = vec![T::T::default(); 1];
let remaining = decoder.get(&mut buffer).expect("getting remaining values");
assert_eq!(remaining, 0);
} else {
assert_eq!(skipped, skip);
let mut buffer = vec![T::T::default(); num_values - skip];
let remaining = decoder.get(&mut buffer).expect("getting remaining values");
assert_eq!(remaining, num_values - skip);
assert_eq!(decoder.values_left(), 0);
assert_eq!(buffer, expected);
}
}
fn test_plain_decode_spaced<T: DataType>(
data: Bytes,
num_values: usize,
type_length: i32,
buffer: &mut [T::T],
num_nulls: usize,
valid_bits: &[u8],
expected: &[T::T],
) {
let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
let result = decoder.set_data(data, num_values);
assert!(result.is_ok());
let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
assert!(result.is_ok());
assert_eq!(num_values + num_nulls, result.unwrap());
assert_eq!(decoder.values_left(), 0);
assert_eq!(buffer, expected);
}
#[test]
#[should_panic(expected = "RleValueEncoder only supports BoolType")]
fn test_rle_value_encode_int32_not_supported() {
let mut encoder = RleValueEncoder::<Int32Type>::new();
encoder.put(&[1, 2, 3, 4]).unwrap();
}
#[test]
#[should_panic(expected = "RleValueDecoder only supports BoolType")]
fn test_rle_value_decode_int32_not_supported() {
let mut decoder = RleValueDecoder::<Int32Type>::new();
decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
}
#[test]
fn test_rle_value_decode_bool_decode() {
let data = vec![
BoolType::gen_vec(-1, 256),
BoolType::gen_vec(-1, 257),
BoolType::gen_vec(-1, 126),
];
test_rle_value_decode::<BoolType>(data);
}
#[test]
#[should_panic(expected = "Bit reader is not initialized")]
fn test_delta_bit_packed_not_initialized_offset() {
let decoder = DeltaBitPackDecoder::<Int32Type>::new();
decoder.get_offset();
}
#[test]
#[should_panic(expected = "Bit reader is not initialized")]
fn test_delta_bit_packed_not_initialized_get() {
let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
let mut buffer = vec![];
decoder.get(&mut buffer).unwrap();
}
#[test]
fn test_delta_bit_packed_int32_empty() {
let data = vec![vec![0; 0]];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_repeat() {
let block_data = vec![
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
6, 7, 8,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_skip_delta_bit_packed_int32_repeat() {
let block_data = vec![
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
6, 7, 8,
];
test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
}
#[test]
fn test_delta_bit_packed_int32_uneven() {
let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_skip_delta_bit_packed_int32_uneven() {
let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
}
#[test]
fn test_delta_bit_packed_int32_same_values() {
let block_data = vec![
127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
let block_data = vec![
-127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
-127, -127,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_skip_delta_bit_packed_int32_same_values() {
let block_data = vec![
127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
];
test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
let block_data = vec![
-127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
-127, -127,
];
test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
}
#[test]
fn test_delta_bit_packed_int32_min_max() {
let block_data = vec![
i32::MIN,
i32::MIN,
i32::MIN,
i32::MAX,
i32::MIN,
i32::MAX,
i32::MIN,
i32::MAX,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_skip_delta_bit_packed_int32_min_max() {
let block_data = vec![
i32::MIN,
i32::MIN,
i32::MIN,
i32::MAX,
i32::MIN,
i32::MAX,
i32::MIN,
i32::MAX,
];
test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
}
#[test]
fn test_delta_bit_packed_int32_multiple_blocks() {
let data = vec![
Int32Type::gen_vec(-1, 64),
Int32Type::gen_vec(-1, 128),
Int32Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_data_across_blocks() {
let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_with_empty_blocks() {
let data = vec![
Int32Type::gen_vec(-1, 128),
vec![0; 0],
Int32Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int64_empty() {
let data = vec![vec![0; 0]];
test_delta_bit_packed_decode::<Int64Type>(data);
}
#[test]
fn test_delta_bit_packed_int64_min_max() {
let block_data = vec![
i64::MIN,
i64::MAX,
i64::MIN,
i64::MAX,
i64::MIN,
i64::MAX,
i64::MIN,
i64::MAX,
];
test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int64_multiple_blocks() {
let data = vec![
Int64Type::gen_vec(-1, 64),
Int64Type::gen_vec(-1, 128),
Int64Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int64Type>(data);
}
#[test]
fn test_delta_bit_packed_decoder_sample() {
let data_bytes = vec![
128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0,
];
let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
decoder.set_data(data_bytes.into(), 3).unwrap();
assert_eq!(decoder.get_offset(), 5);
let mut result = vec![0, 0, 0];
decoder.get(&mut result).unwrap();
assert_eq!(decoder.get_offset(), 34);
assert_eq!(result, vec![29, 43, 89]);
}
#[test]
fn test_delta_bit_packed_padding() {
let header = vec![
128,
2,
4,
128 + 35,
3,
7,
];
let block1_header = vec![
0, 0, 1, 0, 0, ];
let block1 = vec![0xFF; 8];
let block2_header = vec![
0, 0, 1, 2, 0xFF, ];
let block2 = vec![0xFF; 24];
let data: Vec<u8> = header
.into_iter()
.chain(block1_header)
.chain(block1)
.chain(block2_header)
.chain(block2)
.collect();
let length = data.len();
let ptr = Bytes::from(data);
let mut reader = BitReader::new(ptr.clone());
assert_eq!(reader.get_vlq_int().unwrap(), 256);
assert_eq!(reader.get_vlq_int().unwrap(), 4);
assert_eq!(reader.get_vlq_int().unwrap(), 419);
assert_eq!(reader.get_vlq_int().unwrap(), 7);
let mut output = vec![0_i32; 420];
let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
decoder.set_data(ptr.clone(), 0).unwrap();
assert_eq!(decoder.get(&mut output).unwrap(), 419);
assert_eq!(decoder.get_offset(), length);
decoder.set_data(ptr.slice(..12), 0).unwrap();
let err = decoder.get(&mut output).unwrap_err().to_string();
assert!(
err.contains("Expected to read 64 values from miniblock got 8"),
"{}",
err
);
}
#[test]
fn test_delta_byte_array_same_arrays() {
let data = vec![
vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
vec![
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
],
vec![
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
],
];
test_delta_byte_array_decode(data);
}
#[test]
fn test_delta_byte_array_unique_arrays() {
let data = vec![
vec![ByteArray::from(vec![1])],
vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
vec![
ByteArray::from(vec![7, 8]),
ByteArray::from(vec![9, 0, 1, 2]),
],
];
test_delta_byte_array_decode(data);
}
#[test]
fn test_delta_byte_array_single_array() {
let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
test_delta_byte_array_decode(data);
}
#[test]
fn test_byte_stream_split_multiple_f32() {
let data = vec![
vec![
f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
],
vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
];
test_byte_stream_split_decode::<FloatType>(data, -1);
}
#[test]
fn test_byte_stream_split_f64() {
let data = vec![vec![
f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
]];
test_byte_stream_split_decode::<DoubleType>(data, -1);
}
#[test]
fn test_byte_stream_split_multiple_i32() {
let data = vec![
vec![
i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
],
vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
];
test_byte_stream_split_decode::<Int32Type>(data, -1);
}
#[test]
fn test_byte_stream_split_i64() {
let data = vec![vec![
i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
]];
test_byte_stream_split_decode::<Int64Type>(data, -1);
}
fn test_byte_stream_split_flba(type_width: usize) {
let data = vec![
vec![
FixedLenByteArrayType::gen(type_width as i32),
FixedLenByteArrayType::gen(type_width as i32),
],
vec![FixedLenByteArrayType::gen(type_width as i32)],
];
test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
}
#[test]
fn test_byte_stream_split_flba5() {
test_byte_stream_split_flba(5);
}
#[test]
fn test_byte_stream_split_flba16() {
test_byte_stream_split_flba(16);
}
#[test]
fn test_byte_stream_split_flba19() {
test_byte_stream_split_flba(19);
}
#[test]
#[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
fn test_byte_stream_split_flba_mismatch() {
let data = vec![
vec![
FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
],
vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
];
test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
}
#[test]
#[should_panic(expected = "Input data length is not a multiple of type width 4")]
fn test_byte_stream_split_flba_bad_input() {
let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
decoder
.set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
.unwrap();
}
#[test]
fn test_skip_byte_stream_split() {
let block_data = vec![0.3, 0.4, 0.1, 4.10];
test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
test_skip::<DoubleType>(
block_data.into_iter().map(|x| x as f64).collect(),
Encoding::BYTE_STREAM_SPLIT,
100,
);
}
#[test]
fn test_skip_byte_stream_split_ints() {
let block_data = vec![3, 4, 1, 5];
test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
test_skip::<Int64Type>(
block_data.into_iter().map(|x| x as i64).collect(),
Encoding::BYTE_STREAM_SPLIT,
100,
);
}
fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::RLE, -1);
}
fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
}
fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
}
fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
}
fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
for v in &data[..] {
encoder.put(&v[..]).expect("ok to encode");
}
let bytes = encoder.flush_buffer().expect("ok to flush buffer");
let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
let mut result = vec![T::T::default(); expected.len()];
decoder
.set_data(bytes, expected.len())
.expect("ok to set data");
let mut result_num_values = 0;
while decoder.values_left() > 0 {
result_num_values += decoder
.get(&mut result[result_num_values..])
.expect("ok to decode");
}
assert_eq!(result_num_values, expected.len());
assert_eq!(result, expected);
}
fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
encoder.put(&data).expect("ok to encode");
let bytes = encoder.flush_buffer().expect("ok to flush buffer");
let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
decoder.set_data(bytes, data.len()).expect("ok to set data");
if skip >= data.len() {
let skipped = decoder.skip(skip).expect("ok to skip");
assert_eq!(skipped, data.len());
let skipped_again = decoder.skip(skip).expect("ok to skip again");
assert_eq!(skipped_again, 0);
} else {
let skipped = decoder.skip(skip).expect("ok to skip");
assert_eq!(skipped, skip);
let remaining = data.len() - skip;
let expected = &data[skip..];
let mut buffer = vec![T::T::default(); remaining];
let fetched = decoder.get(&mut buffer).expect("ok to decode");
assert_eq!(remaining, fetched);
assert_eq!(&buffer, expected);
}
}
fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
let decoder = get_decoder::<T>(descr, encoding);
match err {
Some(parquet_error) => {
assert_eq!(
decoder.err().unwrap().to_string(),
parquet_error.to_string()
);
}
None => {
assert_eq!(decoder.unwrap().encoding(), encoding);
}
}
}
fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
let ty = SchemaType::primitive_type_builder("t", t)
.with_length(type_len)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(
Arc::new(ty),
0,
0,
ColumnPath::new(vec![]),
))
}
fn usize_to_bytes(v: usize) -> [u8; 4] {
(v as u32).to_ne_bytes()
}
trait ToByteArray<T: DataType> {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[T::T]) -> Vec<u8>;
}
macro_rules! to_byte_array_impl {
($ty: ty) => {
impl ToByteArray<$ty> for $ty {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
<$ty as DataType>::T::slice_as_bytes(data).to_vec()
}
}
};
}
to_byte_array_impl!(Int32Type);
to_byte_array_impl!(Int64Type);
to_byte_array_impl!(FloatType);
to_byte_array_impl!(DoubleType);
impl ToByteArray<BoolType> for BoolType {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[bool]) -> Vec<u8> {
let mut v = vec![];
for (i, item) in data.iter().enumerate() {
if i % 8 == 0 {
v.push(0);
}
if *item {
v[i / 8] |= 1 << (i % 8);
}
}
v
}
}
impl ToByteArray<Int96Type> for Int96Type {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[Int96]) -> Vec<u8> {
let mut v = vec![];
for d in data {
v.extend_from_slice(d.as_bytes());
}
v
}
}
impl ToByteArray<ByteArrayType> for ByteArrayType {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
let mut v = vec![];
for d in data {
let buf = d.data();
let len = &usize_to_bytes(buf.len());
v.extend_from_slice(len);
v.extend(buf);
}
v
}
}
impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
let mut v = vec![];
for d in data {
let buf = d.data();
v.extend(buf);
}
v
}
}
}