parquet/arrow/buffer/
offset_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
22use arrow_buffer::{ArrowNativeType, Buffer};
23use arrow_data::ArrayDataBuilder;
24use arrow_schema::DataType as ArrowType;
25
26/// A buffer of variable-sized byte arrays that can be converted into
27/// a corresponding [`ArrayRef`]
28#[derive(Debug)]
29pub struct OffsetBuffer<I: OffsetSizeTrait> {
30    pub offsets: Vec<I>,
31    pub values: Vec<u8>,
32}
33
34impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
35    fn default() -> Self {
36        let mut offsets = Vec::new();
37        offsets.resize(1, I::default());
38        Self {
39            offsets,
40            values: Vec::new(),
41        }
42    }
43}
44
45impl<I: OffsetSizeTrait> OffsetBuffer<I> {
46    /// Returns the number of byte arrays in this buffer
47    pub fn len(&self) -> usize {
48        self.offsets.len() - 1
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.len() == 0
53    }
54
55    /// If `validate_utf8` this verifies that the first character of `data` is
56    /// the start of a UTF-8 codepoint
57    ///
58    /// Note: This does not verify that the entirety of `data` is valid
59    /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
60    /// all data has been written
61    pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
62        if validate_utf8 {
63            if let Some(&b) = data.first() {
64                // A valid code-point iff it does not start with 0b10xxxxxx
65                // Bit-magic taken from `std::str::is_char_boundary`
66                if (b as i8) < -0x40 {
67                    return Err(ParquetError::General(
68                        "encountered non UTF-8 data".to_string(),
69                    ));
70                }
71            }
72        }
73
74        self.values.extend_from_slice(data);
75
76        let index_offset = I::from_usize(self.values.len())
77            .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
78
79        self.offsets.push(index_offset);
80        Ok(())
81    }
82
83    /// Extends this buffer with a list of keys
84    ///
85    /// For each value `key` in `keys` this will insert
86    /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
87    ///
88    /// Note: This will validate offsets are valid
89    pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
90        &mut self,
91        keys: &[K],
92        dict_offsets: &[V],
93        dict_values: &[u8],
94    ) -> Result<()> {
95        for key in keys {
96            let index = key.as_usize();
97            if index + 1 >= dict_offsets.len() {
98                return Err(general_err!(
99                    "dictionary key beyond bounds of dictionary: 0..{}",
100                    dict_offsets.len().saturating_sub(1)
101                ));
102            }
103            let start_offset = dict_offsets[index].as_usize();
104            let end_offset = dict_offsets[index + 1].as_usize();
105
106            // Dictionary values are verified when decoding dictionary page
107            self.try_push(&dict_values[start_offset..end_offset], false)?;
108        }
109        Ok(())
110    }
111
112    /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
113    ///
114    /// This MUST be combined with validating that the offsets start on a character
115    /// boundary, otherwise it would be possible for the values array to be a valid UTF-8
116    /// sequence, but not the individual string slices it contains
117    ///
118    /// [`Self::try_push`] can perform this validation check on insertion
119    pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
120        match std::str::from_utf8(&self.values.as_slice()[start_offset..]) {
121            Ok(_) => Ok(()),
122            Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
123        }
124    }
125
126    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
127    pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
128        let array_data_builder = ArrayDataBuilder::new(data_type)
129            .len(self.len())
130            .add_buffer(Buffer::from_vec(self.offsets))
131            .add_buffer(Buffer::from_vec(self.values))
132            .null_bit_buffer(null_buffer);
133
134        let data = match cfg!(debug_assertions) {
135            true => array_data_builder.build().unwrap(),
136            false => unsafe { array_data_builder.build_unchecked() },
137        };
138
139        make_array(data)
140    }
141}
142
143impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
144    fn pad_nulls(
145        &mut self,
146        read_offset: usize,
147        values_read: usize,
148        levels_read: usize,
149        valid_mask: &[u8],
150    ) {
151        assert_eq!(self.offsets.len(), read_offset + values_read + 1);
152        self.offsets
153            .resize(read_offset + levels_read + 1, I::default());
154
155        let offsets = &mut self.offsets;
156
157        let mut last_pos = read_offset + levels_read + 1;
158        let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
159
160        let values_range = read_offset..read_offset + values_read;
161        for (value_pos, level_pos) in values_range
162            .clone()
163            .rev()
164            .zip(iter_set_bits_rev(valid_mask))
165        {
166            assert!(level_pos >= value_pos);
167            assert!(level_pos < last_pos);
168
169            let end_offset = offsets[value_pos + 1];
170            let start_offset = offsets[value_pos];
171
172            // Fill in any nulls
173            for x in &mut offsets[level_pos + 1..last_pos] {
174                *x = end_offset;
175            }
176
177            if level_pos == value_pos {
178                return;
179            }
180
181            offsets[level_pos] = start_offset;
182            last_pos = level_pos;
183            last_start_offset = start_offset;
184        }
185
186        // Pad leading nulls up to `last_offset`
187        for x in &mut offsets[values_range.start + 1..last_pos] {
188            *x = last_start_offset
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use arrow_array::{Array, LargeStringArray, StringArray};
197
198    #[test]
199    fn test_offset_buffer_empty() {
200        let buffer = OffsetBuffer::<i32>::default();
201        let array = buffer.into_array(None, ArrowType::Utf8);
202        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
203        assert_eq!(strings.len(), 0);
204    }
205
206    #[test]
207    fn test_offset_buffer_append() {
208        let mut buffer = OffsetBuffer::<i64>::default();
209        buffer.try_push("hello".as_bytes(), true).unwrap();
210        buffer.try_push("bar".as_bytes(), true).unwrap();
211        buffer
212            .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
213            .unwrap();
214
215        let array = buffer.into_array(None, ArrowType::LargeUtf8);
216        let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
217        assert_eq!(
218            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
219            vec!["hello", "bar", "cd", "f", "ab", "e"]
220        )
221    }
222
223    #[test]
224    fn test_offset_buffer() {
225        let mut buffer = OffsetBuffer::<i32>::default();
226        for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
227            buffer.try_push(v.as_bytes(), false).unwrap()
228        }
229        let split = std::mem::take(&mut buffer);
230
231        let array = split.into_array(None, ArrowType::Utf8);
232        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
233        assert_eq!(
234            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
235            vec!["hello", "world", "cupcakes", "a", "b", "c"]
236        );
237
238        buffer.try_push("test".as_bytes(), false).unwrap();
239        let array = buffer.into_array(None, ArrowType::Utf8);
240        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
241        assert_eq!(
242            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
243            vec!["test"]
244        );
245    }
246
247    #[test]
248    fn test_offset_buffer_pad_nulls() {
249        let mut buffer = OffsetBuffer::<i32>::default();
250        let values = ["a", "b", "c", "def", "gh"];
251        for v in &values {
252            buffer.try_push(v.as_bytes(), false).unwrap()
253        }
254
255        let valid = [
256            true, false, false, true, false, true, false, true, true, false, false,
257        ];
258        let valid_mask = Buffer::from_iter(valid.iter().copied());
259
260        // Both trailing and leading nulls
261        buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
262
263        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
264        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
265        assert_eq!(
266            strings.iter().collect::<Vec<_>>(),
267            vec![
268                Some("a"),
269                None,
270                None,
271                Some("b"),
272                None,
273                Some("c"),
274                None,
275                Some("def"),
276                Some("gh"),
277                None,
278                None
279            ]
280        );
281    }
282
283    #[test]
284    fn test_utf8_validation() {
285        let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
286        std::str::from_utf8(valid_2_byte_utf8).unwrap();
287        let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
288        std::str::from_utf8(valid_3_byte_utf8).unwrap();
289        let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
290        std::str::from_utf8(valid_4_byte_utf8).unwrap();
291
292        let mut buffer = OffsetBuffer::<i32>::default();
293        buffer.try_push(valid_2_byte_utf8, true).unwrap();
294        buffer.try_push(valid_3_byte_utf8, true).unwrap();
295        buffer.try_push(valid_4_byte_utf8, true).unwrap();
296
297        // Cannot append string starting with incomplete codepoint
298        buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
299        buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
300        buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
301        buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
302        buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
303        buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
304
305        // Can append data containing an incomplete codepoint
306        buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
307
308        assert_eq!(buffer.len(), 4);
309        assert_eq!(buffer.values.len(), 11);
310
311        buffer.try_push(valid_3_byte_utf8, true).unwrap();
312
313        // Should fail due to incomplete codepoint
314        buffer.check_valid_utf8(0).unwrap_err();
315
316        // After broken codepoint -> success
317        buffer.check_valid_utf8(11).unwrap();
318
319        // Fails if run from middle of codepoint
320        buffer.check_valid_utf8(12).unwrap_err();
321    }
322
323    #[test]
324    fn test_pad_nulls_empty() {
325        let mut buffer = OffsetBuffer::<i32>::default();
326        let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
327        buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
328
329        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
330        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
331
332        assert_eq!(strings.len(), 9);
333        assert!(strings.iter().all(|x| x.is_none()))
334    }
335}