use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{builder::make_view, ArrayRef};
use arrow_buffer::Buffer;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Box<dyn ArrayReader>> {
let data_type = match arrow_type {
Some(t) => t,
None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
_ => ArrowType::BinaryView,
},
};
match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
}
_ => Err(general_err!(
"invalid data type for byte array reader read to view type - {}",
data_type
)),
}
}
struct ByteViewArrayReader {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
}
impl ByteViewArrayReader {
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
) -> Self {
Self {
data_type,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
record_reader,
}
}
}
impl ArrayReader for ByteViewArrayReader {
fn as_any(&self) -> &dyn Any {
self
}
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
}
fn consume_batch(&mut self) -> Result<ArrayRef> {
let buffer = self.record_reader.consume_record_data();
let null_buffer = self.record_reader.consume_bitmap_buffer();
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
let array = buffer.into_array(null_buffer, &self.data_type);
Ok(array)
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
}
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_deref()
}
}
struct ByteViewArrayColumnValueDecoder {
dict: Option<ViewBuffer>,
decoder: Option<ByteViewArrayDecoder>,
validate_utf8: bool,
}
impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
type Buffer = ViewBuffer;
fn new(desc: &ColumnDescPtr) -> Self {
let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
Self {
dict: None,
decoder: None,
validate_utf8,
}
}
fn set_dict(
&mut self,
buf: Bytes,
num_values: u32,
encoding: Encoding,
_is_sorted: bool,
) -> Result<()> {
if !matches!(
encoding,
Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
) {
return Err(nyi_err!(
"Invalid/Unsupported encoding type for dictionary: {}",
encoding
));
}
let mut buffer = ViewBuffer::default();
let mut decoder = ByteViewArrayDecoderPlain::new(
buf,
num_values as usize,
Some(num_values as usize),
self.validate_utf8,
);
decoder.read(&mut buffer, usize::MAX)?;
self.dict = Some(buffer);
Ok(())
}
fn set_data(
&mut self,
encoding: Encoding,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
self.decoder = Some(ByteViewArrayDecoder::new(
encoding,
data,
num_levels,
num_values,
self.validate_utf8,
)?);
Ok(())
}
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;
decoder.read(out, num_values, self.dict.as_ref())
}
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;
decoder.skip(num_values, self.dict.as_ref())
}
}
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
DeltaLength(ByteViewArrayDecoderDeltaLength),
DeltaByteArray(ByteViewArrayDecoderDelta),
}
impl ByteViewArrayDecoder {
pub fn new(
encoding: Encoding,
data: Bytes,
num_levels: usize,
num_values: Option<usize>,
validate_utf8: bool,
) -> Result<Self> {
let decoder = match encoding {
Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
data,
num_levels,
num_values,
validate_utf8,
)),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
data, num_levels, num_values,
))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
),
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
encoding
))
}
};
Ok(decoder)
}
pub fn read(
&mut self,
out: &mut ViewBuffer,
len: usize,
dict: Option<&ViewBuffer>,
) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.read(out, len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.read(out, dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}
pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.skip(len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.skip(dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}
pub struct ByteViewArrayDecoderPlain {
buf: Bytes,
offset: usize,
validate_utf8: bool,
max_remaining_values: usize,
}
impl ByteViewArrayDecoderPlain {
pub fn new(
buf: Bytes,
num_levels: usize,
num_values: Option<usize>,
validate_utf8: bool,
) -> Self {
Self {
buf,
offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
validate_utf8,
}
}
pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
let block_id = output.append_block(buf);
let to_read = len.min(self.max_remaining_values);
let buf = self.buf.as_ref();
let mut read = 0;
output.views.reserve(to_read);
let mut utf8_validation_begin = self.offset;
while self.offset < self.buf.len() && read != to_read {
if self.offset + 4 > self.buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] = unsafe {
buf.get_unchecked(self.offset..self.offset + 4)
.try_into()
.unwrap()
};
let len = u32::from_le_bytes(len_bytes);
let start_offset = self.offset + 4;
let end_offset = start_offset + len as usize;
if end_offset > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
if self.validate_utf8 {
if len < 128 {
} else {
check_valid_utf8(unsafe {
buf.get_unchecked(utf8_validation_begin..self.offset)
})?;
utf8_validation_begin = start_offset;
}
}
unsafe {
output.append_view_unchecked(block_id, start_offset as u32, len);
}
self.offset = end_offset;
read += 1;
}
if self.validate_utf8 {
check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..self.offset) })?;
}
self.max_remaining_values -= to_read;
Ok(to_read)
}
pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
let mut skip = 0;
let buf = self.buf.as_ref();
while self.offset < self.buf.len() && skip != to_skip {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
}
self.max_remaining_values -= skip;
Ok(skip)
}
}
pub struct ByteViewArrayDecoderDictionary {
decoder: DictIndexDecoder,
}
impl ByteViewArrayDecoderDictionary {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}
fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
if dict.is_empty() || len == 0 {
return Ok(0);
}
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};
if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
}
}
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
self.decoder.read(len, |keys| {
for k in keys {
let view = dict
.views
.get(*k as usize)
.ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
let len = *view as u32;
if len <= 12 {
unsafe {
output.append_raw_view_unchecked(view);
}
} else {
let mut view = ByteView::from(*view);
view.buffer_index += base_buffer_idx;
unsafe {
output.append_raw_view_unchecked(&view.into());
}
}
}
Ok(())
})
}
fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
if dict.is_empty() {
return Ok(0);
}
self.decoder.skip(to_skip)
}
}
pub struct ByteViewArrayDecoderDeltaLength {
lengths: Vec<i32>,
data: Bytes,
length_offset: usize,
data_offset: usize,
validate_utf8: bool,
}
impl ByteViewArrayDecoderDeltaLength {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.clone(), 0)?;
let values = len_decoder.values_left();
let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;
let mut total_bytes = 0;
for l in lengths.iter() {
if *l < 0 {
return Err(ParquetError::General(
"negative delta length byte array length".to_string(),
));
}
total_bytes += *l as usize;
}
if total_bytes + len_decoder.get_offset() > data.len() {
return Err(ParquetError::General(
"Insufficient delta length byte array bytes".to_string(),
));
}
Ok(Self {
lengths,
data,
validate_utf8,
length_offset: 0,
data_offset: len_decoder.get_offset(),
})
}
fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
let to_read = len.min(self.lengths.len() - self.length_offset);
output.views.reserve(to_read);
let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
let bytes = arrow_buffer::Buffer::from_bytes(self.data.clone().into());
let block_id = output.append_block(bytes);
let mut current_offset = self.data_offset;
let initial_offset = current_offset;
for length in src_lengths {
unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }
current_offset += *length as usize;
}
if self.validate_utf8 {
check_valid_utf8(&self.data[initial_offset..current_offset])?;
}
self.data_offset = current_offset;
self.length_offset += to_read;
Ok(to_read)
}
fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);
let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}
pub struct ByteViewArrayDecoderDelta {
decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}
impl ByteViewArrayDecoderDelta {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
Ok(Self {
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}
fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
output.views.reserve(len.min(self.decoder.remaining()));
let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
let buffer_id = output.buffers.len() as u32;
let read = if !self.validate_utf8 {
self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
array_buffer.extend_from_slice(bytes);
}
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?
} else {
let mut utf8_validation_buffer = Vec::with_capacity(4096);
let v = self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
array_buffer.extend_from_slice(bytes);
} else {
utf8_validation_buffer.extend_from_slice(bytes);
}
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?;
check_valid_utf8(&array_buffer)?;
check_valid_utf8(&utf8_validation_buffer)?;
v
};
let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
assert_eq!(actual_block_id, buffer_id);
Ok(read)
}
fn skip(&mut self, to_skip: usize) -> Result<usize> {
self.decoder.skip(to_skip)
}
}
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
}
#[cfg(test)]
mod tests {
use arrow_array::StringViewArray;
use arrow_buffer::Buffer;
use crate::{
arrow::{
array_reader::test_util::{byte_array_all_encodings, utf8_column},
buffer::view_buffer::ViewBuffer,
record_reader::buffer::ValuesBuffer,
},
basic::Encoding,
column::reader::decoder::ColumnValueDecoder,
};
use super::*;
#[test]
fn test_byte_array_string_view_decoder() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
let column_desc = utf8_column();
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();
for (encoding, page) in pages {
let mut output = ViewBuffer::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
assert_eq!(output.views.len(), 4);
let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("world"),
None,
Some("large payload over 12 bytes"),
Some("b"),
None,
None,
]
);
}
}
}