parquet/arrow/array_reader/
fixed_len_byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::record_reader::buffer::ValuesBuffer;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30    ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
31    FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{Buffer, IntervalDayTime, i256};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
43pub fn make_fixed_len_byte_array_reader(
44    pages: Box<dyn PageIterator>,
45    column_desc: ColumnDescPtr,
46    arrow_type: Option<ArrowType>,
47) -> Result<Box<dyn ArrayReader>> {
48    // Check if Arrow type is specified, else create it from Parquet type
49    let data_type = match arrow_type {
50        Some(t) => t,
51        None => parquet_to_arrow_field(column_desc.as_ref())?
52            .data_type()
53            .clone(),
54    };
55
56    let byte_length = match column_desc.physical_type() {
57        Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
58        t => {
59            return Err(general_err!(
60                "invalid physical type for fixed length byte array reader - {}",
61                t
62            ));
63        }
64    };
65    match &data_type {
66        ArrowType::FixedSizeBinary(_) => {}
67        ArrowType::Decimal32(_, _) => {
68            if byte_length > 4 {
69                return Err(general_err!(
70                    "decimal 32 type too large, must be less then 4 bytes, got {}",
71                    byte_length
72                ));
73            }
74        }
75        ArrowType::Decimal64(_, _) => {
76            if byte_length > 8 {
77                return Err(general_err!(
78                    "decimal 64 type too large, must be less then 8 bytes, got {}",
79                    byte_length
80                ));
81            }
82        }
83        ArrowType::Decimal128(_, _) => {
84            if byte_length > 16 {
85                return Err(general_err!(
86                    "decimal 128 type too large, must be less than 16 bytes, got {}",
87                    byte_length
88                ));
89            }
90        }
91        ArrowType::Decimal256(_, _) => {
92            if byte_length > 32 {
93                return Err(general_err!(
94                    "decimal 256 type too large, must be less than 32 bytes, got {}",
95                    byte_length
96                ));
97            }
98        }
99        ArrowType::Interval(_) => {
100            if byte_length != 12 {
101                // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
102                return Err(general_err!(
103                    "interval type must consist of 12 bytes got {}",
104                    byte_length
105                ));
106            }
107        }
108        ArrowType::Float16 => {
109            if byte_length != 2 {
110                return Err(general_err!(
111                    "float 16 type must be 2 bytes, got {}",
112                    byte_length
113                ));
114            }
115        }
116        _ => {
117            return Err(general_err!(
118                "invalid data type for fixed length byte array reader - {}",
119                data_type
120            ));
121        }
122    }
123
124    Ok(Box::new(FixedLenByteArrayReader::new(
125        pages,
126        column_desc,
127        data_type,
128        byte_length,
129    )))
130}
131
132struct FixedLenByteArrayReader {
133    data_type: ArrowType,
134    byte_length: usize,
135    pages: Box<dyn PageIterator>,
136    def_levels_buffer: Option<Vec<i16>>,
137    rep_levels_buffer: Option<Vec<i16>>,
138    record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
139}
140
141impl FixedLenByteArrayReader {
142    fn new(
143        pages: Box<dyn PageIterator>,
144        column_desc: ColumnDescPtr,
145        data_type: ArrowType,
146        byte_length: usize,
147    ) -> Self {
148        Self {
149            data_type,
150            byte_length,
151            pages,
152            def_levels_buffer: None,
153            rep_levels_buffer: None,
154            record_reader: GenericRecordReader::new(column_desc),
155        }
156    }
157}
158
159impl ArrayReader for FixedLenByteArrayReader {
160    fn as_any(&self) -> &dyn Any {
161        self
162    }
163
164    fn get_data_type(&self) -> &ArrowType {
165        &self.data_type
166    }
167
168    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
169        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
170    }
171
172    fn consume_batch(&mut self) -> Result<ArrayRef> {
173        let record_data = self.record_reader.consume_record_data();
174
175        let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
176            .len(self.record_reader.num_values())
177            .add_buffer(Buffer::from_vec(record_data.buffer))
178            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
179
180        let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
181
182        // TODO: An improvement might be to do this conversion on read
183        // Note the conversions below apply to all elements regardless of null slots as the
184        // conversion lambdas are all infallible. This improves performance by avoiding a branch in
185        // the inner loop (see docs for `PrimitiveArray::from_unary`).
186        let array: ArrayRef = match &self.data_type {
187            ArrowType::Decimal32(p, s) => {
188                let f = |b: &[u8]| i32::from_be_bytes(sign_extend_be(b));
189                Arc::new(Decimal32Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
190                    as ArrayRef
191            }
192            ArrowType::Decimal64(p, s) => {
193                let f = |b: &[u8]| i64::from_be_bytes(sign_extend_be(b));
194                Arc::new(Decimal64Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
195                    as ArrayRef
196            }
197            ArrowType::Decimal128(p, s) => {
198                let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
199                Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
200                    as ArrayRef
201            }
202            ArrowType::Decimal256(p, s) => {
203                let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
204                Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
205                    as ArrayRef
206            }
207            ArrowType::Interval(unit) => {
208                // An interval is stored as 3x 32-bit unsigned integers storing months, days,
209                // and milliseconds
210                match unit {
211                    IntervalUnit::YearMonth => {
212                        let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
213                        Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
214                    }
215                    IntervalUnit::DayTime => {
216                        let f = |b: &[u8]| {
217                            IntervalDayTime::new(
218                                i32::from_le_bytes(b[4..8].try_into().unwrap()),
219                                i32::from_le_bytes(b[8..12].try_into().unwrap()),
220                            )
221                        };
222                        Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
223                    }
224                    IntervalUnit::MonthDayNano => {
225                        return Err(nyi_err!("MonthDayNano intervals not supported"));
226                    }
227                }
228            }
229            ArrowType::Float16 => {
230                let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
231                Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
232            }
233            _ => Arc::new(binary) as ArrayRef,
234        };
235
236        self.def_levels_buffer = self.record_reader.consume_def_levels();
237        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
238        self.record_reader.reset();
239
240        Ok(array)
241    }
242
243    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
244        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
245    }
246
247    fn get_def_levels(&self) -> Option<&[i16]> {
248        self.def_levels_buffer.as_deref()
249    }
250
251    fn get_rep_levels(&self) -> Option<&[i16]> {
252        self.rep_levels_buffer.as_deref()
253    }
254}
255
256#[derive(Default)]
257struct FixedLenByteArrayBuffer {
258    buffer: Vec<u8>,
259    /// The length of each element in bytes
260    byte_length: Option<usize>,
261}
262
263#[inline]
264fn move_values<F>(
265    buffer: &mut Vec<u8>,
266    byte_length: usize,
267    values_range: Range<usize>,
268    valid_mask: &[u8],
269    mut op: F,
270) where
271    F: FnMut(&mut Vec<u8>, usize, usize, usize),
272{
273    for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
274        debug_assert!(level_pos >= value_pos);
275        if level_pos <= value_pos {
276            break;
277        }
278
279        let level_pos_bytes = level_pos * byte_length;
280        let value_pos_bytes = value_pos * byte_length;
281
282        op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
283    }
284}
285
286impl ValuesBuffer for FixedLenByteArrayBuffer {
287    fn pad_nulls(
288        &mut self,
289        read_offset: usize,
290        values_read: usize,
291        levels_read: usize,
292        valid_mask: &[u8],
293    ) {
294        let byte_length = self.byte_length.unwrap_or_default();
295
296        assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
297        self.buffer
298            .resize((read_offset + levels_read) * byte_length, 0);
299
300        let values_range = read_offset..read_offset + values_read;
301        // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4,
302        // the simple loop is preferred as the compiler can eliminate the loop via unrolling.
303        // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows
304        // the loop to be vectorized, yielding much better performance.
305        const VEC_CUTOFF: usize = 4;
306        if byte_length > VEC_CUTOFF {
307            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
308                let split = buffer.split_at_mut(level_pos_bytes);
309                let dst = &mut split.1[..byte_length];
310                let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
311                dst.copy_from_slice(src);
312            };
313            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
314        } else {
315            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
316                for i in 0..byte_length {
317                    buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
318                }
319            };
320            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
321        }
322    }
323}
324
325struct ValueDecoder {
326    byte_length: usize,
327    dict_page: Option<Bytes>,
328    decoder: Option<Decoder>,
329}
330
331impl ColumnValueDecoder for ValueDecoder {
332    type Buffer = FixedLenByteArrayBuffer;
333
334    fn new(col: &ColumnDescPtr) -> Self {
335        Self {
336            byte_length: col.type_length() as usize,
337            dict_page: None,
338            decoder: None,
339        }
340    }
341
342    fn set_dict(
343        &mut self,
344        buf: Bytes,
345        num_values: u32,
346        encoding: Encoding,
347        _is_sorted: bool,
348    ) -> Result<()> {
349        if !matches!(
350            encoding,
351            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
352        ) {
353            return Err(nyi_err!(
354                "Invalid/Unsupported encoding type for dictionary: {}",
355                encoding
356            ));
357        }
358        let expected_len = num_values as usize * self.byte_length;
359        if expected_len > buf.len() {
360            return Err(general_err!(
361                "too few bytes in dictionary page, expected {} got {}",
362                expected_len,
363                buf.len()
364            ));
365        }
366
367        self.dict_page = Some(buf);
368        Ok(())
369    }
370
371    fn set_data(
372        &mut self,
373        encoding: Encoding,
374        data: Bytes,
375        num_levels: usize,
376        num_values: Option<usize>,
377    ) -> Result<()> {
378        self.decoder = Some(match encoding {
379            Encoding::PLAIN => Decoder::Plain {
380                buf: data,
381                offset: 0,
382            },
383            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
384                decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
385            },
386            Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
387                decoder: DeltaByteArrayDecoder::new(data)?,
388            },
389            Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
390                buf: data,
391                offset: 0,
392            },
393            _ => {
394                return Err(general_err!(
395                    "unsupported encoding for fixed length byte array: {}",
396                    encoding
397                ));
398            }
399        });
400        Ok(())
401    }
402
403    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
404        match out.byte_length {
405            Some(x) => assert_eq!(x, self.byte_length),
406            None => out.byte_length = Some(self.byte_length),
407        }
408
409        match self.decoder.as_mut().unwrap() {
410            Decoder::Plain { offset, buf } => {
411                let to_read =
412                    (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
413                let end_offset = *offset + to_read * self.byte_length;
414                out.buffer
415                    .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
416                *offset = end_offset;
417                Ok(to_read)
418            }
419            Decoder::Dict { decoder } => {
420                let dict = self.dict_page.as_ref().unwrap();
421                // All data must be NULL
422                if dict.is_empty() {
423                    return Ok(0);
424                }
425
426                decoder.read(num_values, |keys| {
427                    out.buffer.reserve(keys.len() * self.byte_length);
428                    for key in keys {
429                        let offset = *key as usize * self.byte_length;
430                        let val = &dict.as_ref()[offset..offset + self.byte_length];
431                        out.buffer.extend_from_slice(val);
432                    }
433                    Ok(())
434                })
435            }
436            Decoder::Delta { decoder } => {
437                let to_read = num_values.min(decoder.remaining());
438                out.buffer.reserve(to_read * self.byte_length);
439
440                decoder.read(to_read, |slice| {
441                    if slice.len() != self.byte_length {
442                        return Err(general_err!(
443                            "encountered array with incorrect length, got {} expected {}",
444                            slice.len(),
445                            self.byte_length
446                        ));
447                    }
448                    out.buffer.extend_from_slice(slice);
449                    Ok(())
450                })
451            }
452            Decoder::ByteStreamSplit { buf, offset } => {
453                // we have n=`byte_length` streams of length `buf.len/byte_length`
454                // to read value i, we need the i'th byte from each of the streams
455                // so `offset` should be the value offset, not the byte offset
456                let total_values = buf.len() / self.byte_length;
457                let to_read = num_values.min(total_values - *offset);
458
459                // now read the n streams and reassemble values into the output buffer
460                read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
461
462                *offset += to_read;
463                Ok(to_read)
464            }
465        }
466    }
467
468    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
469        match self.decoder.as_mut().unwrap() {
470            Decoder::Plain { offset, buf } => {
471                let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
472                *offset += to_read * self.byte_length;
473                Ok(to_read)
474            }
475            Decoder::Dict { decoder } => decoder.skip(num_values),
476            Decoder::Delta { decoder } => decoder.skip(num_values),
477            Decoder::ByteStreamSplit { offset, buf } => {
478                let total_values = buf.len() / self.byte_length;
479                let to_read = num_values.min(total_values - *offset);
480                *offset += to_read;
481                Ok(to_read)
482            }
483        }
484    }
485}
486
487// `src` is an array laid out like a NxM matrix where N == `data_width` and
488// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, with stream `i`
489// containing the `ith` byte for each value. Each "column" is a single value.
490// This will reassemble `num_values` values by reading columns of the matrix starting at
491// `offset`. Values will be appended to `dst`.
492fn read_byte_stream_split(
493    dst: &mut Vec<u8>,
494    src: &mut Bytes,
495    offset: usize,
496    num_values: usize,
497    data_width: usize,
498) {
499    let stride = src.len() / data_width;
500    let idx = dst.len();
501    dst.resize(idx + num_values * data_width, 0u8);
502    let dst_slc = &mut dst[idx..idx + num_values * data_width];
503    for j in 0..data_width {
504        let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
505        for i in 0..num_values {
506            dst_slc[i * data_width + j] = src_slc[i];
507        }
508    }
509}
510
511enum Decoder {
512    Plain { buf: Bytes, offset: usize },
513    Dict { decoder: DictIndexDecoder },
514    Delta { decoder: DeltaByteArrayDecoder },
515    ByteStreamSplit { buf: Bytes, offset: usize },
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::arrow::ArrowWriter;
522    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
523    use arrow::datatypes::Field;
524    use arrow::error::Result as ArrowResult;
525    use arrow_array::{Array, ListArray};
526    use arrow_array::{Decimal256Array, RecordBatch};
527    use bytes::Bytes;
528    use std::sync::Arc;
529
530    #[test]
531    fn test_decimal_list() {
532        let decimals = Decimal256Array::from_iter_values(
533            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
534        );
535
536        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
537        let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
538            decimals.data_type().clone(),
539            false,
540        ))))
541        .len(7)
542        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
543        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
544        .child_data(vec![decimals.into_data()])
545        .build()
546        .unwrap();
547
548        let written =
549            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
550                .unwrap();
551
552        let mut buffer = Vec::with_capacity(1024);
553        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
554        writer.write(&written).unwrap();
555        writer.close().unwrap();
556
557        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
558            .unwrap()
559            .collect::<ArrowResult<Vec<_>>>()
560            .unwrap();
561
562        assert_eq!(&written.slice(0, 3), &read[0]);
563        assert_eq!(&written.slice(3, 3), &read[1]);
564        assert_eq!(&written.slice(6, 1), &read[2]);
565    }
566}