use std::{cmp, marker::PhantomData};
use crate::basic::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{num_required_bits, BitWriter};
use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder};
use bytes::Bytes;
pub use dict_encoder::DictEncoder;
mod byte_stream_split_encoder;
mod dict_encoder;
pub trait Encoder<T: DataType>: Send {
fn put(&mut self, values: &[T::T]) -> Result<()>;
#[cfg(test)]
fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> {
let num_values = values.len();
let mut buffer = Vec::with_capacity(num_values);
for (i, item) in values.iter().enumerate().take(num_values) {
if crate::util::bit_util::get_bit(valid_bits, i) {
buffer.push(item.clone());
}
}
self.put(&buffer[..])?;
Ok(buffer.len())
}
fn encoding(&self) -> Encoding;
fn estimated_data_encoded_size(&self) -> usize;
fn estimated_memory_size(&self) -> usize;
fn flush_buffer(&mut self) -> Result<Bytes>;
}
pub fn get_encoder<T: DataType>(
encoding: Encoding,
descr: &ColumnDescPtr,
) -> Result<Box<dyn Encoder<T>>> {
let encoder: Box<dyn Encoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainEncoder::new()),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
"Cannot initialize this encoding through this function"
));
}
Encoding::RLE => Box::new(RleValueEncoder::new()),
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
Type::FIXED_LEN_BYTE_ARRAY => Box::new(VariableWidthByteStreamSplitEncoder::new(
descr.type_length(),
)),
_ => Box::new(ByteStreamSplitEncoder::new()),
},
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(encoder)
}
pub struct PlainEncoder<T: DataType> {
buffer: Vec<u8>,
bit_writer: BitWriter,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for PlainEncoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> PlainEncoder<T> {
pub fn new() -> Self {
Self {
buffer: vec![],
bit_writer: BitWriter::new(256),
_phantom: PhantomData,
}
}
}
impl<T: DataType> Encoder<T> for PlainEncoder<T> {
#[cold]
fn encoding(&self) -> Encoding {
Encoding::PLAIN
}
fn estimated_data_encoded_size(&self) -> usize {
self.buffer.len() + self.bit_writer.bytes_written()
}
#[inline]
fn flush_buffer(&mut self) -> Result<Bytes> {
self.buffer
.extend_from_slice(self.bit_writer.flush_buffer());
self.bit_writer.clear();
Ok(std::mem::take(&mut self.buffer).into())
}
#[inline]
fn put(&mut self, values: &[T::T]) -> Result<()> {
T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
Ok(())
}
fn estimated_memory_size(&self) -> usize {
self.buffer.capacity() * std::mem::size_of::<u8>() + self.bit_writer.estimated_memory_size()
}
}
const DEFAULT_RLE_BUFFER_LEN: usize = 1024;
pub struct RleValueEncoder<T: DataType> {
encoder: Option<RleEncoder>,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for RleValueEncoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> RleValueEncoder<T> {
pub fn new() -> Self {
Self {
encoder: None,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
#[inline]
fn put(&mut self, values: &[T::T]) -> Result<()> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
let rle_encoder = self.encoder.get_or_insert_with(|| {
let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
buffer.extend_from_slice(&[0; 4]);
RleEncoder::new_from_buf(1, buffer)
});
for value in values {
let value = value.as_u64()?;
rle_encoder.put(value)
}
Ok(())
}
#[cold]
fn encoding(&self) -> Encoding {
Encoding::RLE
}
#[inline]
fn estimated_data_encoded_size(&self) -> usize {
match self.encoder {
Some(ref enc) => enc.len(),
None => 0,
}
}
#[inline]
fn flush_buffer(&mut self) -> Result<Bytes> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
let rle_encoder = self
.encoder
.take()
.expect("RLE value encoder is not initialized");
let mut buf = rle_encoder.consume();
assert!(buf.len() >= 4, "should have had padding inserted");
let len = (buf.len() - 4) as i32;
buf[..4].copy_from_slice(&len.to_le_bytes());
Ok(buf.into())
}
fn estimated_memory_size(&self) -> usize {
self.encoder
.as_ref()
.map_or(0, |enc| enc.estimated_memory_size())
}
}
const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32;
const DEFAULT_BIT_WRITER_SIZE: usize = 1024 * 1024;
const DEFAULT_NUM_MINI_BLOCKS: usize = 4;
pub struct DeltaBitPackEncoder<T: DataType> {
page_header_writer: BitWriter,
bit_writer: BitWriter,
total_values: usize,
first_value: i64,
current_value: i64,
block_size: usize,
mini_block_size: usize,
num_mini_blocks: usize,
values_in_block: usize,
deltas: Vec<i64>,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for DeltaBitPackEncoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaBitPackEncoder<T> {
pub fn new() -> Self {
Self::assert_supported_type();
let mini_block_size = match T::T::PHYSICAL_TYPE {
Type::INT32 => 32,
Type::INT64 => 64,
_ => unreachable!(),
};
let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
let block_size = mini_block_size * num_mini_blocks;
assert_eq!(block_size % 128, 0);
DeltaBitPackEncoder {
page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE),
bit_writer: BitWriter::new(DEFAULT_BIT_WRITER_SIZE),
total_values: 0,
first_value: 0,
current_value: 0, block_size, mini_block_size,
num_mini_blocks,
values_in_block: 0, deltas: vec![0; block_size],
_phantom: PhantomData,
}
}
fn write_page_header(&mut self) {
self.page_header_writer.put_vlq_int(self.block_size as u64);
self.page_header_writer
.put_vlq_int(self.num_mini_blocks as u64);
self.page_header_writer
.put_vlq_int(self.total_values as u64);
self.page_header_writer.put_zigzag_vlq_int(self.first_value);
}
#[inline(never)]
fn flush_block_values(&mut self) -> Result<()> {
if self.values_in_block == 0 {
return Ok(());
}
let mut min_delta = i64::MAX;
for i in 0..self.values_in_block {
min_delta = cmp::min(min_delta, self.deltas[i]);
}
self.bit_writer.put_zigzag_vlq_int(min_delta);
let offset = self.bit_writer.skip(self.num_mini_blocks);
for i in 0..self.num_mini_blocks {
let n = cmp::min(self.mini_block_size, self.values_in_block);
if n == 0 {
let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
for j in i..self.num_mini_blocks {
self.bit_writer.write_at(offset + j, pad_value);
}
break;
}
let mut max_delta = i64::MIN;
for j in 0..n {
max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
}
let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize;
self.bit_writer.write_at(offset + i, bit_width as u8);
for j in 0..n {
let packed_value =
self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta);
self.bit_writer.put_value(packed_value, bit_width);
}
for _ in n..self.mini_block_size {
self.bit_writer.put_value(0, bit_width);
}
self.values_in_block -= n;
}
assert_eq!(
self.values_in_block, 0,
"Expected 0 values in block, found {}",
self.values_in_block
);
Ok(())
}
}
impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let mut idx = if self.total_values == 0 {
self.first_value = self.as_i64(values, 0);
self.current_value = self.first_value;
1
} else {
0
};
self.total_values += values.len();
while idx < values.len() {
let value = self.as_i64(values, idx);
self.deltas[self.values_in_block] = self.subtract(value, self.current_value);
self.current_value = value;
idx += 1;
self.values_in_block += 1;
if self.values_in_block == self.block_size {
self.flush_block_values()?;
}
}
Ok(())
}
#[cold]
fn encoding(&self) -> Encoding {
Encoding::DELTA_BINARY_PACKED
}
fn estimated_data_encoded_size(&self) -> usize {
self.bit_writer.bytes_written()
}
fn flush_buffer(&mut self) -> Result<Bytes> {
self.flush_block_values()?;
self.write_page_header();
let mut buffer = Vec::new();
buffer.extend_from_slice(self.page_header_writer.flush_buffer());
buffer.extend_from_slice(self.bit_writer.flush_buffer());
self.page_header_writer.clear();
self.bit_writer.clear();
self.total_values = 0;
self.first_value = 0;
self.current_value = 0;
self.values_in_block = 0;
Ok(buffer.into())
}
fn estimated_memory_size(&self) -> usize {
self.page_header_writer.estimated_memory_size()
+ self.bit_writer.estimated_memory_size()
+ self.deltas.capacity() * std::mem::size_of::<i64>()
+ std::mem::size_of::<Self>()
}
}
trait DeltaBitPackEncoderConversion<T: DataType> {
fn assert_supported_type();
fn as_i64(&self, values: &[T::T], index: usize) -> i64;
fn subtract(&self, left: i64, right: i64) -> i64;
fn subtract_u64(&self, left: i64, right: i64) -> u64;
}
impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> {
#[inline]
fn assert_supported_type() {
ensure_phys_ty!(
Type::INT32 | Type::INT64,
"DeltaBitPackDecoder only supports Int32Type and Int64Type"
);
}
#[inline]
fn as_i64(&self, values: &[T::T], index: usize) -> i64 {
values[index]
.as_i64()
.expect("DeltaBitPackDecoder only supports Int32Type and Int64Type")
}
#[inline]
fn subtract(&self, left: i64, right: i64) -> i64 {
match T::get_physical_type() {
Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64,
Type::INT64 => left.wrapping_sub(right),
_ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
}
}
#[inline]
fn subtract_u64(&self, left: i64, right: i64) -> u64 {
match T::get_physical_type() {
Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64,
Type::INT64 => left.wrapping_sub(right) as u64,
_ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
}
}
}
pub struct DeltaLengthByteArrayEncoder<T: DataType> {
len_encoder: DeltaBitPackEncoder<Int32Type>,
data: Vec<ByteArray>,
encoded_size: usize,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for DeltaLengthByteArrayEncoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaLengthByteArrayEncoder<T> {
pub fn new() -> Self {
Self {
len_encoder: DeltaBitPackEncoder::new(),
data: vec![],
encoded_size: 0,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
ensure_phys_ty!(
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
"DeltaLengthByteArrayEncoder only supports ByteArrayType"
);
let val_it = || {
values
.iter()
.map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap())
};
let lengths: Vec<i32> = val_it().map(|byte_array| byte_array.len() as i32).collect();
self.len_encoder.put(&lengths)?;
for byte_array in val_it() {
self.encoded_size += byte_array.len();
self.data.push(byte_array.clone());
}
Ok(())
}
#[cold]
fn encoding(&self) -> Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY
}
fn estimated_data_encoded_size(&self) -> usize {
self.len_encoder.estimated_data_encoded_size() + self.encoded_size
}
fn flush_buffer(&mut self) -> Result<Bytes> {
ensure_phys_ty!(
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
"DeltaLengthByteArrayEncoder only supports ByteArrayType"
);
let mut total_bytes = vec![];
let lengths = self.len_encoder.flush_buffer()?;
total_bytes.extend_from_slice(&lengths);
self.data.iter().for_each(|byte_array| {
total_bytes.extend_from_slice(byte_array.data());
});
self.data.clear();
self.encoded_size = 0;
Ok(total_bytes.into())
}
fn estimated_memory_size(&self) -> usize {
self.len_encoder.estimated_memory_size() + self.data.len() + std::mem::size_of::<Self>()
}
}
pub struct DeltaByteArrayEncoder<T: DataType> {
prefix_len_encoder: DeltaBitPackEncoder<Int32Type>,
suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>,
previous: Vec<u8>,
_phantom: PhantomData<T>,
}
impl<T: DataType> Default for DeltaByteArrayEncoder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: DataType> DeltaByteArrayEncoder<T> {
pub fn new() -> Self {
Self {
prefix_len_encoder: DeltaBitPackEncoder::new(),
suffix_writer: DeltaLengthByteArrayEncoder::new(),
previous: vec![],
_phantom: PhantomData,
}
}
}
impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
let mut prefix_lengths: Vec<i32> = vec![];
let mut suffixes: Vec<ByteArray> = vec![];
let values = values
.iter()
.map(|x| x.as_any())
.map(|x| match T::get_physical_type() {
Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(),
Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(),
_ => panic!(
"DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
),
});
for byte_array in values {
let current = byte_array.data();
let prefix_len = cmp::min(self.previous.len(), current.len());
let mut match_len = 0;
while match_len < prefix_len && self.previous[match_len] == current[match_len] {
match_len += 1;
}
prefix_lengths.push(match_len as i32);
suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len));
self.previous.clear();
self.previous.extend_from_slice(current);
}
self.prefix_len_encoder.put(&prefix_lengths)?;
self.suffix_writer.put(&suffixes)?;
Ok(())
}
#[cold]
fn encoding(&self) -> Encoding {
Encoding::DELTA_BYTE_ARRAY
}
fn estimated_data_encoded_size(&self) -> usize {
self.prefix_len_encoder.estimated_data_encoded_size()
+ self.suffix_writer.estimated_data_encoded_size()
}
fn flush_buffer(&mut self) -> Result<Bytes> {
match T::get_physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
let mut total_bytes = vec![];
let lengths = self.prefix_len_encoder.flush_buffer()?;
total_bytes.extend_from_slice(&lengths);
let suffixes = self.suffix_writer.flush_buffer()?;
total_bytes.extend_from_slice(&suffixes);
self.previous.clear();
Ok(total_bytes.into())
}
_ => panic!(
"DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
),
}
}
fn estimated_memory_size(&self) -> usize {
self.prefix_len_encoder.estimated_memory_size()
+ self.suffix_writer.estimated_memory_size()
+ (self.previous.capacity() * std::mem::size_of::<u8>())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
use crate::util::bit_util;
use crate::util::test_common::rand_gen::{random_bytes, RandGen};
const TEST_SET_SIZE: usize = 1024;
#[test]
fn test_get_encoders() {
create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BINARY_PACKED, None);
create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
create_and_check_encoder::<Int32Type>(
0,
Encoding::RLE_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_encoder::<Int32Type>(
0,
Encoding::PLAIN_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
#[allow(deprecated)]
create_and_check_encoder::<Int32Type>(
0,
Encoding::BIT_PACKED,
Some(nyi_err!("Encoding BIT_PACKED is not supported")),
);
}
#[test]
fn test_bool() {
BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1);
}
#[test]
fn test_i32() {
Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
fn test_i64() {
Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
fn test_i96() {
Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
}
#[test]
fn test_float() {
FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
fn test_double() {
DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
fn test_byte_array() {
ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1);
ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1);
}
#[test]
fn test_fixed_len_byte_array() {
FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100);
FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100);
FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, 100);
}
#[test]
fn test_dict_encoded_size() {
fn run_test<T: DataType>(type_length: i32, values: &[T::T], expected_size: usize) {
let mut encoder = create_test_dict_encoder::<T>(type_length);
assert_eq!(encoder.dict_encoded_size(), 0);
encoder.put(values).unwrap();
assert_eq!(encoder.dict_encoded_size(), expected_size);
encoder.flush_buffer().unwrap();
assert_eq!(encoder.dict_encoded_size(), expected_size);
}
run_test::<BoolType>(-1, &[true, false, true, false, true], 2);
run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20);
run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40);
run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20);
run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40);
run_test::<Int96Type>(
-1,
&[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])],
24,
);
run_test::<ByteArrayType>(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15);
run_test::<FixedLenByteArrayType>(
2,
&[ByteArray::from("ab").into(), ByteArray::from("bc").into()],
4,
);
}
#[test]
fn test_estimated_data_encoded_size() {
fn run_test<T: DataType>(
encoding: Encoding,
type_length: i32,
values: &[T::T],
initial_size: usize,
max_size: usize,
flush_size: usize,
) {
let mut encoder = match encoding {
Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
Box::new(create_test_dict_encoder::<T>(type_length))
}
_ => create_test_encoder::<T>(type_length, encoding),
};
assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
encoder.put(values).unwrap();
assert_eq!(encoder.estimated_data_encoded_size(), max_size);
encoder.flush_buffer().unwrap();
assert_eq!(encoder.estimated_data_encoded_size(), flush_size);
}
run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0);
run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);
run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
let mut values = vec![];
values.extend_from_slice(&[true; 16]);
values.extend_from_slice(&[false; 16]);
run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
run_test::<ByteArrayType>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
-1,
&[ByteArray::from("ab"), ByteArray::from("abc")],
0,
5, 0,
);
run_test::<ByteArrayType>(
Encoding::DELTA_BYTE_ARRAY,
-1,
&[ByteArray::from("ab"), ByteArray::from("abc")],
0,
3, 0,
);
run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
}
#[test]
fn test_byte_stream_split_example_f32() {
let mut encoder = create_test_encoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
let mut decoder = create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
let input = vec![
f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
];
encoder.put(&input).unwrap();
let encoded = encoder.flush_buffer().unwrap();
assert_eq!(
encoded,
Bytes::from(vec![
0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6
])
);
let mut decoded = vec![0.0; input.len()];
decoder.set_data(encoded, input.len()).unwrap();
decoder.get(&mut decoded).unwrap();
assert_eq!(decoded, input);
}
#[test]
fn test_issue_47() {
let mut encoder = create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
let mut decoder = create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
let input = vec![
ByteArray::from("aa"),
ByteArray::from("aaa"),
ByteArray::from("aa"),
ByteArray::from("aaa"),
];
let mut output = vec![ByteArray::default(); input.len()];
let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]);
assert!(
result.is_ok(),
"first put_and_get() failed with: {}",
result.unwrap_err()
);
result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]);
assert!(
result.is_ok(),
"second put_and_get() failed with: {}",
result.unwrap_err()
);
assert_eq!(output, input);
}
trait EncodingTester<T: DataType> {
fn test(enc: Encoding, total: usize, type_length: i32) {
let result = match enc {
Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
Self::test_dict_internal(total, type_length)
}
enc => Self::test_internal(enc, total, type_length),
};
assert!(
result.is_ok(),
"Expected result to be OK but got err:\n {}",
result.unwrap_err()
);
}
fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>;
fn test_dict_internal(total: usize, type_length: i32) -> Result<()>;
}
impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> {
let mut encoder = create_test_encoder::<T>(type_length, enc);
let mut decoder = create_test_decoder::<T>(type_length, enc);
let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
let mut result_data = vec![T::T::default(); total];
let num_bytes = bit_util::ceil(total as i64, 8);
let valid_bits = random_bytes(num_bytes as usize);
let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?;
let data = encoder.flush_buffer()?;
decoder.set_data(data, values_written)?;
let _ = decoder.get_spaced(
&mut result_data[..],
values.len() - values_written,
&valid_bits[..],
)?;
for i in 0..total {
if bit_util::get_bit(&valid_bits[..], i) {
assert_eq!(result_data[i], values[i]);
} else {
assert_eq!(result_data[i], T::T::default());
}
}
let mut actual_total = put_and_get(
&mut encoder,
&mut decoder,
&values[..],
&mut result_data[..],
)?;
assert_eq!(actual_total, total);
assert_eq!(result_data, values);
values = <T as RandGen<T>>::gen_vec(type_length, total);
actual_total = put_and_get(
&mut encoder,
&mut decoder,
&values[..],
&mut result_data[..],
)?;
assert_eq!(actual_total, total);
assert_eq!(result_data, values);
Ok(())
}
fn test_dict_internal(total: usize, type_length: i32) -> Result<()> {
let mut encoder = create_test_dict_encoder::<T>(type_length);
let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
encoder.put(&values[..])?;
let mut data = encoder.flush_buffer()?;
let mut decoder = create_test_dict_decoder::<T>();
let mut dict_decoder = PlainDecoder::<T>::new(type_length);
dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
decoder.set_dict(Box::new(dict_decoder))?;
let mut result_data = vec![T::T::default(); total];
decoder.set_data(data, total)?;
let mut actual_total = decoder.get(&mut result_data)?;
assert_eq!(actual_total, total);
assert_eq!(result_data, values);
values = <T as RandGen<T>>::gen_vec(type_length, total);
encoder.put(&values[..])?;
data = encoder.flush_buffer()?;
let mut dict_decoder = PlainDecoder::<T>::new(type_length);
dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
decoder.set_dict(Box::new(dict_decoder))?;
decoder.set_data(data, total)?;
actual_total = decoder.get(&mut result_data)?;
assert_eq!(actual_total, total);
assert_eq!(result_data, values);
Ok(())
}
}
fn put_and_get<T: DataType>(
encoder: &mut Box<dyn Encoder<T>>,
decoder: &mut Box<dyn Decoder<T>>,
input: &[T::T],
output: &mut [T::T],
) -> Result<usize> {
encoder.put(input)?;
let data = encoder.flush_buffer()?;
decoder.set_data(data, input.len())?;
decoder.get(output)
}
fn create_and_check_encoder<T: DataType>(
type_length: i32,
encoding: Encoding,
err: Option<ParquetError>,
) {
let desc = create_test_col_desc_ptr(type_length, T::get_physical_type());
let encoder = get_encoder::<T>(encoding, &desc);
match err {
Some(parquet_error) => {
assert_eq!(
encoder.err().unwrap().to_string(),
parquet_error.to_string()
)
}
None => assert_eq!(encoder.unwrap().encoding(), encoding),
}
}
fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
let ty = SchemaType::primitive_type_builder("t", t)
.with_length(type_len)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(
Arc::new(ty),
0,
0,
ColumnPath::new(vec![]),
))
}
fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Encoder<T>> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
get_encoder(enc, &desc).unwrap()
}
fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Decoder<T>> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
get_decoder(desc, enc).unwrap()
}
fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
DictEncoder::<T>::new(desc)
}
fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
DictDecoder::<T>::new()
}
}