1use std::collections::VecDeque;
22use std::iter;
23use std::{fs::File, io::Read, path::Path, sync::Arc};
24
25use crate::basic::{Encoding, Type};
26use crate::bloom_filter::Sbbf;
27use crate::column::page::{Page, PageMetadata, PageReader};
28use crate::compression::{create_codec, Codec};
29use crate::errors::{ParquetError, Result};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35 statistics,
36};
37use crate::format::{PageHeader, PageLocation, PageType};
38use crate::record::reader::RowIter;
39use crate::record::Row;
40use crate::schema::types::Type as SchemaType;
41use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
42use bytes::Bytes;
43use thrift::protocol::TCompactInputProtocol;
44
45impl TryFrom<File> for SerializedFileReader<File> {
46 type Error = ParquetError;
47
48 fn try_from(file: File) -> Result<Self> {
49 Self::new(file)
50 }
51}
52
53impl TryFrom<&Path> for SerializedFileReader<File> {
54 type Error = ParquetError;
55
56 fn try_from(path: &Path) -> Result<Self> {
57 let file = File::open(path)?;
58 Self::try_from(file)
59 }
60}
61
62impl TryFrom<String> for SerializedFileReader<File> {
63 type Error = ParquetError;
64
65 fn try_from(path: String) -> Result<Self> {
66 Self::try_from(Path::new(&path))
67 }
68}
69
70impl TryFrom<&str> for SerializedFileReader<File> {
71 type Error = ParquetError;
72
73 fn try_from(path: &str) -> Result<Self> {
74 Self::try_from(Path::new(&path))
75 }
76}
77
78impl IntoIterator for SerializedFileReader<File> {
81 type Item = Result<Row>;
82 type IntoIter = RowIter<'static>;
83
84 fn into_iter(self) -> Self::IntoIter {
85 RowIter::from_file_into(Box::new(self))
86 }
87}
88
89pub struct SerializedFileReader<R: ChunkReader> {
94 chunk_reader: Arc<R>,
95 metadata: Arc<ParquetMetaData>,
96 props: ReaderPropertiesPtr,
97}
98
99pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
103
104#[derive(Default)]
108pub struct ReadOptionsBuilder {
109 predicates: Vec<ReadGroupPredicate>,
110 enable_page_index: bool,
111 props: Option<ReaderProperties>,
112}
113
114impl ReadOptionsBuilder {
115 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
123 self.predicates.push(predicate);
124 self
125 }
126
127 pub fn with_range(mut self, start: i64, end: i64) -> Self {
130 assert!(start < end);
131 let predicate = move |rg: &RowGroupMetaData, _: usize| {
132 let mid = get_midpoint_offset(rg);
133 mid >= start && mid < end
134 };
135 self.predicates.push(Box::new(predicate));
136 self
137 }
138
139 pub fn with_page_index(mut self) -> Self {
144 self.enable_page_index = true;
145 self
146 }
147
148 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
150 self.props = Some(properties);
151 self
152 }
153
154 pub fn build(self) -> ReadOptions {
156 let props = self
157 .props
158 .unwrap_or_else(|| ReaderProperties::builder().build());
159 ReadOptions {
160 predicates: self.predicates,
161 enable_page_index: self.enable_page_index,
162 props,
163 }
164 }
165}
166
167pub struct ReadOptions {
172 predicates: Vec<ReadGroupPredicate>,
173 enable_page_index: bool,
174 props: ReaderProperties,
175}
176
177impl<R: 'static + ChunkReader> SerializedFileReader<R> {
178 pub fn new(chunk_reader: R) -> Result<Self> {
181 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
182 let props = Arc::new(ReaderProperties::builder().build());
183 Ok(Self {
184 chunk_reader: Arc::new(chunk_reader),
185 metadata: Arc::new(metadata),
186 props,
187 })
188 }
189
190 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
193 let mut metadata_builder = ParquetMetaDataReader::new()
194 .parse_and_finish(&chunk_reader)?
195 .into_builder();
196 let mut predicates = options.predicates;
197
198 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
200 let mut keep = true;
201 for predicate in &mut predicates {
202 if !predicate(&rg_meta, i) {
203 keep = false;
204 break;
205 }
206 }
207 if keep {
208 metadata_builder = metadata_builder.add_row_group(rg_meta);
209 }
210 }
211
212 let mut metadata = metadata_builder.build();
213
214 if options.enable_page_index {
216 let mut reader =
217 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
218 reader.read_page_indexes(&chunk_reader)?;
219 metadata = reader.finish()?;
220 }
221
222 Ok(Self {
223 chunk_reader: Arc::new(chunk_reader),
224 metadata: Arc::new(metadata),
225 props: Arc::new(options.props),
226 })
227 }
228}
229
230fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
232 let col = meta.column(0);
233 let mut offset = col.data_page_offset();
234 if let Some(dic_offset) = col.dictionary_page_offset() {
235 if offset > dic_offset {
236 offset = dic_offset
237 }
238 };
239 offset + meta.compressed_size() / 2
240}
241
242impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
243 fn metadata(&self) -> &ParquetMetaData {
244 &self.metadata
245 }
246
247 fn num_row_groups(&self) -> usize {
248 self.metadata.num_row_groups()
249 }
250
251 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
252 let row_group_metadata = self.metadata.row_group(i);
253 let props = Arc::clone(&self.props);
255 let f = Arc::clone(&self.chunk_reader);
256 Ok(Box::new(SerializedRowGroupReader::new(
257 f,
258 row_group_metadata,
259 self.metadata.offset_index().map(|x| x[i].as_slice()),
260 props,
261 )?))
262 }
263
264 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
265 RowIter::from_file(projection, self)
266 }
267}
268
269pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
271 chunk_reader: Arc<R>,
272 metadata: &'a RowGroupMetaData,
273 offset_index: Option<&'a [OffsetIndexMetaData]>,
274 props: ReaderPropertiesPtr,
275 bloom_filters: Vec<Option<Sbbf>>,
276}
277
278impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
279 pub fn new(
281 chunk_reader: Arc<R>,
282 metadata: &'a RowGroupMetaData,
283 offset_index: Option<&'a [OffsetIndexMetaData]>,
284 props: ReaderPropertiesPtr,
285 ) -> Result<Self> {
286 let bloom_filters = if props.read_bloom_filter() {
287 metadata
288 .columns()
289 .iter()
290 .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
291 .collect::<Result<Vec<_>>>()?
292 } else {
293 iter::repeat(None).take(metadata.columns().len()).collect()
294 };
295 Ok(Self {
296 chunk_reader,
297 metadata,
298 offset_index,
299 props,
300 bloom_filters,
301 })
302 }
303}
304
305impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
306 fn metadata(&self) -> &RowGroupMetaData {
307 self.metadata
308 }
309
310 fn num_columns(&self) -> usize {
311 self.metadata.num_columns()
312 }
313
314 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
316 let col = self.metadata.column(i);
317
318 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
319
320 let props = Arc::clone(&self.props);
321 Ok(Box::new(SerializedPageReader::new_with_properties(
322 Arc::clone(&self.chunk_reader),
323 col,
324 self.metadata.num_rows() as usize,
325 page_locations,
326 props,
327 )?))
328 }
329
330 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
332 self.bloom_filters[i].as_ref()
333 }
334
335 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
336 RowIter::from_row_group(projection, self)
337 }
338}
339
340pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
342 let mut prot = TCompactInputProtocol::new(input);
343 let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
344 Ok(page_header)
345}
346
347fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
349 struct TrackedRead<R> {
351 inner: R,
352 bytes_read: usize,
353 }
354
355 impl<R: Read> Read for TrackedRead<R> {
356 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
357 let v = self.inner.read(buf)?;
358 self.bytes_read += v;
359 Ok(v)
360 }
361 }
362
363 let mut tracked = TrackedRead {
364 inner: input,
365 bytes_read: 0,
366 };
367 let header = read_page_header(&mut tracked)?;
368 Ok((tracked.bytes_read, header))
369}
370
371pub(crate) fn decode_page(
373 page_header: PageHeader,
374 buffer: Bytes,
375 physical_type: Type,
376 decompressor: Option<&mut Box<dyn Codec>>,
377) -> Result<Page> {
378 #[cfg(feature = "crc")]
380 if let Some(expected_crc) = page_header.crc {
381 let crc = crc32fast::hash(&buffer);
382 if crc != expected_crc as u32 {
383 return Err(general_err!("Page CRC checksum mismatch"));
384 }
385 }
386
387 let mut offset: usize = 0;
394 let mut can_decompress = true;
395
396 if let Some(ref header_v2) = page_header.data_page_header_v2 {
397 offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
398 as usize;
399 can_decompress = header_v2.is_compressed.unwrap_or(true);
401 }
402
403 let buffer = match decompressor {
406 Some(decompressor) if can_decompress => {
407 let uncompressed_size = page_header.uncompressed_page_size as usize;
408 let mut decompressed = Vec::with_capacity(uncompressed_size);
409 let compressed = &buffer.as_ref()[offset..];
410 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
411 decompressor.decompress(
412 compressed,
413 &mut decompressed,
414 Some(uncompressed_size - offset),
415 )?;
416
417 if decompressed.len() != uncompressed_size {
418 return Err(general_err!(
419 "Actual decompressed size doesn't match the expected one ({} vs {})",
420 decompressed.len(),
421 uncompressed_size
422 ));
423 }
424
425 Bytes::from(decompressed)
426 }
427 _ => buffer,
428 };
429
430 let result = match page_header.type_ {
431 PageType::DICTIONARY_PAGE => {
432 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433 ParquetError::General("Missing dictionary page header".to_string())
434 })?;
435 let is_sorted = dict_header.is_sorted.unwrap_or(false);
436 Page::DictionaryPage {
437 buf: buffer,
438 num_values: dict_header.num_values as u32,
439 encoding: Encoding::try_from(dict_header.encoding)?,
440 is_sorted,
441 }
442 }
443 PageType::DATA_PAGE => {
444 let header = page_header
445 .data_page_header
446 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447 Page::DataPage {
448 buf: buffer,
449 num_values: header.num_values as u32,
450 encoding: Encoding::try_from(header.encoding)?,
451 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
452 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
453 statistics: statistics::from_thrift(physical_type, header.statistics)?,
454 }
455 }
456 PageType::DATA_PAGE_V2 => {
457 let header = page_header
458 .data_page_header_v2
459 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460 let is_compressed = header.is_compressed.unwrap_or(true);
461 Page::DataPageV2 {
462 buf: buffer,
463 num_values: header.num_values as u32,
464 encoding: Encoding::try_from(header.encoding)?,
465 num_nulls: header.num_nulls as u32,
466 num_rows: header.num_rows as u32,
467 def_levels_byte_len: header.definition_levels_byte_length as u32,
468 rep_levels_byte_len: header.repetition_levels_byte_length as u32,
469 is_compressed,
470 statistics: statistics::from_thrift(physical_type, header.statistics)?,
471 }
472 }
473 _ => {
474 unimplemented!("Page type {:?} is not supported", page_header.type_)
476 }
477 };
478
479 Ok(result)
480}
481
482enum SerializedPageReaderState {
483 Values {
484 offset: usize,
486
487 remaining_bytes: usize,
489
490 next_page_header: Option<Box<PageHeader>>,
492 },
493 Pages {
494 page_locations: VecDeque<PageLocation>,
496 dictionary_page: Option<PageLocation>,
498 total_rows: usize,
500 },
501}
502
503pub struct SerializedPageReader<R: ChunkReader> {
505 reader: Arc<R>,
507
508 decompressor: Option<Box<dyn Codec>>,
510
511 physical_type: Type,
513
514 state: SerializedPageReaderState,
515}
516
517impl<R: ChunkReader> SerializedPageReader<R> {
518 pub fn new(
520 reader: Arc<R>,
521 meta: &ColumnChunkMetaData,
522 total_rows: usize,
523 page_locations: Option<Vec<PageLocation>>,
524 ) -> Result<Self> {
525 let props = Arc::new(ReaderProperties::builder().build());
526 SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props)
527 }
528
529 pub fn new_with_properties(
531 reader: Arc<R>,
532 meta: &ColumnChunkMetaData,
533 total_rows: usize,
534 page_locations: Option<Vec<PageLocation>>,
535 props: ReaderPropertiesPtr,
536 ) -> Result<Self> {
537 let decompressor = create_codec(meta.compression(), props.codec_options())?;
538 let (start, len) = meta.byte_range();
539
540 let state = match page_locations {
541 Some(locations) => {
542 let dictionary_page = match locations.first() {
543 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
544 offset: start as i64,
545 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
546 first_row_index: 0,
547 }),
548 _ => None,
549 };
550
551 SerializedPageReaderState::Pages {
552 page_locations: locations.into(),
553 dictionary_page,
554 total_rows,
555 }
556 }
557 None => SerializedPageReaderState::Values {
558 offset: start as usize,
559 remaining_bytes: len as usize,
560 next_page_header: None,
561 },
562 };
563
564 Ok(Self {
565 reader,
566 decompressor,
567 state,
568 physical_type: meta.column_type(),
569 })
570 }
571}
572
573impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
574 type Item = Result<Page>;
575
576 fn next(&mut self) -> Option<Self::Item> {
577 self.get_next_page().transpose()
578 }
579}
580
581impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
582 fn get_next_page(&mut self) -> Result<Option<Page>> {
583 loop {
584 let page = match &mut self.state {
585 SerializedPageReaderState::Values {
586 offset,
587 remaining_bytes: remaining,
588 next_page_header,
589 } => {
590 if *remaining == 0 {
591 return Ok(None);
592 }
593
594 let mut read = self.reader.get_read(*offset as u64)?;
595 let header = if let Some(header) = next_page_header.take() {
596 *header
597 } else {
598 let (header_len, header) = read_page_header_len(&mut read)?;
599 *offset += header_len;
600 *remaining -= header_len;
601 header
602 };
603 let data_len = header.compressed_page_size as usize;
604 *offset += data_len;
605 *remaining -= data_len;
606
607 if header.type_ == PageType::INDEX_PAGE {
608 continue;
609 }
610
611 let mut buffer = Vec::with_capacity(data_len);
612 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
613
614 if read != data_len {
615 return Err(eof_err!(
616 "Expected to read {} bytes of page, read only {}",
617 data_len,
618 read
619 ));
620 }
621
622 decode_page(
623 header,
624 Bytes::from(buffer),
625 self.physical_type,
626 self.decompressor.as_mut(),
627 )?
628 }
629 SerializedPageReaderState::Pages {
630 page_locations,
631 dictionary_page,
632 ..
633 } => {
634 let front = match dictionary_page
635 .take()
636 .or_else(|| page_locations.pop_front())
637 {
638 Some(front) => front,
639 None => return Ok(None),
640 };
641
642 let page_len = front.compressed_page_size as usize;
643
644 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
645
646 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
647 let header = PageHeader::read_from_in_protocol(&mut prot)?;
648 let offset = buffer.len() - prot.as_slice().len();
649
650 let bytes = buffer.slice(offset..);
651 decode_page(
652 header,
653 bytes,
654 self.physical_type,
655 self.decompressor.as_mut(),
656 )?
657 }
658 };
659
660 return Ok(Some(page));
661 }
662 }
663
664 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
665 match &mut self.state {
666 SerializedPageReaderState::Values {
667 offset,
668 remaining_bytes,
669 next_page_header,
670 } => {
671 loop {
672 if *remaining_bytes == 0 {
673 return Ok(None);
674 }
675 return if let Some(header) = next_page_header.as_ref() {
676 if let Ok(page_meta) = (&**header).try_into() {
677 Ok(Some(page_meta))
678 } else {
679 *next_page_header = None;
681 continue;
682 }
683 } else {
684 let mut read = self.reader.get_read(*offset as u64)?;
685 let (header_len, header) = read_page_header_len(&mut read)?;
686 *offset += header_len;
687 *remaining_bytes -= header_len;
688 let page_meta = if let Ok(page_meta) = (&header).try_into() {
689 Ok(Some(page_meta))
690 } else {
691 continue;
693 };
694 *next_page_header = Some(Box::new(header));
695 page_meta
696 };
697 }
698 }
699 SerializedPageReaderState::Pages {
700 page_locations,
701 dictionary_page,
702 total_rows,
703 } => {
704 if dictionary_page.is_some() {
705 Ok(Some(PageMetadata {
706 num_rows: None,
707 num_levels: None,
708 is_dict: true,
709 }))
710 } else if let Some(page) = page_locations.front() {
711 let next_rows = page_locations
712 .get(1)
713 .map(|x| x.first_row_index as usize)
714 .unwrap_or(*total_rows);
715
716 Ok(Some(PageMetadata {
717 num_rows: Some(next_rows - page.first_row_index as usize),
718 num_levels: None,
719 is_dict: false,
720 }))
721 } else {
722 Ok(None)
723 }
724 }
725 }
726 }
727
728 fn skip_next_page(&mut self) -> Result<()> {
729 match &mut self.state {
730 SerializedPageReaderState::Values {
731 offset,
732 remaining_bytes,
733 next_page_header,
734 } => {
735 if let Some(buffered_header) = next_page_header.take() {
736 *offset += buffered_header.compressed_page_size as usize;
738 *remaining_bytes -= buffered_header.compressed_page_size as usize;
739 } else {
740 let mut read = self.reader.get_read(*offset as u64)?;
741 let (header_len, header) = read_page_header_len(&mut read)?;
742 let data_page_size = header.compressed_page_size as usize;
743 *offset += header_len + data_page_size;
744 *remaining_bytes -= header_len + data_page_size;
745 }
746 Ok(())
747 }
748 SerializedPageReaderState::Pages { page_locations, .. } => {
749 page_locations.pop_front();
750
751 Ok(())
752 }
753 }
754 }
755
756 fn at_record_boundary(&mut self) -> Result<bool> {
757 match &mut self.state {
758 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
759 SerializedPageReaderState::Pages { .. } => Ok(true),
760 }
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use bytes::Buf;
767
768 use crate::file::properties::{EnabledStatistics, WriterProperties};
769 use crate::format::BoundaryOrder;
770
771 use crate::basic::{self, ColumnOrder};
772 use crate::column::reader::ColumnReader;
773 use crate::data_type::private::ParquetValueType;
774 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
775 use crate::file::page_index::index::{Index, NativeIndex};
776 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
777 use crate::file::writer::SerializedFileWriter;
778 use crate::record::RowAccessor;
779 use crate::schema::parser::parse_message_type;
780 use crate::util::test_common::file_util::{get_test_file, get_test_path};
781
782 use super::*;
783
784 #[test]
785 fn test_cursor_and_file_has_the_same_behaviour() {
786 let mut buf: Vec<u8> = Vec::new();
787 get_test_file("alltypes_plain.parquet")
788 .read_to_end(&mut buf)
789 .unwrap();
790 let cursor = Bytes::from(buf);
791 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
792
793 let test_file = get_test_file("alltypes_plain.parquet");
794 let read_from_file = SerializedFileReader::new(test_file).unwrap();
795
796 let file_iter = read_from_file.get_row_iter(None).unwrap();
797 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
798
799 for (a, b) in file_iter.zip(cursor_iter) {
800 assert_eq!(a.unwrap(), b.unwrap())
801 }
802 }
803
804 #[test]
805 fn test_file_reader_try_from() {
806 let test_file = get_test_file("alltypes_plain.parquet");
808 let test_path_buf = get_test_path("alltypes_plain.parquet");
809 let test_path = test_path_buf.as_path();
810 let test_path_str = test_path.to_str().unwrap();
811
812 let reader = SerializedFileReader::try_from(test_file);
813 assert!(reader.is_ok());
814
815 let reader = SerializedFileReader::try_from(test_path);
816 assert!(reader.is_ok());
817
818 let reader = SerializedFileReader::try_from(test_path_str);
819 assert!(reader.is_ok());
820
821 let reader = SerializedFileReader::try_from(test_path_str.to_string());
822 assert!(reader.is_ok());
823
824 let test_path = Path::new("invalid.parquet");
826 let test_path_str = test_path.to_str().unwrap();
827
828 let reader = SerializedFileReader::try_from(test_path);
829 assert!(reader.is_err());
830
831 let reader = SerializedFileReader::try_from(test_path_str);
832 assert!(reader.is_err());
833
834 let reader = SerializedFileReader::try_from(test_path_str.to_string());
835 assert!(reader.is_err());
836 }
837
838 #[test]
839 fn test_file_reader_into_iter() {
840 let path = get_test_path("alltypes_plain.parquet");
841 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
842 let iter = reader.into_iter();
843 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
844
845 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
846 }
847
848 #[test]
849 fn test_file_reader_into_iter_project() {
850 let path = get_test_path("alltypes_plain.parquet");
851 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
852 let schema = "message schema { OPTIONAL INT32 id; }";
853 let proj = parse_message_type(schema).ok();
854 let iter = reader.into_iter().project(proj).unwrap();
855 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
856
857 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
858 }
859
860 #[test]
861 fn test_reuse_file_chunk() {
862 let test_file = get_test_file("alltypes_plain.parquet");
866 let reader = SerializedFileReader::new(test_file).unwrap();
867 let row_group = reader.get_row_group(0).unwrap();
868
869 let mut page_readers = Vec::new();
870 for i in 0..row_group.num_columns() {
871 page_readers.push(row_group.get_column_page_reader(i).unwrap());
872 }
873
874 for mut page_reader in page_readers {
877 assert!(page_reader.get_next_page().is_ok());
878 }
879 }
880
881 #[test]
882 fn test_file_reader() {
883 let test_file = get_test_file("alltypes_plain.parquet");
884 let reader_result = SerializedFileReader::new(test_file);
885 assert!(reader_result.is_ok());
886 let reader = reader_result.unwrap();
887
888 let metadata = reader.metadata();
890 assert_eq!(metadata.num_row_groups(), 1);
891
892 let file_metadata = metadata.file_metadata();
894 assert!(file_metadata.created_by().is_some());
895 assert_eq!(
896 file_metadata.created_by().unwrap(),
897 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
898 );
899 assert!(file_metadata.key_value_metadata().is_none());
900 assert_eq!(file_metadata.num_rows(), 8);
901 assert_eq!(file_metadata.version(), 1);
902 assert_eq!(file_metadata.column_orders(), None);
903
904 let row_group_metadata = metadata.row_group(0);
906 assert_eq!(row_group_metadata.num_columns(), 11);
907 assert_eq!(row_group_metadata.num_rows(), 8);
908 assert_eq!(row_group_metadata.total_byte_size(), 671);
909 for i in 0..row_group_metadata.num_columns() {
911 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
912 }
913
914 let row_group_reader_result = reader.get_row_group(0);
916 assert!(row_group_reader_result.is_ok());
917 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
918 assert_eq!(
919 row_group_reader.num_columns(),
920 row_group_metadata.num_columns()
921 );
922 assert_eq!(
923 row_group_reader.metadata().total_byte_size(),
924 row_group_metadata.total_byte_size()
925 );
926
927 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
930 assert!(page_reader_0_result.is_ok());
931 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
932 let mut page_count = 0;
933 while let Ok(Some(page)) = page_reader_0.get_next_page() {
934 let is_expected_page = match page {
935 Page::DictionaryPage {
936 buf,
937 num_values,
938 encoding,
939 is_sorted,
940 } => {
941 assert_eq!(buf.len(), 32);
942 assert_eq!(num_values, 8);
943 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
944 assert!(!is_sorted);
945 true
946 }
947 Page::DataPage {
948 buf,
949 num_values,
950 encoding,
951 def_level_encoding,
952 rep_level_encoding,
953 statistics,
954 } => {
955 assert_eq!(buf.len(), 11);
956 assert_eq!(num_values, 8);
957 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
958 assert_eq!(def_level_encoding, Encoding::RLE);
959 #[allow(deprecated)]
960 let expected_rep_level_encoding = Encoding::BIT_PACKED;
961 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
962 assert!(statistics.is_none());
963 true
964 }
965 _ => false,
966 };
967 assert!(is_expected_page);
968 page_count += 1;
969 }
970 assert_eq!(page_count, 2);
971 }
972
973 #[test]
974 fn test_file_reader_datapage_v2() {
975 let test_file = get_test_file("datapage_v2.snappy.parquet");
976 let reader_result = SerializedFileReader::new(test_file);
977 assert!(reader_result.is_ok());
978 let reader = reader_result.unwrap();
979
980 let metadata = reader.metadata();
982 assert_eq!(metadata.num_row_groups(), 1);
983
984 let file_metadata = metadata.file_metadata();
986 assert!(file_metadata.created_by().is_some());
987 assert_eq!(
988 file_metadata.created_by().unwrap(),
989 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
990 );
991 assert!(file_metadata.key_value_metadata().is_some());
992 assert_eq!(
993 file_metadata.key_value_metadata().to_owned().unwrap().len(),
994 1
995 );
996
997 assert_eq!(file_metadata.num_rows(), 5);
998 assert_eq!(file_metadata.version(), 1);
999 assert_eq!(file_metadata.column_orders(), None);
1000
1001 let row_group_metadata = metadata.row_group(0);
1002
1003 for i in 0..row_group_metadata.num_columns() {
1005 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1006 }
1007
1008 let row_group_reader_result = reader.get_row_group(0);
1010 assert!(row_group_reader_result.is_ok());
1011 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1012 assert_eq!(
1013 row_group_reader.num_columns(),
1014 row_group_metadata.num_columns()
1015 );
1016 assert_eq!(
1017 row_group_reader.metadata().total_byte_size(),
1018 row_group_metadata.total_byte_size()
1019 );
1020
1021 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1024 assert!(page_reader_0_result.is_ok());
1025 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1026 let mut page_count = 0;
1027 while let Ok(Some(page)) = page_reader_0.get_next_page() {
1028 let is_expected_page = match page {
1029 Page::DictionaryPage {
1030 buf,
1031 num_values,
1032 encoding,
1033 is_sorted,
1034 } => {
1035 assert_eq!(buf.len(), 7);
1036 assert_eq!(num_values, 1);
1037 assert_eq!(encoding, Encoding::PLAIN);
1038 assert!(!is_sorted);
1039 true
1040 }
1041 Page::DataPageV2 {
1042 buf,
1043 num_values,
1044 encoding,
1045 num_nulls,
1046 num_rows,
1047 def_levels_byte_len,
1048 rep_levels_byte_len,
1049 is_compressed,
1050 statistics,
1051 } => {
1052 assert_eq!(buf.len(), 4);
1053 assert_eq!(num_values, 5);
1054 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1055 assert_eq!(num_nulls, 1);
1056 assert_eq!(num_rows, 5);
1057 assert_eq!(def_levels_byte_len, 2);
1058 assert_eq!(rep_levels_byte_len, 0);
1059 assert!(is_compressed);
1060 assert!(statistics.is_some());
1061 true
1062 }
1063 _ => false,
1064 };
1065 assert!(is_expected_page);
1066 page_count += 1;
1067 }
1068 assert_eq!(page_count, 2);
1069 }
1070
1071 #[test]
1072 fn test_page_iterator() {
1073 let file = get_test_file("alltypes_plain.parquet");
1074 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1075
1076 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1077
1078 let page = page_iterator.next();
1080 assert!(page.is_some());
1081 assert!(page.unwrap().is_ok());
1082
1083 let page = page_iterator.next();
1085 assert!(page.is_none());
1086
1087 let row_group_indices = Box::new(0..1);
1088 let mut page_iterator =
1089 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1090
1091 let page = page_iterator.next();
1093 assert!(page.is_some());
1094 assert!(page.unwrap().is_ok());
1095
1096 let page = page_iterator.next();
1098 assert!(page.is_none());
1099 }
1100
1101 #[test]
1102 fn test_file_reader_key_value_metadata() {
1103 let file = get_test_file("binary.parquet");
1104 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1105
1106 let metadata = file_reader
1107 .metadata
1108 .file_metadata()
1109 .key_value_metadata()
1110 .unwrap();
1111
1112 assert_eq!(metadata.len(), 3);
1113
1114 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1115
1116 assert_eq!(metadata[1].key, "writer.model.name");
1117 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1118
1119 assert_eq!(metadata[2].key, "parquet.proto.class");
1120 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1121 }
1122
1123 #[test]
1124 fn test_file_reader_optional_metadata() {
1125 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1127 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1128
1129 let row_group_metadata = file_reader.metadata.row_group(0);
1130 let col0_metadata = row_group_metadata.column(0);
1131
1132 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1134
1135 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1137
1138 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1139 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1140 assert_eq!(page_encoding_stats.count, 1);
1141
1142 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1144 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1145
1146 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1148 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1149 }
1150
1151 #[test]
1152 fn test_file_reader_with_no_filter() -> Result<()> {
1153 let test_file = get_test_file("alltypes_plain.parquet");
1154 let origin_reader = SerializedFileReader::new(test_file)?;
1155 let metadata = origin_reader.metadata();
1157 assert_eq!(metadata.num_row_groups(), 1);
1158 Ok(())
1159 }
1160
1161 #[test]
1162 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1163 let test_file = get_test_file("alltypes_plain.parquet");
1164 let read_options = ReadOptionsBuilder::new()
1165 .with_predicate(Box::new(|_, _| false))
1166 .build();
1167 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1168 let metadata = reader.metadata();
1169 assert_eq!(metadata.num_row_groups(), 0);
1170 Ok(())
1171 }
1172
1173 #[test]
1174 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1175 let test_file = get_test_file("alltypes_plain.parquet");
1176 let origin_reader = SerializedFileReader::new(test_file)?;
1177 let metadata = origin_reader.metadata();
1179 assert_eq!(metadata.num_row_groups(), 1);
1180 let mid = get_midpoint_offset(metadata.row_group(0));
1181
1182 let test_file = get_test_file("alltypes_plain.parquet");
1183 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1184 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1185 let metadata = reader.metadata();
1186 assert_eq!(metadata.num_row_groups(), 1);
1187
1188 let test_file = get_test_file("alltypes_plain.parquet");
1189 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1190 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1191 let metadata = reader.metadata();
1192 assert_eq!(metadata.num_row_groups(), 0);
1193 Ok(())
1194 }
1195
1196 #[test]
1197 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1198 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1199 let origin_reader = SerializedFileReader::new(test_file)?;
1200 let metadata = origin_reader.metadata();
1201 let mid = get_midpoint_offset(metadata.row_group(0));
1202
1203 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1205 let read_options = ReadOptionsBuilder::new()
1206 .with_page_index()
1207 .with_predicate(Box::new(|_, _| true))
1208 .with_range(mid, mid + 1)
1209 .build();
1210 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1211 let metadata = reader.metadata();
1212 assert_eq!(metadata.num_row_groups(), 1);
1213 assert_eq!(metadata.column_index().unwrap().len(), 1);
1214 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1215
1216 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1218 let read_options = ReadOptionsBuilder::new()
1219 .with_page_index()
1220 .with_predicate(Box::new(|_, _| true))
1221 .with_range(0, mid)
1222 .build();
1223 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1224 let metadata = reader.metadata();
1225 assert_eq!(metadata.num_row_groups(), 0);
1226 assert_eq!(metadata.column_index().unwrap().len(), 0);
1227 assert_eq!(metadata.offset_index().unwrap().len(), 0);
1228
1229 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1231 let read_options = ReadOptionsBuilder::new()
1232 .with_page_index()
1233 .with_predicate(Box::new(|_, _| false))
1234 .with_range(mid, mid + 1)
1235 .build();
1236 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1237 let metadata = reader.metadata();
1238 assert_eq!(metadata.num_row_groups(), 0);
1239 assert_eq!(metadata.column_index().unwrap().len(), 0);
1240 assert_eq!(metadata.offset_index().unwrap().len(), 0);
1241
1242 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1244 let read_options = ReadOptionsBuilder::new()
1245 .with_page_index()
1246 .with_predicate(Box::new(|_, _| false))
1247 .with_range(0, mid)
1248 .build();
1249 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1250 let metadata = reader.metadata();
1251 assert_eq!(metadata.num_row_groups(), 0);
1252 assert_eq!(metadata.column_index().unwrap().len(), 0);
1253 assert_eq!(metadata.offset_index().unwrap().len(), 0);
1254 Ok(())
1255 }
1256
1257 #[test]
1258 fn test_file_reader_invalid_metadata() {
1259 let data = [
1260 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1261 80, 65, 82, 49,
1262 ];
1263 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1264 assert_eq!(
1265 ret.err().unwrap().to_string(),
1266 "Parquet error: Could not parse metadata: bad data"
1267 );
1268 }
1269
1270 #[test]
1271 fn test_page_index_reader() {
1288 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1289 let builder = ReadOptionsBuilder::new();
1290 let options = builder.with_page_index().build();
1292 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1293 let reader = reader_result.unwrap();
1294
1295 let metadata = reader.metadata();
1297 assert_eq!(metadata.num_row_groups(), 1);
1298
1299 let column_index = metadata.column_index().unwrap();
1300
1301 assert_eq!(column_index.len(), 1);
1303 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1304 index
1305 } else {
1306 unreachable!()
1307 };
1308
1309 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1310 let index_in_pages = &index.indexes;
1311
1312 assert_eq!(index_in_pages.len(), 1);
1314
1315 let page0 = &index_in_pages[0];
1316 let min = page0.min.as_ref().unwrap();
1317 let max = page0.max.as_ref().unwrap();
1318 assert_eq!(b"Hello", min.as_bytes());
1319 assert_eq!(b"today", max.as_bytes());
1320
1321 let offset_indexes = metadata.offset_index().unwrap();
1322 assert_eq!(offset_indexes.len(), 1);
1324 let offset_index = &offset_indexes[0];
1325 let page_offset = &offset_index[0].page_locations()[0];
1326
1327 assert_eq!(4, page_offset.offset);
1328 assert_eq!(152, page_offset.compressed_page_size);
1329 assert_eq!(0, page_offset.first_row_index);
1330 }
1331
1332 #[test]
1333 fn test_page_index_reader_out_of_order() {
1334 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1335 let options = ReadOptionsBuilder::new().with_page_index().build();
1336 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1337 let metadata = reader.metadata();
1338
1339 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1340 let columns = metadata.row_group(0).columns();
1341 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1342
1343 let a = read_columns_indexes(&test_file, columns).unwrap();
1344 let mut b = read_columns_indexes(&test_file, &reversed).unwrap();
1345 b.reverse();
1346 assert_eq!(a, b);
1347
1348 let a = read_offset_indexes(&test_file, columns).unwrap();
1349 let mut b = read_offset_indexes(&test_file, &reversed).unwrap();
1350 b.reverse();
1351 assert_eq!(a, b);
1352 }
1353
1354 #[test]
1355 fn test_page_index_reader_all_type() {
1356 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1357 let builder = ReadOptionsBuilder::new();
1358 let options = builder.with_page_index().build();
1360 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1361 let reader = reader_result.unwrap();
1362
1363 let metadata = reader.metadata();
1365 assert_eq!(metadata.num_row_groups(), 1);
1366
1367 let column_index = metadata.column_index().unwrap();
1368 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1369
1370 assert_eq!(column_index.len(), 1);
1372 let row_group_metadata = metadata.row_group(0);
1373
1374 assert!(!&column_index[0][0].is_sorted());
1376 let boundary_order = &column_index[0][0].get_boundary_order();
1377 assert!(boundary_order.is_some());
1378 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1379 if let Index::INT32(index) = &column_index[0][0] {
1380 check_native_page_index(
1381 index,
1382 325,
1383 get_row_group_min_max_bytes(row_group_metadata, 0),
1384 BoundaryOrder::UNORDERED,
1385 );
1386 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1387 } else {
1388 unreachable!()
1389 };
1390 assert!(&column_index[0][1].is_sorted());
1392 if let Index::BOOLEAN(index) = &column_index[0][1] {
1393 assert_eq!(index.indexes.len(), 82);
1394 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1395 } else {
1396 unreachable!()
1397 };
1398 assert!(&column_index[0][2].is_sorted());
1400 if let Index::INT32(index) = &column_index[0][2] {
1401 check_native_page_index(
1402 index,
1403 325,
1404 get_row_group_min_max_bytes(row_group_metadata, 2),
1405 BoundaryOrder::ASCENDING,
1406 );
1407 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1408 } else {
1409 unreachable!()
1410 };
1411 assert!(&column_index[0][3].is_sorted());
1413 if let Index::INT32(index) = &column_index[0][3] {
1414 check_native_page_index(
1415 index,
1416 325,
1417 get_row_group_min_max_bytes(row_group_metadata, 3),
1418 BoundaryOrder::ASCENDING,
1419 );
1420 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1421 } else {
1422 unreachable!()
1423 };
1424 assert!(&column_index[0][4].is_sorted());
1426 if let Index::INT32(index) = &column_index[0][4] {
1427 check_native_page_index(
1428 index,
1429 325,
1430 get_row_group_min_max_bytes(row_group_metadata, 4),
1431 BoundaryOrder::ASCENDING,
1432 );
1433 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1434 } else {
1435 unreachable!()
1436 };
1437 assert!(!&column_index[0][5].is_sorted());
1439 if let Index::INT64(index) = &column_index[0][5] {
1440 check_native_page_index(
1441 index,
1442 528,
1443 get_row_group_min_max_bytes(row_group_metadata, 5),
1444 BoundaryOrder::UNORDERED,
1445 );
1446 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1447 } else {
1448 unreachable!()
1449 };
1450 assert!(&column_index[0][6].is_sorted());
1452 if let Index::FLOAT(index) = &column_index[0][6] {
1453 check_native_page_index(
1454 index,
1455 325,
1456 get_row_group_min_max_bytes(row_group_metadata, 6),
1457 BoundaryOrder::ASCENDING,
1458 );
1459 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1460 } else {
1461 unreachable!()
1462 };
1463 assert!(!&column_index[0][7].is_sorted());
1465 if let Index::DOUBLE(index) = &column_index[0][7] {
1466 check_native_page_index(
1467 index,
1468 528,
1469 get_row_group_min_max_bytes(row_group_metadata, 7),
1470 BoundaryOrder::UNORDERED,
1471 );
1472 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1473 } else {
1474 unreachable!()
1475 };
1476 assert!(!&column_index[0][8].is_sorted());
1478 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1479 check_native_page_index(
1480 index,
1481 974,
1482 get_row_group_min_max_bytes(row_group_metadata, 8),
1483 BoundaryOrder::UNORDERED,
1484 );
1485 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1486 } else {
1487 unreachable!()
1488 };
1489 assert!(&column_index[0][9].is_sorted());
1491 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1492 check_native_page_index(
1493 index,
1494 352,
1495 get_row_group_min_max_bytes(row_group_metadata, 9),
1496 BoundaryOrder::ASCENDING,
1497 );
1498 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1499 } else {
1500 unreachable!()
1501 };
1502 assert!(!&column_index[0][10].is_sorted());
1505 if let Index::NONE = &column_index[0][10] {
1506 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1507 } else {
1508 unreachable!()
1509 };
1510 assert!(&column_index[0][11].is_sorted());
1512 if let Index::INT32(index) = &column_index[0][11] {
1513 check_native_page_index(
1514 index,
1515 325,
1516 get_row_group_min_max_bytes(row_group_metadata, 11),
1517 BoundaryOrder::ASCENDING,
1518 );
1519 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1520 } else {
1521 unreachable!()
1522 };
1523 assert!(!&column_index[0][12].is_sorted());
1525 if let Index::INT32(index) = &column_index[0][12] {
1526 check_native_page_index(
1527 index,
1528 325,
1529 get_row_group_min_max_bytes(row_group_metadata, 12),
1530 BoundaryOrder::UNORDERED,
1531 );
1532 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1533 } else {
1534 unreachable!()
1535 };
1536 }
1537
1538 fn check_native_page_index<T: ParquetValueType>(
1539 row_group_index: &NativeIndex<T>,
1540 page_size: usize,
1541 min_max: (&[u8], &[u8]),
1542 boundary_order: BoundaryOrder,
1543 ) {
1544 assert_eq!(row_group_index.indexes.len(), page_size);
1545 assert_eq!(row_group_index.boundary_order, boundary_order);
1546 row_group_index.indexes.iter().all(|x| {
1547 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1548 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1549 });
1550 }
1551
1552 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1553 let statistics = r.column(col_num).statistics().unwrap();
1554 (
1555 statistics.min_bytes_opt().unwrap_or_default(),
1556 statistics.max_bytes_opt().unwrap_or_default(),
1557 )
1558 }
1559
1560 #[test]
1561 fn test_skip_page_with_offset_index() {
1562 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1563 let builder = ReadOptionsBuilder::new();
1564 let options = builder.with_page_index().build();
1566 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1567 let reader = reader_result.unwrap();
1568
1569 let row_group_reader = reader.get_row_group(0).unwrap();
1570
1571 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1573
1574 let mut vec = vec![];
1575
1576 for i in 0..325 {
1577 if i % 2 == 0 {
1578 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1579 } else {
1580 column_page_reader.skip_next_page().unwrap();
1581 }
1582 }
1583 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1585 assert!(column_page_reader.get_next_page().unwrap().is_none());
1586
1587 assert_eq!(vec.len(), 163);
1588 }
1589
1590 #[test]
1591 fn test_skip_page_without_offset_index() {
1592 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1593
1594 let reader_result = SerializedFileReader::new(test_file);
1596 let reader = reader_result.unwrap();
1597
1598 let row_group_reader = reader.get_row_group(0).unwrap();
1599
1600 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1602
1603 let mut vec = vec![];
1604
1605 for i in 0..325 {
1606 if i % 2 == 0 {
1607 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1608 } else {
1609 column_page_reader.peek_next_page().unwrap().unwrap();
1610 column_page_reader.skip_next_page().unwrap();
1611 }
1612 }
1613 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1615 assert!(column_page_reader.get_next_page().unwrap().is_none());
1616
1617 assert_eq!(vec.len(), 163);
1618 }
1619
1620 #[test]
1621 fn test_peek_page_with_dictionary_page() {
1622 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1623 let builder = ReadOptionsBuilder::new();
1624 let options = builder.with_page_index().build();
1626 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1627 let reader = reader_result.unwrap();
1628 let row_group_reader = reader.get_row_group(0).unwrap();
1629
1630 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1632
1633 let mut vec = vec![];
1634
1635 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1636 assert!(meta.is_dict);
1637 let page = column_page_reader.get_next_page().unwrap().unwrap();
1638 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1639
1640 for i in 0..352 {
1641 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1642 if i != 351 {
1645 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1646 } else {
1647 assert_eq!(meta.num_rows, Some(10));
1650 }
1651 assert!(!meta.is_dict);
1652 vec.push(meta);
1653 let page = column_page_reader.get_next_page().unwrap().unwrap();
1654 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1655 }
1656
1657 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1659 assert!(column_page_reader.get_next_page().unwrap().is_none());
1660
1661 assert_eq!(vec.len(), 352);
1662 }
1663
1664 #[test]
1665 fn test_peek_page_with_dictionary_page_without_offset_index() {
1666 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1667
1668 let reader_result = SerializedFileReader::new(test_file);
1669 let reader = reader_result.unwrap();
1670 let row_group_reader = reader.get_row_group(0).unwrap();
1671
1672 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1674
1675 let mut vec = vec![];
1676
1677 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1678 assert!(meta.is_dict);
1679 let page = column_page_reader.get_next_page().unwrap().unwrap();
1680 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1681
1682 for i in 0..352 {
1683 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1684 if i != 351 {
1687 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
1688 } else {
1689 assert_eq!(meta.num_levels, Some(10));
1692 }
1693 assert!(!meta.is_dict);
1694 vec.push(meta);
1695 let page = column_page_reader.get_next_page().unwrap().unwrap();
1696 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1697 }
1698
1699 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1701 assert!(column_page_reader.get_next_page().unwrap().is_none());
1702
1703 assert_eq!(vec.len(), 352);
1704 }
1705
1706 #[test]
1707 fn test_fixed_length_index() {
1708 let message_type = "
1709 message test_schema {
1710 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
1711 }
1712 ";
1713
1714 let schema = parse_message_type(message_type).unwrap();
1715 let mut out = Vec::with_capacity(1024);
1716 let mut writer =
1717 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
1718
1719 let mut r = writer.next_row_group().unwrap();
1720 let mut c = r.next_column().unwrap().unwrap();
1721 c.typed::<FixedLenByteArrayType>()
1722 .write_batch(
1723 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
1724 Some(&[1, 1, 0, 1]),
1725 None,
1726 )
1727 .unwrap();
1728 c.close().unwrap();
1729 r.close().unwrap();
1730 writer.close().unwrap();
1731
1732 let b = Bytes::from(out);
1733 let options = ReadOptionsBuilder::new().with_page_index().build();
1734 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
1735 let index = reader.metadata().column_index().unwrap();
1736
1737 assert_eq!(index.len(), 1);
1739 let c = &index[0];
1740 assert_eq!(c.len(), 1);
1742
1743 match &c[0] {
1744 Index::FIXED_LEN_BYTE_ARRAY(v) => {
1745 assert_eq!(v.indexes.len(), 1);
1746 let page_idx = &v.indexes[0];
1747 assert_eq!(page_idx.null_count.unwrap(), 1);
1748 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
1749 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
1750 }
1751 _ => unreachable!(),
1752 }
1753 }
1754
1755 #[test]
1756 fn test_multi_gz() {
1757 let file = get_test_file("concatenated_gzip_members.parquet");
1758 let reader = SerializedFileReader::new(file).unwrap();
1759 let row_group_reader = reader.get_row_group(0).unwrap();
1760 match row_group_reader.get_column_reader(0).unwrap() {
1761 ColumnReader::Int64ColumnReader(mut reader) => {
1762 let mut buffer = Vec::with_capacity(1024);
1763 let mut def_levels = Vec::with_capacity(1024);
1764 let (num_records, num_values, num_levels) = reader
1765 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
1766 .unwrap();
1767
1768 assert_eq!(num_records, 513);
1769 assert_eq!(num_values, 513);
1770 assert_eq!(num_levels, 513);
1771
1772 let expected: Vec<i64> = (1..514).collect();
1773 assert_eq!(&buffer, &expected);
1774 }
1775 _ => unreachable!(),
1776 }
1777 }
1778
1779 #[test]
1780 fn test_byte_stream_split_extended() {
1781 let path = format!(
1782 "{}/byte_stream_split_extended.gzip.parquet",
1783 arrow::util::test_util::parquet_test_data(),
1784 );
1785 let file = File::open(path).unwrap();
1786 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
1787
1788 let mut iter = reader
1790 .get_row_iter(None)
1791 .expect("Failed to create row iterator");
1792
1793 let mut start = 0;
1794 let end = reader.metadata().file_metadata().num_rows();
1795
1796 let check_row = |row: Result<Row, ParquetError>| {
1797 assert!(row.is_ok());
1798 let r = row.unwrap();
1799 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
1800 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
1801 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
1802 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
1803 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
1804 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
1805 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
1806 };
1807
1808 while start < end {
1809 match iter.next() {
1810 Some(row) => check_row(row),
1811 None => break,
1812 };
1813 start += 1;
1814 }
1815 }
1816
1817 #[test]
1818 fn test_filtered_rowgroup_metadata() {
1819 let message_type = "
1820 message test_schema {
1821 REQUIRED INT32 a;
1822 }
1823 ";
1824 let schema = Arc::new(parse_message_type(message_type).unwrap());
1825 let props = Arc::new(
1826 WriterProperties::builder()
1827 .set_statistics_enabled(EnabledStatistics::Page)
1828 .build(),
1829 );
1830 let mut file: File = tempfile::tempfile().unwrap();
1831 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1832 let data = [1, 2, 3, 4, 5];
1833
1834 for idx in 0..5 {
1836 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
1837 let mut row_group_writer = file_writer.next_row_group().unwrap();
1838 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1839 writer
1840 .typed::<Int32Type>()
1841 .write_batch(data_i.as_slice(), None, None)
1842 .unwrap();
1843 writer.close().unwrap();
1844 }
1845 row_group_writer.close().unwrap();
1846 file_writer.flushed_row_groups();
1847 }
1848 let file_metadata = file_writer.close().unwrap();
1849
1850 assert_eq!(file_metadata.num_rows, 25);
1851 assert_eq!(file_metadata.row_groups.len(), 5);
1852
1853 let read_options = ReadOptionsBuilder::new()
1855 .with_page_index()
1856 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
1857 .build();
1858 let reader =
1859 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
1860 .unwrap();
1861 let metadata = reader.metadata();
1862
1863 assert_eq!(metadata.num_row_groups(), 1);
1865 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
1866
1867 assert!(metadata.column_index().is_some());
1869 assert!(metadata.offset_index().is_some());
1870 assert_eq!(metadata.column_index().unwrap().len(), 1);
1871 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1872 let col_idx = metadata.column_index().unwrap();
1873 let off_idx = metadata.offset_index().unwrap();
1874 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
1875 let pg_idx = &col_idx[0][0];
1876 let off_idx_i = &off_idx[0][0];
1877
1878 match pg_idx {
1880 Index::INT32(int_idx) => {
1881 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
1882 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
1883 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
1884 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
1885 }
1886 _ => panic!("wrong stats type"),
1887 }
1888
1889 assert_eq!(
1891 off_idx_i.page_locations[0].offset,
1892 metadata.row_group(0).column(0).data_page_offset()
1893 );
1894
1895 let read_options = ReadOptionsBuilder::new()
1897 .with_page_index()
1898 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
1899 .build();
1900 let reader =
1901 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
1902 .unwrap();
1903 let metadata = reader.metadata();
1904
1905 assert_eq!(metadata.num_row_groups(), 2);
1907 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
1908 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
1909
1910 assert!(metadata.column_index().is_some());
1912 assert!(metadata.offset_index().is_some());
1913 assert_eq!(metadata.column_index().unwrap().len(), 2);
1914 assert_eq!(metadata.offset_index().unwrap().len(), 2);
1915 let col_idx = metadata.column_index().unwrap();
1916 let off_idx = metadata.offset_index().unwrap();
1917
1918 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
1919 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
1920 let pg_idx = &col_idx_i[0];
1921 let off_idx_i = &off_idx[i][0];
1922
1923 match pg_idx {
1925 Index::INT32(int_idx) => {
1926 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
1927 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
1928 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
1929 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
1930 }
1931 _ => panic!("wrong stats type"),
1932 }
1933
1934 assert_eq!(
1936 off_idx_i.page_locations[0].offset,
1937 metadata.row_group(i).column(0).data_page_offset()
1938 );
1939 }
1940 }
1941}