1use std::collections::VecDeque;
79use std::fmt::Formatter;
80use std::io::SeekFrom;
81use std::ops::Range;
82use std::pin::Pin;
83use std::sync::Arc;
84use std::task::{Context, Poll};
85
86use bytes::{Buf, Bytes};
87use futures::future::{BoxFuture, FutureExt};
88use futures::ready;
89use futures::stream::Stream;
90use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
91
92use arrow_array::RecordBatch;
93use arrow_schema::{DataType, Fields, Schema, SchemaRef};
94
95use crate::arrow::array_reader::{build_array_reader, RowGroups};
96use crate::arrow::arrow_reader::{
97 apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
98 ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
99};
100use crate::arrow::ProjectionMask;
101
102use crate::bloom_filter::{
103 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
104};
105use crate::column::page::{PageIterator, PageReader};
106use crate::errors::{ParquetError, Result};
107use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
108use crate::file::page_index::offset_index::OffsetIndexMetaData;
109use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
110use crate::file::FOOTER_SIZE;
111use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
112
113mod metadata;
114pub use metadata::*;
115
116#[cfg(feature = "object_store")]
117mod store;
118
119use crate::arrow::schema::ParquetField;
120#[cfg(feature = "object_store")]
121pub use store::*;
122
123pub trait AsyncFileReader: Send {
137 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
139
140 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
142 async move {
143 let mut result = Vec::with_capacity(ranges.len());
144
145 for range in ranges.into_iter() {
146 let data = self.get_bytes(range).await?;
147 result.push(data);
148 }
149
150 Ok(result)
151 }
152 .boxed()
153 }
154
155 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
159}
160
161impl AsyncFileReader for Box<dyn AsyncFileReader> {
162 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
163 self.as_mut().get_bytes(range)
164 }
165
166 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
167 self.as_mut().get_byte_ranges(ranges)
168 }
169
170 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
171 self.as_mut().get_metadata()
172 }
173}
174
175impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
176 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
177 async move {
178 self.seek(SeekFrom::Start(range.start as u64)).await?;
179
180 let to_read = range.end - range.start;
181 let mut buffer = Vec::with_capacity(to_read);
182 let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
183 if read != to_read {
184 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
185 }
186
187 Ok(buffer.into())
188 }
189 .boxed()
190 }
191
192 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
193 const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
194 async move {
195 self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
196
197 let mut buf = [0_u8; FOOTER_SIZE];
198 self.read_exact(&mut buf).await?;
199
200 let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
201 self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
202 .await?;
203
204 let mut buf = Vec::with_capacity(metadata_len);
205 self.take(metadata_len as _).read_to_end(&mut buf).await?;
206
207 Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
208 }
209 .boxed()
210 }
211}
212
213impl ArrowReaderMetadata {
214 pub async fn load_async<T: AsyncFileReader>(
224 input: &mut T,
225 options: ArrowReaderOptions,
226 ) -> Result<Self> {
227 let mut metadata = input.get_metadata().await?;
230
231 if options.page_index
232 && metadata.column_index().is_none()
233 && metadata.offset_index().is_none()
234 {
235 let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
236 let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
237 reader.load_page_index(input).await?;
238 metadata = Arc::new(reader.finish()?)
239 }
240 Self::try_new(metadata, options)
241 }
242}
243
244#[doc(hidden)]
245pub struct AsyncReader<T>(T);
250
251pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
259
260impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
261 pub async fn new(input: T) -> Result<Self> {
294 Self::new_with_options(input, Default::default()).await
295 }
296
297 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
300 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
301 Ok(Self::new_with_metadata(input, metadata))
302 }
303
304 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
350 Self::new_builder(AsyncReader(input), metadata)
351 }
352
353 pub async fn get_row_group_column_bloom_filter(
358 &mut self,
359 row_group_idx: usize,
360 column_idx: usize,
361 ) -> Result<Option<Sbbf>> {
362 let metadata = self.metadata.row_group(row_group_idx);
363 let column_metadata = metadata.column(column_idx);
364
365 let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
366 offset
367 .try_into()
368 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
369 } else {
370 return Ok(None);
371 };
372
373 let buffer = match column_metadata.bloom_filter_length() {
374 Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
375 None => self
376 .input
377 .0
378 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
379 }
380 .await?;
381
382 let (header, bitset_offset) =
383 chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
384
385 match header.algorithm {
386 BloomFilterAlgorithm::BLOCK(_) => {
387 }
389 }
390 match header.compression {
391 BloomFilterCompression::UNCOMPRESSED(_) => {
392 }
394 }
395 match header.hash {
396 BloomFilterHash::XXHASH(_) => {
397 }
399 }
400
401 let bitset = match column_metadata.bloom_filter_length() {
402 Some(_) => buffer.slice((bitset_offset as usize - offset)..),
403 None => {
404 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
405 ParquetError::General("Bloom filter length is invalid".to_string())
406 })?;
407 self.input
408 .0
409 .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
410 .await?
411 }
412 };
413 Ok(Some(Sbbf::new(&bitset)))
414 }
415
416 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
418 let num_row_groups = self.metadata.row_groups().len();
419
420 let row_groups = match self.row_groups {
421 Some(row_groups) => {
422 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
423 return Err(general_err!(
424 "row group {} out of bounds 0..{}",
425 col,
426 num_row_groups
427 ));
428 }
429 row_groups.into()
430 }
431 None => (0..self.metadata.row_groups().len()).collect(),
432 };
433
434 let batch_size = self
436 .batch_size
437 .min(self.metadata.file_metadata().num_rows() as usize);
438 let reader = ReaderFactory {
439 input: self.input.0,
440 filter: self.filter,
441 metadata: self.metadata.clone(),
442 fields: self.fields,
443 limit: self.limit,
444 offset: self.offset,
445 };
446
447 let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
450 Some(DataType::Struct(fields)) => {
451 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
452 }
453 None => Fields::empty(),
454 _ => unreachable!("Must be Struct for root type"),
455 };
456 let schema = Arc::new(Schema::new(projected_fields));
457
458 Ok(ParquetRecordBatchStream {
459 metadata: self.metadata,
460 batch_size,
461 row_groups,
462 projection: self.projection,
463 selection: self.selection,
464 schema,
465 reader: Some(reader),
466 state: StreamState::Init,
467 })
468 }
469}
470
471type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
472
473struct ReaderFactory<T> {
476 metadata: Arc<ParquetMetaData>,
477
478 fields: Option<Arc<ParquetField>>,
479
480 input: T,
481
482 filter: Option<RowFilter>,
483
484 limit: Option<usize>,
485
486 offset: Option<usize>,
487}
488
489impl<T> ReaderFactory<T>
490where
491 T: AsyncFileReader + Send,
492{
493 async fn read_row_group(
497 mut self,
498 row_group_idx: usize,
499 mut selection: Option<RowSelection>,
500 projection: ProjectionMask,
501 batch_size: usize,
502 ) -> ReadResult<T> {
503 let meta = self.metadata.row_group(row_group_idx);
506 let offset_index = self
507 .metadata
508 .offset_index()
509 .filter(|index| !index.is_empty())
511 .map(|x| x[row_group_idx].as_slice());
512
513 let mut row_group = InMemoryRowGroup {
514 metadata: meta,
515 row_count: meta.num_rows() as usize,
517 column_chunks: vec![None; meta.columns().len()],
518 offset_index,
519 };
520
521 if let Some(filter) = self.filter.as_mut() {
522 for predicate in filter.predicates.iter_mut() {
523 if !selects_any(selection.as_ref()) {
524 return Ok((self, None));
525 }
526
527 let predicate_projection = predicate.projection();
528 row_group
529 .fetch(&mut self.input, predicate_projection, selection.as_ref())
530 .await?;
531
532 let array_reader =
533 build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
534
535 selection = Some(evaluate_predicate(
536 batch_size,
537 array_reader,
538 selection,
539 predicate.as_mut(),
540 )?);
541 }
542 }
543
544 let rows_before = selection
546 .as_ref()
547 .map(|s| s.row_count())
548 .unwrap_or(row_group.row_count);
549
550 if rows_before == 0 {
551 return Ok((self, None));
552 }
553
554 selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
555
556 let rows_after = selection
558 .as_ref()
559 .map(|s| s.row_count())
560 .unwrap_or(row_group.row_count);
561
562 if let Some(offset) = &mut self.offset {
564 *offset = offset.saturating_sub(rows_before - rows_after)
567 }
568
569 if rows_after == 0 {
570 return Ok((self, None));
571 }
572
573 if let Some(limit) = &mut self.limit {
574 *limit -= rows_after;
575 }
576
577 row_group
578 .fetch(&mut self.input, &projection, selection.as_ref())
579 .await?;
580
581 let reader = ParquetRecordBatchReader::new(
582 batch_size,
583 build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
584 selection,
585 );
586
587 Ok((self, Some(reader)))
588 }
589}
590
591enum StreamState<T> {
592 Init,
594 Decoding(ParquetRecordBatchReader),
596 Reading(BoxFuture<'static, ReadResult<T>>),
598 Error,
600}
601
602impl<T> std::fmt::Debug for StreamState<T> {
603 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
604 match self {
605 StreamState::Init => write!(f, "StreamState::Init"),
606 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
607 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
608 StreamState::Error => write!(f, "StreamState::Error"),
609 }
610 }
611}
612
613pub struct ParquetRecordBatchStream<T> {
616 metadata: Arc<ParquetMetaData>,
617
618 schema: SchemaRef,
619
620 row_groups: VecDeque<usize>,
621
622 projection: ProjectionMask,
623
624 batch_size: usize,
625
626 selection: Option<RowSelection>,
627
628 reader: Option<ReaderFactory<T>>,
630
631 state: StreamState<T>,
632}
633
634impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
635 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
636 f.debug_struct("ParquetRecordBatchStream")
637 .field("metadata", &self.metadata)
638 .field("schema", &self.schema)
639 .field("batch_size", &self.batch_size)
640 .field("projection", &self.projection)
641 .field("state", &self.state)
642 .finish()
643 }
644}
645
646impl<T> ParquetRecordBatchStream<T> {
647 pub fn schema(&self) -> &SchemaRef {
652 &self.schema
653 }
654}
655
656impl<T> Stream for ParquetRecordBatchStream<T>
657where
658 T: AsyncFileReader + Unpin + Send + 'static,
659{
660 type Item = Result<RecordBatch>;
661
662 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
663 loop {
664 match &mut self.state {
665 StreamState::Decoding(batch_reader) => match batch_reader.next() {
666 Some(Ok(batch)) => {
667 return Poll::Ready(Some(Ok(batch)));
668 }
669 Some(Err(e)) => {
670 self.state = StreamState::Error;
671 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
672 }
673 None => self.state = StreamState::Init,
674 },
675 StreamState::Init => {
676 let row_group_idx = match self.row_groups.pop_front() {
677 Some(idx) => idx,
678 None => return Poll::Ready(None),
679 };
680
681 let reader = self.reader.take().expect("lost reader");
682
683 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
684
685 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
686
687 let fut = reader
688 .read_row_group(
689 row_group_idx,
690 selection,
691 self.projection.clone(),
692 self.batch_size,
693 )
694 .boxed();
695
696 self.state = StreamState::Reading(fut)
697 }
698 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
699 Ok((reader_factory, maybe_reader)) => {
700 self.reader = Some(reader_factory);
701 match maybe_reader {
702 Some(reader) => self.state = StreamState::Decoding(reader),
704 None => self.state = StreamState::Init,
706 }
707 }
708 Err(e) => {
709 self.state = StreamState::Error;
710 return Poll::Ready(Some(Err(e)));
711 }
712 },
713 StreamState::Error => return Poll::Ready(None), }
715 }
716 }
717}
718
719struct InMemoryRowGroup<'a> {
721 metadata: &'a RowGroupMetaData,
722 offset_index: Option<&'a [OffsetIndexMetaData]>,
723 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
724 row_count: usize,
725}
726
727impl<'a> InMemoryRowGroup<'a> {
728 async fn fetch<T: AsyncFileReader + Send>(
730 &mut self,
731 input: &mut T,
732 projection: &ProjectionMask,
733 selection: Option<&RowSelection>,
734 ) -> Result<()> {
735 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
736 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
739
740 let fetch_ranges = self
741 .column_chunks
742 .iter()
743 .zip(self.metadata.columns())
744 .enumerate()
745 .filter(|&(idx, (chunk, _chunk_meta))| {
746 chunk.is_none() && projection.leaf_included(idx)
747 })
748 .flat_map(|(idx, (_chunk, chunk_meta))| {
749 let mut ranges = vec![];
752 let (start, _len) = chunk_meta.byte_range();
753 match offset_index[idx].page_locations.first() {
754 Some(first) if first.offset as u64 != start => {
755 ranges.push(start as usize..first.offset as usize);
756 }
757 _ => (),
758 }
759
760 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
761 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
762
763 ranges
764 })
765 .collect();
766
767 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
768 let mut page_start_offsets = page_start_offsets.into_iter();
769
770 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
771 if chunk.is_some() || !projection.leaf_included(idx) {
772 continue;
773 }
774
775 if let Some(offsets) = page_start_offsets.next() {
776 let mut chunks = Vec::with_capacity(offsets.len());
777 for _ in 0..offsets.len() {
778 chunks.push(chunk_data.next().unwrap());
779 }
780
781 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
782 length: self.metadata.column(idx).byte_range().1 as usize,
783 data: offsets.into_iter().zip(chunks.into_iter()).collect(),
784 }))
785 }
786 }
787 } else {
788 let fetch_ranges = self
789 .column_chunks
790 .iter()
791 .enumerate()
792 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
793 .map(|(idx, _chunk)| {
794 let column = self.metadata.column(idx);
795 let (start, length) = column.byte_range();
796 start as usize..(start + length) as usize
797 })
798 .collect();
799
800 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
801
802 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
803 if chunk.is_some() || !projection.leaf_included(idx) {
804 continue;
805 }
806
807 if let Some(data) = chunk_data.next() {
808 *chunk = Some(Arc::new(ColumnChunkData::Dense {
809 offset: self.metadata.column(idx).byte_range().0 as usize,
810 data,
811 }));
812 }
813 }
814 }
815
816 Ok(())
817 }
818}
819
820impl RowGroups for InMemoryRowGroup<'_> {
821 fn num_rows(&self) -> usize {
822 self.row_count
823 }
824
825 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
826 match &self.column_chunks[i] {
827 None => Err(ParquetError::General(format!(
828 "Invalid column index {i}, column was not fetched"
829 ))),
830 Some(data) => {
831 let page_locations = self
832 .offset_index
833 .filter(|index| !index.is_empty())
835 .map(|index| index[i].page_locations.clone());
836 let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
837 data.clone(),
838 self.metadata.column(i),
839 self.row_count,
840 page_locations,
841 )?);
842
843 Ok(Box::new(ColumnChunkIterator {
844 reader: Some(Ok(page_reader)),
845 }))
846 }
847 }
848 }
849}
850
851#[derive(Clone)]
853enum ColumnChunkData {
854 Sparse {
856 length: usize,
858 data: Vec<(usize, Bytes)>,
861 },
862 Dense { offset: usize, data: Bytes },
864}
865
866impl ColumnChunkData {
867 fn get(&self, start: u64) -> Result<Bytes> {
868 match &self {
869 ColumnChunkData::Sparse { data, .. } => data
870 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
871 .map(|idx| data[idx].1.clone())
872 .map_err(|_| {
873 ParquetError::General(format!(
874 "Invalid offset in sparse column chunk data: {start}"
875 ))
876 }),
877 ColumnChunkData::Dense { offset, data } => {
878 let start = start as usize - *offset;
879 Ok(data.slice(start..))
880 }
881 }
882 }
883}
884
885impl Length for ColumnChunkData {
886 fn len(&self) -> u64 {
887 match &self {
888 ColumnChunkData::Sparse { length, .. } => *length as u64,
889 ColumnChunkData::Dense { data, .. } => data.len() as u64,
890 }
891 }
892}
893
894impl ChunkReader for ColumnChunkData {
895 type T = bytes::buf::Reader<Bytes>;
896
897 fn get_read(&self, start: u64) -> Result<Self::T> {
898 Ok(self.get(start)?.reader())
899 }
900
901 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
902 Ok(self.get(start)?.slice(..length))
903 }
904}
905
906struct ColumnChunkIterator {
908 reader: Option<Result<Box<dyn PageReader>>>,
909}
910
911impl Iterator for ColumnChunkIterator {
912 type Item = Result<Box<dyn PageReader>>;
913
914 fn next(&mut self) -> Option<Self::Item> {
915 self.reader.take()
916 }
917}
918
919impl PageIterator for ColumnChunkIterator {}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924 use crate::arrow::arrow_reader::{
925 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
926 };
927 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
928 use crate::arrow::ArrowWriter;
929 use crate::file::metadata::ParquetMetaDataReader;
930 use crate::file::page_index::index_reader;
931 use crate::file::properties::WriterProperties;
932 use arrow::compute::kernels::cmp::eq;
933 use arrow::error::Result as ArrowResult;
934 use arrow_array::builder::{ListBuilder, StringBuilder};
935 use arrow_array::cast::AsArray;
936 use arrow_array::types::Int32Type;
937 use arrow_array::{
938 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
939 StructArray, UInt64Array,
940 };
941 use arrow_schema::{DataType, Field, Schema};
942 use futures::{StreamExt, TryStreamExt};
943 use rand::{thread_rng, Rng};
944 use std::collections::HashMap;
945 use std::sync::{Arc, Mutex};
946 use tempfile::tempfile;
947
948 #[derive(Clone)]
949 struct TestReader {
950 data: Bytes,
951 metadata: Arc<ParquetMetaData>,
952 requests: Arc<Mutex<Vec<Range<usize>>>>,
953 }
954
955 impl AsyncFileReader for TestReader {
956 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
957 self.requests.lock().unwrap().push(range.clone());
958 futures::future::ready(Ok(self.data.slice(range))).boxed()
959 }
960
961 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
962 futures::future::ready(Ok(self.metadata.clone())).boxed()
963 }
964 }
965
966 #[tokio::test]
967 async fn test_async_reader() {
968 let testdata = arrow::util::test_util::parquet_test_data();
969 let path = format!("{testdata}/alltypes_plain.parquet");
970 let data = Bytes::from(std::fs::read(path).unwrap());
971
972 let metadata = ParquetMetaDataReader::new()
973 .parse_and_finish(&data)
974 .unwrap();
975 let metadata = Arc::new(metadata);
976
977 assert_eq!(metadata.num_row_groups(), 1);
978
979 let async_reader = TestReader {
980 data: data.clone(),
981 metadata: metadata.clone(),
982 requests: Default::default(),
983 };
984
985 let requests = async_reader.requests.clone();
986 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
987 .await
988 .unwrap();
989
990 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
991 let stream = builder
992 .with_projection(mask.clone())
993 .with_batch_size(1024)
994 .build()
995 .unwrap();
996
997 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
998
999 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1000 .unwrap()
1001 .with_projection(mask)
1002 .with_batch_size(104)
1003 .build()
1004 .unwrap()
1005 .collect::<ArrowResult<Vec<_>>>()
1006 .unwrap();
1007
1008 assert_eq!(async_batches, sync_batches);
1009
1010 let requests = requests.lock().unwrap();
1011 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1012 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1013
1014 assert_eq!(
1015 &requests[..],
1016 &[
1017 offset_1 as usize..(offset_1 + length_1) as usize,
1018 offset_2 as usize..(offset_2 + length_2) as usize
1019 ]
1020 );
1021 }
1022
1023 #[tokio::test]
1024 async fn test_async_reader_with_index() {
1025 let testdata = arrow::util::test_util::parquet_test_data();
1026 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1027 let data = Bytes::from(std::fs::read(path).unwrap());
1028
1029 let metadata = ParquetMetaDataReader::new()
1030 .parse_and_finish(&data)
1031 .unwrap();
1032 let metadata = Arc::new(metadata);
1033
1034 assert_eq!(metadata.num_row_groups(), 1);
1035
1036 let async_reader = TestReader {
1037 data: data.clone(),
1038 metadata: metadata.clone(),
1039 requests: Default::default(),
1040 };
1041
1042 let options = ArrowReaderOptions::new().with_page_index(true);
1043 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1044 .await
1045 .unwrap();
1046
1047 let metadata_with_index = builder.metadata();
1049
1050 let offset_index = metadata_with_index.offset_index().unwrap();
1052 let column_index = metadata_with_index.column_index().unwrap();
1053
1054 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1055 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1056
1057 let num_columns = metadata_with_index
1058 .file_metadata()
1059 .schema_descr()
1060 .num_columns();
1061
1062 offset_index
1064 .iter()
1065 .for_each(|x| assert_eq!(x.len(), num_columns));
1066 column_index
1067 .iter()
1068 .for_each(|x| assert_eq!(x.len(), num_columns));
1069
1070 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1071 let stream = builder
1072 .with_projection(mask.clone())
1073 .with_batch_size(1024)
1074 .build()
1075 .unwrap();
1076
1077 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1078
1079 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1080 .unwrap()
1081 .with_projection(mask)
1082 .with_batch_size(1024)
1083 .build()
1084 .unwrap()
1085 .collect::<ArrowResult<Vec<_>>>()
1086 .unwrap();
1087
1088 assert_eq!(async_batches, sync_batches);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_async_reader_with_limit() {
1093 let testdata = arrow::util::test_util::parquet_test_data();
1094 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1095 let data = Bytes::from(std::fs::read(path).unwrap());
1096
1097 let metadata = ParquetMetaDataReader::new()
1098 .parse_and_finish(&data)
1099 .unwrap();
1100 let metadata = Arc::new(metadata);
1101
1102 assert_eq!(metadata.num_row_groups(), 1);
1103
1104 let async_reader = TestReader {
1105 data: data.clone(),
1106 metadata: metadata.clone(),
1107 requests: Default::default(),
1108 };
1109
1110 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1111 .await
1112 .unwrap();
1113
1114 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1115 let stream = builder
1116 .with_projection(mask.clone())
1117 .with_batch_size(1024)
1118 .with_limit(1)
1119 .build()
1120 .unwrap();
1121
1122 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1123
1124 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1125 .unwrap()
1126 .with_projection(mask)
1127 .with_batch_size(1024)
1128 .with_limit(1)
1129 .build()
1130 .unwrap()
1131 .collect::<ArrowResult<Vec<_>>>()
1132 .unwrap();
1133
1134 assert_eq!(async_batches, sync_batches);
1135 }
1136
1137 #[tokio::test]
1138 async fn test_async_reader_skip_pages() {
1139 let testdata = arrow::util::test_util::parquet_test_data();
1140 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1141 let data = Bytes::from(std::fs::read(path).unwrap());
1142
1143 let metadata = ParquetMetaDataReader::new()
1144 .parse_and_finish(&data)
1145 .unwrap();
1146 let metadata = Arc::new(metadata);
1147
1148 assert_eq!(metadata.num_row_groups(), 1);
1149
1150 let async_reader = TestReader {
1151 data: data.clone(),
1152 metadata: metadata.clone(),
1153 requests: Default::default(),
1154 };
1155
1156 let options = ArrowReaderOptions::new().with_page_index(true);
1157 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1158 .await
1159 .unwrap();
1160
1161 let selection = RowSelection::from(vec![
1162 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1171
1172 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1173
1174 let stream = builder
1175 .with_projection(mask.clone())
1176 .with_row_selection(selection.clone())
1177 .build()
1178 .expect("building stream");
1179
1180 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183 .unwrap()
1184 .with_projection(mask)
1185 .with_batch_size(1024)
1186 .with_row_selection(selection)
1187 .build()
1188 .unwrap()
1189 .collect::<ArrowResult<Vec<_>>>()
1190 .unwrap();
1191
1192 assert_eq!(async_batches, sync_batches);
1193 }
1194
1195 #[tokio::test]
1196 async fn test_fuzz_async_reader_selection() {
1197 let testdata = arrow::util::test_util::parquet_test_data();
1198 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1199 let data = Bytes::from(std::fs::read(path).unwrap());
1200
1201 let metadata = ParquetMetaDataReader::new()
1202 .parse_and_finish(&data)
1203 .unwrap();
1204 let metadata = Arc::new(metadata);
1205
1206 assert_eq!(metadata.num_row_groups(), 1);
1207
1208 let mut rand = thread_rng();
1209
1210 for _ in 0..100 {
1211 let mut expected_rows = 0;
1212 let mut total_rows = 0;
1213 let mut skip = false;
1214 let mut selectors = vec![];
1215
1216 while total_rows < 7300 {
1217 let row_count: usize = rand.gen_range(1..100);
1218
1219 let row_count = row_count.min(7300 - total_rows);
1220
1221 selectors.push(RowSelector { row_count, skip });
1222
1223 total_rows += row_count;
1224 if !skip {
1225 expected_rows += row_count;
1226 }
1227
1228 skip = !skip;
1229 }
1230
1231 let selection = RowSelection::from(selectors);
1232
1233 let async_reader = TestReader {
1234 data: data.clone(),
1235 metadata: metadata.clone(),
1236 requests: Default::default(),
1237 };
1238
1239 let options = ArrowReaderOptions::new().with_page_index(true);
1240 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1241 .await
1242 .unwrap();
1243
1244 let col_idx: usize = rand.gen_range(0..13);
1245 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1246
1247 let stream = builder
1248 .with_projection(mask.clone())
1249 .with_row_selection(selection.clone())
1250 .build()
1251 .expect("building stream");
1252
1253 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1254
1255 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1256
1257 assert_eq!(actual_rows, expected_rows);
1258 }
1259 }
1260
1261 #[tokio::test]
1262 async fn test_async_reader_zero_row_selector() {
1263 let testdata = arrow::util::test_util::parquet_test_data();
1265 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1266 let data = Bytes::from(std::fs::read(path).unwrap());
1267
1268 let metadata = ParquetMetaDataReader::new()
1269 .parse_and_finish(&data)
1270 .unwrap();
1271 let metadata = Arc::new(metadata);
1272
1273 assert_eq!(metadata.num_row_groups(), 1);
1274
1275 let mut rand = thread_rng();
1276
1277 let mut expected_rows = 0;
1278 let mut total_rows = 0;
1279 let mut skip = false;
1280 let mut selectors = vec![];
1281
1282 selectors.push(RowSelector {
1283 row_count: 0,
1284 skip: false,
1285 });
1286
1287 while total_rows < 7300 {
1288 let row_count: usize = rand.gen_range(1..100);
1289
1290 let row_count = row_count.min(7300 - total_rows);
1291
1292 selectors.push(RowSelector { row_count, skip });
1293
1294 total_rows += row_count;
1295 if !skip {
1296 expected_rows += row_count;
1297 }
1298
1299 skip = !skip;
1300 }
1301
1302 let selection = RowSelection::from(selectors);
1303
1304 let async_reader = TestReader {
1305 data: data.clone(),
1306 metadata: metadata.clone(),
1307 requests: Default::default(),
1308 };
1309
1310 let options = ArrowReaderOptions::new().with_page_index(true);
1311 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1312 .await
1313 .unwrap();
1314
1315 let col_idx: usize = rand.gen_range(0..13);
1316 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1317
1318 let stream = builder
1319 .with_projection(mask.clone())
1320 .with_row_selection(selection.clone())
1321 .build()
1322 .expect("building stream");
1323
1324 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1325
1326 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1327
1328 assert_eq!(actual_rows, expected_rows);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_row_filter() {
1333 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1334 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1335 let c = Int32Array::from_iter(0..6);
1336 let data = RecordBatch::try_from_iter([
1337 ("a", Arc::new(a) as ArrayRef),
1338 ("b", Arc::new(b) as ArrayRef),
1339 ("c", Arc::new(c) as ArrayRef),
1340 ])
1341 .unwrap();
1342
1343 let mut buf = Vec::with_capacity(1024);
1344 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1345 writer.write(&data).unwrap();
1346 writer.close().unwrap();
1347
1348 let data: Bytes = buf.into();
1349 let metadata = ParquetMetaDataReader::new()
1350 .parse_and_finish(&data)
1351 .unwrap();
1352 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1353
1354 let test = TestReader {
1355 data,
1356 metadata: Arc::new(metadata),
1357 requests: Default::default(),
1358 };
1359 let requests = test.requests.clone();
1360
1361 let a_scalar = StringArray::from_iter_values(["b"]);
1362 let a_filter = ArrowPredicateFn::new(
1363 ProjectionMask::leaves(&parquet_schema, vec![0]),
1364 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1365 );
1366
1367 let b_scalar = StringArray::from_iter_values(["4"]);
1368 let b_filter = ArrowPredicateFn::new(
1369 ProjectionMask::leaves(&parquet_schema, vec![1]),
1370 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1371 );
1372
1373 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1374
1375 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1376 let stream = ParquetRecordBatchStreamBuilder::new(test)
1377 .await
1378 .unwrap()
1379 .with_projection(mask.clone())
1380 .with_batch_size(1024)
1381 .with_row_filter(filter)
1382 .build()
1383 .unwrap();
1384
1385 let batches: Vec<_> = stream.try_collect().await.unwrap();
1386 assert_eq!(batches.len(), 1);
1387
1388 let batch = &batches[0];
1389 assert_eq!(batch.num_rows(), 1);
1390 assert_eq!(batch.num_columns(), 2);
1391
1392 let col = batch.column(0);
1393 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1394 assert_eq!(val, "b");
1395
1396 let col = batch.column(1);
1397 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1398 assert_eq!(val, 3);
1399
1400 assert_eq!(requests.lock().unwrap().len(), 3);
1402 }
1403
1404 #[tokio::test]
1405 async fn test_limit_multiple_row_groups() {
1406 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1407 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1408 let c = Int32Array::from_iter(0..6);
1409 let data = RecordBatch::try_from_iter([
1410 ("a", Arc::new(a) as ArrayRef),
1411 ("b", Arc::new(b) as ArrayRef),
1412 ("c", Arc::new(c) as ArrayRef),
1413 ])
1414 .unwrap();
1415
1416 let mut buf = Vec::with_capacity(1024);
1417 let props = WriterProperties::builder()
1418 .set_max_row_group_size(3)
1419 .build();
1420 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1421 writer.write(&data).unwrap();
1422 writer.close().unwrap();
1423
1424 let data: Bytes = buf.into();
1425 let metadata = ParquetMetaDataReader::new()
1426 .parse_and_finish(&data)
1427 .unwrap();
1428
1429 assert_eq!(metadata.num_row_groups(), 2);
1430
1431 let test = TestReader {
1432 data,
1433 metadata: Arc::new(metadata),
1434 requests: Default::default(),
1435 };
1436
1437 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1438 .await
1439 .unwrap()
1440 .with_batch_size(1024)
1441 .with_limit(4)
1442 .build()
1443 .unwrap();
1444
1445 let batches: Vec<_> = stream.try_collect().await.unwrap();
1446 assert_eq!(batches.len(), 2);
1448
1449 let batch = &batches[0];
1450 assert_eq!(batch.num_rows(), 3);
1452 assert_eq!(batch.num_columns(), 3);
1453 let col2 = batch.column(2).as_primitive::<Int32Type>();
1454 assert_eq!(col2.values(), &[0, 1, 2]);
1455
1456 let batch = &batches[1];
1457 assert_eq!(batch.num_rows(), 1);
1459 assert_eq!(batch.num_columns(), 3);
1460 let col2 = batch.column(2).as_primitive::<Int32Type>();
1461 assert_eq!(col2.values(), &[3]);
1462
1463 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1464 .await
1465 .unwrap()
1466 .with_offset(2)
1467 .with_limit(3)
1468 .build()
1469 .unwrap();
1470
1471 let batches: Vec<_> = stream.try_collect().await.unwrap();
1472 assert_eq!(batches.len(), 2);
1474
1475 let batch = &batches[0];
1476 assert_eq!(batch.num_rows(), 1);
1478 assert_eq!(batch.num_columns(), 3);
1479 let col2 = batch.column(2).as_primitive::<Int32Type>();
1480 assert_eq!(col2.values(), &[2]);
1481
1482 let batch = &batches[1];
1483 assert_eq!(batch.num_rows(), 2);
1485 assert_eq!(batch.num_columns(), 3);
1486 let col2 = batch.column(2).as_primitive::<Int32Type>();
1487 assert_eq!(col2.values(), &[3, 4]);
1488
1489 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1490 .await
1491 .unwrap()
1492 .with_offset(4)
1493 .with_limit(20)
1494 .build()
1495 .unwrap();
1496
1497 let batches: Vec<_> = stream.try_collect().await.unwrap();
1498 assert_eq!(batches.len(), 1);
1500
1501 let batch = &batches[0];
1502 assert_eq!(batch.num_rows(), 2);
1504 assert_eq!(batch.num_columns(), 3);
1505 let col2 = batch.column(2).as_primitive::<Int32Type>();
1506 assert_eq!(col2.values(), &[4, 5]);
1507 }
1508
1509 #[tokio::test]
1510 async fn test_row_filter_with_index() {
1511 let testdata = arrow::util::test_util::parquet_test_data();
1512 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1513 let data = Bytes::from(std::fs::read(path).unwrap());
1514
1515 let metadata = ParquetMetaDataReader::new()
1516 .parse_and_finish(&data)
1517 .unwrap();
1518 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1519 let metadata = Arc::new(metadata);
1520
1521 assert_eq!(metadata.num_row_groups(), 1);
1522
1523 let async_reader = TestReader {
1524 data: data.clone(),
1525 metadata: metadata.clone(),
1526 requests: Default::default(),
1527 };
1528
1529 let a_filter =
1530 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1531 Ok(batch.column(0).as_boolean().clone())
1532 });
1533
1534 let b_scalar = Int8Array::from(vec![2]);
1535 let b_filter = ArrowPredicateFn::new(
1536 ProjectionMask::leaves(&parquet_schema, vec![2]),
1537 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1538 );
1539
1540 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1541
1542 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1543
1544 let options = ArrowReaderOptions::new().with_page_index(true);
1545 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1546 .await
1547 .unwrap()
1548 .with_projection(mask.clone())
1549 .with_batch_size(1024)
1550 .with_row_filter(filter)
1551 .build()
1552 .unwrap();
1553
1554 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1555
1556 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1557
1558 assert_eq!(total_rows, 730);
1559 }
1560
1561 #[tokio::test]
1562 async fn test_in_memory_row_group_sparse() {
1563 let testdata = arrow::util::test_util::parquet_test_data();
1564 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1565 let data = Bytes::from(std::fs::read(path).unwrap());
1566
1567 let metadata = ParquetMetaDataReader::new()
1568 .parse_and_finish(&data)
1569 .unwrap();
1570
1571 let offset_index =
1572 index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
1573 .expect("reading offset index");
1574
1575 let mut metadata_builder = metadata.into_builder();
1576 let mut row_groups = metadata_builder.take_row_groups();
1577 row_groups.truncate(1);
1578 let row_group_meta = row_groups.pop().unwrap();
1579
1580 let metadata = metadata_builder
1581 .add_row_group(row_group_meta)
1582 .set_column_index(None)
1583 .set_offset_index(Some(vec![offset_index.clone()]))
1584 .build();
1585
1586 let metadata = Arc::new(metadata);
1587
1588 let num_rows = metadata.row_group(0).num_rows();
1589
1590 assert_eq!(metadata.num_row_groups(), 1);
1591
1592 let async_reader = TestReader {
1593 data: data.clone(),
1594 metadata: metadata.clone(),
1595 requests: Default::default(),
1596 };
1597
1598 let requests = async_reader.requests.clone();
1599 let (_, fields) = parquet_to_arrow_schema_and_fields(
1600 metadata.file_metadata().schema_descr(),
1601 ProjectionMask::all(),
1602 None,
1603 )
1604 .unwrap();
1605
1606 let _schema_desc = metadata.file_metadata().schema_descr();
1607
1608 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1609
1610 let reader_factory = ReaderFactory {
1611 metadata,
1612 fields: fields.map(Arc::new),
1613 input: async_reader,
1614 filter: None,
1615 limit: None,
1616 offset: None,
1617 };
1618
1619 let mut skip = true;
1620 let mut pages = offset_index[0].page_locations.iter().peekable();
1621
1622 let mut selectors = vec![];
1624 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1625 while let Some(page) = pages.next() {
1626 let num_rows = if let Some(next_page) = pages.peek() {
1627 next_page.first_row_index - page.first_row_index
1628 } else {
1629 num_rows - page.first_row_index
1630 };
1631
1632 if skip {
1633 selectors.push(RowSelector::skip(num_rows as usize));
1634 } else {
1635 selectors.push(RowSelector::select(num_rows as usize));
1636 let start = page.offset as usize;
1637 let end = start + page.compressed_page_size as usize;
1638 expected_page_requests.push(start..end);
1639 }
1640 skip = !skip;
1641 }
1642
1643 let selection = RowSelection::from(selectors);
1644
1645 let (_factory, _reader) = reader_factory
1646 .read_row_group(0, Some(selection), projection.clone(), 48)
1647 .await
1648 .expect("reading row group");
1649
1650 let requests = requests.lock().unwrap();
1651
1652 assert_eq!(&requests[..], &expected_page_requests)
1653 }
1654
1655 #[tokio::test]
1656 async fn test_batch_size_overallocate() {
1657 let testdata = arrow::util::test_util::parquet_test_data();
1658 let path = format!("{testdata}/alltypes_plain.parquet");
1660 let data = Bytes::from(std::fs::read(path).unwrap());
1661
1662 let metadata = ParquetMetaDataReader::new()
1663 .parse_and_finish(&data)
1664 .unwrap();
1665 let file_rows = metadata.file_metadata().num_rows() as usize;
1666 let metadata = Arc::new(metadata);
1667
1668 let async_reader = TestReader {
1669 data: data.clone(),
1670 metadata: metadata.clone(),
1671 requests: Default::default(),
1672 };
1673
1674 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1675 .await
1676 .unwrap();
1677
1678 let stream = builder
1679 .with_projection(ProjectionMask::all())
1680 .with_batch_size(1024)
1681 .build()
1682 .unwrap();
1683 assert_ne!(1024, file_rows);
1684 assert_eq!(stream.batch_size, file_rows);
1685 }
1686
1687 #[tokio::test]
1688 async fn test_get_row_group_column_bloom_filter_without_length() {
1689 let testdata = arrow::util::test_util::parquet_test_data();
1690 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1691 let data = Bytes::from(std::fs::read(path).unwrap());
1692 test_get_row_group_column_bloom_filter(data, false).await;
1693 }
1694
1695 #[tokio::test]
1696 async fn test_parquet_record_batch_stream_schema() {
1697 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1698 schema.flattened_fields().iter().map(|f| f.name()).collect()
1699 }
1700
1701 let mut metadata = HashMap::with_capacity(1);
1710 metadata.insert("key".to_string(), "value".to_string());
1711
1712 let nested_struct_array = StructArray::from(vec![
1713 (
1714 Arc::new(Field::new("d", DataType::Utf8, true)),
1715 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1716 ),
1717 (
1718 Arc::new(Field::new("e", DataType::Utf8, true)),
1719 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1720 ),
1721 ]);
1722 let struct_array = StructArray::from(vec![
1723 (
1724 Arc::new(Field::new("a", DataType::Int32, true)),
1725 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1726 ),
1727 (
1728 Arc::new(Field::new("b", DataType::UInt64, true)),
1729 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1730 ),
1731 (
1732 Arc::new(Field::new(
1733 "c",
1734 nested_struct_array.data_type().clone(),
1735 true,
1736 )),
1737 Arc::new(nested_struct_array) as ArrayRef,
1738 ),
1739 ]);
1740
1741 let schema =
1742 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1743 let record_batch = RecordBatch::from(struct_array)
1744 .with_schema(schema.clone())
1745 .unwrap();
1746
1747 let mut file = tempfile().unwrap();
1749 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1750 writer.write(&record_batch).unwrap();
1751 writer.close().unwrap();
1752
1753 let all_fields = ["a", "b", "c", "d", "e"];
1754 let projections = [
1756 (vec![], vec![]),
1757 (vec![0], vec!["a"]),
1758 (vec![0, 1], vec!["a", "b"]),
1759 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1760 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1761 ];
1762
1763 for (indices, expected_projected_names) in projections {
1765 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1766 assert_eq!(get_all_field_names(&builder), all_fields);
1768 assert_eq!(builder.metadata, metadata);
1769 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1771 assert_eq!(reader.metadata, HashMap::default());
1772 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1773 assert_eq!(batch.metadata, HashMap::default());
1774 };
1775
1776 let builder =
1777 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1778 let sync_builder_schema = builder.schema().clone();
1779 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1780 let mut reader = builder.with_projection(mask).build().unwrap();
1781 let sync_reader_schema = reader.schema();
1782 let batch = reader.next().unwrap().unwrap();
1783 let sync_batch_schema = batch.schema();
1784 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1785
1786 let file = tokio::fs::File::from(file.try_clone().unwrap());
1788 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1789 let async_builder_schema = builder.schema().clone();
1790 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1791 let mut reader = builder.with_projection(mask).build().unwrap();
1792 let async_reader_schema = reader.schema().clone();
1793 let batch = reader.next().await.unwrap().unwrap();
1794 let async_batch_schema = batch.schema();
1795 assert_schemas(
1796 async_builder_schema,
1797 async_reader_schema,
1798 async_batch_schema,
1799 );
1800 }
1801 }
1802
1803 #[tokio::test]
1804 async fn test_get_row_group_column_bloom_filter_with_length() {
1805 let testdata = arrow::util::test_util::parquet_test_data();
1807 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1808 let data = Bytes::from(std::fs::read(path).unwrap());
1809 let metadata = ParquetMetaDataReader::new()
1810 .parse_and_finish(&data)
1811 .unwrap();
1812 let metadata = Arc::new(metadata);
1813 let async_reader = TestReader {
1814 data: data.clone(),
1815 metadata: metadata.clone(),
1816 requests: Default::default(),
1817 };
1818 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1819 .await
1820 .unwrap();
1821 let schema = builder.schema().clone();
1822 let stream = builder.build().unwrap();
1823 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1824
1825 let mut parquet_data = Vec::new();
1826 let props = WriterProperties::builder()
1827 .set_bloom_filter_enabled(true)
1828 .build();
1829 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1830 for batch in batches {
1831 writer.write(&batch).unwrap();
1832 }
1833 writer.close().unwrap();
1834
1835 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1837 }
1838
1839 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1840 let metadata = ParquetMetaDataReader::new()
1841 .parse_and_finish(&data)
1842 .unwrap();
1843 let metadata = Arc::new(metadata);
1844
1845 assert_eq!(metadata.num_row_groups(), 1);
1846 let row_group = metadata.row_group(0);
1847 let column = row_group.column(0);
1848 assert_eq!(column.bloom_filter_length().is_some(), with_length);
1849
1850 let async_reader = TestReader {
1851 data: data.clone(),
1852 metadata: metadata.clone(),
1853 requests: Default::default(),
1854 };
1855
1856 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1857 .await
1858 .unwrap();
1859
1860 let sbbf = builder
1861 .get_row_group_column_bloom_filter(0, 0)
1862 .await
1863 .unwrap()
1864 .unwrap();
1865 assert!(sbbf.check(&"Hello"));
1866 assert!(!sbbf.check(&"Hello_Not_Exists"));
1867 }
1868
1869 #[tokio::test]
1870 async fn test_nested_skip() {
1871 let schema = Arc::new(Schema::new(vec![
1872 Field::new("col_1", DataType::UInt64, false),
1873 Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
1874 ]));
1875
1876 let props = WriterProperties::builder()
1878 .set_data_page_row_count_limit(256)
1879 .set_write_batch_size(256)
1880 .set_max_row_group_size(1024);
1881
1882 let mut file = tempfile().unwrap();
1884 let mut writer =
1885 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1886
1887 let mut builder = ListBuilder::new(StringBuilder::new());
1888 for id in 0..1024 {
1889 match id % 3 {
1890 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1891 1 => builder.append_value([Some(format!("id_{id}"))]),
1892 _ => builder.append_null(),
1893 }
1894 }
1895 let refs = vec![
1896 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1897 Arc::new(builder.finish()) as ArrayRef,
1898 ];
1899
1900 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1901 writer.write(&batch).unwrap();
1902 writer.close().unwrap();
1903
1904 let selections = [
1905 RowSelection::from(vec![
1906 RowSelector::skip(313),
1907 RowSelector::select(1),
1908 RowSelector::skip(709),
1909 RowSelector::select(1),
1910 ]),
1911 RowSelection::from(vec![
1912 RowSelector::skip(255),
1913 RowSelector::select(1),
1914 RowSelector::skip(767),
1915 RowSelector::select(1),
1916 ]),
1917 RowSelection::from(vec![
1918 RowSelector::select(255),
1919 RowSelector::skip(1),
1920 RowSelector::select(767),
1921 RowSelector::skip(1),
1922 ]),
1923 RowSelection::from(vec![
1924 RowSelector::skip(254),
1925 RowSelector::select(1),
1926 RowSelector::select(1),
1927 RowSelector::skip(767),
1928 RowSelector::select(1),
1929 ]),
1930 ];
1931
1932 for selection in selections {
1933 let expected = selection.row_count();
1934 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1936 tokio::fs::File::from_std(file.try_clone().unwrap()),
1937 ArrowReaderOptions::new().with_page_index(true),
1938 )
1939 .await
1940 .unwrap();
1941
1942 reader = reader.with_row_selection(selection);
1943
1944 let mut stream = reader.build().unwrap();
1945
1946 let mut total_rows = 0;
1947 while let Some(rb) = stream.next().await {
1948 let rb = rb.unwrap();
1949 total_rows += rb.num_rows();
1950 }
1951 assert_eq!(total_rows, expected);
1952 }
1953 }
1954
1955 #[tokio::test]
1956 async fn test_row_filter_nested() {
1957 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1958 let b = StructArray::from(vec![
1959 (
1960 Arc::new(Field::new("aa", DataType::Utf8, true)),
1961 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1962 ),
1963 (
1964 Arc::new(Field::new("bb", DataType::Utf8, true)),
1965 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1966 ),
1967 ]);
1968 let c = Int32Array::from_iter(0..6);
1969 let data = RecordBatch::try_from_iter([
1970 ("a", Arc::new(a) as ArrayRef),
1971 ("b", Arc::new(b) as ArrayRef),
1972 ("c", Arc::new(c) as ArrayRef),
1973 ])
1974 .unwrap();
1975
1976 let mut buf = Vec::with_capacity(1024);
1977 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1978 writer.write(&data).unwrap();
1979 writer.close().unwrap();
1980
1981 let data: Bytes = buf.into();
1982 let metadata = ParquetMetaDataReader::new()
1983 .parse_and_finish(&data)
1984 .unwrap();
1985 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1986
1987 let test = TestReader {
1988 data,
1989 metadata: Arc::new(metadata),
1990 requests: Default::default(),
1991 };
1992 let requests = test.requests.clone();
1993
1994 let a_scalar = StringArray::from_iter_values(["b"]);
1995 let a_filter = ArrowPredicateFn::new(
1996 ProjectionMask::leaves(&parquet_schema, vec![0]),
1997 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1998 );
1999
2000 let b_scalar = StringArray::from_iter_values(["4"]);
2001 let b_filter = ArrowPredicateFn::new(
2002 ProjectionMask::leaves(&parquet_schema, vec![2]),
2003 move |batch| {
2004 let struct_array = batch
2006 .column(0)
2007 .as_any()
2008 .downcast_ref::<StructArray>()
2009 .unwrap();
2010 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2011 },
2012 );
2013
2014 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2015
2016 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2017 let stream = ParquetRecordBatchStreamBuilder::new(test)
2018 .await
2019 .unwrap()
2020 .with_projection(mask.clone())
2021 .with_batch_size(1024)
2022 .with_row_filter(filter)
2023 .build()
2024 .unwrap();
2025
2026 let batches: Vec<_> = stream.try_collect().await.unwrap();
2027 assert_eq!(batches.len(), 1);
2028
2029 let batch = &batches[0];
2030 assert_eq!(batch.num_rows(), 1);
2031 assert_eq!(batch.num_columns(), 2);
2032
2033 let col = batch.column(0);
2034 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2035 assert_eq!(val, "b");
2036
2037 let col = batch.column(1);
2038 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2039 assert_eq!(val, 3);
2040
2041 assert_eq!(requests.lock().unwrap().len(), 3);
2043 }
2044
2045 #[tokio::test]
2046 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2047 use tokio::fs::File;
2048 let testdata = arrow::util::test_util::parquet_test_data();
2049 let path = format!("{testdata}/alltypes_plain.parquet");
2050 let mut file = File::open(&path).await.unwrap();
2051 let file_size = file.metadata().await.unwrap().len();
2052 let mut metadata = ParquetMetaDataReader::new()
2053 .with_page_indexes(true)
2054 .load_and_finish(&mut file, file_size as usize)
2055 .await
2056 .unwrap();
2057
2058 metadata.set_offset_index(Some(vec![]));
2059 let options = ArrowReaderOptions::new().with_page_index(true);
2060 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2061 let reader =
2062 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2063 .build()
2064 .unwrap();
2065
2066 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2067 assert_eq!(result.len(), 1);
2068 }
2069
2070 #[tokio::test]
2071 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2072 use tokio::fs::File;
2073 let testdata = arrow::util::test_util::parquet_test_data();
2074 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2075 let mut file = File::open(&path).await.unwrap();
2076 let file_size = file.metadata().await.unwrap().len();
2077 let metadata = ParquetMetaDataReader::new()
2078 .with_page_indexes(true)
2079 .load_and_finish(&mut file, file_size as usize)
2080 .await
2081 .unwrap();
2082
2083 let options = ArrowReaderOptions::new().with_page_index(true);
2084 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2085 let reader =
2086 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2087 .build()
2088 .unwrap();
2089
2090 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2091 assert_eq!(result.len(), 8);
2092 }
2093
2094 #[tokio::test]
2095 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2096 use tempfile::TempDir;
2097 use tokio::fs::File;
2098 fn write_metadata_to_local_file(
2099 metadata: ParquetMetaData,
2100 file: impl AsRef<std::path::Path>,
2101 ) {
2102 use crate::file::metadata::ParquetMetaDataWriter;
2103 use std::fs::File;
2104 let file = File::create(file).unwrap();
2105 ParquetMetaDataWriter::new(file, &metadata)
2106 .finish()
2107 .unwrap()
2108 }
2109
2110 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2111 use std::fs::File;
2112 let file = File::open(file).unwrap();
2113 ParquetMetaDataReader::new()
2114 .with_page_indexes(true)
2115 .parse_and_finish(&file)
2116 .unwrap()
2117 }
2118
2119 let testdata = arrow::util::test_util::parquet_test_data();
2120 let path = format!("{testdata}/alltypes_plain.parquet");
2121 let mut file = File::open(&path).await.unwrap();
2122 let file_size = file.metadata().await.unwrap().len();
2123 let metadata = ParquetMetaDataReader::new()
2124 .with_page_indexes(true)
2125 .load_and_finish(&mut file, file_size as usize)
2126 .await
2127 .unwrap();
2128
2129 let tempdir = TempDir::new().unwrap();
2130 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2131 write_metadata_to_local_file(metadata, &metadata_path);
2132 let metadata = read_metadata_from_local_file(&metadata_path);
2133
2134 let options = ArrowReaderOptions::new().with_page_index(true);
2135 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2136 let reader =
2137 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2138 .build()
2139 .unwrap();
2140
2141 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2143 assert_eq!(result.len(), 1);
2144 }
2145}