1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_array::{Array, ArrayRef, OffsetSizeTrait, new_empty_array};
23use arrow_buffer::ArrowNativeType;
24use arrow_schema::DataType as ArrowType;
25use bytes::Bytes;
26
27use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
28use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
29use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
30use crate::arrow::record_reader::GenericRecordReader;
31use crate::arrow::schema::parquet_to_arrow_field;
32use crate::basic::{ConvertedType, Encoding};
33use crate::column::page::PageIterator;
34use crate::column::reader::decoder::ColumnValueDecoder;
35use crate::encodings::rle::RleDecoder;
36use crate::errors::{ParquetError, Result};
37use crate::schema::types::ColumnDescPtr;
38use crate::util::bit_util::FromBytes;
39
40macro_rules! make_reader {
42 (
43 ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
44 $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
45 }
46 ) => {
47 match (($k, $v)) {
48 $(
49 ($key_arrow, $value_arrow) => {
50 let reader = GenericRecordReader::new($column_desc);
51 Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
52 $pages, $data_type, reader,
53 )))
54 }
55 )+
56 _ => Err(general_err!(
57 "unsupported data type for byte array dictionary reader - {}",
58 $data_type
59 )),
60 }
61 }
62}
63
64pub fn make_byte_array_dictionary_reader(
77 pages: Box<dyn PageIterator>,
78 column_desc: ColumnDescPtr,
79 arrow_type: Option<ArrowType>,
80) -> Result<Box<dyn ArrayReader>> {
81 let data_type = match arrow_type {
83 Some(t) => t,
84 None => parquet_to_arrow_field(column_desc.as_ref())?
85 .data_type()
86 .clone(),
87 };
88
89 match &data_type {
90 ArrowType::Dictionary(key_type, value_type) => {
91 make_reader! {
92 (pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
93 (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32),
94 (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
95 (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32),
96 (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64),
97 (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u16, i32),
98 (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64),
99 (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i16, i32),
100 (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64),
101 (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u32, i32),
102 (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64),
103 (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i32, i32),
104 (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64),
105 (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u64, i32),
106 (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64),
107 (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i64, i32),
108 (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64),
109 }
110 }
111 }
112 _ => Err(general_err!(
113 "invalid non-dictionary data type for byte array dictionary reader - {}",
114 data_type
115 )),
116 }
117}
118
119struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
123 data_type: ArrowType,
124 pages: Box<dyn PageIterator>,
125 def_levels_buffer: Option<Vec<i16>>,
126 rep_levels_buffer: Option<Vec<i16>>,
127 record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
128}
129
130impl<K, V> ByteArrayDictionaryReader<K, V>
131where
132 K: FromBytes + Ord + ArrowNativeType,
133 V: OffsetSizeTrait,
134{
135 fn new(
136 pages: Box<dyn PageIterator>,
137 data_type: ArrowType,
138 record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
139 ) -> Self {
140 Self {
141 data_type,
142 pages,
143 def_levels_buffer: None,
144 rep_levels_buffer: None,
145 record_reader,
146 }
147 }
148}
149
150impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
151where
152 K: FromBytes + Ord + ArrowNativeType,
153 V: OffsetSizeTrait,
154{
155 fn as_any(&self) -> &dyn Any {
156 self
157 }
158
159 fn get_data_type(&self) -> &ArrowType {
160 &self.data_type
161 }
162
163 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
164 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
165 }
166
167 fn consume_batch(&mut self) -> Result<ArrayRef> {
168 self.def_levels_buffer = self.record_reader.consume_def_levels();
170 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
171
172 if self.record_reader.num_values() == 0 {
173 return Ok(new_empty_array(&self.data_type));
177 }
178
179 let buffer = self.record_reader.consume_record_data();
180 let null_buffer = self.record_reader.consume_bitmap_buffer();
181 let array = buffer.into_array(null_buffer, &self.data_type)?;
182 self.record_reader.reset();
183
184 Ok(array)
185 }
186
187 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
188 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
189 }
190
191 fn get_def_levels(&self) -> Option<&[i16]> {
192 self.def_levels_buffer.as_deref()
193 }
194
195 fn get_rep_levels(&self) -> Option<&[i16]> {
196 self.rep_levels_buffer.as_deref()
197 }
198}
199
200enum MaybeDictionaryDecoder {
204 Dict {
205 decoder: RleDecoder,
206 max_remaining_values: usize,
209 },
210 Fallback(ByteArrayDecoder),
211}
212
213struct DictionaryDecoder<K, V> {
215 dict: Option<ArrayRef>,
217
218 decoder: Option<MaybeDictionaryDecoder>,
220
221 validate_utf8: bool,
222
223 value_type: ArrowType,
224
225 phantom: PhantomData<(K, V)>,
226}
227
228impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
229where
230 K: FromBytes + Ord + ArrowNativeType,
231 V: OffsetSizeTrait,
232{
233 type Buffer = DictionaryBuffer<K, V>;
234
235 fn new(col: &ColumnDescPtr) -> Self {
236 let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
237
238 let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
239 (true, true) => ArrowType::LargeUtf8,
240 (true, false) => ArrowType::LargeBinary,
241 (false, true) => ArrowType::Utf8,
242 (false, false) => ArrowType::Binary,
243 };
244
245 Self {
246 dict: None,
247 decoder: None,
248 validate_utf8,
249 value_type,
250 phantom: Default::default(),
251 }
252 }
253
254 fn set_dict(
255 &mut self,
256 buf: Bytes,
257 num_values: u32,
258 encoding: Encoding,
259 _is_sorted: bool,
260 ) -> Result<()> {
261 if !matches!(
262 encoding,
263 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
264 ) {
265 return Err(nyi_err!(
266 "Invalid/Unsupported encoding type for dictionary: {}",
267 encoding
268 ));
269 }
270
271 if K::from_usize(num_values as usize).is_none() {
272 return Err(general_err!("dictionary too large for index type"));
273 }
274
275 let len = num_values as usize;
276 let mut buffer = OffsetBuffer::<V>::default();
277 let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
278 decoder.read(&mut buffer, usize::MAX)?;
279
280 let array = buffer.into_array(None, self.value_type.clone());
281 self.dict = Some(Arc::new(array));
282 Ok(())
283 }
284
285 fn set_data(
286 &mut self,
287 encoding: Encoding,
288 data: Bytes,
289 num_levels: usize,
290 num_values: Option<usize>,
291 ) -> Result<()> {
292 let decoder = match encoding {
293 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
294 let bit_width = data[0];
295 let mut decoder = RleDecoder::new(bit_width);
296 decoder.set_data(data.slice(1..))?;
297 MaybeDictionaryDecoder::Dict {
298 decoder,
299 max_remaining_values: num_values.unwrap_or(num_levels),
300 }
301 }
302 _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new(
303 encoding,
304 data,
305 num_levels,
306 num_values,
307 self.validate_utf8,
308 )?),
309 };
310
311 self.decoder = Some(decoder);
312 Ok(())
313 }
314
315 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
316 match self.decoder.as_mut().expect("decoder set") {
317 MaybeDictionaryDecoder::Fallback(decoder) => {
318 decoder.read(out.spill_values()?, num_values, None)
319 }
320 MaybeDictionaryDecoder::Dict {
321 decoder,
322 max_remaining_values,
323 } => {
324 let len = num_values.min(*max_remaining_values);
325
326 let dict = self
327 .dict
328 .as_ref()
329 .ok_or_else(|| general_err!("missing dictionary page for column"))?;
330
331 assert_eq!(dict.data_type(), &self.value_type);
332
333 if dict.is_empty() {
334 return Ok(0); }
336
337 match out.as_keys(dict) {
338 Some(keys) => {
339 let start = keys.len();
344 keys.resize(start + len, K::default());
345 let len = decoder.get_batch(&mut keys[start..])?;
346 keys.truncate(start + len);
347 *max_remaining_values -= len;
348 Ok(len)
349 }
350 None => {
351 let values = out.spill_values()?;
356 let mut keys = vec![K::default(); len];
357 let len = decoder.get_batch(&mut keys)?;
358
359 assert_eq!(dict.data_type(), &self.value_type);
360
361 let data = dict.to_data();
362 let dict_buffers = data.buffers();
363 let dict_offsets = dict_buffers[0].typed_data::<V>();
364 let dict_values = dict_buffers[1].as_slice();
365
366 values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?;
367 *max_remaining_values -= len;
368 Ok(len)
369 }
370 }
371 }
372 }
373 }
374
375 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
376 match self.decoder.as_mut().expect("decoder set") {
377 MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::<V>(num_values, None),
378 MaybeDictionaryDecoder::Dict {
379 decoder,
380 max_remaining_values,
381 } => {
382 let num_values = num_values.min(*max_remaining_values);
383 *max_remaining_values -= num_values;
384 decoder.skip(num_values)
385 }
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use arrow::compute::cast;
393 use arrow_array::{Array, StringArray};
394 use arrow_buffer::Buffer;
395
396 use crate::arrow::array_reader::test_util::{
397 byte_array_all_encodings, encode_dictionary, utf8_column,
398 };
399 use crate::arrow::record_reader::buffer::ValuesBuffer;
400 use crate::data_type::ByteArray;
401
402 use super::*;
403
404 fn utf8_dictionary() -> ArrowType {
405 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8))
406 }
407
408 #[test]
409 fn test_dictionary_preservation() {
410 let data_type = utf8_dictionary();
411
412 let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
413 .into_iter()
414 .map(ByteArray::from)
415 .collect();
416 let (dict, encoded) = encode_dictionary(&data);
417
418 let column_desc = utf8_column();
419 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
420
421 decoder
422 .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
423 .unwrap();
424
425 decoder
426 .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
427 .unwrap();
428
429 let mut output = DictionaryBuffer::<i32, i32>::default();
430 assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
431
432 let mut valid = vec![false, false, true, true, false, true];
433 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
434 output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
435
436 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
437
438 assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);
439
440 valid.extend_from_slice(&[false, false, true, true, false, true, true, false]);
441 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
442 output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
443
444 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
445
446 let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
447 assert_eq!(array.data_type(), &data_type);
448
449 let array = cast(&array, &ArrowType::Utf8).unwrap();
450 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
451 assert_eq!(strings.len(), 14);
452
453 assert_eq!(
454 strings.iter().collect::<Vec<_>>(),
455 vec![
456 None,
457 None,
458 Some("0"),
459 Some("1"),
460 None,
461 Some("0"),
462 None,
463 None,
464 Some("1"),
465 Some("2"),
466 None,
467 Some("1"),
468 Some("2"),
469 None
470 ]
471 )
472 }
473
474 #[test]
475 fn test_dictionary_preservation_skip() {
476 let data_type = utf8_dictionary();
477
478 let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
479 .into_iter()
480 .map(ByteArray::from)
481 .collect();
482 let (dict, encoded) = encode_dictionary(&data);
483
484 let column_desc = utf8_column();
485 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
486
487 decoder
488 .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
489 .unwrap();
490
491 decoder
492 .set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len()))
493 .unwrap();
494
495 let mut output = DictionaryBuffer::<i32, i32>::default();
496
497 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
499 assert_eq!(decoder.skip_values(1).unwrap(), 1);
500
501 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
502
503 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
505 assert_eq!(decoder.skip_values(1).unwrap(), 1);
506
507 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
509 assert_eq!(decoder.skip_values(4).unwrap(), 0);
510
511 let valid = [true, true, true, true, true];
512 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
513 output.pad_nulls(0, 5, 5, valid_buffer.as_slice());
514
515 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
516
517 let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
518 assert_eq!(array.data_type(), &data_type);
519
520 let array = cast(&array, &ArrowType::Utf8).unwrap();
521 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
522 assert_eq!(strings.len(), 5);
523
524 assert_eq!(
525 strings.iter().collect::<Vec<_>>(),
526 vec![Some("0"), Some("1"), Some("1"), Some("2"), Some("2"),]
527 )
528 }
529
530 #[test]
531 fn test_dictionary_fallback() {
532 let data_type = utf8_dictionary();
533 let data = vec!["hello", "world", "a", "b"];
534
535 let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone());
536 let num_encodings = pages.len();
537
538 let column_desc = utf8_column();
539 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
540
541 decoder
542 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
543 .unwrap();
544
545 let mut output = DictionaryBuffer::<i32, i32>::default();
547
548 for (encoding, page) in pages {
549 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
550 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
551 }
552 let array = output.into_array(None, &data_type).unwrap();
553 assert_eq!(array.data_type(), &data_type);
554
555 let array = cast(&array, &ArrowType::Utf8).unwrap();
556 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
557 assert_eq!(strings.len(), data.len() * num_encodings);
558
559 for i in 0..num_encodings {
561 assert_eq!(
562 strings
563 .iter()
564 .skip(i * data.len())
565 .take(data.len())
566 .map(|x| x.unwrap())
567 .collect::<Vec<_>>(),
568 data
569 )
570 }
571 }
572
573 #[test]
574 fn test_dictionary_skip_fallback() {
575 let data_type = utf8_dictionary();
576 let data = vec!["hello", "world", "a", "b"];
577
578 let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone());
579 let num_encodings = pages.len();
580
581 let column_desc = utf8_column();
582 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
583
584 decoder
585 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
586 .unwrap();
587
588 let mut output = DictionaryBuffer::<i32, i32>::default();
590
591 for (encoding, page) in pages {
592 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
593 decoder.skip_values(2).expect("skipping two values");
594 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
595 }
596 let array = output.into_array(None, &data_type).unwrap();
597 assert_eq!(array.data_type(), &data_type);
598
599 let array = cast(&array, &ArrowType::Utf8).unwrap();
600 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
601 assert_eq!(strings.len(), (data.len() - 2) * num_encodings);
602
603 for i in 0..num_encodings {
605 assert_eq!(
606 &strings
607 .iter()
608 .skip(i * (data.len() - 2))
609 .take(data.len() - 2)
610 .map(|x| x.unwrap())
611 .collect::<Vec<_>>(),
612 &data[2..]
613 )
614 }
615 }
616
617 #[test]
618 fn test_too_large_dictionary() {
619 let data: Vec<_> = (0..128)
620 .map(|x| ByteArray::from(x.to_string().as_str()))
621 .collect();
622 let (dictionary, _) = encode_dictionary(&data);
623
624 let column_desc = utf8_column();
625
626 let mut decoder = DictionaryDecoder::<i8, i32>::new(&column_desc);
627 let err = decoder
628 .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false)
629 .unwrap_err()
630 .to_string();
631
632 assert!(err.contains("dictionary too large for index type"));
633
634 let mut decoder = DictionaryDecoder::<i16, i32>::new(&column_desc);
635 decoder
636 .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false)
637 .unwrap();
638 }
639
640 #[test]
641 fn test_nulls() {
642 let data_type = utf8_dictionary();
643 let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
644
645 let column_desc = utf8_column();
646 let mut decoder = DictionaryDecoder::new(&column_desc);
647
648 decoder
649 .set_dict(encoded_dictionary, 4, Encoding::PLAIN_DICTIONARY, false)
650 .unwrap();
651
652 for (encoding, page) in pages.clone() {
653 let mut output = DictionaryBuffer::<i32, i32>::default();
654 decoder.set_data(encoding, page, 8, None).unwrap();
655 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
656
657 output.pad_nulls(0, 0, 8, &[0]);
658 let array = output
659 .into_array(Some(Buffer::from(&[0])), &data_type)
660 .unwrap();
661
662 assert_eq!(array.len(), 8);
663 assert_eq!(array.null_count(), 8);
664 assert_eq!(array.logical_null_count(), 8);
665 }
666
667 for (encoding, page) in pages {
668 let mut output = DictionaryBuffer::<i32, i32>::default();
669 decoder.set_data(encoding, page, 8, None).unwrap();
670 assert_eq!(decoder.skip_values(1024).unwrap(), 0);
671
672 output.pad_nulls(0, 0, 8, &[0]);
673 let array = output
674 .into_array(Some(Buffer::from(&[0])), &data_type)
675 .unwrap();
676
677 assert_eq!(array.len(), 8);
678 assert_eq!(array.null_count(), 8);
679 assert_eq!(array.logical_null_count(), 8);
680 }
681 }
682}