1use bytes::Bytes;
21use num::traits::WrappingAdd;
22use num::FromPrimitive;
23use std::{cmp, marker::PhantomData, mem};
24
25use super::rle::RleDecoder;
26
27use crate::basic::*;
28use crate::data_type::private::ParquetValueType;
29use crate::data_type::*;
30use crate::encodings::decoding::byte_stream_split_decoder::{
31 ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
32};
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35use crate::util::bit_util::{self, BitReader};
36
37mod byte_stream_split_decoder;
38
39pub(crate) mod private {
40 use super::*;
41
42 pub trait GetDecoder {
47 fn get_decoder<T: DataType<T = Self>>(
48 descr: ColumnDescPtr,
49 encoding: Encoding,
50 ) -> Result<Box<dyn Decoder<T>>> {
51 get_decoder_default(descr, encoding)
52 }
53 }
54
55 fn get_decoder_default<T: DataType>(
56 descr: ColumnDescPtr,
57 encoding: Encoding,
58 ) -> Result<Box<dyn Decoder<T>>> {
59 match encoding {
60 Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
61 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
62 "Cannot initialize this encoding through this function"
63 )),
64 Encoding::RLE
65 | Encoding::DELTA_BINARY_PACKED
66 | Encoding::DELTA_BYTE_ARRAY
67 | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
68 "Encoding {} is not supported for type",
69 encoding
70 )),
71 e => Err(nyi_err!("Encoding {} is not supported", e)),
72 }
73 }
74
75 impl GetDecoder for bool {
76 fn get_decoder<T: DataType<T = Self>>(
77 descr: ColumnDescPtr,
78 encoding: Encoding,
79 ) -> Result<Box<dyn Decoder<T>>> {
80 match encoding {
81 Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
82 _ => get_decoder_default(descr, encoding),
83 }
84 }
85 }
86
87 impl GetDecoder for i32 {
88 fn get_decoder<T: DataType<T = Self>>(
89 descr: ColumnDescPtr,
90 encoding: Encoding,
91 ) -> Result<Box<dyn Decoder<T>>> {
92 match encoding {
93 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
94 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
95 _ => get_decoder_default(descr, encoding),
96 }
97 }
98 }
99
100 impl GetDecoder for i64 {
101 fn get_decoder<T: DataType<T = Self>>(
102 descr: ColumnDescPtr,
103 encoding: Encoding,
104 ) -> Result<Box<dyn Decoder<T>>> {
105 match encoding {
106 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
107 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
108 _ => get_decoder_default(descr, encoding),
109 }
110 }
111 }
112
113 impl GetDecoder for f32 {
114 fn get_decoder<T: DataType<T = Self>>(
115 descr: ColumnDescPtr,
116 encoding: Encoding,
117 ) -> Result<Box<dyn Decoder<T>>> {
118 match encoding {
119 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
120 _ => get_decoder_default(descr, encoding),
121 }
122 }
123 }
124 impl GetDecoder for f64 {
125 fn get_decoder<T: DataType<T = Self>>(
126 descr: ColumnDescPtr,
127 encoding: Encoding,
128 ) -> Result<Box<dyn Decoder<T>>> {
129 match encoding {
130 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
131 _ => get_decoder_default(descr, encoding),
132 }
133 }
134 }
135
136 impl GetDecoder for ByteArray {
137 fn get_decoder<T: DataType<T = Self>>(
138 descr: ColumnDescPtr,
139 encoding: Encoding,
140 ) -> Result<Box<dyn Decoder<T>>> {
141 match encoding {
142 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
143 Encoding::DELTA_LENGTH_BYTE_ARRAY => {
144 Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
145 }
146 _ => get_decoder_default(descr, encoding),
147 }
148 }
149 }
150
151 impl GetDecoder for FixedLenByteArray {
152 fn get_decoder<T: DataType<T = Self>>(
153 descr: ColumnDescPtr,
154 encoding: Encoding,
155 ) -> Result<Box<dyn Decoder<T>>> {
156 match encoding {
157 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
158 VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
159 )),
160 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
161 _ => get_decoder_default(descr, encoding),
162 }
163 }
164 }
165
166 impl GetDecoder for Int96 {}
167}
168
169pub trait Decoder<T: DataType>: Send {
174 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
177
178 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
185
186 fn get_spaced(
199 &mut self,
200 buffer: &mut [T::T],
201 null_count: usize,
202 valid_bits: &[u8],
203 ) -> Result<usize> {
204 assert!(buffer.len() >= null_count);
205
206 if null_count == 0 {
208 return self.get(buffer);
209 }
210
211 let num_values = buffer.len();
212 let values_to_read = num_values - null_count;
213 let values_read = self.get(buffer)?;
214 if values_read != values_to_read {
215 return Err(general_err!(
216 "Number of values read: {}, doesn't match expected: {}",
217 values_read,
218 values_to_read
219 ));
220 }
221 let mut values_to_move = values_read;
222 for i in (0..num_values).rev() {
223 if bit_util::get_bit(valid_bits, i) {
224 values_to_move -= 1;
225 buffer.swap(i, values_to_move);
226 }
227 }
228
229 Ok(num_values)
230 }
231
232 fn values_left(&self) -> usize;
234
235 fn encoding(&self) -> Encoding;
237
238 fn skip(&mut self, num_values: usize) -> Result<usize>;
240}
241
242pub fn get_decoder<T: DataType>(
247 descr: ColumnDescPtr,
248 encoding: Encoding,
249) -> Result<Box<dyn Decoder<T>>> {
250 use self::private::GetDecoder;
251 T::T::get_decoder(descr, encoding)
252}
253
254#[derive(Default)]
258pub struct PlainDecoderDetails {
259 pub(crate) num_values: usize,
261
262 pub(crate) start: usize,
264
265 pub(crate) type_length: i32,
267
268 pub(crate) data: Option<Bytes>,
270
271 pub(crate) bit_reader: Option<BitReader>,
273}
274
275pub struct PlainDecoder<T: DataType> {
281 inner: PlainDecoderDetails,
283
284 _phantom: PhantomData<T>,
287}
288
289impl<T: DataType> PlainDecoder<T> {
290 pub fn new(type_length: i32) -> Self {
292 PlainDecoder {
293 inner: PlainDecoderDetails {
294 type_length,
295 num_values: 0,
296 start: 0,
297 data: None,
298 bit_reader: None,
299 },
300 _phantom: PhantomData,
301 }
302 }
303}
304
305impl<T: DataType> Decoder<T> for PlainDecoder<T> {
306 #[inline]
307 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
308 T::T::set_data(&mut self.inner, data, num_values);
309 Ok(())
310 }
311
312 #[inline]
313 fn values_left(&self) -> usize {
314 self.inner.num_values
315 }
316
317 #[inline]
318 fn encoding(&self) -> Encoding {
319 Encoding::PLAIN
320 }
321
322 #[inline]
323 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
324 T::T::decode(buffer, &mut self.inner)
325 }
326
327 #[inline]
328 fn skip(&mut self, num_values: usize) -> Result<usize> {
329 T::T::skip(&mut self.inner, num_values)
330 }
331}
332
333pub struct DictDecoder<T: DataType> {
342 dictionary: Vec<T::T>,
344
345 has_dictionary: bool,
347
348 rle_decoder: Option<RleDecoder>,
350
351 num_values: usize,
353}
354
355impl<T: DataType> Default for DictDecoder<T> {
356 fn default() -> Self {
357 Self::new()
358 }
359}
360
361impl<T: DataType> DictDecoder<T> {
362 pub fn new() -> Self {
364 Self {
365 dictionary: vec![],
366 has_dictionary: false,
367 rle_decoder: None,
368 num_values: 0,
369 }
370 }
371
372 pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
374 let num_values = decoder.values_left();
375 self.dictionary.resize(num_values, T::T::default());
376 let _ = decoder.get(&mut self.dictionary)?;
377 self.has_dictionary = true;
378 Ok(())
379 }
380}
381
382impl<T: DataType> Decoder<T> for DictDecoder<T> {
383 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
384 let bit_width = data.as_ref()[0];
386 let mut rle_decoder = RleDecoder::new(bit_width);
387 rle_decoder.set_data(data.slice(1..));
388 self.num_values = num_values;
389 self.rle_decoder = Some(rle_decoder);
390 Ok(())
391 }
392
393 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
394 assert!(self.rle_decoder.is_some());
395 assert!(self.has_dictionary, "Must call set_dict() first!");
396
397 let rle = self.rle_decoder.as_mut().unwrap();
398 let num_values = cmp::min(buffer.len(), self.num_values);
399 rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
400 }
401
402 fn values_left(&self) -> usize {
404 self.num_values
405 }
406
407 fn encoding(&self) -> Encoding {
408 Encoding::RLE_DICTIONARY
409 }
410
411 fn skip(&mut self, num_values: usize) -> Result<usize> {
412 assert!(self.rle_decoder.is_some());
413 assert!(self.has_dictionary, "Must call set_dict() first!");
414
415 let rle = self.rle_decoder.as_mut().unwrap();
416 let num_values = cmp::min(num_values, self.num_values);
417 rle.skip(num_values)
418 }
419}
420
421pub struct RleValueDecoder<T: DataType> {
428 values_left: usize,
429 decoder: RleDecoder,
430 _phantom: PhantomData<T>,
431}
432
433impl<T: DataType> Default for RleValueDecoder<T> {
434 fn default() -> Self {
435 Self::new()
436 }
437}
438
439impl<T: DataType> RleValueDecoder<T> {
440 pub fn new() -> Self {
441 Self {
442 values_left: 0,
443 decoder: RleDecoder::new(1),
444 _phantom: PhantomData,
445 }
446 }
447}
448
449impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
450 #[inline]
451 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
452 ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
454
455 const I32_SIZE: usize = mem::size_of::<i32>();
457 let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
458 self.decoder = RleDecoder::new(1);
459 self.decoder
460 .set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
461 self.values_left = num_values;
462 Ok(())
463 }
464
465 #[inline]
466 fn values_left(&self) -> usize {
467 self.values_left
468 }
469
470 #[inline]
471 fn encoding(&self) -> Encoding {
472 Encoding::RLE
473 }
474
475 #[inline]
476 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
477 let num_values = cmp::min(buffer.len(), self.values_left);
478 let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
479 self.values_left -= values_read;
480 Ok(values_read)
481 }
482
483 #[inline]
484 fn skip(&mut self, num_values: usize) -> Result<usize> {
485 let num_values = cmp::min(num_values, self.values_left);
486 let values_skipped = self.decoder.skip(num_values)?;
487 self.values_left -= values_skipped;
488 Ok(values_skipped)
489 }
490}
491
492pub struct DeltaBitPackDecoder<T: DataType> {
500 bit_reader: BitReader,
501 initialized: bool,
502
503 block_size: usize,
506 values_left: usize,
508 mini_blocks_per_block: usize,
510 values_per_mini_block: usize,
512
513 min_delta: T::T,
516 block_end_offset: usize,
518 mini_block_idx: usize,
520 mini_block_bit_widths: Vec<u8>,
522 mini_block_remaining: usize,
524
525 first_value: Option<T::T>,
527 last_value: T::T,
529}
530
531impl<T: DataType> Default for DeltaBitPackDecoder<T>
532where
533 T::T: Default + FromPrimitive + WrappingAdd + Copy,
534{
535 fn default() -> Self {
536 Self::new()
537 }
538}
539
540impl<T: DataType> DeltaBitPackDecoder<T>
541where
542 T::T: Default + FromPrimitive + WrappingAdd + Copy,
543{
544 pub fn new() -> Self {
546 Self {
547 bit_reader: BitReader::from(vec![]),
548 initialized: false,
549 block_size: 0,
550 values_left: 0,
551 mini_blocks_per_block: 0,
552 values_per_mini_block: 0,
553 min_delta: Default::default(),
554 mini_block_idx: 0,
555 mini_block_bit_widths: vec![],
556 mini_block_remaining: 0,
557 block_end_offset: 0,
558 first_value: None,
559 last_value: Default::default(),
560 }
561 }
562
563 pub fn get_offset(&self) -> usize {
565 assert!(self.initialized, "Bit reader is not initialized");
566 match self.values_left {
567 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
573 _ => self.bit_reader.get_byte_offset(),
574 }
575 }
576
577 #[inline]
579 fn next_block(&mut self) -> Result<()> {
580 let min_delta = self
581 .bit_reader
582 .get_zigzag_vlq_int()
583 .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
584
585 self.min_delta =
586 T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
587
588 self.mini_block_bit_widths.clear();
589 self.bit_reader
590 .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
591
592 let mut offset = self.bit_reader.get_byte_offset();
593 let mut remaining = self.values_left;
594
595 for b in &mut self.mini_block_bit_widths {
597 if remaining == 0 {
598 *b = 0;
601 }
602 remaining = remaining.saturating_sub(self.values_per_mini_block);
603 offset += *b as usize * self.values_per_mini_block / 8;
604 }
605 self.block_end_offset = offset;
606
607 if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
608 return Err(eof_err!("insufficient mini block bit widths"));
609 }
610
611 self.mini_block_remaining = self.values_per_mini_block;
612 self.mini_block_idx = 0;
613
614 Ok(())
615 }
616
617 #[inline]
619 fn next_mini_block(&mut self) -> Result<()> {
620 if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
621 self.mini_block_idx += 1;
622 self.mini_block_remaining = self.values_per_mini_block;
623 Ok(())
624 } else {
625 self.next_block()
626 }
627 }
628}
629
630impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
631where
632 T::T: Default + FromPrimitive + WrappingAdd + Copy,
633{
634 #[inline]
636 fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
637 self.bit_reader = BitReader::new(data);
638 self.initialized = true;
639
640 self.block_size = self
642 .bit_reader
643 .get_vlq_int()
644 .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
645 .try_into()
646 .map_err(|_| general_err!("invalid 'block_size'"))?;
647
648 self.mini_blocks_per_block = self
649 .bit_reader
650 .get_vlq_int()
651 .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
652 .try_into()
653 .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
654
655 self.values_left = self
656 .bit_reader
657 .get_vlq_int()
658 .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
659 .try_into()
660 .map_err(|_| general_err!("invalid 'values_left'"))?;
661
662 let first_value = self
663 .bit_reader
664 .get_zigzag_vlq_int()
665 .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
666
667 self.first_value =
668 Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
669
670 if self.block_size % 128 != 0 {
671 return Err(general_err!(
672 "'block_size' must be a multiple of 128, got {}",
673 self.block_size
674 ));
675 }
676
677 if self.block_size % self.mini_blocks_per_block != 0 {
678 return Err(general_err!(
679 "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
680 self.block_size,
681 self.mini_blocks_per_block
682 ));
683 }
684
685 self.mini_block_idx = 0;
687 self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
688 self.mini_block_remaining = 0;
689 self.mini_block_bit_widths.clear();
690
691 if self.values_per_mini_block % 32 != 0 {
692 return Err(general_err!(
693 "'values_per_mini_block' must be a multiple of 32 got {}",
694 self.values_per_mini_block
695 ));
696 }
697
698 Ok(())
699 }
700
701 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
702 assert!(self.initialized, "Bit reader is not initialized");
703 if buffer.is_empty() {
704 return Ok(0);
705 }
706
707 let mut read = 0;
708 let to_read = buffer.len().min(self.values_left);
709
710 if let Some(value) = self.first_value.take() {
711 self.last_value = value;
712 buffer[0] = value;
713 read += 1;
714 self.values_left -= 1;
715 }
716
717 while read != to_read {
718 if self.mini_block_remaining == 0 {
719 self.next_mini_block()?;
720 }
721
722 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
723 let batch_to_read = self.mini_block_remaining.min(to_read - read);
724
725 let batch_read = self
726 .bit_reader
727 .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
728
729 if batch_read != batch_to_read {
730 return Err(general_err!(
731 "Expected to read {} values from miniblock got {}",
732 batch_to_read,
733 batch_read
734 ));
735 }
736
737 for v in &mut buffer[read..read + batch_read] {
740 *v = v
744 .wrapping_add(&self.min_delta)
745 .wrapping_add(&self.last_value);
746
747 self.last_value = *v;
748 }
749
750 read += batch_read;
751 self.mini_block_remaining -= batch_read;
752 self.values_left -= batch_read;
753 }
754
755 Ok(to_read)
756 }
757
758 fn values_left(&self) -> usize {
759 self.values_left
760 }
761
762 fn encoding(&self) -> Encoding {
763 Encoding::DELTA_BINARY_PACKED
764 }
765
766 fn skip(&mut self, num_values: usize) -> Result<usize> {
767 let mut skip = 0;
768 let to_skip = num_values.min(self.values_left);
769 if to_skip == 0 {
770 return Ok(0);
771 }
772
773 if let Some(value) = self.first_value.take() {
775 self.last_value = value;
776 skip += 1;
777 self.values_left -= 1;
778 }
779
780 let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
781 Type::INT32 => 32,
782 Type::INT64 => 64,
783 _ => unreachable!(),
784 };
785
786 let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
787 while skip < to_skip {
788 if self.mini_block_remaining == 0 {
789 self.next_mini_block()?;
790 }
791
792 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
793 let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
794 let mini_block_should_skip = mini_block_to_skip;
795
796 let skip_count = self
797 .bit_reader
798 .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
799
800 if skip_count != mini_block_to_skip {
801 return Err(general_err!(
802 "Expected to skip {} values from mini block got {}.",
803 mini_block_batch_size,
804 skip_count
805 ));
806 }
807
808 for v in &mut skip_buffer[0..skip_count] {
809 *v = v
810 .wrapping_add(&self.min_delta)
811 .wrapping_add(&self.last_value);
812
813 self.last_value = *v;
814 }
815
816 skip += mini_block_should_skip;
817 self.mini_block_remaining -= mini_block_should_skip;
818 self.values_left -= mini_block_should_skip;
819 }
820
821 Ok(to_skip)
822 }
823}
824
825pub struct DeltaLengthByteArrayDecoder<T: DataType> {
835 lengths: Vec<i32>,
838
839 current_idx: usize,
841
842 data: Option<Bytes>,
844
845 offset: usize,
847
848 num_values: usize,
850
851 _phantom: PhantomData<T>,
853}
854
855impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
856 fn default() -> Self {
857 Self::new()
858 }
859}
860
861impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
862 pub fn new() -> Self {
864 Self {
865 lengths: vec![],
866 current_idx: 0,
867 data: None,
868 offset: 0,
869 num_values: 0,
870 _phantom: PhantomData,
871 }
872 }
873}
874
875impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
876 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
877 match T::get_physical_type() {
878 Type::BYTE_ARRAY => {
879 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
880 len_decoder.set_data(data.clone(), num_values)?;
881 let num_lengths = len_decoder.values_left();
882 self.lengths.resize(num_lengths, 0);
883 len_decoder.get(&mut self.lengths[..])?;
884
885 self.data = Some(data.slice(len_decoder.get_offset()..));
886 self.offset = 0;
887 self.current_idx = 0;
888 self.num_values = num_lengths;
889 Ok(())
890 }
891 _ => Err(general_err!(
892 "DeltaLengthByteArrayDecoder only support ByteArrayType"
893 )),
894 }
895 }
896
897 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
898 match T::get_physical_type() {
899 Type::BYTE_ARRAY => {
900 assert!(self.data.is_some());
901
902 let data = self.data.as_ref().unwrap();
903 let num_values = cmp::min(buffer.len(), self.num_values);
904
905 for item in buffer.iter_mut().take(num_values) {
906 let len = self.lengths[self.current_idx] as usize;
907 item.set_from_bytes(data.slice(self.offset..self.offset + len));
908
909 self.offset += len;
910 self.current_idx += 1;
911 }
912
913 self.num_values -= num_values;
914 Ok(num_values)
915 }
916 _ => Err(general_err!(
917 "DeltaLengthByteArrayDecoder only support ByteArrayType"
918 )),
919 }
920 }
921
922 fn values_left(&self) -> usize {
923 self.num_values
924 }
925
926 fn encoding(&self) -> Encoding {
927 Encoding::DELTA_LENGTH_BYTE_ARRAY
928 }
929
930 fn skip(&mut self, num_values: usize) -> Result<usize> {
931 match T::get_physical_type() {
932 Type::BYTE_ARRAY => {
933 let num_values = cmp::min(num_values, self.num_values);
934
935 let next_offset: i32 = self.lengths
936 [self.current_idx..self.current_idx + num_values]
937 .iter()
938 .sum();
939
940 self.current_idx += num_values;
941 self.offset += next_offset as usize;
942
943 self.num_values -= num_values;
944 Ok(num_values)
945 }
946 other_type => Err(general_err!(
947 "DeltaLengthByteArrayDecoder not support {}, only support byte array",
948 other_type
949 )),
950 }
951 }
952}
953
954pub struct DeltaByteArrayDecoder<T: DataType> {
964 prefix_lengths: Vec<i32>,
967
968 current_idx: usize,
970
971 suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
974
975 previous_value: Vec<u8>,
977
978 num_values: usize,
980
981 _phantom: PhantomData<T>,
983}
984
985impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
986 fn default() -> Self {
987 Self::new()
988 }
989}
990
991impl<T: DataType> DeltaByteArrayDecoder<T> {
992 pub fn new() -> Self {
994 Self {
995 prefix_lengths: vec![],
996 current_idx: 0,
997 suffix_decoder: None,
998 previous_value: vec![],
999 num_values: 0,
1000 _phantom: PhantomData,
1001 }
1002 }
1003}
1004
1005impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1006 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1007 match T::get_physical_type() {
1008 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1009 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1010 prefix_len_decoder.set_data(data.clone(), num_values)?;
1011 let num_prefixes = prefix_len_decoder.values_left();
1012 self.prefix_lengths.resize(num_prefixes, 0);
1013 prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1014
1015 let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1016 suffix_decoder
1017 .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1018 self.suffix_decoder = Some(suffix_decoder);
1019 self.num_values = num_prefixes;
1020 self.current_idx = 0;
1021 self.previous_value.clear();
1022 Ok(())
1023 }
1024 _ => Err(general_err!(
1025 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1026 )),
1027 }
1028 }
1029
1030 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1031 match T::get_physical_type() {
1032 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1033 let num_values = cmp::min(buffer.len(), self.num_values);
1034 let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1035 for item in buffer.iter_mut().take(num_values) {
1036 let suffix_decoder = self
1039 .suffix_decoder
1040 .as_mut()
1041 .expect("decoder not initialized");
1042 suffix_decoder.get(&mut v[..])?;
1043 let suffix = v[0].data();
1044
1045 let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1047
1048 let mut result = Vec::new();
1050 result.extend_from_slice(&self.previous_value[0..prefix_len]);
1051 result.extend_from_slice(suffix);
1052
1053 let data = Bytes::from(result.clone());
1054 item.set_from_bytes(data);
1055
1056 self.previous_value = result;
1057 self.current_idx += 1;
1058 }
1059
1060 self.num_values -= num_values;
1061 Ok(num_values)
1062 }
1063 _ => Err(general_err!(
1064 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1065 )),
1066 }
1067 }
1068
1069 fn values_left(&self) -> usize {
1070 self.num_values
1071 }
1072
1073 fn encoding(&self) -> Encoding {
1074 Encoding::DELTA_BYTE_ARRAY
1075 }
1076
1077 fn skip(&mut self, num_values: usize) -> Result<usize> {
1078 let mut buffer = vec![T::T::default(); num_values];
1079 self.get(&mut buffer)
1080 }
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085 use super::{super::encoding::*, *};
1086
1087 use std::f32::consts::PI as PI_f32;
1088 use std::f64::consts::PI as PI_f64;
1089 use std::sync::Arc;
1090
1091 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1092 use crate::util::test_common::rand_gen::RandGen;
1093
1094 #[test]
1095 fn test_get_decoders() {
1096 create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1098 create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1099 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1100 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1101 create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1102
1103 create_and_check_decoder::<Int32Type>(
1105 Encoding::RLE_DICTIONARY,
1106 Some(general_err!(
1107 "Cannot initialize this encoding through this function"
1108 )),
1109 );
1110 create_and_check_decoder::<Int32Type>(
1111 Encoding::PLAIN_DICTIONARY,
1112 Some(general_err!(
1113 "Cannot initialize this encoding through this function"
1114 )),
1115 );
1116 create_and_check_decoder::<Int32Type>(
1117 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1118 Some(general_err!(
1119 "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1120 )),
1121 );
1122 create_and_check_decoder::<Int32Type>(
1123 Encoding::DELTA_BYTE_ARRAY,
1124 Some(general_err!(
1125 "Encoding DELTA_BYTE_ARRAY is not supported for type"
1126 )),
1127 );
1128
1129 #[allow(deprecated)]
1131 create_and_check_decoder::<Int32Type>(
1132 Encoding::BIT_PACKED,
1133 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1134 );
1135 }
1136
1137 #[test]
1138 fn test_plain_decode_int32() {
1139 let data = [42, 18, 52];
1140 let data_bytes = Int32Type::to_byte_array(&data[..]);
1141 let mut buffer = [0; 3];
1142 test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1143 }
1144
1145 #[test]
1146 fn test_plain_skip_int32() {
1147 let data = [42, 18, 52];
1148 let data_bytes = Int32Type::to_byte_array(&data[..]);
1149 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1150 }
1151
1152 #[test]
1153 fn test_plain_skip_all_int32() {
1154 let data = [42, 18, 52];
1155 let data_bytes = Int32Type::to_byte_array(&data[..]);
1156 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1157 }
1158
1159 #[test]
1160 fn test_plain_decode_int32_spaced() {
1161 let data = [42, 18, 52];
1162 let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1163 let data_bytes = Int32Type::to_byte_array(&data[..]);
1164 let mut buffer = [0; 8];
1165 let num_nulls = 5;
1166 let valid_bits = [0b01001010];
1167 test_plain_decode_spaced::<Int32Type>(
1168 Bytes::from(data_bytes),
1169 3,
1170 -1,
1171 &mut buffer[..],
1172 num_nulls,
1173 &valid_bits,
1174 &expected_data[..],
1175 );
1176 }
1177
1178 #[test]
1179 fn test_plain_decode_int64() {
1180 let data = [42, 18, 52];
1181 let data_bytes = Int64Type::to_byte_array(&data[..]);
1182 let mut buffer = [0; 3];
1183 test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1184 }
1185
1186 #[test]
1187 fn test_plain_skip_int64() {
1188 let data = [42, 18, 52];
1189 let data_bytes = Int64Type::to_byte_array(&data[..]);
1190 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1191 }
1192
1193 #[test]
1194 fn test_plain_skip_all_int64() {
1195 let data = [42, 18, 52];
1196 let data_bytes = Int64Type::to_byte_array(&data[..]);
1197 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1198 }
1199
1200 #[test]
1201 fn test_plain_decode_float() {
1202 let data = [PI_f32, 2.414, 12.51];
1203 let data_bytes = FloatType::to_byte_array(&data[..]);
1204 let mut buffer = [0.0; 3];
1205 test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1206 }
1207
1208 #[test]
1209 fn test_plain_skip_float() {
1210 let data = [PI_f32, 2.414, 12.51];
1211 let data_bytes = FloatType::to_byte_array(&data[..]);
1212 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1213 }
1214
1215 #[test]
1216 fn test_plain_skip_all_float() {
1217 let data = [PI_f32, 2.414, 12.51];
1218 let data_bytes = FloatType::to_byte_array(&data[..]);
1219 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1220 }
1221
1222 #[test]
1223 fn test_plain_skip_double() {
1224 let data = [PI_f64, 2.414f64, 12.51f64];
1225 let data_bytes = DoubleType::to_byte_array(&data[..]);
1226 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1227 }
1228
1229 #[test]
1230 fn test_plain_skip_all_double() {
1231 let data = [PI_f64, 2.414f64, 12.51f64];
1232 let data_bytes = DoubleType::to_byte_array(&data[..]);
1233 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1234 }
1235
1236 #[test]
1237 fn test_plain_decode_double() {
1238 let data = [PI_f64, 2.414f64, 12.51f64];
1239 let data_bytes = DoubleType::to_byte_array(&data[..]);
1240 let mut buffer = [0.0f64; 3];
1241 test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1242 }
1243
1244 #[test]
1245 fn test_plain_decode_int96() {
1246 let mut data = [Int96::new(); 4];
1247 data[0].set_data(11, 22, 33);
1248 data[1].set_data(44, 55, 66);
1249 data[2].set_data(10, 20, 30);
1250 data[3].set_data(40, 50, 60);
1251 let data_bytes = Int96Type::to_byte_array(&data[..]);
1252 let mut buffer = [Int96::new(); 4];
1253 test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1254 }
1255
1256 #[test]
1257 fn test_plain_skip_int96() {
1258 let mut data = [Int96::new(); 4];
1259 data[0].set_data(11, 22, 33);
1260 data[1].set_data(44, 55, 66);
1261 data[2].set_data(10, 20, 30);
1262 data[3].set_data(40, 50, 60);
1263 let data_bytes = Int96Type::to_byte_array(&data[..]);
1264 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1265 }
1266
1267 #[test]
1268 fn test_plain_skip_all_int96() {
1269 let mut data = [Int96::new(); 4];
1270 data[0].set_data(11, 22, 33);
1271 data[1].set_data(44, 55, 66);
1272 data[2].set_data(10, 20, 30);
1273 data[3].set_data(40, 50, 60);
1274 let data_bytes = Int96Type::to_byte_array(&data[..]);
1275 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1276 }
1277
1278 #[test]
1279 fn test_plain_decode_bool() {
1280 let data = [
1281 false, true, false, false, true, false, true, true, false, true,
1282 ];
1283 let data_bytes = BoolType::to_byte_array(&data[..]);
1284 let mut buffer = [false; 10];
1285 test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1286 }
1287
1288 #[test]
1289 fn test_plain_skip_bool() {
1290 let data = [
1291 false, true, false, false, true, false, true, true, false, true,
1292 ];
1293 let data_bytes = BoolType::to_byte_array(&data[..]);
1294 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1295 }
1296
1297 #[test]
1298 fn test_plain_skip_all_bool() {
1299 let data = [
1300 false, true, false, false, true, false, true, true, false, true,
1301 ];
1302 let data_bytes = BoolType::to_byte_array(&data[..]);
1303 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1304 }
1305
1306 #[test]
1307 fn test_plain_decode_byte_array() {
1308 let mut data = vec![ByteArray::new(); 2];
1309 data[0].set_data(Bytes::from(String::from("hello")));
1310 data[1].set_data(Bytes::from(String::from("parquet")));
1311 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1312 let mut buffer = vec![ByteArray::new(); 2];
1313 test_plain_decode::<ByteArrayType>(
1314 Bytes::from(data_bytes),
1315 2,
1316 -1,
1317 &mut buffer[..],
1318 &data[..],
1319 );
1320 }
1321
1322 #[test]
1323 fn test_plain_skip_byte_array() {
1324 let mut data = vec![ByteArray::new(); 2];
1325 data[0].set_data(Bytes::from(String::from("hello")));
1326 data[1].set_data(Bytes::from(String::from("parquet")));
1327 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1328 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1329 }
1330
1331 #[test]
1332 fn test_plain_skip_all_byte_array() {
1333 let mut data = vec![ByteArray::new(); 2];
1334 data[0].set_data(Bytes::from(String::from("hello")));
1335 data[1].set_data(Bytes::from(String::from("parquet")));
1336 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1337 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1338 }
1339
1340 #[test]
1341 fn test_plain_decode_fixed_len_byte_array() {
1342 let mut data = vec![FixedLenByteArray::default(); 3];
1343 data[0].set_data(Bytes::from(String::from("bird")));
1344 data[1].set_data(Bytes::from(String::from("come")));
1345 data[2].set_data(Bytes::from(String::from("flow")));
1346 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1347 let mut buffer = vec![FixedLenByteArray::default(); 3];
1348 test_plain_decode::<FixedLenByteArrayType>(
1349 Bytes::from(data_bytes),
1350 3,
1351 4,
1352 &mut buffer[..],
1353 &data[..],
1354 );
1355 }
1356
1357 #[test]
1358 fn test_plain_skip_fixed_len_byte_array() {
1359 let mut data = vec![FixedLenByteArray::default(); 3];
1360 data[0].set_data(Bytes::from(String::from("bird")));
1361 data[1].set_data(Bytes::from(String::from("come")));
1362 data[2].set_data(Bytes::from(String::from("flow")));
1363 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1364 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1365 }
1366
1367 #[test]
1368 fn test_plain_skip_all_fixed_len_byte_array() {
1369 let mut data = vec![FixedLenByteArray::default(); 3];
1370 data[0].set_data(Bytes::from(String::from("bird")));
1371 data[1].set_data(Bytes::from(String::from("come")));
1372 data[2].set_data(Bytes::from(String::from("flow")));
1373 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1374 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1375 }
1376
1377 fn test_plain_decode<T: DataType>(
1378 data: Bytes,
1379 num_values: usize,
1380 type_length: i32,
1381 buffer: &mut [T::T],
1382 expected: &[T::T],
1383 ) {
1384 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1385 let result = decoder.set_data(data, num_values);
1386 assert!(result.is_ok());
1387 let result = decoder.get(buffer);
1388 assert!(result.is_ok());
1389 assert_eq!(decoder.values_left(), 0);
1390 assert_eq!(buffer, expected);
1391 }
1392
1393 fn test_plain_skip<T: DataType>(
1394 data: Bytes,
1395 num_values: usize,
1396 skip: usize,
1397 type_length: i32,
1398 expected: &[T::T],
1399 ) {
1400 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1401 let result = decoder.set_data(data, num_values);
1402 assert!(result.is_ok());
1403 let skipped = decoder.skip(skip).expect("skipping values");
1404
1405 if skip >= num_values {
1406 assert_eq!(skipped, num_values);
1407
1408 let mut buffer = vec![T::T::default(); 1];
1409 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1410 assert_eq!(remaining, 0);
1411 } else {
1412 assert_eq!(skipped, skip);
1413 let mut buffer = vec![T::T::default(); num_values - skip];
1414 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1415 assert_eq!(remaining, num_values - skip);
1416 assert_eq!(decoder.values_left(), 0);
1417 assert_eq!(buffer, expected);
1418 }
1419 }
1420
1421 fn test_plain_decode_spaced<T: DataType>(
1422 data: Bytes,
1423 num_values: usize,
1424 type_length: i32,
1425 buffer: &mut [T::T],
1426 num_nulls: usize,
1427 valid_bits: &[u8],
1428 expected: &[T::T],
1429 ) {
1430 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1431 let result = decoder.set_data(data, num_values);
1432 assert!(result.is_ok());
1433 let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1434 assert!(result.is_ok());
1435 assert_eq!(num_values + num_nulls, result.unwrap());
1436 assert_eq!(decoder.values_left(), 0);
1437 assert_eq!(buffer, expected);
1438 }
1439
1440 #[test]
1441 #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1442 fn test_rle_value_encode_int32_not_supported() {
1443 let mut encoder = RleValueEncoder::<Int32Type>::new();
1444 encoder.put(&[1, 2, 3, 4]).unwrap();
1445 }
1446
1447 #[test]
1448 #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1449 fn test_rle_value_decode_int32_not_supported() {
1450 let mut decoder = RleValueDecoder::<Int32Type>::new();
1451 decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1452 }
1453
1454 #[test]
1455 fn test_rle_value_decode_bool_decode() {
1456 let data = vec![
1458 BoolType::gen_vec(-1, 256),
1459 BoolType::gen_vec(-1, 257),
1460 BoolType::gen_vec(-1, 126),
1461 ];
1462 test_rle_value_decode::<BoolType>(data);
1463 }
1464
1465 #[test]
1466 #[should_panic(expected = "Bit reader is not initialized")]
1467 fn test_delta_bit_packed_not_initialized_offset() {
1468 let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1470 decoder.get_offset();
1471 }
1472
1473 #[test]
1474 #[should_panic(expected = "Bit reader is not initialized")]
1475 fn test_delta_bit_packed_not_initialized_get() {
1476 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1478 let mut buffer = vec![];
1479 decoder.get(&mut buffer).unwrap();
1480 }
1481
1482 #[test]
1483 fn test_delta_bit_packed_int32_empty() {
1484 let data = vec![vec![0; 0]];
1485 test_delta_bit_packed_decode::<Int32Type>(data);
1486 }
1487
1488 #[test]
1489 fn test_delta_bit_packed_int32_repeat() {
1490 let block_data = vec![
1491 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1492 6, 7, 8,
1493 ];
1494 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1495 }
1496
1497 #[test]
1498 fn test_skip_delta_bit_packed_int32_repeat() {
1499 let block_data = vec![
1500 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1501 6, 7, 8,
1502 ];
1503 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1504 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1505 }
1506
1507 #[test]
1508 fn test_delta_bit_packed_int32_uneven() {
1509 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1510 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1511 }
1512
1513 #[test]
1514 fn test_skip_delta_bit_packed_int32_uneven() {
1515 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1516 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1517 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1518 }
1519
1520 #[test]
1521 fn test_delta_bit_packed_int32_same_values() {
1522 let block_data = vec![
1523 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1524 ];
1525 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1526
1527 let block_data = vec![
1528 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1529 -127, -127,
1530 ];
1531 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1532 }
1533
1534 #[test]
1535 fn test_skip_delta_bit_packed_int32_same_values() {
1536 let block_data = vec![
1537 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1538 ];
1539 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1540 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1541
1542 let block_data = vec![
1543 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1544 -127, -127,
1545 ];
1546 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1547 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1548 }
1549
1550 #[test]
1551 fn test_delta_bit_packed_int32_min_max() {
1552 let block_data = vec![
1553 i32::MIN,
1554 i32::MIN,
1555 i32::MIN,
1556 i32::MAX,
1557 i32::MIN,
1558 i32::MAX,
1559 i32::MIN,
1560 i32::MAX,
1561 ];
1562 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1563 }
1564
1565 #[test]
1566 fn test_skip_delta_bit_packed_int32_min_max() {
1567 let block_data = vec![
1568 i32::MIN,
1569 i32::MIN,
1570 i32::MIN,
1571 i32::MAX,
1572 i32::MIN,
1573 i32::MAX,
1574 i32::MIN,
1575 i32::MAX,
1576 ];
1577 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1578 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1579 }
1580
1581 #[test]
1582 fn test_delta_bit_packed_int32_multiple_blocks() {
1583 let data = vec![
1585 Int32Type::gen_vec(-1, 64),
1586 Int32Type::gen_vec(-1, 128),
1587 Int32Type::gen_vec(-1, 64),
1588 ];
1589 test_delta_bit_packed_decode::<Int32Type>(data);
1590 }
1591
1592 #[test]
1593 fn test_delta_bit_packed_int32_data_across_blocks() {
1594 let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1596 test_delta_bit_packed_decode::<Int32Type>(data);
1597 }
1598
1599 #[test]
1600 fn test_delta_bit_packed_int32_with_empty_blocks() {
1601 let data = vec![
1602 Int32Type::gen_vec(-1, 128),
1603 vec![0; 0],
1604 Int32Type::gen_vec(-1, 64),
1605 ];
1606 test_delta_bit_packed_decode::<Int32Type>(data);
1607 }
1608
1609 #[test]
1610 fn test_delta_bit_packed_int64_empty() {
1611 let data = vec![vec![0; 0]];
1612 test_delta_bit_packed_decode::<Int64Type>(data);
1613 }
1614
1615 #[test]
1616 fn test_delta_bit_packed_int64_min_max() {
1617 let block_data = vec![
1618 i64::MIN,
1619 i64::MAX,
1620 i64::MIN,
1621 i64::MAX,
1622 i64::MIN,
1623 i64::MAX,
1624 i64::MIN,
1625 i64::MAX,
1626 ];
1627 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1628 }
1629
1630 #[test]
1631 fn test_delta_bit_packed_int64_multiple_blocks() {
1632 let data = vec![
1634 Int64Type::gen_vec(-1, 64),
1635 Int64Type::gen_vec(-1, 128),
1636 Int64Type::gen_vec(-1, 64),
1637 ];
1638 test_delta_bit_packed_decode::<Int64Type>(data);
1639 }
1640
1641 #[test]
1642 fn test_delta_bit_packed_decoder_sample() {
1643 let data_bytes = vec![
1644 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1645 0, 0, 0, 0, 0, 0,
1646 ];
1647 let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1648 decoder.set_data(data_bytes.into(), 3).unwrap();
1649 assert_eq!(decoder.get_offset(), 5);
1652 let mut result = vec![0, 0, 0];
1653 decoder.get(&mut result).unwrap();
1654 assert_eq!(decoder.get_offset(), 34);
1655 assert_eq!(result, vec![29, 43, 89]);
1656 }
1657
1658 #[test]
1659 fn test_delta_bit_packed_padding() {
1660 let header = vec![
1662 128,
1666 2,
1667 4,
1669 128 + 35,
1671 3,
1672 7,
1674 ];
1675
1676 let block1_header = vec![
1678 0, 0, 1, 0, 0, ];
1681
1682 let block1 = vec![0xFF; 8];
1687
1688 let block2_header = vec![
1690 0, 0, 1, 2, 0xFF, ];
1693
1694 let block2 = vec![0xFF; 24];
1699
1700 let data: Vec<u8> = header
1701 .into_iter()
1702 .chain(block1_header)
1703 .chain(block1)
1704 .chain(block2_header)
1705 .chain(block2)
1706 .collect();
1707
1708 let length = data.len();
1709
1710 let ptr = Bytes::from(data);
1711 let mut reader = BitReader::new(ptr.clone());
1712 assert_eq!(reader.get_vlq_int().unwrap(), 256);
1713 assert_eq!(reader.get_vlq_int().unwrap(), 4);
1714 assert_eq!(reader.get_vlq_int().unwrap(), 419);
1715 assert_eq!(reader.get_vlq_int().unwrap(), 7);
1716
1717 let mut output = vec![0_i32; 420];
1719
1720 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1721 decoder.set_data(ptr.clone(), 0).unwrap();
1722 assert_eq!(decoder.get(&mut output).unwrap(), 419);
1723 assert_eq!(decoder.get_offset(), length);
1724
1725 decoder.set_data(ptr.slice(..12), 0).unwrap();
1727 let err = decoder.get(&mut output).unwrap_err().to_string();
1728 assert!(
1729 err.contains("Expected to read 64 values from miniblock got 8"),
1730 "{}",
1731 err
1732 );
1733 }
1734
1735 #[test]
1736 fn test_delta_byte_array_same_arrays() {
1737 let data = vec![
1738 vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1739 vec![
1740 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1741 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1742 ],
1743 vec![
1744 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1745 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1746 ],
1747 ];
1748 test_delta_byte_array_decode(data);
1749 }
1750
1751 #[test]
1752 fn test_delta_byte_array_unique_arrays() {
1753 let data = vec![
1754 vec![ByteArray::from(vec![1])],
1755 vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1756 vec![
1757 ByteArray::from(vec![7, 8]),
1758 ByteArray::from(vec![9, 0, 1, 2]),
1759 ],
1760 ];
1761 test_delta_byte_array_decode(data);
1762 }
1763
1764 #[test]
1765 fn test_delta_byte_array_single_array() {
1766 let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1767 test_delta_byte_array_decode(data);
1768 }
1769
1770 #[test]
1771 fn test_byte_stream_split_multiple_f32() {
1772 let data = vec![
1773 vec![
1774 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1775 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1776 ],
1777 vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1778 ];
1779 test_byte_stream_split_decode::<FloatType>(data, -1);
1780 }
1781
1782 #[test]
1783 fn test_byte_stream_split_f64() {
1784 let data = vec![vec![
1785 f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1786 f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1787 ]];
1788 test_byte_stream_split_decode::<DoubleType>(data, -1);
1789 }
1790
1791 #[test]
1792 fn test_byte_stream_split_multiple_i32() {
1793 let data = vec![
1794 vec![
1795 i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1796 i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1797 ],
1798 vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1799 ];
1800 test_byte_stream_split_decode::<Int32Type>(data, -1);
1801 }
1802
1803 #[test]
1804 fn test_byte_stream_split_i64() {
1805 let data = vec![vec![
1806 i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1807 i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1808 ]];
1809 test_byte_stream_split_decode::<Int64Type>(data, -1);
1810 }
1811
1812 fn test_byte_stream_split_flba(type_width: usize) {
1813 let data = vec![
1814 vec![
1815 FixedLenByteArrayType::gen(type_width as i32),
1816 FixedLenByteArrayType::gen(type_width as i32),
1817 ],
1818 vec![FixedLenByteArrayType::gen(type_width as i32)],
1819 ];
1820 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
1821 }
1822
1823 #[test]
1824 fn test_byte_stream_split_flba5() {
1825 test_byte_stream_split_flba(5);
1826 }
1827
1828 #[test]
1829 fn test_byte_stream_split_flba16() {
1830 test_byte_stream_split_flba(16);
1831 }
1832
1833 #[test]
1834 fn test_byte_stream_split_flba19() {
1835 test_byte_stream_split_flba(19);
1836 }
1837
1838 #[test]
1839 #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
1840 fn test_byte_stream_split_flba_mismatch() {
1841 let data = vec![
1842 vec![
1843 FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
1844 FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
1845 ],
1846 vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
1847 ];
1848 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
1849 }
1850
1851 #[test]
1852 #[should_panic(expected = "Input data length is not a multiple of type width 4")]
1853 fn test_byte_stream_split_flba_bad_input() {
1854 let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
1855 decoder
1856 .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
1857 .unwrap();
1858 }
1859
1860 #[test]
1861 fn test_skip_byte_stream_split() {
1862 let block_data = vec![0.3, 0.4, 0.1, 4.10];
1863 test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1864 test_skip::<DoubleType>(
1865 block_data.into_iter().map(|x| x as f64).collect(),
1866 Encoding::BYTE_STREAM_SPLIT,
1867 100,
1868 );
1869 }
1870
1871 #[test]
1872 fn test_skip_byte_stream_split_ints() {
1873 let block_data = vec![3, 4, 1, 5];
1874 test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1875 test_skip::<Int64Type>(
1876 block_data.into_iter().map(|x| x as i64).collect(),
1877 Encoding::BYTE_STREAM_SPLIT,
1878 100,
1879 );
1880 }
1881
1882 fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1883 test_encode_decode::<T>(data, Encoding::RLE, -1);
1884 }
1885
1886 fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1887 test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
1888 }
1889
1890 fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
1891 test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
1892 }
1893
1894 fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
1895 test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
1896 }
1897
1898 fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
1903 let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
1904
1905 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1907
1908 for v in &data[..] {
1909 encoder.put(&v[..]).expect("ok to encode");
1910 }
1911 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1912
1913 let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
1915
1916 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1918
1919 let mut result = vec![T::T::default(); expected.len()];
1920 decoder
1921 .set_data(bytes, expected.len())
1922 .expect("ok to set data");
1923 let mut result_num_values = 0;
1924 while decoder.values_left() > 0 {
1925 result_num_values += decoder
1926 .get(&mut result[result_num_values..])
1927 .expect("ok to decode");
1928 }
1929 assert_eq!(result_num_values, expected.len());
1930 assert_eq!(result, expected);
1931 }
1932
1933 fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
1934 let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
1937
1938 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1940
1941 encoder.put(&data).expect("ok to encode");
1942
1943 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1944
1945 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1946 decoder.set_data(bytes, data.len()).expect("ok to set data");
1947
1948 if skip >= data.len() {
1949 let skipped = decoder.skip(skip).expect("ok to skip");
1950 assert_eq!(skipped, data.len());
1951
1952 let skipped_again = decoder.skip(skip).expect("ok to skip again");
1953 assert_eq!(skipped_again, 0);
1954 } else {
1955 let skipped = decoder.skip(skip).expect("ok to skip");
1956 assert_eq!(skipped, skip);
1957
1958 let remaining = data.len() - skip;
1959
1960 let expected = &data[skip..];
1961 let mut buffer = vec![T::T::default(); remaining];
1962 let fetched = decoder.get(&mut buffer).expect("ok to decode");
1963 assert_eq!(remaining, fetched);
1964 assert_eq!(&buffer, expected);
1965 }
1966 }
1967
1968 fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
1969 let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
1970 let decoder = get_decoder::<T>(descr, encoding);
1971 match err {
1972 Some(parquet_error) => {
1973 assert_eq!(
1974 decoder.err().unwrap().to_string(),
1975 parquet_error.to_string()
1976 );
1977 }
1978 None => {
1979 assert_eq!(decoder.unwrap().encoding(), encoding);
1980 }
1981 }
1982 }
1983
1984 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1986 let ty = SchemaType::primitive_type_builder("t", t)
1987 .with_length(type_len)
1988 .build()
1989 .unwrap();
1990 Arc::new(ColumnDescriptor::new(
1991 Arc::new(ty),
1992 0,
1993 0,
1994 ColumnPath::new(vec![]),
1995 ))
1996 }
1997
1998 fn usize_to_bytes(v: usize) -> [u8; 4] {
1999 (v as u32).to_ne_bytes()
2000 }
2001
2002 trait ToByteArray<T: DataType> {
2004 #[allow(clippy::wrong_self_convention)]
2005 fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2006 }
2007
2008 macro_rules! to_byte_array_impl {
2009 ($ty: ty) => {
2010 impl ToByteArray<$ty> for $ty {
2011 #[allow(clippy::wrong_self_convention)]
2012 fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2013 <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2014 }
2015 }
2016 };
2017 }
2018
2019 to_byte_array_impl!(Int32Type);
2020 to_byte_array_impl!(Int64Type);
2021 to_byte_array_impl!(FloatType);
2022 to_byte_array_impl!(DoubleType);
2023
2024 impl ToByteArray<BoolType> for BoolType {
2025 #[allow(clippy::wrong_self_convention)]
2026 fn to_byte_array(data: &[bool]) -> Vec<u8> {
2027 let mut v = vec![];
2028 for (i, item) in data.iter().enumerate() {
2029 if i % 8 == 0 {
2030 v.push(0);
2031 }
2032 if *item {
2033 v[i / 8] |= 1 << (i % 8);
2034 }
2035 }
2036 v
2037 }
2038 }
2039
2040 impl ToByteArray<Int96Type> for Int96Type {
2041 #[allow(clippy::wrong_self_convention)]
2042 fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2043 let mut v = vec![];
2044 for d in data {
2045 v.extend_from_slice(d.as_bytes());
2046 }
2047 v
2048 }
2049 }
2050
2051 impl ToByteArray<ByteArrayType> for ByteArrayType {
2052 #[allow(clippy::wrong_self_convention)]
2053 fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2054 let mut v = vec![];
2055 for d in data {
2056 let buf = d.data();
2057 let len = &usize_to_bytes(buf.len());
2058 v.extend_from_slice(len);
2059 v.extend(buf);
2060 }
2061 v
2062 }
2063 }
2064
2065 impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2066 #[allow(clippy::wrong_self_convention)]
2067 fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2068 let mut v = vec![];
2069 for d in data {
2070 let buf = d.data();
2071 v.extend(buf);
2072 }
2073 v
2074 }
2075 }
2076}