parquet/arrow/array_reader/
byte_view_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use arrow_array::{builder::make_view, ArrayRef};
31use arrow_buffer::Buffer;
32use arrow_data::ByteView;
33use arrow_schema::DataType as ArrowType;
34use bytes::Bytes;
35use std::any::Any;
36
37/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
38pub fn make_byte_view_array_reader(
39    pages: Box<dyn PageIterator>,
40    column_desc: ColumnDescPtr,
41    arrow_type: Option<ArrowType>,
42) -> Result<Box<dyn ArrayReader>> {
43    // Check if Arrow type is specified, else create it from Parquet type
44    let data_type = match arrow_type {
45        Some(t) => t,
46        None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
47            ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
48            _ => ArrowType::BinaryView,
49        },
50    };
51
52    match data_type {
53        ArrowType::BinaryView | ArrowType::Utf8View => {
54            let reader = GenericRecordReader::new(column_desc);
55            Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
56        }
57
58        _ => Err(general_err!(
59            "invalid data type for byte array reader read to view type - {}",
60            data_type
61        )),
62    }
63}
64
65/// An [`ArrayReader`] for variable length byte arrays
66struct ByteViewArrayReader {
67    data_type: ArrowType,
68    pages: Box<dyn PageIterator>,
69    def_levels_buffer: Option<Vec<i16>>,
70    rep_levels_buffer: Option<Vec<i16>>,
71    record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
72}
73
74impl ByteViewArrayReader {
75    fn new(
76        pages: Box<dyn PageIterator>,
77        data_type: ArrowType,
78        record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
79    ) -> Self {
80        Self {
81            data_type,
82            pages,
83            def_levels_buffer: None,
84            rep_levels_buffer: None,
85            record_reader,
86        }
87    }
88}
89
90impl ArrayReader for ByteViewArrayReader {
91    fn as_any(&self) -> &dyn Any {
92        self
93    }
94
95    fn get_data_type(&self) -> &ArrowType {
96        &self.data_type
97    }
98
99    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
100        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
101    }
102
103    fn consume_batch(&mut self) -> Result<ArrayRef> {
104        let buffer = self.record_reader.consume_record_data();
105        let null_buffer = self.record_reader.consume_bitmap_buffer();
106        self.def_levels_buffer = self.record_reader.consume_def_levels();
107        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
108        self.record_reader.reset();
109
110        let array = buffer.into_array(null_buffer, &self.data_type);
111
112        Ok(array)
113    }
114
115    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
116        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
117    }
118
119    fn get_def_levels(&self) -> Option<&[i16]> {
120        self.def_levels_buffer.as_deref()
121    }
122
123    fn get_rep_levels(&self) -> Option<&[i16]> {
124        self.rep_levels_buffer.as_deref()
125    }
126}
127
128/// A [`ColumnValueDecoder`] for variable length byte arrays
129struct ByteViewArrayColumnValueDecoder {
130    dict: Option<ViewBuffer>,
131    decoder: Option<ByteViewArrayDecoder>,
132    validate_utf8: bool,
133}
134
135impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
136    type Buffer = ViewBuffer;
137
138    fn new(desc: &ColumnDescPtr) -> Self {
139        let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
140        Self {
141            dict: None,
142            decoder: None,
143            validate_utf8,
144        }
145    }
146
147    fn set_dict(
148        &mut self,
149        buf: Bytes,
150        num_values: u32,
151        encoding: Encoding,
152        _is_sorted: bool,
153    ) -> Result<()> {
154        if !matches!(
155            encoding,
156            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
157        ) {
158            return Err(nyi_err!(
159                "Invalid/Unsupported encoding type for dictionary: {}",
160                encoding
161            ));
162        }
163
164        let mut buffer = ViewBuffer::default();
165        let mut decoder = ByteViewArrayDecoderPlain::new(
166            buf,
167            num_values as usize,
168            Some(num_values as usize),
169            self.validate_utf8,
170        );
171        decoder.read(&mut buffer, usize::MAX)?;
172        self.dict = Some(buffer);
173        Ok(())
174    }
175
176    fn set_data(
177        &mut self,
178        encoding: Encoding,
179        data: Bytes,
180        num_levels: usize,
181        num_values: Option<usize>,
182    ) -> Result<()> {
183        self.decoder = Some(ByteViewArrayDecoder::new(
184            encoding,
185            data,
186            num_levels,
187            num_values,
188            self.validate_utf8,
189        )?);
190        Ok(())
191    }
192
193    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
194        let decoder = self
195            .decoder
196            .as_mut()
197            .ok_or_else(|| general_err!("no decoder set"))?;
198
199        decoder.read(out, num_values, self.dict.as_ref())
200    }
201
202    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
203        let decoder = self
204            .decoder
205            .as_mut()
206            .ok_or_else(|| general_err!("no decoder set"))?;
207
208        decoder.skip(num_values, self.dict.as_ref())
209    }
210}
211
212/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
213pub enum ByteViewArrayDecoder {
214    Plain(ByteViewArrayDecoderPlain),
215    Dictionary(ByteViewArrayDecoderDictionary),
216    DeltaLength(ByteViewArrayDecoderDeltaLength),
217    DeltaByteArray(ByteViewArrayDecoderDelta),
218}
219
220impl ByteViewArrayDecoder {
221    pub fn new(
222        encoding: Encoding,
223        data: Bytes,
224        num_levels: usize,
225        num_values: Option<usize>,
226        validate_utf8: bool,
227    ) -> Result<Self> {
228        let decoder = match encoding {
229            Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
230                data,
231                num_levels,
232                num_values,
233                validate_utf8,
234            )),
235            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
236                ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
237                    data, num_levels, num_values,
238                ))
239            }
240            Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
241                ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
242            ),
243            Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
244                ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
245            ),
246            _ => {
247                return Err(general_err!(
248                    "unsupported encoding for byte array: {}",
249                    encoding
250                ))
251            }
252        };
253
254        Ok(decoder)
255    }
256
257    /// Read up to `len` values to `out` with the optional dictionary
258    pub fn read(
259        &mut self,
260        out: &mut ViewBuffer,
261        len: usize,
262        dict: Option<&ViewBuffer>,
263    ) -> Result<usize> {
264        match self {
265            ByteViewArrayDecoder::Plain(d) => d.read(out, len),
266            ByteViewArrayDecoder::Dictionary(d) => {
267                let dict = dict
268                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
269                d.read(out, dict, len)
270            }
271            ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
272            ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
273        }
274    }
275
276    /// Skip `len` values
277    pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
278        match self {
279            ByteViewArrayDecoder::Plain(d) => d.skip(len),
280            ByteViewArrayDecoder::Dictionary(d) => {
281                let dict = dict
282                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
283                d.skip(dict, len)
284            }
285            ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
286            ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
287        }
288    }
289}
290
291/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
292pub struct ByteViewArrayDecoderPlain {
293    buf: Bytes,
294    offset: usize,
295
296    validate_utf8: bool,
297
298    /// This is a maximum as the null count is not always known, e.g. value data from
299    /// a v1 data page
300    max_remaining_values: usize,
301}
302
303impl ByteViewArrayDecoderPlain {
304    pub fn new(
305        buf: Bytes,
306        num_levels: usize,
307        num_values: Option<usize>,
308        validate_utf8: bool,
309    ) -> Self {
310        Self {
311            buf,
312            offset: 0,
313            max_remaining_values: num_values.unwrap_or(num_levels),
314            validate_utf8,
315        }
316    }
317
318    pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
319        // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
320        // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
321        let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
322        let block_id = output.append_block(buf);
323
324        let to_read = len.min(self.max_remaining_values);
325
326        let buf = self.buf.as_ref();
327        let mut read = 0;
328        output.views.reserve(to_read);
329
330        let mut utf8_validation_begin = self.offset;
331        while self.offset < self.buf.len() && read != to_read {
332            if self.offset + 4 > self.buf.len() {
333                return Err(ParquetError::EOF("eof decoding byte array".into()));
334            }
335            let len_bytes: [u8; 4] = unsafe {
336                buf.get_unchecked(self.offset..self.offset + 4)
337                    .try_into()
338                    .unwrap()
339            };
340            let len = u32::from_le_bytes(len_bytes);
341
342            let start_offset = self.offset + 4;
343            let end_offset = start_offset + len as usize;
344            if end_offset > buf.len() {
345                return Err(ParquetError::EOF("eof decoding byte array".into()));
346            }
347
348            if self.validate_utf8 {
349                // It seems you are trying to understand what's going on here, take a breath and be patient.
350                // Utf-8 validation is a non-trivial task, here are some background facts:
351                // (1) Validating one 2048-byte string is much faster than validating 128 of 16-byte string.
352                //     As shown in https://github.com/apache/arrow-rs/pull/6009#issuecomment-2211174229
353                //     Potentially because the SIMD operations favor longer strings.
354                // (2) Practical strings are short, 99% of strings are smaller than 100 bytes, as shown in paper:
355                //     https://www.vldb.org/pvldb/vol17/p148-zeng.pdf, Figure 5f.
356                // (3) Parquet plain encoding makes utf-8 validation harder,
357                //     because it stores the length of each string right before the string.
358                //     This means naive utf-8 validation will be slow, because the validation need to skip the length bytes.
359                //     I.e., the validation cannot validate the buffer in one pass, but instead, validate strings chunk by chunk.
360                //
361                // Given the above observations, the goal is to do batch validation as much as possible.
362                // The key idea is that if the length is smaller than 128 (99% of the case), then the length bytes are valid utf-8, as reasoned below:
363                // If the length is smaller than 128, its 4-byte encoding are [0, 0, 0, len].
364                // Each of the byte is a valid ASCII character, so they are valid utf-8.
365                // Since they are all smaller than 128, the won't break a utf-8 code point (won't mess with later bytes).
366                //
367                // The implementation keeps a water mark `utf8_validation_begin` to track the beginning of the buffer that is not validated.
368                // If the length is smaller than 128, then we continue to next string.
369                // If the length is larger than 128, then we validate the buffer before the length bytes, and move the water mark to the beginning of next string.
370                if len < 128 {
371                    // fast path, move to next string.
372                    // the len bytes are valid utf8.
373                } else {
374                    // unfortunately, the len bytes may not be valid utf8, we need to wrap up and validate everything before it.
375                    check_valid_utf8(unsafe {
376                        buf.get_unchecked(utf8_validation_begin..self.offset)
377                    })?;
378                    // move the cursor to skip the len bytes.
379                    utf8_validation_begin = start_offset;
380                }
381            }
382
383            unsafe {
384                output.append_view_unchecked(block_id, start_offset as u32, len);
385            }
386            self.offset = end_offset;
387            read += 1;
388        }
389
390        // validate the last part of the buffer
391        if self.validate_utf8 {
392            check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..self.offset) })?;
393        }
394
395        self.max_remaining_values -= to_read;
396        Ok(to_read)
397    }
398
399    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
400        let to_skip = to_skip.min(self.max_remaining_values);
401        let mut skip = 0;
402        let buf = self.buf.as_ref();
403
404        while self.offset < self.buf.len() && skip != to_skip {
405            if self.offset + 4 > buf.len() {
406                return Err(ParquetError::EOF("eof decoding byte array".into()));
407            }
408            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
409            let len = u32::from_le_bytes(len_bytes) as usize;
410            skip += 1;
411            self.offset = self.offset + 4 + len;
412        }
413        self.max_remaining_values -= skip;
414        Ok(skip)
415    }
416}
417
418pub struct ByteViewArrayDecoderDictionary {
419    decoder: DictIndexDecoder,
420}
421
422impl ByteViewArrayDecoderDictionary {
423    fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
424        Self {
425            decoder: DictIndexDecoder::new(data, num_levels, num_values),
426        }
427    }
428
429    /// Reads the next indexes from self.decoder
430    /// the indexes are assumed to be indexes into `dict`
431    /// the output values are written to output
432    ///
433    /// Assumptions / Optimization
434    /// This function checks if dict.buffers() are the last buffers in `output`, and if so
435    /// reuses the dictionary page buffers directly without copying data
436    fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
437        if dict.is_empty() || len == 0 {
438            return Ok(0);
439        }
440
441        // Check if the last few buffer of `output`` are the same as the `dict` buffer
442        // This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
443        let need_to_create_new_buffer = {
444            if output.buffers.len() >= dict.buffers.len() {
445                let offset = output.buffers.len() - dict.buffers.len();
446                output.buffers[offset..]
447                    .iter()
448                    .zip(dict.buffers.iter())
449                    .any(|(a, b)| !a.ptr_eq(b))
450            } else {
451                true
452            }
453        };
454
455        if need_to_create_new_buffer {
456            for b in dict.buffers.iter() {
457                output.buffers.push(b.clone());
458            }
459        }
460
461        // Calculate the offset of the dictionary buffers in the output buffers
462        // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
463        // then the base_buffer_idx is 5 - 2 = 3
464        let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
465
466        self.decoder.read(len, |keys| {
467            for k in keys {
468                let view = dict
469                    .views
470                    .get(*k as usize)
471                    .ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
472                let len = *view as u32;
473                if len <= 12 {
474                    // directly append the view if it is inlined
475                    // Safety: the view is from the dictionary, so it is valid
476                    unsafe {
477                        output.append_raw_view_unchecked(view);
478                    }
479                } else {
480                    // correct the buffer index and append the view
481                    let mut view = ByteView::from(*view);
482                    view.buffer_index += base_buffer_idx;
483                    // Safety: the view is from the dictionary,
484                    // we corrected the index value to point it to output buffer, so it is valid
485                    unsafe {
486                        output.append_raw_view_unchecked(&view.into());
487                    }
488                }
489            }
490            Ok(())
491        })
492    }
493
494    fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
495        if dict.is_empty() {
496            return Ok(0);
497        }
498        self.decoder.skip(to_skip)
499    }
500}
501
502/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
503pub struct ByteViewArrayDecoderDeltaLength {
504    lengths: Vec<i32>,
505    data: Bytes,
506    length_offset: usize,
507    data_offset: usize,
508    validate_utf8: bool,
509}
510
511impl ByteViewArrayDecoderDeltaLength {
512    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
513        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
514        len_decoder.set_data(data.clone(), 0)?;
515        let values = len_decoder.values_left();
516
517        let mut lengths = vec![0; values];
518        len_decoder.get(&mut lengths)?;
519
520        let mut total_bytes = 0;
521
522        for l in lengths.iter() {
523            if *l < 0 {
524                return Err(ParquetError::General(
525                    "negative delta length byte array length".to_string(),
526                ));
527            }
528            total_bytes += *l as usize;
529        }
530
531        if total_bytes + len_decoder.get_offset() > data.len() {
532            return Err(ParquetError::General(
533                "Insufficient delta length byte array bytes".to_string(),
534            ));
535        }
536
537        Ok(Self {
538            lengths,
539            data,
540            validate_utf8,
541            length_offset: 0,
542            data_offset: len_decoder.get_offset(),
543        })
544    }
545
546    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
547        let to_read = len.min(self.lengths.len() - self.length_offset);
548        output.views.reserve(to_read);
549
550        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
551
552        // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
553        // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
554        let bytes = arrow_buffer::Buffer::from_bytes(self.data.clone().into());
555        let block_id = output.append_block(bytes);
556
557        let mut current_offset = self.data_offset;
558        let initial_offset = current_offset;
559        for length in src_lengths {
560            // # Safety
561            // The length is from the delta length decoder, so it is valid
562            // The start_offset is calculated from the lengths, so it is valid
563            // `start_offset + length` is guaranteed to be within the bounds of `data`, as checked in `new`
564            unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }
565
566            current_offset += *length as usize;
567        }
568
569        // Delta length encoding has continuous strings, we can validate utf8 in one go
570        if self.validate_utf8 {
571            check_valid_utf8(&self.data[initial_offset..current_offset])?;
572        }
573
574        self.data_offset = current_offset;
575        self.length_offset += to_read;
576
577        Ok(to_read)
578    }
579
580    fn skip(&mut self, to_skip: usize) -> Result<usize> {
581        let remain_values = self.lengths.len() - self.length_offset;
582        let to_skip = remain_values.min(to_skip);
583
584        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
585        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
586
587        self.data_offset += total_bytes;
588        self.length_offset += to_skip;
589        Ok(to_skip)
590    }
591}
592
593/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
594pub struct ByteViewArrayDecoderDelta {
595    decoder: DeltaByteArrayDecoder,
596    validate_utf8: bool,
597}
598
599impl ByteViewArrayDecoderDelta {
600    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
601        Ok(Self {
602            decoder: DeltaByteArrayDecoder::new(data)?,
603            validate_utf8,
604        })
605    }
606
607    // Unlike other encodings, we need to copy the data.
608    //
609    //  DeltaByteArray data is stored using shared prefixes/suffixes,
610    // which results in potentially non-contiguous
611    // strings, while Arrow encodings require contiguous strings
612    //
613    // <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
614
615    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
616        output.views.reserve(len.min(self.decoder.remaining()));
617
618        // array buffer only have long strings
619        let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
620
621        let buffer_id = output.buffers.len() as u32;
622
623        let read = if !self.validate_utf8 {
624            self.decoder.read(len, |bytes| {
625                let offset = array_buffer.len();
626                let view = make_view(bytes, buffer_id, offset as u32);
627                if bytes.len() > 12 {
628                    // only copy the data to buffer if the string can not be inlined.
629                    array_buffer.extend_from_slice(bytes);
630                }
631
632                // # Safety
633                // The buffer_id is the last buffer in the output buffers
634                // The offset is calculated from the buffer, so it is valid
635                unsafe {
636                    output.append_raw_view_unchecked(&view);
637                }
638                Ok(())
639            })?
640        } else {
641            // utf8 validation buffer has only short strings. These short
642            // strings are inlined into the views but we copy them into a
643            // contiguous buffer to accelerate validation.®
644            let mut utf8_validation_buffer = Vec::with_capacity(4096);
645
646            let v = self.decoder.read(len, |bytes| {
647                let offset = array_buffer.len();
648                let view = make_view(bytes, buffer_id, offset as u32);
649                if bytes.len() > 12 {
650                    // only copy the data to buffer if the string can not be inlined.
651                    array_buffer.extend_from_slice(bytes);
652                } else {
653                    utf8_validation_buffer.extend_from_slice(bytes);
654                }
655
656                // # Safety
657                // The buffer_id is the last buffer in the output buffers
658                // The offset is calculated from the buffer, so it is valid
659                // Utf-8 validation is done later
660                unsafe {
661                    output.append_raw_view_unchecked(&view);
662                }
663                Ok(())
664            })?;
665            check_valid_utf8(&array_buffer)?;
666            check_valid_utf8(&utf8_validation_buffer)?;
667            v
668        };
669
670        let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
671        assert_eq!(actual_block_id, buffer_id);
672        Ok(read)
673    }
674
675    fn skip(&mut self, to_skip: usize) -> Result<usize> {
676        self.decoder.skip(to_skip)
677    }
678}
679
680/// Check that `val` is a valid UTF-8 sequence
681pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
682    match std::str::from_utf8(val) {
683        Ok(_) => Ok(()),
684        Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use arrow_array::StringViewArray;
691    use arrow_buffer::Buffer;
692
693    use crate::{
694        arrow::{
695            array_reader::test_util::{byte_array_all_encodings, utf8_column},
696            buffer::view_buffer::ViewBuffer,
697            record_reader::buffer::ValuesBuffer,
698        },
699        basic::Encoding,
700        column::reader::decoder::ColumnValueDecoder,
701    };
702
703    use super::*;
704
705    #[test]
706    fn test_byte_array_string_view_decoder() {
707        let (pages, encoded_dictionary) =
708            byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
709
710        let column_desc = utf8_column();
711        let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
712
713        decoder
714            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
715            .unwrap();
716
717        for (encoding, page) in pages {
718            let mut output = ViewBuffer::default();
719            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
720
721            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
722            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
723            assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
724            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
725
726            assert_eq!(output.views.len(), 4);
727
728            let valid = [false, false, true, true, false, true, true, false, false];
729            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
730
731            output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
732            let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
733            let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
734
735            assert_eq!(
736                strings.iter().collect::<Vec<_>>(),
737                vec![
738                    None,
739                    None,
740                    Some("hello"),
741                    Some("world"),
742                    None,
743                    Some("large payload over 12 bytes"),
744                    Some("b"),
745                    None,
746                    None,
747                ]
748            );
749        }
750    }
751}