1use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25 ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26 RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35pub enum ColumnReader {
37 BoolColumnReader(ColumnReaderImpl<BoolType>),
39 Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41 Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43 Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45 FloatColumnReader(ColumnReaderImpl<FloatType>),
47 DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49 ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51 FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55pub fn get_column_reader(
58 col_descr: ColumnDescPtr,
59 col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61 match col_descr.physical_type() {
62 Type::BOOLEAN => {
63 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64 }
65 Type::INT32 => {
66 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67 }
68 Type::INT64 => {
69 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70 }
71 Type::INT96 => {
72 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73 }
74 Type::FLOAT => {
75 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76 }
77 Type::DOUBLE => {
78 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79 }
80 Type::BYTE_ARRAY => {
81 ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82 }
83 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84 ColumnReaderImpl::new(col_descr, col_page_reader),
85 ),
86 }
87}
88
89pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94 T::get_column_reader(col_reader).unwrap_or_else(|| {
95 panic!(
96 "Failed to convert column reader into a typed column reader for `{}` type",
97 T::get_physical_type()
98 )
99 })
100}
101
102pub type ColumnReaderImpl<T> = GenericColumnReader<
104 RepetitionLevelDecoderImpl,
105 DefinitionLevelDecoderImpl,
106 ColumnValueDecoderImpl<T>,
107>;
108
109pub struct GenericColumnReader<R, D, V> {
115 descr: ColumnDescPtr,
116
117 page_reader: Box<dyn PageReader>,
118
119 num_buffered_values: usize,
121
122 num_decoded_values: usize,
125
126 has_record_delimiter: bool,
128
129 def_level_decoder: Option<D>,
131
132 rep_level_decoder: Option<R>,
134
135 values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141 V: ColumnValueDecoder,
142{
143 pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145 let values_decoder = V::new(&descr);
146
147 let def_level_decoder = (descr.max_def_level() != 0)
148 .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150 let rep_level_decoder = (descr.max_rep_level() != 0)
151 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153 Self::new_with_decoders(
154 descr,
155 page_reader,
156 values_decoder,
157 def_level_decoder,
158 rep_level_decoder,
159 )
160 }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165 R: RepetitionLevelDecoder,
166 D: DefinitionLevelDecoder,
167 V: ColumnValueDecoder,
168{
169 pub(crate) fn new_with_decoders(
170 descr: ColumnDescPtr,
171 page_reader: Box<dyn PageReader>,
172 values_decoder: V,
173 def_level_decoder: Option<D>,
174 rep_level_decoder: Option<R>,
175 ) -> Self {
176 Self {
177 descr,
178 def_level_decoder,
179 rep_level_decoder,
180 page_reader,
181 num_buffered_values: 0,
182 num_decoded_values: 0,
183 values_decoder,
184 has_record_delimiter: false,
185 }
186 }
187
188 #[deprecated(note = "Use read_records")]
201 pub fn read_batch(
202 &mut self,
203 batch_size: usize,
204 def_levels: Option<&mut D::Buffer>,
205 rep_levels: Option<&mut R::Buffer>,
206 values: &mut V::Buffer,
207 ) -> Result<(usize, usize)> {
208 let (_, values, levels) = self.read_records(batch_size, def_levels, rep_levels, values)?;
209
210 Ok((values, levels))
211 }
212
213 pub fn read_records(
228 &mut self,
229 max_records: usize,
230 mut def_levels: Option<&mut D::Buffer>,
231 mut rep_levels: Option<&mut R::Buffer>,
232 values: &mut V::Buffer,
233 ) -> Result<(usize, usize, usize)> {
234 let mut total_records_read = 0;
235 let mut total_levels_read = 0;
236 let mut total_values_read = 0;
237
238 while total_records_read < max_records && self.has_next()? {
239 let remaining_records = max_records - total_records_read;
240 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
241
242 let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
243 Some(reader) => {
244 let out = rep_levels
245 .as_mut()
246 .ok_or_else(|| general_err!("must specify repetition levels"))?;
247
248 let (mut records_read, levels_read) =
249 reader.read_rep_levels(out, remaining_records, remaining_levels)?;
250
251 if records_read == 0 && levels_read == 0 {
252 return Err(general_err!(
254 "Insufficient repetition levels read from column"
255 ));
256 }
257 if levels_read == remaining_levels && self.has_record_delimiter {
258 assert!(records_read < remaining_records); records_read += reader.flush_partial() as usize;
262 }
263 (records_read, levels_read)
264 }
265 None => {
266 let min = remaining_records.min(remaining_levels);
267 (min, min)
268 }
269 };
270
271 let values_to_read = match self.def_level_decoder.as_mut() {
272 Some(reader) => {
273 let out = def_levels
274 .as_mut()
275 .ok_or_else(|| general_err!("must specify definition levels"))?;
276
277 let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
278
279 if levels_read != levels_to_read {
280 return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));
281 }
282
283 values_read
284 }
285 None => levels_to_read,
286 };
287
288 let values_read = self.values_decoder.read(values, values_to_read)?;
289
290 if values_read != values_to_read {
291 return Err(general_err!(
292 "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
293 ));
294 }
295
296 self.num_decoded_values += levels_to_read;
297 total_records_read += records_read;
298 total_levels_read += levels_to_read;
299 total_values_read += values_read;
300 }
301
302 Ok((total_records_read, total_values_read, total_levels_read))
303 }
304
305 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
311 let mut remaining_records = num_records;
312 while remaining_records != 0 {
313 if self.num_buffered_values == self.num_decoded_values {
314 let metadata = match self.page_reader.peek_next_page()? {
315 None => return Ok(num_records - remaining_records),
316 Some(metadata) => metadata,
317 };
318
319 if metadata.is_dict {
321 self.read_dictionary_page()?;
322 continue;
323 }
324
325 let rows = metadata.num_rows.or_else(|| {
328 self.rep_level_decoder
330 .is_none()
331 .then_some(metadata.num_levels)?
332 });
333
334 if let Some(rows) = rows {
335 if rows <= remaining_records {
336 self.page_reader.skip_next_page()?;
337 remaining_records -= rows;
338 continue;
339 }
340 }
341 if !self.read_new_page()? {
344 return Ok(num_records - remaining_records);
345 }
346 }
347
348 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
352
353 let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
354 Some(decoder) => {
355 let (mut records_read, levels_read) =
356 decoder.skip_rep_levels(remaining_records, remaining_levels)?;
357
358 if levels_read == remaining_levels && self.has_record_delimiter {
359 assert!(records_read < remaining_records); records_read += decoder.flush_partial() as usize;
363 }
364
365 (records_read, levels_read)
366 }
367 None => {
368 let levels = remaining_levels.min(remaining_records);
370 (levels, levels)
371 }
372 };
373
374 self.num_decoded_values += rep_levels_read;
375 remaining_records -= records_read;
376
377 if self.num_buffered_values == self.num_decoded_values {
378 continue;
380 }
381
382 let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
383 Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
384 None => (rep_levels_read, rep_levels_read),
385 };
386
387 if rep_levels_read != def_levels_read {
388 return Err(general_err!(
389 "levels mismatch, read {} repetition levels and {} definition levels",
390 rep_levels_read,
391 def_levels_read
392 ));
393 }
394
395 let values = self.values_decoder.skip_values(values_read)?;
396 if values != values_read {
397 return Err(general_err!(
398 "skipped {} values, expected {}",
399 values,
400 values_read
401 ));
402 }
403 }
404 Ok(num_records - remaining_records)
405 }
406
407 fn read_dictionary_page(&mut self) -> Result<()> {
410 match self.page_reader.get_next_page()? {
411 Some(Page::DictionaryPage {
412 buf,
413 num_values,
414 encoding,
415 is_sorted,
416 }) => self
417 .values_decoder
418 .set_dict(buf, num_values, encoding, is_sorted),
419 _ => Err(ParquetError::General(
420 "Invalid page. Expecting dictionary page".to_string(),
421 )),
422 }
423 }
424
425 fn read_new_page(&mut self) -> Result<bool> {
428 loop {
429 match self.page_reader.get_next_page()? {
430 None => return Ok(false),
432 Some(current_page) => {
433 match current_page {
434 Page::DictionaryPage {
436 buf,
437 num_values,
438 encoding,
439 is_sorted,
440 } => {
441 self.values_decoder
442 .set_dict(buf, num_values, encoding, is_sorted)?;
443 continue;
444 }
445 Page::DataPage {
447 buf,
448 num_values,
449 encoding,
450 def_level_encoding,
451 rep_level_encoding,
452 statistics: _,
453 } => {
454 self.num_buffered_values = num_values as _;
455 self.num_decoded_values = 0;
456
457 let max_rep_level = self.descr.max_rep_level();
458 let max_def_level = self.descr.max_def_level();
459
460 let mut offset = 0;
461
462 if max_rep_level > 0 {
463 let (bytes_read, level_data) = parse_v1_level(
464 max_rep_level,
465 num_values,
466 rep_level_encoding,
467 buf.slice(offset..),
468 )?;
469 offset += bytes_read;
470
471 self.has_record_delimiter =
472 self.page_reader.at_record_boundary()?;
473
474 self.rep_level_decoder
475 .as_mut()
476 .unwrap()
477 .set_data(rep_level_encoding, level_data);
478 }
479
480 if max_def_level > 0 {
481 let (bytes_read, level_data) = parse_v1_level(
482 max_def_level,
483 num_values,
484 def_level_encoding,
485 buf.slice(offset..),
486 )?;
487 offset += bytes_read;
488
489 self.def_level_decoder
490 .as_mut()
491 .unwrap()
492 .set_data(def_level_encoding, level_data);
493 }
494
495 self.values_decoder.set_data(
496 encoding,
497 buf.slice(offset..),
498 num_values as usize,
499 None,
500 )?;
501 return Ok(true);
502 }
503 Page::DataPageV2 {
505 buf,
506 num_values,
507 encoding,
508 num_nulls,
509 num_rows: _,
510 def_levels_byte_len,
511 rep_levels_byte_len,
512 is_compressed: _,
513 statistics: _,
514 } => {
515 if num_nulls > num_values {
516 return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
517 }
518
519 self.num_buffered_values = num_values as _;
520 self.num_decoded_values = 0;
521
522 if self.descr.max_rep_level() > 0 {
525 self.has_record_delimiter =
529 self.page_reader.at_record_boundary()?;
530
531 self.rep_level_decoder.as_mut().unwrap().set_data(
532 Encoding::RLE,
533 buf.slice(..rep_levels_byte_len as usize),
534 );
535 }
536
537 if self.descr.max_def_level() > 0 {
540 self.def_level_decoder.as_mut().unwrap().set_data(
541 Encoding::RLE,
542 buf.slice(
543 rep_levels_byte_len as usize
544 ..(rep_levels_byte_len + def_levels_byte_len) as usize,
545 ),
546 );
547 }
548
549 self.values_decoder.set_data(
550 encoding,
551 buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
552 num_values as usize,
553 Some((num_values - num_nulls) as usize),
554 )?;
555 return Ok(true);
556 }
557 };
558 }
559 }
560 }
561 }
562
563 #[inline]
567 pub(crate) fn has_next(&mut self) -> Result<bool> {
568 if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
569 if !self.read_new_page()? {
572 Ok(false)
573 } else {
574 Ok(self.num_buffered_values != 0)
575 }
576 } else {
577 Ok(true)
578 }
579 }
580}
581
582fn parse_v1_level(
583 max_level: i16,
584 num_buffered_values: u32,
585 encoding: Encoding,
586 buf: Bytes,
587) -> Result<(usize, Bytes)> {
588 match encoding {
589 Encoding::RLE => {
590 let i32_size = std::mem::size_of::<i32>();
591 let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
592 Ok((
593 i32_size + data_size,
594 buf.slice(i32_size..i32_size + data_size),
595 ))
596 }
597 #[allow(deprecated)]
598 Encoding::BIT_PACKED => {
599 let bit_width = num_required_bits(max_level as u64);
600 let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
601 Ok((num_bytes, buf.slice(..num_bytes)))
602 }
603 _ => Err(general_err!("invalid level encoding: {}", encoding)),
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610
611 use rand::distributions::uniform::SampleUniform;
612 use std::{collections::VecDeque, sync::Arc};
613
614 use crate::basic::Type as PhysicalType;
615 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
616 use crate::util::test_common::page_util::InMemoryPageReader;
617 use crate::util::test_common::rand_gen::make_pages;
618
619 const NUM_LEVELS: usize = 128;
620 const NUM_PAGES: usize = 2;
621 const MAX_DEF_LEVEL: i16 = 5;
622 const MAX_REP_LEVEL: i16 = 5;
623
624 macro_rules! test {
626 ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
628 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
629 test_internal!(
630 $test_func,
631 Int32Type,
632 get_test_int32_type,
633 $func,
634 $def_level,
635 $rep_level,
636 $num_pages,
637 $num_levels,
638 $batch_size,
639 $min,
640 $max
641 );
642 };
643 ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
645 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
646 test_internal!(
647 $test_func,
648 Int64Type,
649 get_test_int64_type,
650 $func,
651 $def_level,
652 $rep_level,
653 $num_pages,
654 $num_levels,
655 $batch_size,
656 $min,
657 $max
658 );
659 };
660 }
661
662 macro_rules! test_internal {
663 ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
664 $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
665 $min:expr, $max:expr) => {
666 #[test]
667 fn $test_func() {
668 let desc = Arc::new(ColumnDescriptor::new(
669 Arc::new($pty()),
670 $def_level,
671 $rep_level,
672 ColumnPath::new(Vec::new()),
673 ));
674 let mut tester = ColumnReaderTester::<$ty>::new();
675 tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
676 }
677 };
678 }
679
680 test!(
681 test_read_plain_v1_int32,
682 i32,
683 plain_v1,
684 MAX_DEF_LEVEL,
685 MAX_REP_LEVEL,
686 NUM_PAGES,
687 NUM_LEVELS,
688 16,
689 i32::MIN,
690 i32::MAX
691 );
692 test!(
693 test_read_plain_v2_int32,
694 i32,
695 plain_v2,
696 MAX_DEF_LEVEL,
697 MAX_REP_LEVEL,
698 NUM_PAGES,
699 NUM_LEVELS,
700 16,
701 i32::MIN,
702 i32::MAX
703 );
704
705 test!(
706 test_read_plain_v1_int32_uneven,
707 i32,
708 plain_v1,
709 MAX_DEF_LEVEL,
710 MAX_REP_LEVEL,
711 NUM_PAGES,
712 NUM_LEVELS,
713 17,
714 i32::MIN,
715 i32::MAX
716 );
717 test!(
718 test_read_plain_v2_int32_uneven,
719 i32,
720 plain_v2,
721 MAX_DEF_LEVEL,
722 MAX_REP_LEVEL,
723 NUM_PAGES,
724 NUM_LEVELS,
725 17,
726 i32::MIN,
727 i32::MAX
728 );
729
730 test!(
731 test_read_plain_v1_int32_multi_page,
732 i32,
733 plain_v1,
734 MAX_DEF_LEVEL,
735 MAX_REP_LEVEL,
736 NUM_PAGES,
737 NUM_LEVELS,
738 512,
739 i32::MIN,
740 i32::MAX
741 );
742 test!(
743 test_read_plain_v2_int32_multi_page,
744 i32,
745 plain_v2,
746 MAX_DEF_LEVEL,
747 MAX_REP_LEVEL,
748 NUM_PAGES,
749 NUM_LEVELS,
750 512,
751 i32::MIN,
752 i32::MAX
753 );
754
755 test!(
757 test_read_plain_v1_int32_required_non_repeated,
758 i32,
759 plain_v1,
760 0,
761 0,
762 NUM_PAGES,
763 NUM_LEVELS,
764 16,
765 i32::MIN,
766 i32::MAX
767 );
768 test!(
769 test_read_plain_v2_int32_required_non_repeated,
770 i32,
771 plain_v2,
772 0,
773 0,
774 NUM_PAGES,
775 NUM_LEVELS,
776 16,
777 i32::MIN,
778 i32::MAX
779 );
780
781 test!(
782 test_read_plain_v1_int64,
783 i64,
784 plain_v1,
785 1,
786 1,
787 NUM_PAGES,
788 NUM_LEVELS,
789 16,
790 i64::MIN,
791 i64::MAX
792 );
793 test!(
794 test_read_plain_v2_int64,
795 i64,
796 plain_v2,
797 1,
798 1,
799 NUM_PAGES,
800 NUM_LEVELS,
801 16,
802 i64::MIN,
803 i64::MAX
804 );
805
806 test!(
807 test_read_plain_v1_int64_uneven,
808 i64,
809 plain_v1,
810 1,
811 1,
812 NUM_PAGES,
813 NUM_LEVELS,
814 17,
815 i64::MIN,
816 i64::MAX
817 );
818 test!(
819 test_read_plain_v2_int64_uneven,
820 i64,
821 plain_v2,
822 1,
823 1,
824 NUM_PAGES,
825 NUM_LEVELS,
826 17,
827 i64::MIN,
828 i64::MAX
829 );
830
831 test!(
832 test_read_plain_v1_int64_multi_page,
833 i64,
834 plain_v1,
835 1,
836 1,
837 NUM_PAGES,
838 NUM_LEVELS,
839 512,
840 i64::MIN,
841 i64::MAX
842 );
843 test!(
844 test_read_plain_v2_int64_multi_page,
845 i64,
846 plain_v2,
847 1,
848 1,
849 NUM_PAGES,
850 NUM_LEVELS,
851 512,
852 i64::MIN,
853 i64::MAX
854 );
855
856 test!(
858 test_read_plain_v1_int64_required_non_repeated,
859 i64,
860 plain_v1,
861 0,
862 0,
863 NUM_PAGES,
864 NUM_LEVELS,
865 16,
866 i64::MIN,
867 i64::MAX
868 );
869 test!(
870 test_read_plain_v2_int64_required_non_repeated,
871 i64,
872 plain_v2,
873 0,
874 0,
875 NUM_PAGES,
876 NUM_LEVELS,
877 16,
878 i64::MIN,
879 i64::MAX
880 );
881
882 test!(
883 test_read_dict_v1_int32_small,
884 i32,
885 dict_v1,
886 MAX_DEF_LEVEL,
887 MAX_REP_LEVEL,
888 2,
889 2,
890 16,
891 0,
892 3
893 );
894 test!(
895 test_read_dict_v2_int32_small,
896 i32,
897 dict_v2,
898 MAX_DEF_LEVEL,
899 MAX_REP_LEVEL,
900 2,
901 2,
902 16,
903 0,
904 3
905 );
906
907 test!(
908 test_read_dict_v1_int32,
909 i32,
910 dict_v1,
911 MAX_DEF_LEVEL,
912 MAX_REP_LEVEL,
913 NUM_PAGES,
914 NUM_LEVELS,
915 16,
916 0,
917 3
918 );
919 test!(
920 test_read_dict_v2_int32,
921 i32,
922 dict_v2,
923 MAX_DEF_LEVEL,
924 MAX_REP_LEVEL,
925 NUM_PAGES,
926 NUM_LEVELS,
927 16,
928 0,
929 3
930 );
931
932 test!(
933 test_read_dict_v1_int32_uneven,
934 i32,
935 dict_v1,
936 MAX_DEF_LEVEL,
937 MAX_REP_LEVEL,
938 NUM_PAGES,
939 NUM_LEVELS,
940 17,
941 0,
942 3
943 );
944 test!(
945 test_read_dict_v2_int32_uneven,
946 i32,
947 dict_v2,
948 MAX_DEF_LEVEL,
949 MAX_REP_LEVEL,
950 NUM_PAGES,
951 NUM_LEVELS,
952 17,
953 0,
954 3
955 );
956
957 test!(
958 test_read_dict_v1_int32_multi_page,
959 i32,
960 dict_v1,
961 MAX_DEF_LEVEL,
962 MAX_REP_LEVEL,
963 NUM_PAGES,
964 NUM_LEVELS,
965 512,
966 0,
967 3
968 );
969 test!(
970 test_read_dict_v2_int32_multi_page,
971 i32,
972 dict_v2,
973 MAX_DEF_LEVEL,
974 MAX_REP_LEVEL,
975 NUM_PAGES,
976 NUM_LEVELS,
977 512,
978 0,
979 3
980 );
981
982 test!(
983 test_read_dict_v1_int64,
984 i64,
985 dict_v1,
986 MAX_DEF_LEVEL,
987 MAX_REP_LEVEL,
988 NUM_PAGES,
989 NUM_LEVELS,
990 16,
991 0,
992 3
993 );
994 test!(
995 test_read_dict_v2_int64,
996 i64,
997 dict_v2,
998 MAX_DEF_LEVEL,
999 MAX_REP_LEVEL,
1000 NUM_PAGES,
1001 NUM_LEVELS,
1002 16,
1003 0,
1004 3
1005 );
1006
1007 #[test]
1008 fn test_read_batch_values_only() {
1009 test_read_batch_int32(16, 0, 0);
1010 }
1011
1012 #[test]
1013 fn test_read_batch_values_def_levels() {
1014 test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
1015 }
1016
1017 #[test]
1018 fn test_read_batch_values_rep_levels() {
1019 test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1020 }
1021
1022 #[test]
1023 fn test_read_batch_values_def_rep_levels() {
1024 test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1025 }
1026
1027 #[test]
1028 fn test_read_batch_adjust_after_buffering_page() {
1029 let primitive_type = get_test_int32_type();
1036 let desc = Arc::new(ColumnDescriptor::new(
1037 Arc::new(primitive_type),
1038 1,
1039 1,
1040 ColumnPath::new(Vec::new()),
1041 ));
1042
1043 let num_pages = 2;
1044 let num_levels = 4;
1045 let batch_size = 5;
1046
1047 let mut tester = ColumnReaderTester::<Int32Type>::new();
1048 tester.test_read_batch(
1049 desc,
1050 Encoding::RLE_DICTIONARY,
1051 num_pages,
1052 num_levels,
1053 batch_size,
1054 i32::MIN,
1055 i32::MAX,
1056 false,
1057 );
1058 }
1059
1060 fn get_test_int32_type() -> SchemaType {
1106 SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1107 .with_repetition(Repetition::REQUIRED)
1108 .with_converted_type(ConvertedType::INT_32)
1109 .with_length(-1)
1110 .build()
1111 .expect("build() should be OK")
1112 }
1113
1114 fn get_test_int64_type() -> SchemaType {
1116 SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1117 .with_repetition(Repetition::REQUIRED)
1118 .with_converted_type(ConvertedType::INT_64)
1119 .with_length(-1)
1120 .build()
1121 .expect("build() should be OK")
1122 }
1123
1124 fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1129 let primitive_type = get_test_int32_type();
1130
1131 let desc = Arc::new(ColumnDescriptor::new(
1132 Arc::new(primitive_type),
1133 max_def_level,
1134 max_rep_level,
1135 ColumnPath::new(Vec::new()),
1136 ));
1137
1138 let mut tester = ColumnReaderTester::<Int32Type>::new();
1139 tester.test_read_batch(
1140 desc,
1141 Encoding::RLE_DICTIONARY,
1142 NUM_PAGES,
1143 NUM_LEVELS,
1144 batch_size,
1145 i32::MIN,
1146 i32::MAX,
1147 false,
1148 );
1149 }
1150
1151 struct ColumnReaderTester<T: DataType>
1152 where
1153 T::T: PartialOrd + SampleUniform + Copy,
1154 {
1155 rep_levels: Vec<i16>,
1156 def_levels: Vec<i16>,
1157 values: Vec<T::T>,
1158 }
1159
1160 impl<T: DataType> ColumnReaderTester<T>
1161 where
1162 T::T: PartialOrd + SampleUniform + Copy,
1163 {
1164 pub fn new() -> Self {
1165 Self {
1166 rep_levels: Vec::new(),
1167 def_levels: Vec::new(),
1168 values: Vec::new(),
1169 }
1170 }
1171
1172 fn plain_v1(
1174 &mut self,
1175 desc: ColumnDescPtr,
1176 num_pages: usize,
1177 num_levels: usize,
1178 batch_size: usize,
1179 min: T::T,
1180 max: T::T,
1181 ) {
1182 self.test_read_batch_general(
1183 desc,
1184 Encoding::PLAIN,
1185 num_pages,
1186 num_levels,
1187 batch_size,
1188 min,
1189 max,
1190 false,
1191 );
1192 }
1193
1194 fn plain_v2(
1196 &mut self,
1197 desc: ColumnDescPtr,
1198 num_pages: usize,
1199 num_levels: usize,
1200 batch_size: usize,
1201 min: T::T,
1202 max: T::T,
1203 ) {
1204 self.test_read_batch_general(
1205 desc,
1206 Encoding::PLAIN,
1207 num_pages,
1208 num_levels,
1209 batch_size,
1210 min,
1211 max,
1212 true,
1213 );
1214 }
1215
1216 fn dict_v1(
1218 &mut self,
1219 desc: ColumnDescPtr,
1220 num_pages: usize,
1221 num_levels: usize,
1222 batch_size: usize,
1223 min: T::T,
1224 max: T::T,
1225 ) {
1226 self.test_read_batch_general(
1227 desc,
1228 Encoding::RLE_DICTIONARY,
1229 num_pages,
1230 num_levels,
1231 batch_size,
1232 min,
1233 max,
1234 false,
1235 );
1236 }
1237
1238 fn dict_v2(
1240 &mut self,
1241 desc: ColumnDescPtr,
1242 num_pages: usize,
1243 num_levels: usize,
1244 batch_size: usize,
1245 min: T::T,
1246 max: T::T,
1247 ) {
1248 self.test_read_batch_general(
1249 desc,
1250 Encoding::RLE_DICTIONARY,
1251 num_pages,
1252 num_levels,
1253 batch_size,
1254 min,
1255 max,
1256 true,
1257 );
1258 }
1259
1260 #[allow(clippy::too_many_arguments)]
1263 fn test_read_batch_general(
1264 &mut self,
1265 desc: ColumnDescPtr,
1266 encoding: Encoding,
1267 num_pages: usize,
1268 num_levels: usize,
1269 batch_size: usize,
1270 min: T::T,
1271 max: T::T,
1272 use_v2: bool,
1273 ) {
1274 self.test_read_batch(
1275 desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1276 );
1277 }
1278
1279 #[allow(clippy::too_many_arguments)]
1282 fn test_read_batch(
1283 &mut self,
1284 desc: ColumnDescPtr,
1285 encoding: Encoding,
1286 num_pages: usize,
1287 num_levels: usize,
1288 batch_size: usize,
1289 min: T::T,
1290 max: T::T,
1291 use_v2: bool,
1292 ) {
1293 let mut pages = VecDeque::new();
1294 make_pages::<T>(
1295 desc.clone(),
1296 encoding,
1297 num_pages,
1298 num_levels,
1299 min,
1300 max,
1301 &mut self.def_levels,
1302 &mut self.rep_levels,
1303 &mut self.values,
1304 &mut pages,
1305 use_v2,
1306 );
1307 let max_def_level = desc.max_def_level();
1308 let max_rep_level = desc.max_rep_level();
1309 let page_reader = InMemoryPageReader::new(pages);
1310 let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1311 let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1312
1313 let mut values = Vec::new();
1314 let mut def_levels = Vec::new();
1315 let mut rep_levels = Vec::new();
1316
1317 let mut curr_values_read = 0;
1318 let mut curr_levels_read = 0;
1319 loop {
1320 let (_, values_read, levels_read) = typed_column_reader
1321 .read_records(
1322 batch_size,
1323 Some(&mut def_levels),
1324 Some(&mut rep_levels),
1325 &mut values,
1326 )
1327 .expect("read_batch() should be OK");
1328
1329 curr_values_read += values_read;
1330 curr_levels_read += levels_read;
1331
1332 if values_read == 0 && levels_read == 0 {
1333 break;
1334 }
1335 }
1336
1337 assert_eq!(values, self.values, "values content doesn't match");
1338
1339 if max_def_level > 0 {
1340 assert_eq!(
1341 def_levels, self.def_levels,
1342 "definition levels content doesn't match"
1343 );
1344 }
1345
1346 if max_rep_level > 0 {
1347 assert_eq!(
1348 rep_levels, self.rep_levels,
1349 "repetition levels content doesn't match"
1350 );
1351 }
1352
1353 assert!(
1354 curr_levels_read >= curr_values_read,
1355 "expected levels read to be greater than values read"
1356 );
1357 }
1358 }
1359}