parquet/encodings/
decoding.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported decoders for Parquet.
19
20use bytes::Bytes;
21use num::traits::WrappingAdd;
22use num::FromPrimitive;
23use std::{cmp, marker::PhantomData, mem};
24
25use super::rle::RleDecoder;
26
27use crate::basic::*;
28use crate::data_type::private::ParquetValueType;
29use crate::data_type::*;
30use crate::encodings::decoding::byte_stream_split_decoder::{
31    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
32};
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35use crate::util::bit_util::{self, BitReader};
36
37mod byte_stream_split_decoder;
38
39pub(crate) mod private {
40    use super::*;
41
42    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
43    /// the corresponding [`ParquetValueType`]. This is necessary to support
44    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
45    /// and by extension all [`ParquetValueType`]
46    pub trait GetDecoder {
47        fn get_decoder<T: DataType<T = Self>>(
48            descr: ColumnDescPtr,
49            encoding: Encoding,
50        ) -> Result<Box<dyn Decoder<T>>> {
51            get_decoder_default(descr, encoding)
52        }
53    }
54
55    fn get_decoder_default<T: DataType>(
56        descr: ColumnDescPtr,
57        encoding: Encoding,
58    ) -> Result<Box<dyn Decoder<T>>> {
59        match encoding {
60            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
61            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
62                "Cannot initialize this encoding through this function"
63            )),
64            Encoding::RLE
65            | Encoding::DELTA_BINARY_PACKED
66            | Encoding::DELTA_BYTE_ARRAY
67            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
68                "Encoding {} is not supported for type",
69                encoding
70            )),
71            e => Err(nyi_err!("Encoding {} is not supported", e)),
72        }
73    }
74
75    impl GetDecoder for bool {
76        fn get_decoder<T: DataType<T = Self>>(
77            descr: ColumnDescPtr,
78            encoding: Encoding,
79        ) -> Result<Box<dyn Decoder<T>>> {
80            match encoding {
81                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
82                _ => get_decoder_default(descr, encoding),
83            }
84        }
85    }
86
87    impl GetDecoder for i32 {
88        fn get_decoder<T: DataType<T = Self>>(
89            descr: ColumnDescPtr,
90            encoding: Encoding,
91        ) -> Result<Box<dyn Decoder<T>>> {
92            match encoding {
93                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
94                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
95                _ => get_decoder_default(descr, encoding),
96            }
97        }
98    }
99
100    impl GetDecoder for i64 {
101        fn get_decoder<T: DataType<T = Self>>(
102            descr: ColumnDescPtr,
103            encoding: Encoding,
104        ) -> Result<Box<dyn Decoder<T>>> {
105            match encoding {
106                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
107                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
108                _ => get_decoder_default(descr, encoding),
109            }
110        }
111    }
112
113    impl GetDecoder for f32 {
114        fn get_decoder<T: DataType<T = Self>>(
115            descr: ColumnDescPtr,
116            encoding: Encoding,
117        ) -> Result<Box<dyn Decoder<T>>> {
118            match encoding {
119                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
120                _ => get_decoder_default(descr, encoding),
121            }
122        }
123    }
124    impl GetDecoder for f64 {
125        fn get_decoder<T: DataType<T = Self>>(
126            descr: ColumnDescPtr,
127            encoding: Encoding,
128        ) -> Result<Box<dyn Decoder<T>>> {
129            match encoding {
130                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
131                _ => get_decoder_default(descr, encoding),
132            }
133        }
134    }
135
136    impl GetDecoder for ByteArray {
137        fn get_decoder<T: DataType<T = Self>>(
138            descr: ColumnDescPtr,
139            encoding: Encoding,
140        ) -> Result<Box<dyn Decoder<T>>> {
141            match encoding {
142                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
143                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
144                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
145                }
146                _ => get_decoder_default(descr, encoding),
147            }
148        }
149    }
150
151    impl GetDecoder for FixedLenByteArray {
152        fn get_decoder<T: DataType<T = Self>>(
153            descr: ColumnDescPtr,
154            encoding: Encoding,
155        ) -> Result<Box<dyn Decoder<T>>> {
156            match encoding {
157                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
158                    VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
159                )),
160                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
161                _ => get_decoder_default(descr, encoding),
162            }
163        }
164    }
165
166    impl GetDecoder for Int96 {}
167}
168
169// ----------------------------------------------------------------------
170// Decoders
171
172/// A Parquet decoder for the data type `T`.
173pub trait Decoder<T: DataType>: Send {
174    /// Sets the data to decode to be `data`, which should contain `num_values` of values
175    /// to decode.
176    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
177
178    /// Consumes values from this decoder and write the results to `buffer`. This will try
179    /// to fill up `buffer`.
180    ///
181    /// Returns the actual number of values decoded, which should be equal to
182    /// `buffer.len()` unless the remaining number of values is less than
183    /// `buffer.len()`.
184    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
185
186    /// Consume values from this decoder and write the results to `buffer`, leaving
187    /// "spaces" for null values.
188    ///
189    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
190    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
191    ///   at least number of bits that equal to `buffer.len()`.
192    ///
193    /// Returns the actual number of values decoded.
194    ///
195    /// # Panics
196    ///
197    /// Panics if `null_count` is greater than `buffer.len()`.
198    fn get_spaced(
199        &mut self,
200        buffer: &mut [T::T],
201        null_count: usize,
202        valid_bits: &[u8],
203    ) -> Result<usize> {
204        assert!(buffer.len() >= null_count);
205
206        // TODO: check validity of the input arguments?
207        if null_count == 0 {
208            return self.get(buffer);
209        }
210
211        let num_values = buffer.len();
212        let values_to_read = num_values - null_count;
213        let values_read = self.get(buffer)?;
214        if values_read != values_to_read {
215            return Err(general_err!(
216                "Number of values read: {}, doesn't match expected: {}",
217                values_read,
218                values_to_read
219            ));
220        }
221        let mut values_to_move = values_read;
222        for i in (0..num_values).rev() {
223            if bit_util::get_bit(valid_bits, i) {
224                values_to_move -= 1;
225                buffer.swap(i, values_to_move);
226            }
227        }
228
229        Ok(num_values)
230    }
231
232    /// Returns the number of values left in this decoder stream.
233    fn values_left(&self) -> usize;
234
235    /// Returns the encoding for this decoder.
236    fn encoding(&self) -> Encoding;
237
238    /// Skip the specified number of values in this decoder stream.
239    fn skip(&mut self, num_values: usize) -> Result<usize>;
240}
241
242/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
243///
244/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
245/// disastrous consequence could occur.
246pub fn get_decoder<T: DataType>(
247    descr: ColumnDescPtr,
248    encoding: Encoding,
249) -> Result<Box<dyn Decoder<T>>> {
250    use self::private::GetDecoder;
251    T::T::get_decoder(descr, encoding)
252}
253
254// ----------------------------------------------------------------------
255// PLAIN Decoding
256
257#[derive(Default)]
258pub struct PlainDecoderDetails {
259    // The remaining number of values in the byte array
260    pub(crate) num_values: usize,
261
262    // The current starting index in the byte array. Not used when `T` is bool.
263    pub(crate) start: usize,
264
265    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
266    pub(crate) type_length: i32,
267
268    // The byte array to decode from. Not set if `T` is bool.
269    pub(crate) data: Option<Bytes>,
270
271    // Read `data` bit by bit. Only set if `T` is bool.
272    pub(crate) bit_reader: Option<BitReader>,
273}
274
275/// Plain decoding that supports all types.
276///
277/// Values are encoded back to back. For native types, data is encoded as little endian.
278/// Floating point types are encoded in IEEE.
279/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
280pub struct PlainDecoder<T: DataType> {
281    // The binary details needed for decoding
282    inner: PlainDecoderDetails,
283
284    // To allow `T` in the generic parameter for this struct. This doesn't take any
285    // space.
286    _phantom: PhantomData<T>,
287}
288
289impl<T: DataType> PlainDecoder<T> {
290    /// Creates new plain decoder.
291    pub fn new(type_length: i32) -> Self {
292        PlainDecoder {
293            inner: PlainDecoderDetails {
294                type_length,
295                num_values: 0,
296                start: 0,
297                data: None,
298                bit_reader: None,
299            },
300            _phantom: PhantomData,
301        }
302    }
303}
304
305impl<T: DataType> Decoder<T> for PlainDecoder<T> {
306    #[inline]
307    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
308        T::T::set_data(&mut self.inner, data, num_values);
309        Ok(())
310    }
311
312    #[inline]
313    fn values_left(&self) -> usize {
314        self.inner.num_values
315    }
316
317    #[inline]
318    fn encoding(&self) -> Encoding {
319        Encoding::PLAIN
320    }
321
322    #[inline]
323    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
324        T::T::decode(buffer, &mut self.inner)
325    }
326
327    #[inline]
328    fn skip(&mut self, num_values: usize) -> Result<usize> {
329        T::T::skip(&mut self.inner, num_values)
330    }
331}
332
333// ----------------------------------------------------------------------
334// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding
335
336/// Dictionary decoder.
337///
338/// The dictionary encoding builds a dictionary of values encountered in a given column.
339/// The dictionary is be stored in a dictionary page per column chunk.
340/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
341pub struct DictDecoder<T: DataType> {
342    // The dictionary, which maps ids to the values
343    dictionary: Vec<T::T>,
344
345    // Whether `dictionary` has been initialized
346    has_dictionary: bool,
347
348    // The decoder for the value ids
349    rle_decoder: Option<RleDecoder>,
350
351    // Number of values left in the data stream
352    num_values: usize,
353}
354
355impl<T: DataType> Default for DictDecoder<T> {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl<T: DataType> DictDecoder<T> {
362    /// Creates new dictionary decoder.
363    pub fn new() -> Self {
364        Self {
365            dictionary: vec![],
366            has_dictionary: false,
367            rle_decoder: None,
368            num_values: 0,
369        }
370    }
371
372    /// Decodes and sets values for dictionary using `decoder` decoder.
373    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
374        let num_values = decoder.values_left();
375        self.dictionary.resize(num_values, T::T::default());
376        let _ = decoder.get(&mut self.dictionary)?;
377        self.has_dictionary = true;
378        Ok(())
379    }
380}
381
382impl<T: DataType> Decoder<T> for DictDecoder<T> {
383    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
384        // First byte in `data` is bit width
385        let bit_width = data.as_ref()[0];
386        let mut rle_decoder = RleDecoder::new(bit_width);
387        rle_decoder.set_data(data.slice(1..));
388        self.num_values = num_values;
389        self.rle_decoder = Some(rle_decoder);
390        Ok(())
391    }
392
393    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
394        assert!(self.rle_decoder.is_some());
395        assert!(self.has_dictionary, "Must call set_dict() first!");
396
397        let rle = self.rle_decoder.as_mut().unwrap();
398        let num_values = cmp::min(buffer.len(), self.num_values);
399        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
400    }
401
402    /// Number of values left in this decoder stream
403    fn values_left(&self) -> usize {
404        self.num_values
405    }
406
407    fn encoding(&self) -> Encoding {
408        Encoding::RLE_DICTIONARY
409    }
410
411    fn skip(&mut self, num_values: usize) -> Result<usize> {
412        assert!(self.rle_decoder.is_some());
413        assert!(self.has_dictionary, "Must call set_dict() first!");
414
415        let rle = self.rle_decoder.as_mut().unwrap();
416        let num_values = cmp::min(num_values, self.num_values);
417        rle.skip(num_values)
418    }
419}
420
421// ----------------------------------------------------------------------
422// RLE Decoding
423
424/// RLE/Bit-Packing hybrid decoding for values.
425/// Currently is used only for data pages v2 and supports boolean types.
426/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
427pub struct RleValueDecoder<T: DataType> {
428    values_left: usize,
429    decoder: RleDecoder,
430    _phantom: PhantomData<T>,
431}
432
433impl<T: DataType> Default for RleValueDecoder<T> {
434    fn default() -> Self {
435        Self::new()
436    }
437}
438
439impl<T: DataType> RleValueDecoder<T> {
440    pub fn new() -> Self {
441        Self {
442            values_left: 0,
443            decoder: RleDecoder::new(1),
444            _phantom: PhantomData,
445        }
446    }
447}
448
449impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
450    #[inline]
451    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
452        // Only support RLE value reader for boolean values with bit width of 1.
453        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
454
455        // We still need to remove prefix of i32 from the stream.
456        const I32_SIZE: usize = mem::size_of::<i32>();
457        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
458        self.decoder = RleDecoder::new(1);
459        self.decoder
460            .set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
461        self.values_left = num_values;
462        Ok(())
463    }
464
465    #[inline]
466    fn values_left(&self) -> usize {
467        self.values_left
468    }
469
470    #[inline]
471    fn encoding(&self) -> Encoding {
472        Encoding::RLE
473    }
474
475    #[inline]
476    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
477        let num_values = cmp::min(buffer.len(), self.values_left);
478        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
479        self.values_left -= values_read;
480        Ok(values_read)
481    }
482
483    #[inline]
484    fn skip(&mut self, num_values: usize) -> Result<usize> {
485        let num_values = cmp::min(num_values, self.values_left);
486        let values_skipped = self.decoder.skip(num_values)?;
487        self.values_left -= values_skipped;
488        Ok(values_skipped)
489    }
490}
491
492// ----------------------------------------------------------------------
493// DELTA_BINARY_PACKED Decoding
494
495/// Delta binary packed decoder.
496/// Supports INT32 and INT64 types.
497/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
498/// information.
499pub struct DeltaBitPackDecoder<T: DataType> {
500    bit_reader: BitReader,
501    initialized: bool,
502
503    // Header info
504    /// The number of values in each block
505    block_size: usize,
506    /// The number of values that remain to be read in the current page
507    values_left: usize,
508    /// The number of mini-blocks in each block
509    mini_blocks_per_block: usize,
510    /// The number of values in each mini block
511    values_per_mini_block: usize,
512
513    // Per block info
514    /// The minimum delta in the block
515    min_delta: T::T,
516    /// The byte offset of the end of the current block
517    block_end_offset: usize,
518    /// The index on the current mini block
519    mini_block_idx: usize,
520    /// The bit widths of each mini block in the current block
521    mini_block_bit_widths: Vec<u8>,
522    /// The number of values remaining in the current mini block
523    mini_block_remaining: usize,
524
525    /// The first value from the block header if not consumed
526    first_value: Option<T::T>,
527    /// The last value to compute offsets from
528    last_value: T::T,
529}
530
531impl<T: DataType> Default for DeltaBitPackDecoder<T>
532where
533    T::T: Default + FromPrimitive + WrappingAdd + Copy,
534{
535    fn default() -> Self {
536        Self::new()
537    }
538}
539
540impl<T: DataType> DeltaBitPackDecoder<T>
541where
542    T::T: Default + FromPrimitive + WrappingAdd + Copy,
543{
544    /// Creates new delta bit packed decoder.
545    pub fn new() -> Self {
546        Self {
547            bit_reader: BitReader::from(vec![]),
548            initialized: false,
549            block_size: 0,
550            values_left: 0,
551            mini_blocks_per_block: 0,
552            values_per_mini_block: 0,
553            min_delta: Default::default(),
554            mini_block_idx: 0,
555            mini_block_bit_widths: vec![],
556            mini_block_remaining: 0,
557            block_end_offset: 0,
558            first_value: None,
559            last_value: Default::default(),
560        }
561    }
562
563    /// Returns the current offset
564    pub fn get_offset(&self) -> usize {
565        assert!(self.initialized, "Bit reader is not initialized");
566        match self.values_left {
567            // If we've exhausted this page report the end of the current block
568            // as we may not have consumed the trailing padding
569            //
570            // The max is necessary to handle pages which don't contain more than
571            // one value and therefore have no blocks, but still contain a page header
572            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
573            _ => self.bit_reader.get_byte_offset(),
574        }
575    }
576
577    /// Initializes the next block and the first mini block within it
578    #[inline]
579    fn next_block(&mut self) -> Result<()> {
580        let min_delta = self
581            .bit_reader
582            .get_zigzag_vlq_int()
583            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
584
585        self.min_delta =
586            T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
587
588        self.mini_block_bit_widths.clear();
589        self.bit_reader
590            .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
591
592        let mut offset = self.bit_reader.get_byte_offset();
593        let mut remaining = self.values_left;
594
595        // Compute the end offset of the current block
596        for b in &mut self.mini_block_bit_widths {
597            if remaining == 0 {
598                // Specification requires handling arbitrary bit widths
599                // for trailing mini blocks
600                *b = 0;
601            }
602            remaining = remaining.saturating_sub(self.values_per_mini_block);
603            offset += *b as usize * self.values_per_mini_block / 8;
604        }
605        self.block_end_offset = offset;
606
607        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
608            return Err(eof_err!("insufficient mini block bit widths"));
609        }
610
611        self.mini_block_remaining = self.values_per_mini_block;
612        self.mini_block_idx = 0;
613
614        Ok(())
615    }
616
617    /// Initializes the next mini block
618    #[inline]
619    fn next_mini_block(&mut self) -> Result<()> {
620        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
621            self.mini_block_idx += 1;
622            self.mini_block_remaining = self.values_per_mini_block;
623            Ok(())
624        } else {
625            self.next_block()
626        }
627    }
628}
629
630impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
631where
632    T::T: Default + FromPrimitive + WrappingAdd + Copy,
633{
634    // # of total values is derived from encoding
635    #[inline]
636    fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
637        self.bit_reader = BitReader::new(data);
638        self.initialized = true;
639
640        // Read header information
641        self.block_size = self
642            .bit_reader
643            .get_vlq_int()
644            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
645            .try_into()
646            .map_err(|_| general_err!("invalid 'block_size'"))?;
647
648        self.mini_blocks_per_block = self
649            .bit_reader
650            .get_vlq_int()
651            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
652            .try_into()
653            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
654
655        self.values_left = self
656            .bit_reader
657            .get_vlq_int()
658            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
659            .try_into()
660            .map_err(|_| general_err!("invalid 'values_left'"))?;
661
662        let first_value = self
663            .bit_reader
664            .get_zigzag_vlq_int()
665            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
666
667        self.first_value =
668            Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
669
670        if self.block_size % 128 != 0 {
671            return Err(general_err!(
672                "'block_size' must be a multiple of 128, got {}",
673                self.block_size
674            ));
675        }
676
677        if self.block_size % self.mini_blocks_per_block != 0 {
678            return Err(general_err!(
679                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
680                self.block_size,
681                self.mini_blocks_per_block
682            ));
683        }
684
685        // Reset decoding state
686        self.mini_block_idx = 0;
687        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
688        self.mini_block_remaining = 0;
689        self.mini_block_bit_widths.clear();
690
691        if self.values_per_mini_block % 32 != 0 {
692            return Err(general_err!(
693                "'values_per_mini_block' must be a multiple of 32 got {}",
694                self.values_per_mini_block
695            ));
696        }
697
698        Ok(())
699    }
700
701    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
702        assert!(self.initialized, "Bit reader is not initialized");
703        if buffer.is_empty() {
704            return Ok(0);
705        }
706
707        let mut read = 0;
708        let to_read = buffer.len().min(self.values_left);
709
710        if let Some(value) = self.first_value.take() {
711            self.last_value = value;
712            buffer[0] = value;
713            read += 1;
714            self.values_left -= 1;
715        }
716
717        while read != to_read {
718            if self.mini_block_remaining == 0 {
719                self.next_mini_block()?;
720            }
721
722            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
723            let batch_to_read = self.mini_block_remaining.min(to_read - read);
724
725            let batch_read = self
726                .bit_reader
727                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
728
729            if batch_read != batch_to_read {
730                return Err(general_err!(
731                    "Expected to read {} values from miniblock got {}",
732                    batch_to_read,
733                    batch_read
734                ));
735            }
736
737            // At this point we have read the deltas to `buffer` we now need to offset
738            // these to get back to the original values that were encoded
739            for v in &mut buffer[read..read + batch_read] {
740                // It is OK for deltas to contain "overflowed" values after encoding,
741                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
742                // restore original value.
743                *v = v
744                    .wrapping_add(&self.min_delta)
745                    .wrapping_add(&self.last_value);
746
747                self.last_value = *v;
748            }
749
750            read += batch_read;
751            self.mini_block_remaining -= batch_read;
752            self.values_left -= batch_read;
753        }
754
755        Ok(to_read)
756    }
757
758    fn values_left(&self) -> usize {
759        self.values_left
760    }
761
762    fn encoding(&self) -> Encoding {
763        Encoding::DELTA_BINARY_PACKED
764    }
765
766    fn skip(&mut self, num_values: usize) -> Result<usize> {
767        let mut skip = 0;
768        let to_skip = num_values.min(self.values_left);
769        if to_skip == 0 {
770            return Ok(0);
771        }
772
773        // try to consume first value in header.
774        if let Some(value) = self.first_value.take() {
775            self.last_value = value;
776            skip += 1;
777            self.values_left -= 1;
778        }
779
780        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
781            Type::INT32 => 32,
782            Type::INT64 => 64,
783            _ => unreachable!(),
784        };
785
786        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
787        while skip < to_skip {
788            if self.mini_block_remaining == 0 {
789                self.next_mini_block()?;
790            }
791
792            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
793            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
794            let mini_block_should_skip = mini_block_to_skip;
795
796            let skip_count = self
797                .bit_reader
798                .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
799
800            if skip_count != mini_block_to_skip {
801                return Err(general_err!(
802                    "Expected to skip {} values from mini block got {}.",
803                    mini_block_batch_size,
804                    skip_count
805                ));
806            }
807
808            for v in &mut skip_buffer[0..skip_count] {
809                *v = v
810                    .wrapping_add(&self.min_delta)
811                    .wrapping_add(&self.last_value);
812
813                self.last_value = *v;
814            }
815
816            skip += mini_block_should_skip;
817            self.mini_block_remaining -= mini_block_should_skip;
818            self.values_left -= mini_block_should_skip;
819        }
820
821        Ok(to_skip)
822    }
823}
824
825// ----------------------------------------------------------------------
826// DELTA_LENGTH_BYTE_ARRAY Decoding
827
828/// Delta length byte array decoder.
829///
830/// Only applied to byte arrays to separate the length values and the data, the lengths
831/// are encoded using DELTA_BINARY_PACKED encoding.
832/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
833/// for more information.
834pub struct DeltaLengthByteArrayDecoder<T: DataType> {
835    // Lengths for each byte array in `data`
836    // TODO: add memory tracker to this
837    lengths: Vec<i32>,
838
839    // Current index into `lengths`
840    current_idx: usize,
841
842    // Concatenated byte array data
843    data: Option<Bytes>,
844
845    // Offset into `data`, always point to the beginning of next byte array.
846    offset: usize,
847
848    // Number of values left in this decoder stream
849    num_values: usize,
850
851    // Placeholder to allow `T` as generic parameter
852    _phantom: PhantomData<T>,
853}
854
855impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
856    fn default() -> Self {
857        Self::new()
858    }
859}
860
861impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
862    /// Creates new delta length byte array decoder.
863    pub fn new() -> Self {
864        Self {
865            lengths: vec![],
866            current_idx: 0,
867            data: None,
868            offset: 0,
869            num_values: 0,
870            _phantom: PhantomData,
871        }
872    }
873}
874
875impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
876    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
877        match T::get_physical_type() {
878            Type::BYTE_ARRAY => {
879                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
880                len_decoder.set_data(data.clone(), num_values)?;
881                let num_lengths = len_decoder.values_left();
882                self.lengths.resize(num_lengths, 0);
883                len_decoder.get(&mut self.lengths[..])?;
884
885                self.data = Some(data.slice(len_decoder.get_offset()..));
886                self.offset = 0;
887                self.current_idx = 0;
888                self.num_values = num_lengths;
889                Ok(())
890            }
891            _ => Err(general_err!(
892                "DeltaLengthByteArrayDecoder only support ByteArrayType"
893            )),
894        }
895    }
896
897    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
898        match T::get_physical_type() {
899            Type::BYTE_ARRAY => {
900                assert!(self.data.is_some());
901
902                let data = self.data.as_ref().unwrap();
903                let num_values = cmp::min(buffer.len(), self.num_values);
904
905                for item in buffer.iter_mut().take(num_values) {
906                    let len = self.lengths[self.current_idx] as usize;
907                    item.set_from_bytes(data.slice(self.offset..self.offset + len));
908
909                    self.offset += len;
910                    self.current_idx += 1;
911                }
912
913                self.num_values -= num_values;
914                Ok(num_values)
915            }
916            _ => Err(general_err!(
917                "DeltaLengthByteArrayDecoder only support ByteArrayType"
918            )),
919        }
920    }
921
922    fn values_left(&self) -> usize {
923        self.num_values
924    }
925
926    fn encoding(&self) -> Encoding {
927        Encoding::DELTA_LENGTH_BYTE_ARRAY
928    }
929
930    fn skip(&mut self, num_values: usize) -> Result<usize> {
931        match T::get_physical_type() {
932            Type::BYTE_ARRAY => {
933                let num_values = cmp::min(num_values, self.num_values);
934
935                let next_offset: i32 = self.lengths
936                    [self.current_idx..self.current_idx + num_values]
937                    .iter()
938                    .sum();
939
940                self.current_idx += num_values;
941                self.offset += next_offset as usize;
942
943                self.num_values -= num_values;
944                Ok(num_values)
945            }
946            other_type => Err(general_err!(
947                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
948                other_type
949            )),
950        }
951    }
952}
953
954// ----------------------------------------------------------------------
955// DELTA_BYTE_ARRAY Decoding
956
957/// Delta byte array decoder.
958///
959/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
960/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
961/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
962/// information.
963pub struct DeltaByteArrayDecoder<T: DataType> {
964    // Prefix lengths for each byte array
965    // TODO: add memory tracker to this
966    prefix_lengths: Vec<i32>,
967
968    // The current index into `prefix_lengths`,
969    current_idx: usize,
970
971    // Decoder for all suffixes, the # of which should be the same as
972    // `prefix_lengths.len()`
973    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
974
975    // The last byte array, used to derive the current prefix
976    previous_value: Vec<u8>,
977
978    // Number of values left
979    num_values: usize,
980
981    // Placeholder to allow `T` as generic parameter
982    _phantom: PhantomData<T>,
983}
984
985impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
986    fn default() -> Self {
987        Self::new()
988    }
989}
990
991impl<T: DataType> DeltaByteArrayDecoder<T> {
992    /// Creates new delta byte array decoder.
993    pub fn new() -> Self {
994        Self {
995            prefix_lengths: vec![],
996            current_idx: 0,
997            suffix_decoder: None,
998            previous_value: vec![],
999            num_values: 0,
1000            _phantom: PhantomData,
1001        }
1002    }
1003}
1004
1005impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1006    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1007        match T::get_physical_type() {
1008            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1009                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1010                prefix_len_decoder.set_data(data.clone(), num_values)?;
1011                let num_prefixes = prefix_len_decoder.values_left();
1012                self.prefix_lengths.resize(num_prefixes, 0);
1013                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1014
1015                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1016                suffix_decoder
1017                    .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1018                self.suffix_decoder = Some(suffix_decoder);
1019                self.num_values = num_prefixes;
1020                self.current_idx = 0;
1021                self.previous_value.clear();
1022                Ok(())
1023            }
1024            _ => Err(general_err!(
1025                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1026            )),
1027        }
1028    }
1029
1030    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1031        match T::get_physical_type() {
1032            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1033                let num_values = cmp::min(buffer.len(), self.num_values);
1034                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1035                for item in buffer.iter_mut().take(num_values) {
1036                    // Process suffix
1037                    // TODO: this is awkward - maybe we should add a non-vectorized API?
1038                    let suffix_decoder = self
1039                        .suffix_decoder
1040                        .as_mut()
1041                        .expect("decoder not initialized");
1042                    suffix_decoder.get(&mut v[..])?;
1043                    let suffix = v[0].data();
1044
1045                    // Extract current prefix length, can be 0
1046                    let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1047
1048                    // Concatenate prefix with suffix
1049                    let mut result = Vec::new();
1050                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
1051                    result.extend_from_slice(suffix);
1052
1053                    let data = Bytes::from(result.clone());
1054                    item.set_from_bytes(data);
1055
1056                    self.previous_value = result;
1057                    self.current_idx += 1;
1058                }
1059
1060                self.num_values -= num_values;
1061                Ok(num_values)
1062            }
1063            _ => Err(general_err!(
1064                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1065            )),
1066        }
1067    }
1068
1069    fn values_left(&self) -> usize {
1070        self.num_values
1071    }
1072
1073    fn encoding(&self) -> Encoding {
1074        Encoding::DELTA_BYTE_ARRAY
1075    }
1076
1077    fn skip(&mut self, num_values: usize) -> Result<usize> {
1078        let mut buffer = vec![T::T::default(); num_values];
1079        self.get(&mut buffer)
1080    }
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085    use super::{super::encoding::*, *};
1086
1087    use std::f32::consts::PI as PI_f32;
1088    use std::f64::consts::PI as PI_f64;
1089    use std::sync::Arc;
1090
1091    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1092    use crate::util::test_common::rand_gen::RandGen;
1093
1094    #[test]
1095    fn test_get_decoders() {
1096        // supported encodings
1097        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1098        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1099        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1100        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1101        create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1102
1103        // error when initializing
1104        create_and_check_decoder::<Int32Type>(
1105            Encoding::RLE_DICTIONARY,
1106            Some(general_err!(
1107                "Cannot initialize this encoding through this function"
1108            )),
1109        );
1110        create_and_check_decoder::<Int32Type>(
1111            Encoding::PLAIN_DICTIONARY,
1112            Some(general_err!(
1113                "Cannot initialize this encoding through this function"
1114            )),
1115        );
1116        create_and_check_decoder::<Int32Type>(
1117            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1118            Some(general_err!(
1119                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1120            )),
1121        );
1122        create_and_check_decoder::<Int32Type>(
1123            Encoding::DELTA_BYTE_ARRAY,
1124            Some(general_err!(
1125                "Encoding DELTA_BYTE_ARRAY is not supported for type"
1126            )),
1127        );
1128
1129        // unsupported
1130        #[allow(deprecated)]
1131        create_and_check_decoder::<Int32Type>(
1132            Encoding::BIT_PACKED,
1133            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1134        );
1135    }
1136
1137    #[test]
1138    fn test_plain_decode_int32() {
1139        let data = [42, 18, 52];
1140        let data_bytes = Int32Type::to_byte_array(&data[..]);
1141        let mut buffer = [0; 3];
1142        test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1143    }
1144
1145    #[test]
1146    fn test_plain_skip_int32() {
1147        let data = [42, 18, 52];
1148        let data_bytes = Int32Type::to_byte_array(&data[..]);
1149        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1150    }
1151
1152    #[test]
1153    fn test_plain_skip_all_int32() {
1154        let data = [42, 18, 52];
1155        let data_bytes = Int32Type::to_byte_array(&data[..]);
1156        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1157    }
1158
1159    #[test]
1160    fn test_plain_decode_int32_spaced() {
1161        let data = [42, 18, 52];
1162        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1163        let data_bytes = Int32Type::to_byte_array(&data[..]);
1164        let mut buffer = [0; 8];
1165        let num_nulls = 5;
1166        let valid_bits = [0b01001010];
1167        test_plain_decode_spaced::<Int32Type>(
1168            Bytes::from(data_bytes),
1169            3,
1170            -1,
1171            &mut buffer[..],
1172            num_nulls,
1173            &valid_bits,
1174            &expected_data[..],
1175        );
1176    }
1177
1178    #[test]
1179    fn test_plain_decode_int64() {
1180        let data = [42, 18, 52];
1181        let data_bytes = Int64Type::to_byte_array(&data[..]);
1182        let mut buffer = [0; 3];
1183        test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1184    }
1185
1186    #[test]
1187    fn test_plain_skip_int64() {
1188        let data = [42, 18, 52];
1189        let data_bytes = Int64Type::to_byte_array(&data[..]);
1190        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1191    }
1192
1193    #[test]
1194    fn test_plain_skip_all_int64() {
1195        let data = [42, 18, 52];
1196        let data_bytes = Int64Type::to_byte_array(&data[..]);
1197        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1198    }
1199
1200    #[test]
1201    fn test_plain_decode_float() {
1202        let data = [PI_f32, 2.414, 12.51];
1203        let data_bytes = FloatType::to_byte_array(&data[..]);
1204        let mut buffer = [0.0; 3];
1205        test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1206    }
1207
1208    #[test]
1209    fn test_plain_skip_float() {
1210        let data = [PI_f32, 2.414, 12.51];
1211        let data_bytes = FloatType::to_byte_array(&data[..]);
1212        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1213    }
1214
1215    #[test]
1216    fn test_plain_skip_all_float() {
1217        let data = [PI_f32, 2.414, 12.51];
1218        let data_bytes = FloatType::to_byte_array(&data[..]);
1219        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1220    }
1221
1222    #[test]
1223    fn test_plain_skip_double() {
1224        let data = [PI_f64, 2.414f64, 12.51f64];
1225        let data_bytes = DoubleType::to_byte_array(&data[..]);
1226        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1227    }
1228
1229    #[test]
1230    fn test_plain_skip_all_double() {
1231        let data = [PI_f64, 2.414f64, 12.51f64];
1232        let data_bytes = DoubleType::to_byte_array(&data[..]);
1233        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1234    }
1235
1236    #[test]
1237    fn test_plain_decode_double() {
1238        let data = [PI_f64, 2.414f64, 12.51f64];
1239        let data_bytes = DoubleType::to_byte_array(&data[..]);
1240        let mut buffer = [0.0f64; 3];
1241        test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1242    }
1243
1244    #[test]
1245    fn test_plain_decode_int96() {
1246        let mut data = [Int96::new(); 4];
1247        data[0].set_data(11, 22, 33);
1248        data[1].set_data(44, 55, 66);
1249        data[2].set_data(10, 20, 30);
1250        data[3].set_data(40, 50, 60);
1251        let data_bytes = Int96Type::to_byte_array(&data[..]);
1252        let mut buffer = [Int96::new(); 4];
1253        test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1254    }
1255
1256    #[test]
1257    fn test_plain_skip_int96() {
1258        let mut data = [Int96::new(); 4];
1259        data[0].set_data(11, 22, 33);
1260        data[1].set_data(44, 55, 66);
1261        data[2].set_data(10, 20, 30);
1262        data[3].set_data(40, 50, 60);
1263        let data_bytes = Int96Type::to_byte_array(&data[..]);
1264        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1265    }
1266
1267    #[test]
1268    fn test_plain_skip_all_int96() {
1269        let mut data = [Int96::new(); 4];
1270        data[0].set_data(11, 22, 33);
1271        data[1].set_data(44, 55, 66);
1272        data[2].set_data(10, 20, 30);
1273        data[3].set_data(40, 50, 60);
1274        let data_bytes = Int96Type::to_byte_array(&data[..]);
1275        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1276    }
1277
1278    #[test]
1279    fn test_plain_decode_bool() {
1280        let data = [
1281            false, true, false, false, true, false, true, true, false, true,
1282        ];
1283        let data_bytes = BoolType::to_byte_array(&data[..]);
1284        let mut buffer = [false; 10];
1285        test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1286    }
1287
1288    #[test]
1289    fn test_plain_skip_bool() {
1290        let data = [
1291            false, true, false, false, true, false, true, true, false, true,
1292        ];
1293        let data_bytes = BoolType::to_byte_array(&data[..]);
1294        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1295    }
1296
1297    #[test]
1298    fn test_plain_skip_all_bool() {
1299        let data = [
1300            false, true, false, false, true, false, true, true, false, true,
1301        ];
1302        let data_bytes = BoolType::to_byte_array(&data[..]);
1303        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1304    }
1305
1306    #[test]
1307    fn test_plain_decode_byte_array() {
1308        let mut data = vec![ByteArray::new(); 2];
1309        data[0].set_data(Bytes::from(String::from("hello")));
1310        data[1].set_data(Bytes::from(String::from("parquet")));
1311        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1312        let mut buffer = vec![ByteArray::new(); 2];
1313        test_plain_decode::<ByteArrayType>(
1314            Bytes::from(data_bytes),
1315            2,
1316            -1,
1317            &mut buffer[..],
1318            &data[..],
1319        );
1320    }
1321
1322    #[test]
1323    fn test_plain_skip_byte_array() {
1324        let mut data = vec![ByteArray::new(); 2];
1325        data[0].set_data(Bytes::from(String::from("hello")));
1326        data[1].set_data(Bytes::from(String::from("parquet")));
1327        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1328        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1329    }
1330
1331    #[test]
1332    fn test_plain_skip_all_byte_array() {
1333        let mut data = vec![ByteArray::new(); 2];
1334        data[0].set_data(Bytes::from(String::from("hello")));
1335        data[1].set_data(Bytes::from(String::from("parquet")));
1336        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1337        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1338    }
1339
1340    #[test]
1341    fn test_plain_decode_fixed_len_byte_array() {
1342        let mut data = vec![FixedLenByteArray::default(); 3];
1343        data[0].set_data(Bytes::from(String::from("bird")));
1344        data[1].set_data(Bytes::from(String::from("come")));
1345        data[2].set_data(Bytes::from(String::from("flow")));
1346        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1347        let mut buffer = vec![FixedLenByteArray::default(); 3];
1348        test_plain_decode::<FixedLenByteArrayType>(
1349            Bytes::from(data_bytes),
1350            3,
1351            4,
1352            &mut buffer[..],
1353            &data[..],
1354        );
1355    }
1356
1357    #[test]
1358    fn test_plain_skip_fixed_len_byte_array() {
1359        let mut data = vec![FixedLenByteArray::default(); 3];
1360        data[0].set_data(Bytes::from(String::from("bird")));
1361        data[1].set_data(Bytes::from(String::from("come")));
1362        data[2].set_data(Bytes::from(String::from("flow")));
1363        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1364        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1365    }
1366
1367    #[test]
1368    fn test_plain_skip_all_fixed_len_byte_array() {
1369        let mut data = vec![FixedLenByteArray::default(); 3];
1370        data[0].set_data(Bytes::from(String::from("bird")));
1371        data[1].set_data(Bytes::from(String::from("come")));
1372        data[2].set_data(Bytes::from(String::from("flow")));
1373        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1374        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1375    }
1376
1377    fn test_plain_decode<T: DataType>(
1378        data: Bytes,
1379        num_values: usize,
1380        type_length: i32,
1381        buffer: &mut [T::T],
1382        expected: &[T::T],
1383    ) {
1384        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1385        let result = decoder.set_data(data, num_values);
1386        assert!(result.is_ok());
1387        let result = decoder.get(buffer);
1388        assert!(result.is_ok());
1389        assert_eq!(decoder.values_left(), 0);
1390        assert_eq!(buffer, expected);
1391    }
1392
1393    fn test_plain_skip<T: DataType>(
1394        data: Bytes,
1395        num_values: usize,
1396        skip: usize,
1397        type_length: i32,
1398        expected: &[T::T],
1399    ) {
1400        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1401        let result = decoder.set_data(data, num_values);
1402        assert!(result.is_ok());
1403        let skipped = decoder.skip(skip).expect("skipping values");
1404
1405        if skip >= num_values {
1406            assert_eq!(skipped, num_values);
1407
1408            let mut buffer = vec![T::T::default(); 1];
1409            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1410            assert_eq!(remaining, 0);
1411        } else {
1412            assert_eq!(skipped, skip);
1413            let mut buffer = vec![T::T::default(); num_values - skip];
1414            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1415            assert_eq!(remaining, num_values - skip);
1416            assert_eq!(decoder.values_left(), 0);
1417            assert_eq!(buffer, expected);
1418        }
1419    }
1420
1421    fn test_plain_decode_spaced<T: DataType>(
1422        data: Bytes,
1423        num_values: usize,
1424        type_length: i32,
1425        buffer: &mut [T::T],
1426        num_nulls: usize,
1427        valid_bits: &[u8],
1428        expected: &[T::T],
1429    ) {
1430        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1431        let result = decoder.set_data(data, num_values);
1432        assert!(result.is_ok());
1433        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1434        assert!(result.is_ok());
1435        assert_eq!(num_values + num_nulls, result.unwrap());
1436        assert_eq!(decoder.values_left(), 0);
1437        assert_eq!(buffer, expected);
1438    }
1439
1440    #[test]
1441    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1442    fn test_rle_value_encode_int32_not_supported() {
1443        let mut encoder = RleValueEncoder::<Int32Type>::new();
1444        encoder.put(&[1, 2, 3, 4]).unwrap();
1445    }
1446
1447    #[test]
1448    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1449    fn test_rle_value_decode_int32_not_supported() {
1450        let mut decoder = RleValueDecoder::<Int32Type>::new();
1451        decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1452    }
1453
1454    #[test]
1455    fn test_rle_value_decode_bool_decode() {
1456        // Test multiple 'put' calls on the same encoder
1457        let data = vec![
1458            BoolType::gen_vec(-1, 256),
1459            BoolType::gen_vec(-1, 257),
1460            BoolType::gen_vec(-1, 126),
1461        ];
1462        test_rle_value_decode::<BoolType>(data);
1463    }
1464
1465    #[test]
1466    #[should_panic(expected = "Bit reader is not initialized")]
1467    fn test_delta_bit_packed_not_initialized_offset() {
1468        // Fail if set_data() is not called before get_offset()
1469        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1470        decoder.get_offset();
1471    }
1472
1473    #[test]
1474    #[should_panic(expected = "Bit reader is not initialized")]
1475    fn test_delta_bit_packed_not_initialized_get() {
1476        // Fail if set_data() is not called before get()
1477        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1478        let mut buffer = vec![];
1479        decoder.get(&mut buffer).unwrap();
1480    }
1481
1482    #[test]
1483    fn test_delta_bit_packed_int32_empty() {
1484        let data = vec![vec![0; 0]];
1485        test_delta_bit_packed_decode::<Int32Type>(data);
1486    }
1487
1488    #[test]
1489    fn test_delta_bit_packed_int32_repeat() {
1490        let block_data = vec![
1491            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1492            6, 7, 8,
1493        ];
1494        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1495    }
1496
1497    #[test]
1498    fn test_skip_delta_bit_packed_int32_repeat() {
1499        let block_data = vec![
1500            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1501            6, 7, 8,
1502        ];
1503        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1504        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1505    }
1506
1507    #[test]
1508    fn test_delta_bit_packed_int32_uneven() {
1509        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1510        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1511    }
1512
1513    #[test]
1514    fn test_skip_delta_bit_packed_int32_uneven() {
1515        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1516        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1517        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1518    }
1519
1520    #[test]
1521    fn test_delta_bit_packed_int32_same_values() {
1522        let block_data = vec![
1523            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1524        ];
1525        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1526
1527        let block_data = vec![
1528            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1529            -127, -127,
1530        ];
1531        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1532    }
1533
1534    #[test]
1535    fn test_skip_delta_bit_packed_int32_same_values() {
1536        let block_data = vec![
1537            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1538        ];
1539        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1540        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1541
1542        let block_data = vec![
1543            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1544            -127, -127,
1545        ];
1546        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1547        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1548    }
1549
1550    #[test]
1551    fn test_delta_bit_packed_int32_min_max() {
1552        let block_data = vec![
1553            i32::MIN,
1554            i32::MIN,
1555            i32::MIN,
1556            i32::MAX,
1557            i32::MIN,
1558            i32::MAX,
1559            i32::MIN,
1560            i32::MAX,
1561        ];
1562        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1563    }
1564
1565    #[test]
1566    fn test_skip_delta_bit_packed_int32_min_max() {
1567        let block_data = vec![
1568            i32::MIN,
1569            i32::MIN,
1570            i32::MIN,
1571            i32::MAX,
1572            i32::MIN,
1573            i32::MAX,
1574            i32::MIN,
1575            i32::MAX,
1576        ];
1577        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1578        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1579    }
1580
1581    #[test]
1582    fn test_delta_bit_packed_int32_multiple_blocks() {
1583        // Test multiple 'put' calls on the same encoder
1584        let data = vec![
1585            Int32Type::gen_vec(-1, 64),
1586            Int32Type::gen_vec(-1, 128),
1587            Int32Type::gen_vec(-1, 64),
1588        ];
1589        test_delta_bit_packed_decode::<Int32Type>(data);
1590    }
1591
1592    #[test]
1593    fn test_delta_bit_packed_int32_data_across_blocks() {
1594        // Test multiple 'put' calls on the same encoder
1595        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1596        test_delta_bit_packed_decode::<Int32Type>(data);
1597    }
1598
1599    #[test]
1600    fn test_delta_bit_packed_int32_with_empty_blocks() {
1601        let data = vec![
1602            Int32Type::gen_vec(-1, 128),
1603            vec![0; 0],
1604            Int32Type::gen_vec(-1, 64),
1605        ];
1606        test_delta_bit_packed_decode::<Int32Type>(data);
1607    }
1608
1609    #[test]
1610    fn test_delta_bit_packed_int64_empty() {
1611        let data = vec![vec![0; 0]];
1612        test_delta_bit_packed_decode::<Int64Type>(data);
1613    }
1614
1615    #[test]
1616    fn test_delta_bit_packed_int64_min_max() {
1617        let block_data = vec![
1618            i64::MIN,
1619            i64::MAX,
1620            i64::MIN,
1621            i64::MAX,
1622            i64::MIN,
1623            i64::MAX,
1624            i64::MIN,
1625            i64::MAX,
1626        ];
1627        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1628    }
1629
1630    #[test]
1631    fn test_delta_bit_packed_int64_multiple_blocks() {
1632        // Test multiple 'put' calls on the same encoder
1633        let data = vec![
1634            Int64Type::gen_vec(-1, 64),
1635            Int64Type::gen_vec(-1, 128),
1636            Int64Type::gen_vec(-1, 64),
1637        ];
1638        test_delta_bit_packed_decode::<Int64Type>(data);
1639    }
1640
1641    #[test]
1642    fn test_delta_bit_packed_decoder_sample() {
1643        let data_bytes = vec![
1644            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1645            0, 0, 0, 0, 0, 0,
1646        ];
1647        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1648        decoder.set_data(data_bytes.into(), 3).unwrap();
1649        // check exact offsets, because when reading partial values we end up with
1650        // some data not being read from bit reader
1651        assert_eq!(decoder.get_offset(), 5);
1652        let mut result = vec![0, 0, 0];
1653        decoder.get(&mut result).unwrap();
1654        assert_eq!(decoder.get_offset(), 34);
1655        assert_eq!(result, vec![29, 43, 89]);
1656    }
1657
1658    #[test]
1659    fn test_delta_bit_packed_padding() {
1660        // Page header
1661        let header = vec![
1662            // Page Header
1663
1664            // Block Size - 256
1665            128,
1666            2,
1667            // Miniblocks in block,
1668            4,
1669            // Total value count - 419
1670            128 + 35,
1671            3,
1672            // First value - 7
1673            7,
1674        ];
1675
1676        // Block Header
1677        let block1_header = vec![
1678            0, // Min delta
1679            0, 1, 0, 0, // Bit widths
1680        ];
1681
1682        // Mini-block 1 - bit width 0 => 0 bytes
1683        // Mini-block 2 - bit width 1 => 8 bytes
1684        // Mini-block 3 - bit width 0 => 0 bytes
1685        // Mini-block 4 - bit width 0 => 0 bytes
1686        let block1 = vec![0xFF; 8];
1687
1688        // Block Header
1689        let block2_header = vec![
1690            0, // Min delta
1691            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1692        ];
1693
1694        // Mini-block 1 - bit width 0 => 0 bytes
1695        // Mini-block 2 - bit width 1 => 8 bytes
1696        // Mini-block 3 - bit width 2 => 16 bytes
1697        // Mini-block 4 - padding => no bytes
1698        let block2 = vec![0xFF; 24];
1699
1700        let data: Vec<u8> = header
1701            .into_iter()
1702            .chain(block1_header)
1703            .chain(block1)
1704            .chain(block2_header)
1705            .chain(block2)
1706            .collect();
1707
1708        let length = data.len();
1709
1710        let ptr = Bytes::from(data);
1711        let mut reader = BitReader::new(ptr.clone());
1712        assert_eq!(reader.get_vlq_int().unwrap(), 256);
1713        assert_eq!(reader.get_vlq_int().unwrap(), 4);
1714        assert_eq!(reader.get_vlq_int().unwrap(), 419);
1715        assert_eq!(reader.get_vlq_int().unwrap(), 7);
1716
1717        // Test output buffer larger than needed and not exact multiple of block size
1718        let mut output = vec![0_i32; 420];
1719
1720        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1721        decoder.set_data(ptr.clone(), 0).unwrap();
1722        assert_eq!(decoder.get(&mut output).unwrap(), 419);
1723        assert_eq!(decoder.get_offset(), length);
1724
1725        // Test with truncated buffer
1726        decoder.set_data(ptr.slice(..12), 0).unwrap();
1727        let err = decoder.get(&mut output).unwrap_err().to_string();
1728        assert!(
1729            err.contains("Expected to read 64 values from miniblock got 8"),
1730            "{}",
1731            err
1732        );
1733    }
1734
1735    #[test]
1736    fn test_delta_byte_array_same_arrays() {
1737        let data = vec![
1738            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1739            vec![
1740                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1741                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1742            ],
1743            vec![
1744                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1745                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1746            ],
1747        ];
1748        test_delta_byte_array_decode(data);
1749    }
1750
1751    #[test]
1752    fn test_delta_byte_array_unique_arrays() {
1753        let data = vec![
1754            vec![ByteArray::from(vec![1])],
1755            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1756            vec![
1757                ByteArray::from(vec![7, 8]),
1758                ByteArray::from(vec![9, 0, 1, 2]),
1759            ],
1760        ];
1761        test_delta_byte_array_decode(data);
1762    }
1763
1764    #[test]
1765    fn test_delta_byte_array_single_array() {
1766        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1767        test_delta_byte_array_decode(data);
1768    }
1769
1770    #[test]
1771    fn test_byte_stream_split_multiple_f32() {
1772        let data = vec![
1773            vec![
1774                f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1775                f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1776            ],
1777            vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1778        ];
1779        test_byte_stream_split_decode::<FloatType>(data, -1);
1780    }
1781
1782    #[test]
1783    fn test_byte_stream_split_f64() {
1784        let data = vec![vec![
1785            f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1786            f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1787        ]];
1788        test_byte_stream_split_decode::<DoubleType>(data, -1);
1789    }
1790
1791    #[test]
1792    fn test_byte_stream_split_multiple_i32() {
1793        let data = vec![
1794            vec![
1795                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1796                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1797            ],
1798            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1799        ];
1800        test_byte_stream_split_decode::<Int32Type>(data, -1);
1801    }
1802
1803    #[test]
1804    fn test_byte_stream_split_i64() {
1805        let data = vec![vec![
1806            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1807            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1808        ]];
1809        test_byte_stream_split_decode::<Int64Type>(data, -1);
1810    }
1811
1812    fn test_byte_stream_split_flba(type_width: usize) {
1813        let data = vec![
1814            vec![
1815                FixedLenByteArrayType::gen(type_width as i32),
1816                FixedLenByteArrayType::gen(type_width as i32),
1817            ],
1818            vec![FixedLenByteArrayType::gen(type_width as i32)],
1819        ];
1820        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
1821    }
1822
1823    #[test]
1824    fn test_byte_stream_split_flba5() {
1825        test_byte_stream_split_flba(5);
1826    }
1827
1828    #[test]
1829    fn test_byte_stream_split_flba16() {
1830        test_byte_stream_split_flba(16);
1831    }
1832
1833    #[test]
1834    fn test_byte_stream_split_flba19() {
1835        test_byte_stream_split_flba(19);
1836    }
1837
1838    #[test]
1839    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
1840    fn test_byte_stream_split_flba_mismatch() {
1841        let data = vec![
1842            vec![
1843                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
1844                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
1845            ],
1846            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
1847        ];
1848        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
1849    }
1850
1851    #[test]
1852    #[should_panic(expected = "Input data length is not a multiple of type width 4")]
1853    fn test_byte_stream_split_flba_bad_input() {
1854        let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
1855        decoder
1856            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
1857            .unwrap();
1858    }
1859
1860    #[test]
1861    fn test_skip_byte_stream_split() {
1862        let block_data = vec![0.3, 0.4, 0.1, 4.10];
1863        test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1864        test_skip::<DoubleType>(
1865            block_data.into_iter().map(|x| x as f64).collect(),
1866            Encoding::BYTE_STREAM_SPLIT,
1867            100,
1868        );
1869    }
1870
1871    #[test]
1872    fn test_skip_byte_stream_split_ints() {
1873        let block_data = vec![3, 4, 1, 5];
1874        test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1875        test_skip::<Int64Type>(
1876            block_data.into_iter().map(|x| x as i64).collect(),
1877            Encoding::BYTE_STREAM_SPLIT,
1878            100,
1879        );
1880    }
1881
1882    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1883        test_encode_decode::<T>(data, Encoding::RLE, -1);
1884    }
1885
1886    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1887        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
1888    }
1889
1890    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
1891        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
1892    }
1893
1894    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
1895        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
1896    }
1897
1898    // Input data represents vector of data slices to write (test multiple `put()` calls)
1899    // For example,
1900    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
1901    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
1902    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
1903        let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
1904
1905        // Encode data
1906        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1907
1908        for v in &data[..] {
1909            encoder.put(&v[..]).expect("ok to encode");
1910        }
1911        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1912
1913        // Flatten expected data as contiguous array of values
1914        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
1915
1916        // Decode data and compare with original
1917        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1918
1919        let mut result = vec![T::T::default(); expected.len()];
1920        decoder
1921            .set_data(bytes, expected.len())
1922            .expect("ok to set data");
1923        let mut result_num_values = 0;
1924        while decoder.values_left() > 0 {
1925            result_num_values += decoder
1926                .get(&mut result[result_num_values..])
1927                .expect("ok to decode");
1928        }
1929        assert_eq!(result_num_values, expected.len());
1930        assert_eq!(result, expected);
1931    }
1932
1933    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
1934        // Type length should not really matter for encode/decode test,
1935        // otherwise change it based on type
1936        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
1937
1938        // Encode data
1939        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1940
1941        encoder.put(&data).expect("ok to encode");
1942
1943        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1944
1945        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1946        decoder.set_data(bytes, data.len()).expect("ok to set data");
1947
1948        if skip >= data.len() {
1949            let skipped = decoder.skip(skip).expect("ok to skip");
1950            assert_eq!(skipped, data.len());
1951
1952            let skipped_again = decoder.skip(skip).expect("ok to skip again");
1953            assert_eq!(skipped_again, 0);
1954        } else {
1955            let skipped = decoder.skip(skip).expect("ok to skip");
1956            assert_eq!(skipped, skip);
1957
1958            let remaining = data.len() - skip;
1959
1960            let expected = &data[skip..];
1961            let mut buffer = vec![T::T::default(); remaining];
1962            let fetched = decoder.get(&mut buffer).expect("ok to decode");
1963            assert_eq!(remaining, fetched);
1964            assert_eq!(&buffer, expected);
1965        }
1966    }
1967
1968    fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
1969        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
1970        let decoder = get_decoder::<T>(descr, encoding);
1971        match err {
1972            Some(parquet_error) => {
1973                assert_eq!(
1974                    decoder.err().unwrap().to_string(),
1975                    parquet_error.to_string()
1976                );
1977            }
1978            None => {
1979                assert_eq!(decoder.unwrap().encoding(), encoding);
1980            }
1981        }
1982    }
1983
1984    // Creates test column descriptor.
1985    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1986        let ty = SchemaType::primitive_type_builder("t", t)
1987            .with_length(type_len)
1988            .build()
1989            .unwrap();
1990        Arc::new(ColumnDescriptor::new(
1991            Arc::new(ty),
1992            0,
1993            0,
1994            ColumnPath::new(vec![]),
1995        ))
1996    }
1997
1998    fn usize_to_bytes(v: usize) -> [u8; 4] {
1999        (v as u32).to_ne_bytes()
2000    }
2001
2002    /// A util trait to convert slices of different types to byte arrays
2003    trait ToByteArray<T: DataType> {
2004        #[allow(clippy::wrong_self_convention)]
2005        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2006    }
2007
2008    macro_rules! to_byte_array_impl {
2009        ($ty: ty) => {
2010            impl ToByteArray<$ty> for $ty {
2011                #[allow(clippy::wrong_self_convention)]
2012                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2013                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2014                }
2015            }
2016        };
2017    }
2018
2019    to_byte_array_impl!(Int32Type);
2020    to_byte_array_impl!(Int64Type);
2021    to_byte_array_impl!(FloatType);
2022    to_byte_array_impl!(DoubleType);
2023
2024    impl ToByteArray<BoolType> for BoolType {
2025        #[allow(clippy::wrong_self_convention)]
2026        fn to_byte_array(data: &[bool]) -> Vec<u8> {
2027            let mut v = vec![];
2028            for (i, item) in data.iter().enumerate() {
2029                if i % 8 == 0 {
2030                    v.push(0);
2031                }
2032                if *item {
2033                    v[i / 8] |= 1 << (i % 8);
2034                }
2035            }
2036            v
2037        }
2038    }
2039
2040    impl ToByteArray<Int96Type> for Int96Type {
2041        #[allow(clippy::wrong_self_convention)]
2042        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2043            let mut v = vec![];
2044            for d in data {
2045                v.extend_from_slice(d.as_bytes());
2046            }
2047            v
2048        }
2049    }
2050
2051    impl ToByteArray<ByteArrayType> for ByteArrayType {
2052        #[allow(clippy::wrong_self_convention)]
2053        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2054            let mut v = vec![];
2055            for d in data {
2056                let buf = d.data();
2057                let len = &usize_to_bytes(buf.len());
2058                v.extend_from_slice(len);
2059                v.extend(buf);
2060            }
2061            v
2062        }
2063    }
2064
2065    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2066        #[allow(clippy::wrong_self_convention)]
2067        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2068            let mut v = vec![];
2069            for d in data {
2070                let buf = d.data();
2071                v.extend(buf);
2072            }
2073            v
2074        }
2075    }
2076}