1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use arrow_array::{builder::make_view, ArrayRef};
31use arrow_buffer::Buffer;
32use arrow_data::ByteView;
33use arrow_schema::DataType as ArrowType;
34use bytes::Bytes;
35use std::any::Any;
36
37pub fn make_byte_view_array_reader(
39 pages: Box<dyn PageIterator>,
40 column_desc: ColumnDescPtr,
41 arrow_type: Option<ArrowType>,
42) -> Result<Box<dyn ArrayReader>> {
43 let data_type = match arrow_type {
45 Some(t) => t,
46 None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
47 ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
48 _ => ArrowType::BinaryView,
49 },
50 };
51
52 match data_type {
53 ArrowType::BinaryView | ArrowType::Utf8View => {
54 let reader = GenericRecordReader::new(column_desc);
55 Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
56 }
57
58 _ => Err(general_err!(
59 "invalid data type for byte array reader read to view type - {}",
60 data_type
61 )),
62 }
63}
64
65struct ByteViewArrayReader {
67 data_type: ArrowType,
68 pages: Box<dyn PageIterator>,
69 def_levels_buffer: Option<Vec<i16>>,
70 rep_levels_buffer: Option<Vec<i16>>,
71 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
72}
73
74impl ByteViewArrayReader {
75 fn new(
76 pages: Box<dyn PageIterator>,
77 data_type: ArrowType,
78 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
79 ) -> Self {
80 Self {
81 data_type,
82 pages,
83 def_levels_buffer: None,
84 rep_levels_buffer: None,
85 record_reader,
86 }
87 }
88}
89
90impl ArrayReader for ByteViewArrayReader {
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94
95 fn get_data_type(&self) -> &ArrowType {
96 &self.data_type
97 }
98
99 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
100 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
101 }
102
103 fn consume_batch(&mut self) -> Result<ArrayRef> {
104 let buffer = self.record_reader.consume_record_data();
105 let null_buffer = self.record_reader.consume_bitmap_buffer();
106 self.def_levels_buffer = self.record_reader.consume_def_levels();
107 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
108 self.record_reader.reset();
109
110 let array = buffer.into_array(null_buffer, &self.data_type);
111
112 Ok(array)
113 }
114
115 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
116 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
117 }
118
119 fn get_def_levels(&self) -> Option<&[i16]> {
120 self.def_levels_buffer.as_deref()
121 }
122
123 fn get_rep_levels(&self) -> Option<&[i16]> {
124 self.rep_levels_buffer.as_deref()
125 }
126}
127
128struct ByteViewArrayColumnValueDecoder {
130 dict: Option<ViewBuffer>,
131 decoder: Option<ByteViewArrayDecoder>,
132 validate_utf8: bool,
133}
134
135impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
136 type Buffer = ViewBuffer;
137
138 fn new(desc: &ColumnDescPtr) -> Self {
139 let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
140 Self {
141 dict: None,
142 decoder: None,
143 validate_utf8,
144 }
145 }
146
147 fn set_dict(
148 &mut self,
149 buf: Bytes,
150 num_values: u32,
151 encoding: Encoding,
152 _is_sorted: bool,
153 ) -> Result<()> {
154 if !matches!(
155 encoding,
156 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
157 ) {
158 return Err(nyi_err!(
159 "Invalid/Unsupported encoding type for dictionary: {}",
160 encoding
161 ));
162 }
163
164 let mut buffer = ViewBuffer::default();
165 let mut decoder = ByteViewArrayDecoderPlain::new(
166 buf,
167 num_values as usize,
168 Some(num_values as usize),
169 self.validate_utf8,
170 );
171 decoder.read(&mut buffer, usize::MAX)?;
172 self.dict = Some(buffer);
173 Ok(())
174 }
175
176 fn set_data(
177 &mut self,
178 encoding: Encoding,
179 data: Bytes,
180 num_levels: usize,
181 num_values: Option<usize>,
182 ) -> Result<()> {
183 self.decoder = Some(ByteViewArrayDecoder::new(
184 encoding,
185 data,
186 num_levels,
187 num_values,
188 self.validate_utf8,
189 )?);
190 Ok(())
191 }
192
193 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
194 let decoder = self
195 .decoder
196 .as_mut()
197 .ok_or_else(|| general_err!("no decoder set"))?;
198
199 decoder.read(out, num_values, self.dict.as_ref())
200 }
201
202 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
203 let decoder = self
204 .decoder
205 .as_mut()
206 .ok_or_else(|| general_err!("no decoder set"))?;
207
208 decoder.skip(num_values, self.dict.as_ref())
209 }
210}
211
212pub enum ByteViewArrayDecoder {
214 Plain(ByteViewArrayDecoderPlain),
215 Dictionary(ByteViewArrayDecoderDictionary),
216 DeltaLength(ByteViewArrayDecoderDeltaLength),
217 DeltaByteArray(ByteViewArrayDecoderDelta),
218}
219
220impl ByteViewArrayDecoder {
221 pub fn new(
222 encoding: Encoding,
223 data: Bytes,
224 num_levels: usize,
225 num_values: Option<usize>,
226 validate_utf8: bool,
227 ) -> Result<Self> {
228 let decoder = match encoding {
229 Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
230 data,
231 num_levels,
232 num_values,
233 validate_utf8,
234 )),
235 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
236 ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
237 data, num_levels, num_values,
238 ))
239 }
240 Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
241 ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
242 ),
243 Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
244 ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
245 ),
246 _ => {
247 return Err(general_err!(
248 "unsupported encoding for byte array: {}",
249 encoding
250 ))
251 }
252 };
253
254 Ok(decoder)
255 }
256
257 pub fn read(
259 &mut self,
260 out: &mut ViewBuffer,
261 len: usize,
262 dict: Option<&ViewBuffer>,
263 ) -> Result<usize> {
264 match self {
265 ByteViewArrayDecoder::Plain(d) => d.read(out, len),
266 ByteViewArrayDecoder::Dictionary(d) => {
267 let dict = dict
268 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
269 d.read(out, dict, len)
270 }
271 ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
272 ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
273 }
274 }
275
276 pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
278 match self {
279 ByteViewArrayDecoder::Plain(d) => d.skip(len),
280 ByteViewArrayDecoder::Dictionary(d) => {
281 let dict = dict
282 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
283 d.skip(dict, len)
284 }
285 ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
286 ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
287 }
288 }
289}
290
291pub struct ByteViewArrayDecoderPlain {
293 buf: Bytes,
294 offset: usize,
295
296 validate_utf8: bool,
297
298 max_remaining_values: usize,
301}
302
303impl ByteViewArrayDecoderPlain {
304 pub fn new(
305 buf: Bytes,
306 num_levels: usize,
307 num_values: Option<usize>,
308 validate_utf8: bool,
309 ) -> Self {
310 Self {
311 buf,
312 offset: 0,
313 max_remaining_values: num_values.unwrap_or(num_levels),
314 validate_utf8,
315 }
316 }
317
318 pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
319 let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
322 let block_id = output.append_block(buf);
323
324 let to_read = len.min(self.max_remaining_values);
325
326 let buf = self.buf.as_ref();
327 let mut read = 0;
328 output.views.reserve(to_read);
329
330 let mut utf8_validation_begin = self.offset;
331 while self.offset < self.buf.len() && read != to_read {
332 if self.offset + 4 > self.buf.len() {
333 return Err(ParquetError::EOF("eof decoding byte array".into()));
334 }
335 let len_bytes: [u8; 4] = unsafe {
336 buf.get_unchecked(self.offset..self.offset + 4)
337 .try_into()
338 .unwrap()
339 };
340 let len = u32::from_le_bytes(len_bytes);
341
342 let start_offset = self.offset + 4;
343 let end_offset = start_offset + len as usize;
344 if end_offset > buf.len() {
345 return Err(ParquetError::EOF("eof decoding byte array".into()));
346 }
347
348 if self.validate_utf8 {
349 if len < 128 {
371 } else {
374 check_valid_utf8(unsafe {
376 buf.get_unchecked(utf8_validation_begin..self.offset)
377 })?;
378 utf8_validation_begin = start_offset;
380 }
381 }
382
383 unsafe {
384 output.append_view_unchecked(block_id, start_offset as u32, len);
385 }
386 self.offset = end_offset;
387 read += 1;
388 }
389
390 if self.validate_utf8 {
392 check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..self.offset) })?;
393 }
394
395 self.max_remaining_values -= to_read;
396 Ok(to_read)
397 }
398
399 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
400 let to_skip = to_skip.min(self.max_remaining_values);
401 let mut skip = 0;
402 let buf = self.buf.as_ref();
403
404 while self.offset < self.buf.len() && skip != to_skip {
405 if self.offset + 4 > buf.len() {
406 return Err(ParquetError::EOF("eof decoding byte array".into()));
407 }
408 let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
409 let len = u32::from_le_bytes(len_bytes) as usize;
410 skip += 1;
411 self.offset = self.offset + 4 + len;
412 }
413 self.max_remaining_values -= skip;
414 Ok(skip)
415 }
416}
417
418pub struct ByteViewArrayDecoderDictionary {
419 decoder: DictIndexDecoder,
420}
421
422impl ByteViewArrayDecoderDictionary {
423 fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
424 Self {
425 decoder: DictIndexDecoder::new(data, num_levels, num_values),
426 }
427 }
428
429 fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
437 if dict.is_empty() || len == 0 {
438 return Ok(0);
439 }
440
441 let need_to_create_new_buffer = {
444 if output.buffers.len() >= dict.buffers.len() {
445 let offset = output.buffers.len() - dict.buffers.len();
446 output.buffers[offset..]
447 .iter()
448 .zip(dict.buffers.iter())
449 .any(|(a, b)| !a.ptr_eq(b))
450 } else {
451 true
452 }
453 };
454
455 if need_to_create_new_buffer {
456 for b in dict.buffers.iter() {
457 output.buffers.push(b.clone());
458 }
459 }
460
461 let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
465
466 self.decoder.read(len, |keys| {
467 for k in keys {
468 let view = dict
469 .views
470 .get(*k as usize)
471 .ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
472 let len = *view as u32;
473 if len <= 12 {
474 unsafe {
477 output.append_raw_view_unchecked(view);
478 }
479 } else {
480 let mut view = ByteView::from(*view);
482 view.buffer_index += base_buffer_idx;
483 unsafe {
486 output.append_raw_view_unchecked(&view.into());
487 }
488 }
489 }
490 Ok(())
491 })
492 }
493
494 fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
495 if dict.is_empty() {
496 return Ok(0);
497 }
498 self.decoder.skip(to_skip)
499 }
500}
501
502pub struct ByteViewArrayDecoderDeltaLength {
504 lengths: Vec<i32>,
505 data: Bytes,
506 length_offset: usize,
507 data_offset: usize,
508 validate_utf8: bool,
509}
510
511impl ByteViewArrayDecoderDeltaLength {
512 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
513 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
514 len_decoder.set_data(data.clone(), 0)?;
515 let values = len_decoder.values_left();
516
517 let mut lengths = vec![0; values];
518 len_decoder.get(&mut lengths)?;
519
520 let mut total_bytes = 0;
521
522 for l in lengths.iter() {
523 if *l < 0 {
524 return Err(ParquetError::General(
525 "negative delta length byte array length".to_string(),
526 ));
527 }
528 total_bytes += *l as usize;
529 }
530
531 if total_bytes + len_decoder.get_offset() > data.len() {
532 return Err(ParquetError::General(
533 "Insufficient delta length byte array bytes".to_string(),
534 ));
535 }
536
537 Ok(Self {
538 lengths,
539 data,
540 validate_utf8,
541 length_offset: 0,
542 data_offset: len_decoder.get_offset(),
543 })
544 }
545
546 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
547 let to_read = len.min(self.lengths.len() - self.length_offset);
548 output.views.reserve(to_read);
549
550 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
551
552 let bytes = arrow_buffer::Buffer::from_bytes(self.data.clone().into());
555 let block_id = output.append_block(bytes);
556
557 let mut current_offset = self.data_offset;
558 let initial_offset = current_offset;
559 for length in src_lengths {
560 unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }
565
566 current_offset += *length as usize;
567 }
568
569 if self.validate_utf8 {
571 check_valid_utf8(&self.data[initial_offset..current_offset])?;
572 }
573
574 self.data_offset = current_offset;
575 self.length_offset += to_read;
576
577 Ok(to_read)
578 }
579
580 fn skip(&mut self, to_skip: usize) -> Result<usize> {
581 let remain_values = self.lengths.len() - self.length_offset;
582 let to_skip = remain_values.min(to_skip);
583
584 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
585 let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
586
587 self.data_offset += total_bytes;
588 self.length_offset += to_skip;
589 Ok(to_skip)
590 }
591}
592
593pub struct ByteViewArrayDecoderDelta {
595 decoder: DeltaByteArrayDecoder,
596 validate_utf8: bool,
597}
598
599impl ByteViewArrayDecoderDelta {
600 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
601 Ok(Self {
602 decoder: DeltaByteArrayDecoder::new(data)?,
603 validate_utf8,
604 })
605 }
606
607 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
616 output.views.reserve(len.min(self.decoder.remaining()));
617
618 let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
620
621 let buffer_id = output.buffers.len() as u32;
622
623 let read = if !self.validate_utf8 {
624 self.decoder.read(len, |bytes| {
625 let offset = array_buffer.len();
626 let view = make_view(bytes, buffer_id, offset as u32);
627 if bytes.len() > 12 {
628 array_buffer.extend_from_slice(bytes);
630 }
631
632 unsafe {
636 output.append_raw_view_unchecked(&view);
637 }
638 Ok(())
639 })?
640 } else {
641 let mut utf8_validation_buffer = Vec::with_capacity(4096);
645
646 let v = self.decoder.read(len, |bytes| {
647 let offset = array_buffer.len();
648 let view = make_view(bytes, buffer_id, offset as u32);
649 if bytes.len() > 12 {
650 array_buffer.extend_from_slice(bytes);
652 } else {
653 utf8_validation_buffer.extend_from_slice(bytes);
654 }
655
656 unsafe {
661 output.append_raw_view_unchecked(&view);
662 }
663 Ok(())
664 })?;
665 check_valid_utf8(&array_buffer)?;
666 check_valid_utf8(&utf8_validation_buffer)?;
667 v
668 };
669
670 let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
671 assert_eq!(actual_block_id, buffer_id);
672 Ok(read)
673 }
674
675 fn skip(&mut self, to_skip: usize) -> Result<usize> {
676 self.decoder.skip(to_skip)
677 }
678}
679
680pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
682 match std::str::from_utf8(val) {
683 Ok(_) => Ok(()),
684 Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use arrow_array::StringViewArray;
691 use arrow_buffer::Buffer;
692
693 use crate::{
694 arrow::{
695 array_reader::test_util::{byte_array_all_encodings, utf8_column},
696 buffer::view_buffer::ViewBuffer,
697 record_reader::buffer::ValuesBuffer,
698 },
699 basic::Encoding,
700 column::reader::decoder::ColumnValueDecoder,
701 };
702
703 use super::*;
704
705 #[test]
706 fn test_byte_array_string_view_decoder() {
707 let (pages, encoded_dictionary) =
708 byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
709
710 let column_desc = utf8_column();
711 let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
712
713 decoder
714 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
715 .unwrap();
716
717 for (encoding, page) in pages {
718 let mut output = ViewBuffer::default();
719 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
720
721 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
722 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
723 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
724 assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
725
726 assert_eq!(output.views.len(), 4);
727
728 let valid = [false, false, true, true, false, true, true, false, false];
729 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
730
731 output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
732 let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
733 let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
734
735 assert_eq!(
736 strings.iter().collect::<Vec<_>>(),
737 vec![
738 None,
739 None,
740 Some("hello"),
741 Some("world"),
742 None,
743 Some("large payload over 12 bytes"),
744 Some("b"),
745 None,
746 None,
747 ]
748 );
749 }
750 }
751}