1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::Decimal256Array;
27use arrow_array::{
28 builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
29 Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
30 UInt64Array,
31};
32use arrow_buffer::{i256, BooleanBuffer, Buffer};
33use arrow_data::ArrayDataBuilder;
34use arrow_schema::{DataType as ArrowType, TimeUnit};
35use std::any::Any;
36use std::sync::Arc;
37
38pub trait IntoBuffer {
40 fn into_buffer(self) -> Buffer;
41}
42
43macro_rules! native_buffer {
44 ($($t:ty),*) => {
45 $(impl IntoBuffer for Vec<$t> {
46 fn into_buffer(self) -> Buffer {
47 Buffer::from_vec(self)
48 }
49 })*
50 };
51}
52native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
53
54impl IntoBuffer for Vec<bool> {
55 fn into_buffer(self) -> Buffer {
56 BooleanBuffer::from_iter(self).into_inner()
57 }
58}
59
60impl IntoBuffer for Vec<Int96> {
61 fn into_buffer(self) -> Buffer {
62 let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
63 for v in self {
64 builder.append(v.to_nanos())
65 }
66 builder.finish()
67 }
68}
69
70pub struct PrimitiveArrayReader<T>
73where
74 T: DataType,
75 T::T: Copy + Default,
76 Vec<T::T>: IntoBuffer,
77{
78 data_type: ArrowType,
79 pages: Box<dyn PageIterator>,
80 def_levels_buffer: Option<Vec<i16>>,
81 rep_levels_buffer: Option<Vec<i16>>,
82 record_reader: RecordReader<T>,
83}
84
85impl<T> PrimitiveArrayReader<T>
86where
87 T: DataType,
88 T::T: Copy + Default,
89 Vec<T::T>: IntoBuffer,
90{
91 pub fn new(
93 pages: Box<dyn PageIterator>,
94 column_desc: ColumnDescPtr,
95 arrow_type: Option<ArrowType>,
96 ) -> Result<Self> {
97 let data_type = match arrow_type {
99 Some(t) => t,
100 None => parquet_to_arrow_field(column_desc.as_ref())?
101 .data_type()
102 .clone(),
103 };
104
105 let record_reader = RecordReader::<T>::new(column_desc);
106
107 Ok(Self {
108 data_type,
109 pages,
110 def_levels_buffer: None,
111 rep_levels_buffer: None,
112 record_reader,
113 })
114 }
115}
116
117impl<T> ArrayReader for PrimitiveArrayReader<T>
119where
120 T: DataType,
121 T::T: Copy + Default,
122 Vec<T::T>: IntoBuffer,
123{
124 fn as_any(&self) -> &dyn Any {
125 self
126 }
127
128 fn get_data_type(&self) -> &ArrowType {
130 &self.data_type
131 }
132
133 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
134 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
135 }
136
137 fn consume_batch(&mut self) -> Result<ArrayRef> {
138 let target_type = &self.data_type;
139 let arrow_data_type = match T::get_physical_type() {
140 PhysicalType::BOOLEAN => ArrowType::Boolean,
141 PhysicalType::INT32 => {
142 match target_type {
143 ArrowType::UInt32 => {
144 ArrowType::UInt32
147 }
148 _ => ArrowType::Int32,
149 }
150 }
151 PhysicalType::INT64 => {
152 match target_type {
153 ArrowType::UInt64 => {
154 ArrowType::UInt64
157 }
158 _ => ArrowType::Int64,
159 }
160 }
161 PhysicalType::FLOAT => ArrowType::Float32,
162 PhysicalType::DOUBLE => ArrowType::Float64,
163 PhysicalType::INT96 => match target_type {
164 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
165 _ => unreachable!("INT96 must be timestamp nanosecond"),
166 },
167 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
168 unreachable!("PrimitiveArrayReaders don't support complex physical types");
169 }
170 };
171
172 let record_data = self.record_reader.consume_record_data().into_buffer();
176
177 let array_data = ArrayDataBuilder::new(arrow_data_type)
178 .len(self.record_reader.num_values())
179 .add_buffer(record_data)
180 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
181
182 let array_data = unsafe { array_data.build_unchecked() };
183 let array: ArrayRef = match T::get_physical_type() {
184 PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
185 PhysicalType::INT32 => match array_data.data_type() {
186 ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
187 ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
188 _ => unreachable!(),
189 },
190 PhysicalType::INT64 => match array_data.data_type() {
191 ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
192 ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
193 _ => unreachable!(),
194 },
195 PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
196 PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
197 PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
198 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
199 unreachable!("PrimitiveArrayReaders don't support complex physical types");
200 }
201 };
202
203 let array = match target_type {
214 ArrowType::Date64 => {
215 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
217 arrow_cast::cast(&a, target_type)?
218 }
219 ArrowType::Decimal128(p, s) => {
220 let array = match array.data_type() {
224 ArrowType::Int32 => array
225 .as_any()
226 .downcast_ref::<Int32Array>()
227 .unwrap()
228 .unary(|i| i as i128)
229 as Decimal128Array,
230 ArrowType::Int64 => array
231 .as_any()
232 .downcast_ref::<Int64Array>()
233 .unwrap()
234 .unary(|i| i as i128)
235 as Decimal128Array,
236 _ => {
237 return Err(arrow_err!(
238 "Cannot convert {:?} to decimal",
239 array.data_type()
240 ));
241 }
242 }
243 .with_precision_and_scale(*p, *s)?;
244
245 Arc::new(array) as ArrayRef
246 }
247 ArrowType::Decimal256(p, s) => {
248 let array = match array.data_type() {
250 ArrowType::Int32 => array
251 .as_any()
252 .downcast_ref::<Int32Array>()
253 .unwrap()
254 .unary(|i| i256::from_i128(i as i128))
255 as Decimal256Array,
256 ArrowType::Int64 => array
257 .as_any()
258 .downcast_ref::<Int64Array>()
259 .unwrap()
260 .unary(|i| i256::from_i128(i as i128))
261 as Decimal256Array,
262 _ => {
263 return Err(arrow_err!(
264 "Cannot convert {:?} to decimal",
265 array.data_type()
266 ));
267 }
268 }
269 .with_precision_and_scale(*p, *s)?;
270
271 Arc::new(array) as ArrayRef
272 }
273 _ => arrow_cast::cast(&array, target_type)?,
274 };
275
276 self.def_levels_buffer = self.record_reader.consume_def_levels();
278 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
279 self.record_reader.reset();
280 Ok(array)
281 }
282
283 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
284 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
285 }
286
287 fn get_def_levels(&self) -> Option<&[i16]> {
288 self.def_levels_buffer.as_deref()
289 }
290
291 fn get_rep_levels(&self) -> Option<&[i16]> {
292 self.rep_levels_buffer.as_deref()
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use crate::arrow::array_reader::test_util::EmptyPageIterator;
300 use crate::basic::Encoding;
301 use crate::column::page::Page;
302 use crate::data_type::{Int32Type, Int64Type};
303 use crate::schema::parser::parse_message_type;
304 use crate::schema::types::SchemaDescriptor;
305 use crate::util::test_common::rand_gen::make_pages;
306 use crate::util::InMemoryPageIterator;
307 use arrow::datatypes::ArrowPrimitiveType;
308 use arrow_array::{Array, PrimitiveArray};
309
310 use arrow::datatypes::DataType::Decimal128;
311 use rand::distributions::uniform::SampleUniform;
312 use std::collections::VecDeque;
313
314 #[allow(clippy::too_many_arguments)]
315 fn make_column_chunks<T: DataType>(
316 column_desc: ColumnDescPtr,
317 encoding: Encoding,
318 num_levels: usize,
319 min_value: T::T,
320 max_value: T::T,
321 def_levels: &mut Vec<i16>,
322 rep_levels: &mut Vec<i16>,
323 values: &mut Vec<T::T>,
324 page_lists: &mut Vec<Vec<Page>>,
325 use_v2: bool,
326 num_chunks: usize,
327 ) where
328 T::T: PartialOrd + SampleUniform + Copy,
329 {
330 for _i in 0..num_chunks {
331 let mut pages = VecDeque::new();
332 let mut data = Vec::new();
333 let mut page_def_levels = Vec::new();
334 let mut page_rep_levels = Vec::new();
335
336 make_pages::<T>(
337 column_desc.clone(),
338 encoding,
339 1,
340 num_levels,
341 min_value,
342 max_value,
343 &mut page_def_levels,
344 &mut page_rep_levels,
345 &mut data,
346 &mut pages,
347 use_v2,
348 );
349
350 def_levels.append(&mut page_def_levels);
351 rep_levels.append(&mut page_rep_levels);
352 values.append(&mut data);
353 page_lists.push(Vec::from(pages));
354 }
355 }
356
357 #[test]
358 fn test_primitive_array_reader_empty_pages() {
359 let message_type = "
361 message test_schema {
362 REQUIRED INT32 leaf;
363 }
364 ";
365
366 let schema = parse_message_type(message_type)
367 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
368 .unwrap();
369
370 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
371 Box::<EmptyPageIterator>::default(),
372 schema.column(0),
373 None,
374 )
375 .unwrap();
376
377 let array = array_reader.next_batch(50).unwrap();
379 assert!(array.is_empty());
380 }
381
382 #[test]
383 fn test_primitive_array_reader_data() {
384 let message_type = "
386 message test_schema {
387 REQUIRED INT32 leaf;
388 }
389 ";
390
391 let schema = parse_message_type(message_type)
392 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
393 .unwrap();
394
395 let column_desc = schema.column(0);
396
397 {
399 let mut data = Vec::new();
400 let mut page_lists = Vec::new();
401 make_column_chunks::<Int32Type>(
402 column_desc.clone(),
403 Encoding::PLAIN,
404 100,
405 1,
406 200,
407 &mut Vec::new(),
408 &mut Vec::new(),
409 &mut data,
410 &mut page_lists,
411 true,
412 2,
413 );
414 let page_iterator = InMemoryPageIterator::new(page_lists);
415
416 let mut array_reader =
417 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
418 .unwrap();
419
420 let array = array_reader.next_batch(50).unwrap();
422 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
423
424 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
425
426 let array = array_reader.next_batch(100).unwrap();
429 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
430
431 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
432
433 let array = array_reader.next_batch(100).unwrap();
435 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
436
437 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
438 }
439 }
440
441 macro_rules! test_primitive_array_reader_one_type {
442 (
443 $arrow_parquet_type:ty,
444 $physical_type:expr,
445 $converted_type_str:expr,
446 $result_arrow_type:ty,
447 $result_arrow_cast_type:ty,
448 $result_primitive_type:ty
449 $(, $timezone:expr)?
450 ) => {{
451 let message_type = format!(
452 "
453 message test_schema {{
454 REQUIRED {:?} leaf ({});
455 }}
456 ",
457 $physical_type, $converted_type_str
458 );
459 let schema = parse_message_type(&message_type)
460 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
461 .unwrap();
462
463 let column_desc = schema.column(0);
464
465 {
467 let mut data = Vec::new();
468 let mut page_lists = Vec::new();
469 make_column_chunks::<$arrow_parquet_type>(
470 column_desc.clone(),
471 Encoding::PLAIN,
472 100,
473 1,
474 200,
475 &mut Vec::new(),
476 &mut Vec::new(),
477 &mut data,
478 &mut page_lists,
479 true,
480 2,
481 );
482 let page_iterator = InMemoryPageIterator::new(page_lists);
483 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
484 Box::new(page_iterator),
485 column_desc.clone(),
486 None,
487 )
488 .expect("Unable to get array reader");
489
490 let array = array_reader
491 .next_batch(50)
492 .expect("Unable to get batch from reader");
493
494 let result_data_type = <$result_arrow_type>::DATA_TYPE;
495 let array = array
496 .as_any()
497 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
498 .expect(
499 format!(
500 "Unable to downcast {:?} to {:?}",
501 array.data_type(),
502 result_data_type
503 )
504 .as_str(),
505 )
506 $(.clone().with_timezone($timezone))?
507 ;
508
509 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
511 data[0..50]
512 .iter()
513 .map(|x| *x as $result_primitive_type)
514 .collect::<Vec<$result_primitive_type>>(),
515 );
516 let expected = Arc::new(expected) as ArrayRef;
517 let expected = arrow::compute::cast(&expected, &result_data_type)
518 .expect("Unable to cast expected array");
519 assert_eq!(expected.data_type(), &result_data_type);
520 let expected = expected
521 .as_any()
522 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
523 .expect(
524 format!(
525 "Unable to downcast expected {:?} to {:?}",
526 expected.data_type(),
527 result_data_type
528 )
529 .as_str(),
530 )
531 $(.clone().with_timezone($timezone))?
532 ;
533 assert_eq!(expected, array);
534 }
535 }};
536 }
537
538 #[test]
539 fn test_primitive_array_reader_temporal_types() {
540 test_primitive_array_reader_one_type!(
541 crate::data_type::Int32Type,
542 PhysicalType::INT32,
543 "DATE",
544 arrow::datatypes::Date32Type,
545 arrow::datatypes::Int32Type,
546 i32
547 );
548 test_primitive_array_reader_one_type!(
549 crate::data_type::Int32Type,
550 PhysicalType::INT32,
551 "TIME_MILLIS",
552 arrow::datatypes::Time32MillisecondType,
553 arrow::datatypes::Int32Type,
554 i32
555 );
556 test_primitive_array_reader_one_type!(
557 crate::data_type::Int64Type,
558 PhysicalType::INT64,
559 "TIME_MICROS",
560 arrow::datatypes::Time64MicrosecondType,
561 arrow::datatypes::Int64Type,
562 i64
563 );
564 test_primitive_array_reader_one_type!(
565 crate::data_type::Int64Type,
566 PhysicalType::INT64,
567 "TIMESTAMP_MILLIS",
568 arrow::datatypes::TimestampMillisecondType,
569 arrow::datatypes::Int64Type,
570 i64,
571 "UTC"
572 );
573 test_primitive_array_reader_one_type!(
574 crate::data_type::Int64Type,
575 PhysicalType::INT64,
576 "TIMESTAMP_MICROS",
577 arrow::datatypes::TimestampMicrosecondType,
578 arrow::datatypes::Int64Type,
579 i64,
580 "UTC"
581 );
582 }
583
584 #[test]
585 fn test_primitive_array_reader_def_and_rep_levels() {
586 let message_type = "
588 message test_schema {
589 REPEATED Group test_mid {
590 OPTIONAL INT32 leaf;
591 }
592 }
593 ";
594
595 let schema = parse_message_type(message_type)
596 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
597 .unwrap();
598
599 let column_desc = schema.column(0);
600
601 {
603 let mut def_levels = Vec::new();
604 let mut rep_levels = Vec::new();
605 let mut page_lists = Vec::new();
606 make_column_chunks::<Int32Type>(
607 column_desc.clone(),
608 Encoding::PLAIN,
609 100,
610 1,
611 200,
612 &mut def_levels,
613 &mut rep_levels,
614 &mut Vec::new(),
615 &mut page_lists,
616 true,
617 2,
618 );
619
620 let page_iterator = InMemoryPageIterator::new(page_lists);
621
622 let mut array_reader =
623 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
624 .unwrap();
625
626 let mut accu_len: usize = 0;
627
628 let array = array_reader.next_batch(50).unwrap();
630 assert_eq!(
631 Some(&def_levels[accu_len..(accu_len + array.len())]),
632 array_reader.get_def_levels()
633 );
634 assert_eq!(
635 Some(&rep_levels[accu_len..(accu_len + array.len())]),
636 array_reader.get_rep_levels()
637 );
638 accu_len += array.len();
639
640 let array = array_reader.next_batch(100).unwrap();
643 assert_eq!(
644 Some(&def_levels[accu_len..(accu_len + array.len())]),
645 array_reader.get_def_levels()
646 );
647 assert_eq!(
648 Some(&rep_levels[accu_len..(accu_len + array.len())]),
649 array_reader.get_rep_levels()
650 );
651 accu_len += array.len();
652
653 let array = array_reader.next_batch(100).unwrap();
655 assert_eq!(
656 Some(&def_levels[accu_len..(accu_len + array.len())]),
657 array_reader.get_def_levels()
658 );
659 assert_eq!(
660 Some(&rep_levels[accu_len..(accu_len + array.len())]),
661 array_reader.get_rep_levels()
662 );
663 }
664 }
665
666 #[test]
667 fn test_primitive_array_reader_decimal_types() {
668 let message_type = "
670 message test_schema {
671 REQUIRED INT32 decimal1 (DECIMAL(8,2));
672 }
673 ";
674 let schema = parse_message_type(message_type)
675 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
676 .unwrap();
677 let column_desc = schema.column(0);
678
679 {
681 let mut data = Vec::new();
682 let mut page_lists = Vec::new();
683 make_column_chunks::<Int32Type>(
684 column_desc.clone(),
685 Encoding::PLAIN,
686 100,
687 -99999999,
688 99999999,
689 &mut Vec::new(),
690 &mut Vec::new(),
691 &mut data,
692 &mut page_lists,
693 true,
694 2,
695 );
696 let page_iterator = InMemoryPageIterator::new(page_lists);
697
698 let mut array_reader =
699 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
700 .unwrap();
701
702 let array = array_reader.next_batch(50).unwrap();
705 assert_eq!(array.data_type(), &Decimal128(8, 2));
706 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
707 let data_decimal_array = data[0..50]
708 .iter()
709 .copied()
710 .map(|v| Some(v as i128))
711 .collect::<Decimal128Array>()
712 .with_precision_and_scale(8, 2)
713 .unwrap();
714 assert_eq!(array, &data_decimal_array);
715
716 let data_decimal_array = data[0..50]
718 .iter()
719 .copied()
720 .map(|v| Some(v as i128))
721 .collect::<Decimal128Array>()
722 .with_precision_and_scale(9, 0)
723 .unwrap();
724 assert_ne!(array, &data_decimal_array)
725 }
726
727 let message_type = "
729 message test_schema {
730 REQUIRED INT64 decimal1 (DECIMAL(18,4));
731 }
732 ";
733 let schema = parse_message_type(message_type)
734 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
735 .unwrap();
736 let column_desc = schema.column(0);
737
738 {
740 let mut data = Vec::new();
741 let mut page_lists = Vec::new();
742 make_column_chunks::<Int64Type>(
743 column_desc.clone(),
744 Encoding::PLAIN,
745 100,
746 -999999999999999999,
747 999999999999999999,
748 &mut Vec::new(),
749 &mut Vec::new(),
750 &mut data,
751 &mut page_lists,
752 true,
753 2,
754 );
755 let page_iterator = InMemoryPageIterator::new(page_lists);
756
757 let mut array_reader =
758 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
759 .unwrap();
760
761 let array = array_reader.next_batch(50).unwrap();
764 assert_eq!(array.data_type(), &Decimal128(18, 4));
765 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
766 let data_decimal_array = data[0..50]
767 .iter()
768 .copied()
769 .map(|v| Some(v as i128))
770 .collect::<Decimal128Array>()
771 .with_precision_and_scale(18, 4)
772 .unwrap();
773 assert_eq!(array, &data_decimal_array);
774
775 let data_decimal_array = data[0..50]
777 .iter()
778 .copied()
779 .map(|v| Some(v as i128))
780 .collect::<Decimal128Array>()
781 .with_precision_and_scale(34, 0)
782 .unwrap();
783 assert_ne!(array, &data_decimal_array)
784 }
785 }
786}