1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::record_reader::buffer::ValuesBuffer;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30 ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
31 FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{Buffer, IntervalDayTime, i256};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42pub fn make_fixed_len_byte_array_reader(
44 pages: Box<dyn PageIterator>,
45 column_desc: ColumnDescPtr,
46 arrow_type: Option<ArrowType>,
47) -> Result<Box<dyn ArrayReader>> {
48 let data_type = match arrow_type {
50 Some(t) => t,
51 None => parquet_to_arrow_field(column_desc.as_ref())?
52 .data_type()
53 .clone(),
54 };
55
56 let byte_length = match column_desc.physical_type() {
57 Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
58 t => {
59 return Err(general_err!(
60 "invalid physical type for fixed length byte array reader - {}",
61 t
62 ));
63 }
64 };
65 match &data_type {
66 ArrowType::FixedSizeBinary(_) => {}
67 ArrowType::Decimal32(_, _) => {
68 if byte_length > 4 {
69 return Err(general_err!(
70 "decimal 32 type too large, must be less then 4 bytes, got {}",
71 byte_length
72 ));
73 }
74 }
75 ArrowType::Decimal64(_, _) => {
76 if byte_length > 8 {
77 return Err(general_err!(
78 "decimal 64 type too large, must be less then 8 bytes, got {}",
79 byte_length
80 ));
81 }
82 }
83 ArrowType::Decimal128(_, _) => {
84 if byte_length > 16 {
85 return Err(general_err!(
86 "decimal 128 type too large, must be less than 16 bytes, got {}",
87 byte_length
88 ));
89 }
90 }
91 ArrowType::Decimal256(_, _) => {
92 if byte_length > 32 {
93 return Err(general_err!(
94 "decimal 256 type too large, must be less than 32 bytes, got {}",
95 byte_length
96 ));
97 }
98 }
99 ArrowType::Interval(_) => {
100 if byte_length != 12 {
101 return Err(general_err!(
103 "interval type must consist of 12 bytes got {}",
104 byte_length
105 ));
106 }
107 }
108 ArrowType::Float16 => {
109 if byte_length != 2 {
110 return Err(general_err!(
111 "float 16 type must be 2 bytes, got {}",
112 byte_length
113 ));
114 }
115 }
116 _ => {
117 return Err(general_err!(
118 "invalid data type for fixed length byte array reader - {}",
119 data_type
120 ));
121 }
122 }
123
124 Ok(Box::new(FixedLenByteArrayReader::new(
125 pages,
126 column_desc,
127 data_type,
128 byte_length,
129 )))
130}
131
132struct FixedLenByteArrayReader {
133 data_type: ArrowType,
134 byte_length: usize,
135 pages: Box<dyn PageIterator>,
136 def_levels_buffer: Option<Vec<i16>>,
137 rep_levels_buffer: Option<Vec<i16>>,
138 record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
139}
140
141impl FixedLenByteArrayReader {
142 fn new(
143 pages: Box<dyn PageIterator>,
144 column_desc: ColumnDescPtr,
145 data_type: ArrowType,
146 byte_length: usize,
147 ) -> Self {
148 Self {
149 data_type,
150 byte_length,
151 pages,
152 def_levels_buffer: None,
153 rep_levels_buffer: None,
154 record_reader: GenericRecordReader::new(column_desc),
155 }
156 }
157}
158
159impl ArrayReader for FixedLenByteArrayReader {
160 fn as_any(&self) -> &dyn Any {
161 self
162 }
163
164 fn get_data_type(&self) -> &ArrowType {
165 &self.data_type
166 }
167
168 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
169 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
170 }
171
172 fn consume_batch(&mut self) -> Result<ArrayRef> {
173 let record_data = self.record_reader.consume_record_data();
174
175 let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
176 .len(self.record_reader.num_values())
177 .add_buffer(Buffer::from_vec(record_data.buffer))
178 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
179
180 let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
181
182 let array: ArrayRef = match &self.data_type {
187 ArrowType::Decimal32(p, s) => {
188 let f = |b: &[u8]| i32::from_be_bytes(sign_extend_be(b));
189 Arc::new(Decimal32Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
190 as ArrayRef
191 }
192 ArrowType::Decimal64(p, s) => {
193 let f = |b: &[u8]| i64::from_be_bytes(sign_extend_be(b));
194 Arc::new(Decimal64Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
195 as ArrayRef
196 }
197 ArrowType::Decimal128(p, s) => {
198 let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
199 Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
200 as ArrayRef
201 }
202 ArrowType::Decimal256(p, s) => {
203 let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
204 Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
205 as ArrayRef
206 }
207 ArrowType::Interval(unit) => {
208 match unit {
211 IntervalUnit::YearMonth => {
212 let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
213 Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
214 }
215 IntervalUnit::DayTime => {
216 let f = |b: &[u8]| {
217 IntervalDayTime::new(
218 i32::from_le_bytes(b[4..8].try_into().unwrap()),
219 i32::from_le_bytes(b[8..12].try_into().unwrap()),
220 )
221 };
222 Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
223 }
224 IntervalUnit::MonthDayNano => {
225 return Err(nyi_err!("MonthDayNano intervals not supported"));
226 }
227 }
228 }
229 ArrowType::Float16 => {
230 let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
231 Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
232 }
233 _ => Arc::new(binary) as ArrayRef,
234 };
235
236 self.def_levels_buffer = self.record_reader.consume_def_levels();
237 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
238 self.record_reader.reset();
239
240 Ok(array)
241 }
242
243 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
244 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
245 }
246
247 fn get_def_levels(&self) -> Option<&[i16]> {
248 self.def_levels_buffer.as_deref()
249 }
250
251 fn get_rep_levels(&self) -> Option<&[i16]> {
252 self.rep_levels_buffer.as_deref()
253 }
254}
255
256#[derive(Default)]
257struct FixedLenByteArrayBuffer {
258 buffer: Vec<u8>,
259 byte_length: Option<usize>,
261}
262
263#[inline]
264fn move_values<F>(
265 buffer: &mut Vec<u8>,
266 byte_length: usize,
267 values_range: Range<usize>,
268 valid_mask: &[u8],
269 mut op: F,
270) where
271 F: FnMut(&mut Vec<u8>, usize, usize, usize),
272{
273 for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
274 debug_assert!(level_pos >= value_pos);
275 if level_pos <= value_pos {
276 break;
277 }
278
279 let level_pos_bytes = level_pos * byte_length;
280 let value_pos_bytes = value_pos * byte_length;
281
282 op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
283 }
284}
285
286impl ValuesBuffer for FixedLenByteArrayBuffer {
287 fn pad_nulls(
288 &mut self,
289 read_offset: usize,
290 values_read: usize,
291 levels_read: usize,
292 valid_mask: &[u8],
293 ) {
294 let byte_length = self.byte_length.unwrap_or_default();
295
296 assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
297 self.buffer
298 .resize((read_offset + levels_read) * byte_length, 0);
299
300 let values_range = read_offset..read_offset + values_read;
301 const VEC_CUTOFF: usize = 4;
306 if byte_length > VEC_CUTOFF {
307 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
308 let split = buffer.split_at_mut(level_pos_bytes);
309 let dst = &mut split.1[..byte_length];
310 let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
311 dst.copy_from_slice(src);
312 };
313 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
314 } else {
315 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
316 for i in 0..byte_length {
317 buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
318 }
319 };
320 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
321 }
322 }
323}
324
325struct ValueDecoder {
326 byte_length: usize,
327 dict_page: Option<Bytes>,
328 decoder: Option<Decoder>,
329}
330
331impl ColumnValueDecoder for ValueDecoder {
332 type Buffer = FixedLenByteArrayBuffer;
333
334 fn new(col: &ColumnDescPtr) -> Self {
335 Self {
336 byte_length: col.type_length() as usize,
337 dict_page: None,
338 decoder: None,
339 }
340 }
341
342 fn set_dict(
343 &mut self,
344 buf: Bytes,
345 num_values: u32,
346 encoding: Encoding,
347 _is_sorted: bool,
348 ) -> Result<()> {
349 if !matches!(
350 encoding,
351 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
352 ) {
353 return Err(nyi_err!(
354 "Invalid/Unsupported encoding type for dictionary: {}",
355 encoding
356 ));
357 }
358 let expected_len = num_values as usize * self.byte_length;
359 if expected_len > buf.len() {
360 return Err(general_err!(
361 "too few bytes in dictionary page, expected {} got {}",
362 expected_len,
363 buf.len()
364 ));
365 }
366
367 self.dict_page = Some(buf);
368 Ok(())
369 }
370
371 fn set_data(
372 &mut self,
373 encoding: Encoding,
374 data: Bytes,
375 num_levels: usize,
376 num_values: Option<usize>,
377 ) -> Result<()> {
378 self.decoder = Some(match encoding {
379 Encoding::PLAIN => Decoder::Plain {
380 buf: data,
381 offset: 0,
382 },
383 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
384 decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
385 },
386 Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
387 decoder: DeltaByteArrayDecoder::new(data)?,
388 },
389 Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
390 buf: data,
391 offset: 0,
392 },
393 _ => {
394 return Err(general_err!(
395 "unsupported encoding for fixed length byte array: {}",
396 encoding
397 ));
398 }
399 });
400 Ok(())
401 }
402
403 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
404 match out.byte_length {
405 Some(x) => assert_eq!(x, self.byte_length),
406 None => out.byte_length = Some(self.byte_length),
407 }
408
409 match self.decoder.as_mut().unwrap() {
410 Decoder::Plain { offset, buf } => {
411 let to_read =
412 (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
413 let end_offset = *offset + to_read * self.byte_length;
414 out.buffer
415 .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
416 *offset = end_offset;
417 Ok(to_read)
418 }
419 Decoder::Dict { decoder } => {
420 let dict = self.dict_page.as_ref().unwrap();
421 if dict.is_empty() {
423 return Ok(0);
424 }
425
426 decoder.read(num_values, |keys| {
427 out.buffer.reserve(keys.len() * self.byte_length);
428 for key in keys {
429 let offset = *key as usize * self.byte_length;
430 let val = &dict.as_ref()[offset..offset + self.byte_length];
431 out.buffer.extend_from_slice(val);
432 }
433 Ok(())
434 })
435 }
436 Decoder::Delta { decoder } => {
437 let to_read = num_values.min(decoder.remaining());
438 out.buffer.reserve(to_read * self.byte_length);
439
440 decoder.read(to_read, |slice| {
441 if slice.len() != self.byte_length {
442 return Err(general_err!(
443 "encountered array with incorrect length, got {} expected {}",
444 slice.len(),
445 self.byte_length
446 ));
447 }
448 out.buffer.extend_from_slice(slice);
449 Ok(())
450 })
451 }
452 Decoder::ByteStreamSplit { buf, offset } => {
453 let total_values = buf.len() / self.byte_length;
457 let to_read = num_values.min(total_values - *offset);
458
459 read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
461
462 *offset += to_read;
463 Ok(to_read)
464 }
465 }
466 }
467
468 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
469 match self.decoder.as_mut().unwrap() {
470 Decoder::Plain { offset, buf } => {
471 let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
472 *offset += to_read * self.byte_length;
473 Ok(to_read)
474 }
475 Decoder::Dict { decoder } => decoder.skip(num_values),
476 Decoder::Delta { decoder } => decoder.skip(num_values),
477 Decoder::ByteStreamSplit { offset, buf } => {
478 let total_values = buf.len() / self.byte_length;
479 let to_read = num_values.min(total_values - *offset);
480 *offset += to_read;
481 Ok(to_read)
482 }
483 }
484 }
485}
486
487fn read_byte_stream_split(
493 dst: &mut Vec<u8>,
494 src: &mut Bytes,
495 offset: usize,
496 num_values: usize,
497 data_width: usize,
498) {
499 let stride = src.len() / data_width;
500 let idx = dst.len();
501 dst.resize(idx + num_values * data_width, 0u8);
502 let dst_slc = &mut dst[idx..idx + num_values * data_width];
503 for j in 0..data_width {
504 let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
505 for i in 0..num_values {
506 dst_slc[i * data_width + j] = src_slc[i];
507 }
508 }
509}
510
511enum Decoder {
512 Plain { buf: Bytes, offset: usize },
513 Dict { decoder: DictIndexDecoder },
514 Delta { decoder: DeltaByteArrayDecoder },
515 ByteStreamSplit { buf: Bytes, offset: usize },
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::arrow::ArrowWriter;
522 use crate::arrow::arrow_reader::ParquetRecordBatchReader;
523 use arrow::datatypes::Field;
524 use arrow::error::Result as ArrowResult;
525 use arrow_array::{Array, ListArray};
526 use arrow_array::{Decimal256Array, RecordBatch};
527 use bytes::Bytes;
528 use std::sync::Arc;
529
530 #[test]
531 fn test_decimal_list() {
532 let decimals = Decimal256Array::from_iter_values(
533 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
534 );
535
536 let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
538 decimals.data_type().clone(),
539 false,
540 ))))
541 .len(7)
542 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
543 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
544 .child_data(vec![decimals.into_data()])
545 .build()
546 .unwrap();
547
548 let written =
549 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
550 .unwrap();
551
552 let mut buffer = Vec::with_capacity(1024);
553 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
554 writer.write(&written).unwrap();
555 writer.close().unwrap();
556
557 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
558 .unwrap()
559 .collect::<ArrowResult<Vec<_>>>()
560 .unwrap();
561
562 assert_eq!(&written.slice(0, 3), &read[0]);
563 assert_eq!(&written.slice(3, 3), &read[1]);
564 assert_eq!(&written.slice(6, 1), &read[2]);
565 }
566}