1use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
22use arrow_buffer::{ArrowNativeType, Buffer};
23use arrow_data::ArrayDataBuilder;
24use arrow_schema::DataType as ArrowType;
25
26#[derive(Debug)]
29pub struct OffsetBuffer<I: OffsetSizeTrait> {
30 pub offsets: Vec<I>,
31 pub values: Vec<u8>,
32}
33
34impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
35 fn default() -> Self {
36 let mut offsets = Vec::new();
37 offsets.resize(1, I::default());
38 Self {
39 offsets,
40 values: Vec::new(),
41 }
42 }
43}
44
45impl<I: OffsetSizeTrait> OffsetBuffer<I> {
46 pub fn len(&self) -> usize {
48 self.offsets.len() - 1
49 }
50
51 pub fn is_empty(&self) -> bool {
52 self.len() == 0
53 }
54
55 pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
62 if validate_utf8 {
63 if let Some(&b) = data.first() {
64 if (b as i8) < -0x40 {
67 return Err(ParquetError::General(
68 "encountered non UTF-8 data".to_string(),
69 ));
70 }
71 }
72 }
73
74 self.values.extend_from_slice(data);
75
76 let index_offset = I::from_usize(self.values.len())
77 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
78
79 self.offsets.push(index_offset);
80 Ok(())
81 }
82
83 pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
90 &mut self,
91 keys: &[K],
92 dict_offsets: &[V],
93 dict_values: &[u8],
94 ) -> Result<()> {
95 for key in keys {
96 let index = key.as_usize();
97 if index + 1 >= dict_offsets.len() {
98 return Err(general_err!(
99 "dictionary key beyond bounds of dictionary: 0..{}",
100 dict_offsets.len().saturating_sub(1)
101 ));
102 }
103 let start_offset = dict_offsets[index].as_usize();
104 let end_offset = dict_offsets[index + 1].as_usize();
105
106 self.try_push(&dict_values[start_offset..end_offset], false)?;
108 }
109 Ok(())
110 }
111
112 pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
120 match std::str::from_utf8(&self.values.as_slice()[start_offset..]) {
121 Ok(_) => Ok(()),
122 Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
123 }
124 }
125
126 pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
128 let array_data_builder = ArrayDataBuilder::new(data_type)
129 .len(self.len())
130 .add_buffer(Buffer::from_vec(self.offsets))
131 .add_buffer(Buffer::from_vec(self.values))
132 .null_bit_buffer(null_buffer);
133
134 let data = match cfg!(debug_assertions) {
135 true => array_data_builder.build().unwrap(),
136 false => unsafe { array_data_builder.build_unchecked() },
137 };
138
139 make_array(data)
140 }
141}
142
143impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
144 fn pad_nulls(
145 &mut self,
146 read_offset: usize,
147 values_read: usize,
148 levels_read: usize,
149 valid_mask: &[u8],
150 ) {
151 assert_eq!(self.offsets.len(), read_offset + values_read + 1);
152 self.offsets
153 .resize(read_offset + levels_read + 1, I::default());
154
155 let offsets = &mut self.offsets;
156
157 let mut last_pos = read_offset + levels_read + 1;
158 let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
159
160 let values_range = read_offset..read_offset + values_read;
161 for (value_pos, level_pos) in values_range
162 .clone()
163 .rev()
164 .zip(iter_set_bits_rev(valid_mask))
165 {
166 assert!(level_pos >= value_pos);
167 assert!(level_pos < last_pos);
168
169 let end_offset = offsets[value_pos + 1];
170 let start_offset = offsets[value_pos];
171
172 for x in &mut offsets[level_pos + 1..last_pos] {
174 *x = end_offset;
175 }
176
177 if level_pos == value_pos {
178 return;
179 }
180
181 offsets[level_pos] = start_offset;
182 last_pos = level_pos;
183 last_start_offset = start_offset;
184 }
185
186 for x in &mut offsets[values_range.start + 1..last_pos] {
188 *x = last_start_offset
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use arrow_array::{Array, LargeStringArray, StringArray};
197
198 #[test]
199 fn test_offset_buffer_empty() {
200 let buffer = OffsetBuffer::<i32>::default();
201 let array = buffer.into_array(None, ArrowType::Utf8);
202 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
203 assert_eq!(strings.len(), 0);
204 }
205
206 #[test]
207 fn test_offset_buffer_append() {
208 let mut buffer = OffsetBuffer::<i64>::default();
209 buffer.try_push("hello".as_bytes(), true).unwrap();
210 buffer.try_push("bar".as_bytes(), true).unwrap();
211 buffer
212 .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
213 .unwrap();
214
215 let array = buffer.into_array(None, ArrowType::LargeUtf8);
216 let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
217 assert_eq!(
218 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
219 vec!["hello", "bar", "cd", "f", "ab", "e"]
220 )
221 }
222
223 #[test]
224 fn test_offset_buffer() {
225 let mut buffer = OffsetBuffer::<i32>::default();
226 for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
227 buffer.try_push(v.as_bytes(), false).unwrap()
228 }
229 let split = std::mem::take(&mut buffer);
230
231 let array = split.into_array(None, ArrowType::Utf8);
232 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
233 assert_eq!(
234 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
235 vec!["hello", "world", "cupcakes", "a", "b", "c"]
236 );
237
238 buffer.try_push("test".as_bytes(), false).unwrap();
239 let array = buffer.into_array(None, ArrowType::Utf8);
240 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
241 assert_eq!(
242 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
243 vec!["test"]
244 );
245 }
246
247 #[test]
248 fn test_offset_buffer_pad_nulls() {
249 let mut buffer = OffsetBuffer::<i32>::default();
250 let values = ["a", "b", "c", "def", "gh"];
251 for v in &values {
252 buffer.try_push(v.as_bytes(), false).unwrap()
253 }
254
255 let valid = [
256 true, false, false, true, false, true, false, true, true, false, false,
257 ];
258 let valid_mask = Buffer::from_iter(valid.iter().copied());
259
260 buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
262
263 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
264 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
265 assert_eq!(
266 strings.iter().collect::<Vec<_>>(),
267 vec![
268 Some("a"),
269 None,
270 None,
271 Some("b"),
272 None,
273 Some("c"),
274 None,
275 Some("def"),
276 Some("gh"),
277 None,
278 None
279 ]
280 );
281 }
282
283 #[test]
284 fn test_utf8_validation() {
285 let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
286 std::str::from_utf8(valid_2_byte_utf8).unwrap();
287 let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
288 std::str::from_utf8(valid_3_byte_utf8).unwrap();
289 let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
290 std::str::from_utf8(valid_4_byte_utf8).unwrap();
291
292 let mut buffer = OffsetBuffer::<i32>::default();
293 buffer.try_push(valid_2_byte_utf8, true).unwrap();
294 buffer.try_push(valid_3_byte_utf8, true).unwrap();
295 buffer.try_push(valid_4_byte_utf8, true).unwrap();
296
297 buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
299 buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
300 buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
301 buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
302 buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
303 buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
304
305 buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
307
308 assert_eq!(buffer.len(), 4);
309 assert_eq!(buffer.values.len(), 11);
310
311 buffer.try_push(valid_3_byte_utf8, true).unwrap();
312
313 buffer.check_valid_utf8(0).unwrap_err();
315
316 buffer.check_valid_utf8(11).unwrap();
318
319 buffer.check_valid_utf8(12).unwrap_err();
321 }
322
323 #[test]
324 fn test_pad_nulls_empty() {
325 let mut buffer = OffsetBuffer::<i32>::default();
326 let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
327 buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
328
329 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
330 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
331
332 assert_eq!(strings.len(), 9);
333 assert!(strings.iter().all(|x| x.is_none()))
334 }
335}