1use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
25use crate::file::page_index::index::Index;
26use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
27use crate::file::reader::ChunkReader;
28use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
29use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
30use crate::schema::types;
31use crate::schema::types::SchemaDescriptor;
32use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
33
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::MetadataFetch;
36
37#[derive(Default)]
63pub struct ParquetMetaDataReader {
64 metadata: Option<ParquetMetaData>,
65 column_index: bool,
66 offset_index: bool,
67 prefetch_hint: Option<usize>,
68 metadata_size: Option<usize>,
71}
72
73impl ParquetMetaDataReader {
74 pub fn new() -> Self {
76 Default::default()
77 }
78
79 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
82 Self {
83 metadata: Some(metadata),
84 ..Default::default()
85 }
86 }
87
88 pub fn with_page_indexes(self, val: bool) -> Self {
94 self.with_column_indexes(val).with_offset_indexes(val)
95 }
96
97 pub fn with_column_indexes(mut self, val: bool) -> Self {
101 self.column_index = val;
102 self
103 }
104
105 pub fn with_offset_indexes(mut self, val: bool) -> Self {
109 self.offset_index = val;
110 self
111 }
112
113 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
125 self.prefetch_hint = prefetch;
126 self
127 }
128
129 pub fn has_metadata(&self) -> bool {
131 self.metadata.is_some()
132 }
133
134 pub fn finish(&mut self) -> Result<ParquetMetaData> {
136 self.metadata
137 .take()
138 .ok_or_else(|| general_err!("could not parse parquet metadata"))
139 }
140
141 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
160 self.try_parse(reader)?;
161 self.finish()
162 }
163
164 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
170 self.try_parse_sized(reader, reader.len() as usize)
171 }
172
173 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
208 self.metadata = match self.parse_metadata(reader) {
209 Ok(metadata) => Some(metadata),
210 Err(ParquetError::IndexOutOfBound(needed, _)) => {
216 if file_size == reader.len() as usize || needed > file_size {
219 return Err(eof_err!(
220 "Parquet file too small. Size is {} but need {}",
221 file_size,
222 needed
223 ));
224 } else {
225 return Err(ParquetError::IndexOutOfBound(needed, file_size));
227 }
228 }
229 Err(e) => return Err(e),
230 };
231
232 if !self.column_index && !self.offset_index {
234 return Ok(());
235 }
236
237 self.read_page_indexes_sized(reader, file_size)
238 }
239
240 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
243 self.read_page_indexes_sized(reader, reader.len() as usize)
244 }
245
246 pub fn read_page_indexes_sized<R: ChunkReader>(
251 &mut self,
252 reader: &R,
253 file_size: usize,
254 ) -> Result<()> {
255 if self.metadata.is_none() {
256 return Err(general_err!(
257 "Tried to read page indexes without ParquetMetaData metadata"
258 ));
259 }
260
261 let Some(range) = self.range_for_page_index() else {
272 self.empty_page_indexes();
273 return Ok(());
274 };
275
276 let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
279 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
280 if range.end > file_size {
282 return Err(eof_err!(
283 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
284 range
285 ));
286 } else {
287 return Err(ParquetError::IndexOutOfBound(
289 file_size - range.start,
290 file_size,
291 ));
292 }
293 }
294
295 if let Some(metadata_size) = self.metadata_size {
298 let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
299 if range.end > metadata_range.start {
300 return Err(eof_err!(
301 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
302 range,
303 metadata_range
304 ));
305 }
306 }
307
308 let bytes_needed = range.end - range.start;
309 let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
310 let offset = range.start;
311
312 self.parse_column_index(&bytes, offset)?;
313 self.parse_offset_index(&bytes, offset)?;
314
315 Ok(())
316 }
317
318 #[cfg(all(feature = "async", feature = "arrow"))]
325 pub async fn load_and_finish<F: MetadataFetch>(
326 mut self,
327 fetch: F,
328 file_size: usize,
329 ) -> Result<ParquetMetaData> {
330 self.try_load(fetch, file_size).await?;
331 self.finish()
332 }
333
334 #[cfg(all(feature = "async", feature = "arrow"))]
340 pub async fn try_load<F: MetadataFetch>(
341 &mut self,
342 mut fetch: F,
343 file_size: usize,
344 ) -> Result<()> {
345 let (metadata, remainder) =
346 Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;
347
348 self.metadata = Some(metadata);
349
350 if !self.column_index && !self.offset_index {
352 return Ok(());
353 }
354
355 self.load_page_index_with_remainder(fetch, remainder).await
356 }
357
358 #[cfg(all(feature = "async", feature = "arrow"))]
361 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
362 self.load_page_index_with_remainder(fetch, None).await
363 }
364
365 #[cfg(all(feature = "async", feature = "arrow"))]
366 async fn load_page_index_with_remainder<F: MetadataFetch>(
367 &mut self,
368 mut fetch: F,
369 remainder: Option<(usize, Bytes)>,
370 ) -> Result<()> {
371 if self.metadata.is_none() {
372 return Err(general_err!("Footer metadata is not present"));
373 }
374
375 let range = self.range_for_page_index();
377 let range = match range {
378 Some(range) => range,
379 None => return Ok(()),
380 };
381
382 let bytes = match &remainder {
383 Some((remainder_start, remainder)) if *remainder_start <= range.start => {
384 let offset = range.start - *remainder_start;
385 remainder.slice(offset..range.end - *remainder_start + offset)
386 }
387 _ => fetch.fetch(range.start..range.end).await?,
389 };
390
391 assert_eq!(bytes.len(), range.end - range.start);
393 let offset = range.start;
394
395 self.parse_column_index(&bytes, offset)?;
396 self.parse_offset_index(&bytes, offset)?;
397
398 Ok(())
399 }
400
401 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
402 let metadata = self.metadata.as_mut().unwrap();
403 if self.column_index {
404 let index = metadata
405 .row_groups()
406 .iter()
407 .map(|x| {
408 x.columns()
409 .iter()
410 .map(|c| match c.column_index_range() {
411 Some(r) => decode_column_index(
412 &bytes[r.start - start_offset..r.end - start_offset],
413 c.column_type(),
414 ),
415 None => Ok(Index::NONE),
416 })
417 .collect::<Result<Vec<_>>>()
418 })
419 .collect::<Result<Vec<_>>>()?;
420 metadata.set_column_index(Some(index));
421 }
422 Ok(())
423 }
424
425 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
426 let metadata = self.metadata.as_mut().unwrap();
427 if self.offset_index {
428 let index = metadata
429 .row_groups()
430 .iter()
431 .map(|x| {
432 x.columns()
433 .iter()
434 .map(|c| match c.offset_index_range() {
435 Some(r) => decode_offset_index(
436 &bytes[r.start - start_offset..r.end - start_offset],
437 ),
438 None => Err(general_err!("missing offset index")),
439 })
440 .collect::<Result<Vec<_>>>()
441 })
442 .collect::<Result<Vec<_>>>()?;
443
444 metadata.set_offset_index(Some(index));
445 }
446 Ok(())
447 }
448
449 fn empty_page_indexes(&mut self) {
453 let metadata = self.metadata.as_mut().unwrap();
454 let num_row_groups = metadata.num_row_groups();
455 if self.column_index {
456 metadata.set_column_index(Some(vec![vec![]; num_row_groups]));
457 }
458 if self.offset_index {
459 metadata.set_offset_index(Some(vec![vec![]; num_row_groups]));
460 }
461 }
462
463 fn range_for_page_index(&self) -> Option<Range<usize>> {
464 self.metadata.as_ref()?;
466
467 let mut range = None;
469 let metadata = self.metadata.as_ref().unwrap();
470 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
471 if self.column_index {
472 range = acc_range(range, c.column_index_range());
473 }
474 if self.offset_index {
475 range = acc_range(range, c.offset_index_range());
476 }
477 }
478 range
479 }
480
481 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
484 let file_size = chunk_reader.len();
486 if file_size < (FOOTER_SIZE as u64) {
487 return Err(ParquetError::IndexOutOfBound(
488 FOOTER_SIZE,
489 file_size as usize,
490 ));
491 }
492
493 let mut footer = [0_u8; 8];
494 chunk_reader
495 .get_read(file_size - 8)?
496 .read_exact(&mut footer)?;
497
498 let metadata_len = Self::decode_footer(&footer)?;
499 let footer_metadata_len = FOOTER_SIZE + metadata_len;
500 self.metadata_size = Some(footer_metadata_len);
501
502 if footer_metadata_len > file_size as usize {
503 return Err(ParquetError::IndexOutOfBound(
504 footer_metadata_len,
505 file_size as usize,
506 ));
507 }
508
509 let start = file_size - footer_metadata_len as u64;
510 Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref())
511 }
512
513 #[cfg(all(feature = "async", feature = "arrow"))]
517 fn get_prefetch_size(&self) -> usize {
518 if let Some(prefetch) = self.prefetch_hint {
519 if prefetch > FOOTER_SIZE {
520 return prefetch;
521 }
522 }
523 FOOTER_SIZE
524 }
525
526 #[cfg(all(feature = "async", feature = "arrow"))]
527 async fn load_metadata<F: MetadataFetch>(
528 fetch: &mut F,
529 file_size: usize,
530 prefetch: usize,
531 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
532 if file_size < FOOTER_SIZE {
533 return Err(eof_err!("file size of {} is less than footer", file_size));
534 }
535
536 let footer_start = file_size.saturating_sub(prefetch);
540
541 let suffix = fetch.fetch(footer_start..file_size).await?;
542 let suffix_len = suffix.len();
543 let fetch_len = file_size - footer_start;
544 if suffix_len < fetch_len {
545 return Err(eof_err!(
546 "metadata requires {} bytes, but could only read {}",
547 fetch_len,
548 suffix_len
549 ));
550 }
551
552 let mut footer = [0; FOOTER_SIZE];
553 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
554
555 let length = Self::decode_footer(&footer)?;
556
557 if file_size < length + FOOTER_SIZE {
558 return Err(eof_err!(
559 "file size of {} is less than footer + metadata {}",
560 file_size,
561 length + FOOTER_SIZE
562 ));
563 }
564
565 if length > suffix_len - FOOTER_SIZE {
567 let metadata_start = file_size - length - FOOTER_SIZE;
568 let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
569 Ok((Self::decode_metadata(&meta)?, None))
570 } else {
571 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
572 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
573 Ok((
574 Self::decode_metadata(slice)?,
575 Some((footer_start, suffix.slice(..metadata_start))),
576 ))
577 }
578 }
579
580 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
592 if slice[4..] != PARQUET_MAGIC {
594 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
595 }
596
597 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
599 Ok(metadata_len as usize)
601 }
602
603 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
611 let mut prot = TCompactSliceInputProtocol::new(buf);
612 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
613 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
614 let schema = types::from_thrift(&t_file_metadata.schema)?;
615 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
616 let mut row_groups = Vec::new();
617 for rg in t_file_metadata.row_groups {
618 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
619 }
620 let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
621
622 let file_metadata = FileMetaData::new(
623 t_file_metadata.version,
624 t_file_metadata.num_rows,
625 t_file_metadata.created_by,
626 t_file_metadata.key_value_metadata,
627 schema_descr,
628 column_orders,
629 );
630 Ok(ParquetMetaData::new(file_metadata, row_groups))
631 }
632
633 fn parse_column_orders(
636 t_column_orders: Option<Vec<TColumnOrder>>,
637 schema_descr: &SchemaDescriptor,
638 ) -> Option<Vec<ColumnOrder>> {
639 match t_column_orders {
640 Some(orders) => {
641 assert_eq!(
643 orders.len(),
644 schema_descr.num_columns(),
645 "Column order length mismatch"
646 );
647 let mut res = Vec::new();
648 for (i, column) in schema_descr.columns().iter().enumerate() {
649 match orders[i] {
650 TColumnOrder::TYPEORDER(_) => {
651 let sort_order = ColumnOrder::get_sort_order(
652 column.logical_type(),
653 column.converted_type(),
654 column.physical_type(),
655 );
656 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
657 }
658 }
659 }
660 Some(res)
661 }
662 None => None,
663 }
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use bytes::Bytes;
671
672 use crate::basic::SortOrder;
673 use crate::basic::Type;
674 use crate::file::reader::Length;
675 use crate::format::TypeDefinedOrder;
676 use crate::schema::types::Type as SchemaType;
677 use crate::util::test_common::file_util::get_test_file;
678
679 #[test]
680 fn test_parse_metadata_size_smaller_than_footer() {
681 let test_file = tempfile::tempfile().unwrap();
682 let err = ParquetMetaDataReader::new()
683 .parse_metadata(&test_file)
684 .unwrap_err();
685 assert!(matches!(err, ParquetError::IndexOutOfBound(8, _)));
686 }
687
688 #[test]
689 fn test_parse_metadata_corrupt_footer() {
690 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
691 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
692 assert_eq!(
693 reader_result.unwrap_err().to_string(),
694 "Parquet error: Invalid Parquet file. Corrupt footer"
695 );
696 }
697
698 #[test]
699 fn test_parse_metadata_invalid_start() {
700 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
701 let err = ParquetMetaDataReader::new()
702 .parse_metadata(&test_file)
703 .unwrap_err();
704 assert!(matches!(err, ParquetError::IndexOutOfBound(263, _)));
705 }
706
707 #[test]
708 fn test_metadata_column_orders_parse() {
709 let fields = vec![
711 Arc::new(
712 SchemaType::primitive_type_builder("col1", Type::INT32)
713 .build()
714 .unwrap(),
715 ),
716 Arc::new(
717 SchemaType::primitive_type_builder("col2", Type::FLOAT)
718 .build()
719 .unwrap(),
720 ),
721 ];
722 let schema = SchemaType::group_type_builder("schema")
723 .with_fields(fields)
724 .build()
725 .unwrap();
726 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
727
728 let t_column_orders = Some(vec![
729 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
730 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
731 ]);
732
733 assert_eq!(
734 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr),
735 Some(vec![
736 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
737 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
738 ])
739 );
740
741 assert_eq!(
743 ParquetMetaDataReader::parse_column_orders(None, &schema_descr),
744 None
745 );
746 }
747
748 #[test]
749 #[should_panic(expected = "Column order length mismatch")]
750 fn test_metadata_column_orders_len_mismatch() {
751 let schema = SchemaType::group_type_builder("schema").build().unwrap();
752 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
753
754 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
755
756 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
757 }
758
759 #[test]
760 fn test_try_parse() {
761 let file = get_test_file("alltypes_tiny_pages.parquet");
762 let len = file.len() as usize;
763
764 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
765
766 let bytes_for_range = |range: Range<usize>| {
767 file.get_bytes(range.start as u64, range.end - range.start)
768 .unwrap()
769 };
770
771 let bytes = bytes_for_range(0..len);
773 reader.try_parse(&bytes).unwrap();
774 let metadata = reader.finish().unwrap();
775 assert!(metadata.column_index.is_some());
776 assert!(metadata.offset_index.is_some());
777
778 let bytes = bytes_for_range(320000..len);
780 reader.try_parse_sized(&bytes, len).unwrap();
781 let metadata = reader.finish().unwrap();
782 assert!(metadata.column_index.is_some());
783 assert!(metadata.offset_index.is_some());
784
785 let bytes = bytes_for_range(323583..len);
787 reader.try_parse_sized(&bytes, len).unwrap();
788 let metadata = reader.finish().unwrap();
789 assert!(metadata.column_index.is_some());
790 assert!(metadata.offset_index.is_some());
791
792 let bytes = bytes_for_range(323584..len);
794 match reader.try_parse_sized(&bytes, len).unwrap_err() {
796 ParquetError::IndexOutOfBound(needed, _) => {
798 let bytes = bytes_for_range(len - needed..len);
799 reader.try_parse_sized(&bytes, len).unwrap();
800 let metadata = reader.finish().unwrap();
801 assert!(metadata.column_index.is_some());
802 assert!(metadata.offset_index.is_some());
803 }
804 _ => panic!("unexpected error"),
805 };
806
807 let bytes = bytes_for_range(323584..len);
809 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
810 assert_eq!(
811 reader_result.to_string(),
812 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
813 );
814
815 let mut reader = ParquetMetaDataReader::new();
817 let bytes = bytes_for_range(452505..len);
818 match reader.try_parse_sized(&bytes, len).unwrap_err() {
820 ParquetError::IndexOutOfBound(needed, _) => {
822 let bytes = bytes_for_range(len - needed..len);
823 reader.try_parse_sized(&bytes, len).unwrap();
824 reader.finish().unwrap();
825 }
826 _ => panic!("unexpected error"),
827 };
828
829 let reader_result = reader.try_parse(&bytes).unwrap_err();
831 assert_eq!(
832 reader_result.to_string(),
833 "EOF: Parquet file too small. Size is 1728 but need 1729"
834 );
835
836 let bytes = bytes_for_range(0..1000);
838 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
839 assert_eq!(
840 reader_result.to_string(),
841 "Parquet error: Invalid Parquet file. Corrupt footer"
842 );
843
844 let bytes = bytes_for_range(452510..len);
846 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
847 assert_eq!(
848 reader_result.to_string(),
849 "EOF: Parquet file too small. Size is 1728 but need 1729"
850 );
851 }
852}
853
854#[cfg(all(feature = "async", feature = "arrow", test))]
855mod async_tests {
856 use super::*;
857 use bytes::Bytes;
858 use futures::future::BoxFuture;
859 use futures::FutureExt;
860 use std::fs::File;
861 use std::future::Future;
862 use std::io::{Read, Seek, SeekFrom};
863 use std::ops::Range;
864 use std::sync::atomic::{AtomicUsize, Ordering};
865
866 use crate::arrow::async_reader::MetadataFetch;
867 use crate::file::reader::Length;
868 use crate::util::test_common::file_util::get_test_file;
869
870 struct MetadataFetchFn<F>(F);
871
872 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
873 where
874 F: FnMut(Range<usize>) -> Fut + Send,
875 Fut: Future<Output = Result<Bytes>> + Send,
876 {
877 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
878 async move { self.0(range).await }.boxed()
879 }
880 }
881
882 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
883 file.seek(SeekFrom::Start(range.start as _))?;
884 let len = range.end - range.start;
885 let mut buf = Vec::with_capacity(len);
886 file.take(len as _).read_to_end(&mut buf)?;
887 Ok(buf.into())
888 }
889
890 #[tokio::test]
891 async fn test_simple() {
892 let mut file = get_test_file("nulls.snappy.parquet");
893 let len = file.len() as usize;
894
895 let expected = ParquetMetaDataReader::new()
896 .parse_and_finish(&file)
897 .unwrap();
898 let expected = expected.file_metadata().schema();
899 let fetch_count = AtomicUsize::new(0);
900
901 let mut fetch = |range| {
902 fetch_count.fetch_add(1, Ordering::SeqCst);
903 futures::future::ready(read_range(&mut file, range))
904 };
905
906 let input = MetadataFetchFn(&mut fetch);
907 let actual = ParquetMetaDataReader::new()
908 .load_and_finish(input, len)
909 .await
910 .unwrap();
911 assert_eq!(actual.file_metadata().schema(), expected);
912 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
913
914 fetch_count.store(0, Ordering::SeqCst);
916 let input = MetadataFetchFn(&mut fetch);
917 let actual = ParquetMetaDataReader::new()
918 .with_prefetch_hint(Some(7))
919 .load_and_finish(input, len)
920 .await
921 .unwrap();
922 assert_eq!(actual.file_metadata().schema(), expected);
923 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
924
925 fetch_count.store(0, Ordering::SeqCst);
927 let input = MetadataFetchFn(&mut fetch);
928 let actual = ParquetMetaDataReader::new()
929 .with_prefetch_hint(Some(10))
930 .load_and_finish(input, len)
931 .await
932 .unwrap();
933 assert_eq!(actual.file_metadata().schema(), expected);
934 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
935
936 fetch_count.store(0, Ordering::SeqCst);
938 let input = MetadataFetchFn(&mut fetch);
939 let actual = ParquetMetaDataReader::new()
940 .with_prefetch_hint(Some(500))
941 .load_and_finish(input, len)
942 .await
943 .unwrap();
944 assert_eq!(actual.file_metadata().schema(), expected);
945 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
946
947 fetch_count.store(0, Ordering::SeqCst);
949 let input = MetadataFetchFn(&mut fetch);
950 let actual = ParquetMetaDataReader::new()
951 .with_prefetch_hint(Some(428))
952 .load_and_finish(input, len)
953 .await
954 .unwrap();
955 assert_eq!(actual.file_metadata().schema(), expected);
956 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
957
958 let input = MetadataFetchFn(&mut fetch);
959 let err = ParquetMetaDataReader::new()
960 .load_and_finish(input, 4)
961 .await
962 .unwrap_err()
963 .to_string();
964 assert_eq!(err, "EOF: file size of 4 is less than footer");
965
966 let input = MetadataFetchFn(&mut fetch);
967 let err = ParquetMetaDataReader::new()
968 .load_and_finish(input, 20)
969 .await
970 .unwrap_err()
971 .to_string();
972 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
973 }
974
975 #[tokio::test]
976 async fn test_page_index() {
977 let mut file = get_test_file("alltypes_tiny_pages.parquet");
978 let len = file.len() as usize;
979 let fetch_count = AtomicUsize::new(0);
980 let mut fetch = |range| {
981 fetch_count.fetch_add(1, Ordering::SeqCst);
982 futures::future::ready(read_range(&mut file, range))
983 };
984
985 let f = MetadataFetchFn(&mut fetch);
986 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
987 loader.try_load(f, len).await.unwrap();
988 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
989 let metadata = loader.finish().unwrap();
990 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
991
992 fetch_count.store(0, Ordering::SeqCst);
994 let f = MetadataFetchFn(&mut fetch);
995 let mut loader = ParquetMetaDataReader::new()
996 .with_page_indexes(true)
997 .with_prefetch_hint(Some(1729));
998 loader.try_load(f, len).await.unwrap();
999 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1000 let metadata = loader.finish().unwrap();
1001 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1002
1003 fetch_count.store(0, Ordering::SeqCst);
1005 let f = MetadataFetchFn(&mut fetch);
1006 let mut loader = ParquetMetaDataReader::new()
1007 .with_page_indexes(true)
1008 .with_prefetch_hint(Some(130649));
1009 loader.try_load(f, len).await.unwrap();
1010 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1011 let metadata = loader.finish().unwrap();
1012 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1013
1014 fetch_count.store(0, Ordering::SeqCst);
1016 let f = MetadataFetchFn(&mut fetch);
1017 let metadata = ParquetMetaDataReader::new()
1018 .with_page_indexes(true)
1019 .with_prefetch_hint(Some(130650))
1020 .load_and_finish(f, len)
1021 .await
1022 .unwrap();
1023 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1024 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1025 }
1026}