parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Provides `async` API for reading parquet files as
19//! [`RecordBatch`]es
20//!
21//! ```
22//! # #[tokio::main(flavor="current_thread")]
23//! # async fn main() {
24//! #
25//! # use arrow_array::RecordBatch;
26//! # use arrow::util::pretty::pretty_format_batches;
27//! # use futures::TryStreamExt;
28//! # use tokio::fs::File;
29//! #
30//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
31//! #
32//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
33//! #     let formatted = pretty_format_batches(batches).unwrap().to_string();
34//! #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
35//! #     assert_eq!(
36//! #          &actual_lines, expected_lines,
37//! #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
38//! #          expected_lines, actual_lines
39//! #      );
40//! #  }
41//! #
42//! let testdata = arrow::util::test_util::parquet_test_data();
43//! let path = format!("{}/alltypes_plain.parquet", testdata);
44//! let file = File::open(path).await.unwrap();
45//!
46//! let builder = ParquetRecordBatchStreamBuilder::new(file)
47//!     .await
48//!     .unwrap()
49//!     .with_batch_size(3);
50//!
51//! let file_metadata = builder.metadata().file_metadata();
52//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
53//!
54//! let stream = builder.with_projection(mask).build().unwrap();
55//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
56//! assert_eq!(results.len(), 3);
57//!
58//! assert_batches_eq(
59//!     &results,
60//!     &[
61//!         "+----------+-------------+-----------+",
62//!         "| bool_col | tinyint_col | float_col |",
63//!         "+----------+-------------+-----------+",
64//!         "| true     | 0           | 0.0       |",
65//!         "| false    | 1           | 1.1       |",
66//!         "| true     | 0           | 0.0       |",
67//!         "| false    | 1           | 1.1       |",
68//!         "| true     | 0           | 0.0       |",
69//!         "| false    | 1           | 1.1       |",
70//!         "| true     | 0           | 0.0       |",
71//!         "| false    | 1           | 1.1       |",
72//!         "+----------+-------------+-----------+",
73//!      ],
74//!  );
75//! # }
76//! ```
77
78use std::collections::VecDeque;
79use std::fmt::Formatter;
80use std::io::SeekFrom;
81use std::ops::Range;
82use std::pin::Pin;
83use std::sync::Arc;
84use std::task::{Context, Poll};
85
86use bytes::{Buf, Bytes};
87use futures::future::{BoxFuture, FutureExt};
88use futures::ready;
89use futures::stream::Stream;
90use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
91
92use arrow_array::RecordBatch;
93use arrow_schema::{DataType, Fields, Schema, SchemaRef};
94
95use crate::arrow::array_reader::{build_array_reader, RowGroups};
96use crate::arrow::arrow_reader::{
97    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
98    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
99};
100use crate::arrow::ProjectionMask;
101
102use crate::bloom_filter::{
103    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
104};
105use crate::column::page::{PageIterator, PageReader};
106use crate::errors::{ParquetError, Result};
107use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
108use crate::file::page_index::offset_index::OffsetIndexMetaData;
109use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
110use crate::file::FOOTER_SIZE;
111use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
112
113mod metadata;
114pub use metadata::*;
115
116#[cfg(feature = "object_store")]
117mod store;
118
119use crate::arrow::schema::ParquetField;
120#[cfg(feature = "object_store")]
121pub use store::*;
122
123/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
124///
125/// Notes:
126///
127/// 1. There is a default implementation for types that implement [`AsyncRead`]
128///    and [`AsyncSeek`], for example [`tokio::fs::File`].
129///
130/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
131///    is enabled, implements this interface for [`ObjectStore`].
132///
133/// [`ObjectStore`]: object_store::ObjectStore
134///
135/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
136pub trait AsyncFileReader: Send {
137    /// Retrieve the bytes in `range`
138    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
139
140    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
141    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
142        async move {
143            let mut result = Vec::with_capacity(ranges.len());
144
145            for range in ranges.into_iter() {
146                let data = self.get_bytes(range).await?;
147                result.push(data);
148            }
149
150            Ok(result)
151        }
152        .boxed()
153    }
154
155    /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
156    /// allowing fine-grained control over how metadata is sourced, in particular allowing
157    /// for caching, pre-fetching, catalog metadata, etc...
158    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
159}
160
161impl AsyncFileReader for Box<dyn AsyncFileReader> {
162    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
163        self.as_mut().get_bytes(range)
164    }
165
166    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
167        self.as_mut().get_byte_ranges(ranges)
168    }
169
170    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
171        self.as_mut().get_metadata()
172    }
173}
174
175impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
176    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
177        async move {
178            self.seek(SeekFrom::Start(range.start as u64)).await?;
179
180            let to_read = range.end - range.start;
181            let mut buffer = Vec::with_capacity(to_read);
182            let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
183            if read != to_read {
184                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
185            }
186
187            Ok(buffer.into())
188        }
189        .boxed()
190    }
191
192    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
193        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
194        async move {
195            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
196
197            let mut buf = [0_u8; FOOTER_SIZE];
198            self.read_exact(&mut buf).await?;
199
200            let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
201            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
202                .await?;
203
204            let mut buf = Vec::with_capacity(metadata_len);
205            self.take(metadata_len as _).read_to_end(&mut buf).await?;
206
207            Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
208        }
209        .boxed()
210    }
211}
212
213impl ArrowReaderMetadata {
214    /// Returns a new [`ArrowReaderMetadata`] for this builder
215    ///
216    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
217    ///
218    /// # Notes
219    ///
220    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
221    /// `Self::metadata` is missing the page index, this function will attempt
222    /// to load the page index by making an object store request.
223    pub async fn load_async<T: AsyncFileReader>(
224        input: &mut T,
225        options: ArrowReaderOptions,
226    ) -> Result<Self> {
227        // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
228        // took an argument to fetch the page indexes.
229        let mut metadata = input.get_metadata().await?;
230
231        if options.page_index
232            && metadata.column_index().is_none()
233            && metadata.offset_index().is_none()
234        {
235            let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
236            let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
237            reader.load_page_index(input).await?;
238            metadata = Arc::new(reader.finish()?)
239        }
240        Self::try_new(metadata, options)
241    }
242}
243
244#[doc(hidden)]
245/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
246///
247/// Allows sharing the same builder for both the sync and async versions, whilst also not
248/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
249pub struct AsyncReader<T>(T);
250
251/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
252///
253/// In particular, this handles reading the parquet file metadata, allowing consumers
254/// to use this information to select what specific columns, row groups, etc...
255/// they wish to be read by the resulting stream
256///
257/// See [`ArrowReaderBuilder`] for additional member functions
258pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
259
260impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
261    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
262    ///
263    /// # Example
264    ///
265    /// ```
266    /// # use std::fs::metadata;
267    /// # use std::sync::Arc;
268    /// # use bytes::Bytes;
269    /// # use arrow_array::{Int32Array, RecordBatch};
270    /// # use arrow_schema::{DataType, Field, Schema};
271    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
272    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
273    /// # use tempfile::tempfile;
274    /// # use futures::StreamExt;
275    /// # #[tokio::main(flavor="current_thread")]
276    /// # async fn main() {
277    /// #
278    /// # let mut file = tempfile().unwrap();
279    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
280    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
281    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
282    /// # writer.write(&batch).unwrap();
283    /// # writer.close().unwrap();
284    /// // Open async file containing parquet data
285    /// let mut file = tokio::fs::File::from_std(file);
286    /// // construct the reader
287    /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
288    ///   .await.unwrap().build().unwrap();
289    /// // Read batche
290    /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
291    /// # }
292    /// ```
293    pub async fn new(input: T) -> Result<Self> {
294        Self::new_with_options(input, Default::default()).await
295    }
296
297    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
298    /// and [`ArrowReaderOptions`]
299    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
300        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
301        Ok(Self::new_with_metadata(input, metadata))
302    }
303
304    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
305    ///
306    /// This allows loading metadata once and using it to create multiple builders with
307    /// potentially different settings, that can be read in parallel.
308    ///
309    /// # Example of reading from multiple streams in parallel
310    ///
311    /// ```
312    /// # use std::fs::metadata;
313    /// # use std::sync::Arc;
314    /// # use bytes::Bytes;
315    /// # use arrow_array::{Int32Array, RecordBatch};
316    /// # use arrow_schema::{DataType, Field, Schema};
317    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
318    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
319    /// # use tempfile::tempfile;
320    /// # use futures::StreamExt;
321    /// # #[tokio::main(flavor="current_thread")]
322    /// # async fn main() {
323    /// #
324    /// # let mut file = tempfile().unwrap();
325    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
326    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
327    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
328    /// # writer.write(&batch).unwrap();
329    /// # writer.close().unwrap();
330    /// // open file with parquet data
331    /// let mut file = tokio::fs::File::from_std(file);
332    /// // load metadata once
333    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
334    /// // create two readers, a and b, from the same underlying file
335    /// // without reading the metadata again
336    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
337    ///     file.try_clone().await.unwrap(),
338    ///     meta.clone()
339    /// ).build().unwrap();
340    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
341    ///
342    /// // Can read batches from both readers in parallel
343    /// assert_eq!(
344    ///   a.next().await.unwrap().unwrap(),
345    ///   b.next().await.unwrap().unwrap(),
346    /// );
347    /// # }
348    /// ```
349    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
350        Self::new_builder(AsyncReader(input), metadata)
351    }
352
353    /// Read bloom filter for a column in a row group
354    /// Returns `None` if the column does not have a bloom filter
355    ///
356    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
357    pub async fn get_row_group_column_bloom_filter(
358        &mut self,
359        row_group_idx: usize,
360        column_idx: usize,
361    ) -> Result<Option<Sbbf>> {
362        let metadata = self.metadata.row_group(row_group_idx);
363        let column_metadata = metadata.column(column_idx);
364
365        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
366            offset
367                .try_into()
368                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
369        } else {
370            return Ok(None);
371        };
372
373        let buffer = match column_metadata.bloom_filter_length() {
374            Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
375            None => self
376                .input
377                .0
378                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
379        }
380        .await?;
381
382        let (header, bitset_offset) =
383            chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
384
385        match header.algorithm {
386            BloomFilterAlgorithm::BLOCK(_) => {
387                // this match exists to future proof the singleton algorithm enum
388            }
389        }
390        match header.compression {
391            BloomFilterCompression::UNCOMPRESSED(_) => {
392                // this match exists to future proof the singleton compression enum
393            }
394        }
395        match header.hash {
396            BloomFilterHash::XXHASH(_) => {
397                // this match exists to future proof the singleton hash enum
398            }
399        }
400
401        let bitset = match column_metadata.bloom_filter_length() {
402            Some(_) => buffer.slice((bitset_offset as usize - offset)..),
403            None => {
404                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
405                    ParquetError::General("Bloom filter length is invalid".to_string())
406                })?;
407                self.input
408                    .0
409                    .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
410                    .await?
411            }
412        };
413        Ok(Some(Sbbf::new(&bitset)))
414    }
415
416    /// Build a new [`ParquetRecordBatchStream`]
417    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
418        let num_row_groups = self.metadata.row_groups().len();
419
420        let row_groups = match self.row_groups {
421            Some(row_groups) => {
422                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
423                    return Err(general_err!(
424                        "row group {} out of bounds 0..{}",
425                        col,
426                        num_row_groups
427                    ));
428                }
429                row_groups.into()
430            }
431            None => (0..self.metadata.row_groups().len()).collect(),
432        };
433
434        // Try to avoid allocate large buffer
435        let batch_size = self
436            .batch_size
437            .min(self.metadata.file_metadata().num_rows() as usize);
438        let reader = ReaderFactory {
439            input: self.input.0,
440            filter: self.filter,
441            metadata: self.metadata.clone(),
442            fields: self.fields,
443            limit: self.limit,
444            offset: self.offset,
445        };
446
447        // Ensure schema of ParquetRecordBatchStream respects projection, and does
448        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
449        let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
450            Some(DataType::Struct(fields)) => {
451                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
452            }
453            None => Fields::empty(),
454            _ => unreachable!("Must be Struct for root type"),
455        };
456        let schema = Arc::new(Schema::new(projected_fields));
457
458        Ok(ParquetRecordBatchStream {
459            metadata: self.metadata,
460            batch_size,
461            row_groups,
462            projection: self.projection,
463            selection: self.selection,
464            schema,
465            reader: Some(reader),
466            state: StreamState::Init,
467        })
468    }
469}
470
471type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
472
473/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
474/// [`ParquetRecordBatchReader`]
475struct ReaderFactory<T> {
476    metadata: Arc<ParquetMetaData>,
477
478    fields: Option<Arc<ParquetField>>,
479
480    input: T,
481
482    filter: Option<RowFilter>,
483
484    limit: Option<usize>,
485
486    offset: Option<usize>,
487}
488
489impl<T> ReaderFactory<T>
490where
491    T: AsyncFileReader + Send,
492{
493    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
494    ///
495    /// Note: this captures self so that the resulting future has a static lifetime
496    async fn read_row_group(
497        mut self,
498        row_group_idx: usize,
499        mut selection: Option<RowSelection>,
500        projection: ProjectionMask,
501        batch_size: usize,
502    ) -> ReadResult<T> {
503        // TODO: calling build_array multiple times is wasteful
504
505        let meta = self.metadata.row_group(row_group_idx);
506        let offset_index = self
507            .metadata
508            .offset_index()
509            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
510            .filter(|index| !index.is_empty())
511            .map(|x| x[row_group_idx].as_slice());
512
513        let mut row_group = InMemoryRowGroup {
514            metadata: meta,
515            // schema: meta.schema_descr_ptr(),
516            row_count: meta.num_rows() as usize,
517            column_chunks: vec![None; meta.columns().len()],
518            offset_index,
519        };
520
521        if let Some(filter) = self.filter.as_mut() {
522            for predicate in filter.predicates.iter_mut() {
523                if !selects_any(selection.as_ref()) {
524                    return Ok((self, None));
525                }
526
527                let predicate_projection = predicate.projection();
528                row_group
529                    .fetch(&mut self.input, predicate_projection, selection.as_ref())
530                    .await?;
531
532                let array_reader =
533                    build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
534
535                selection = Some(evaluate_predicate(
536                    batch_size,
537                    array_reader,
538                    selection,
539                    predicate.as_mut(),
540                )?);
541            }
542        }
543
544        // Compute the number of rows in the selection before applying limit and offset
545        let rows_before = selection
546            .as_ref()
547            .map(|s| s.row_count())
548            .unwrap_or(row_group.row_count);
549
550        if rows_before == 0 {
551            return Ok((self, None));
552        }
553
554        selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
555
556        // Compute the number of rows in the selection after applying limit and offset
557        let rows_after = selection
558            .as_ref()
559            .map(|s| s.row_count())
560            .unwrap_or(row_group.row_count);
561
562        // Update offset if necessary
563        if let Some(offset) = &mut self.offset {
564            // Reduction is either because of offset or limit, as limit is applied
565            // after offset has been "exhausted" can just use saturating sub here
566            *offset = offset.saturating_sub(rows_before - rows_after)
567        }
568
569        if rows_after == 0 {
570            return Ok((self, None));
571        }
572
573        if let Some(limit) = &mut self.limit {
574            *limit -= rows_after;
575        }
576
577        row_group
578            .fetch(&mut self.input, &projection, selection.as_ref())
579            .await?;
580
581        let reader = ParquetRecordBatchReader::new(
582            batch_size,
583            build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
584            selection,
585        );
586
587        Ok((self, Some(reader)))
588    }
589}
590
591enum StreamState<T> {
592    /// At the start of a new row group, or the end of the parquet stream
593    Init,
594    /// Decoding a batch
595    Decoding(ParquetRecordBatchReader),
596    /// Reading data from input
597    Reading(BoxFuture<'static, ReadResult<T>>),
598    /// Error
599    Error,
600}
601
602impl<T> std::fmt::Debug for StreamState<T> {
603    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
604        match self {
605            StreamState::Init => write!(f, "StreamState::Init"),
606            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
607            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
608            StreamState::Error => write!(f, "StreamState::Error"),
609        }
610    }
611}
612
613/// An asynchronous [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html) of [`RecordBatch`]
614/// for a parquet file that can be constructed using [`ParquetRecordBatchStreamBuilder`].
615pub struct ParquetRecordBatchStream<T> {
616    metadata: Arc<ParquetMetaData>,
617
618    schema: SchemaRef,
619
620    row_groups: VecDeque<usize>,
621
622    projection: ProjectionMask,
623
624    batch_size: usize,
625
626    selection: Option<RowSelection>,
627
628    /// This is an option so it can be moved into a future
629    reader: Option<ReaderFactory<T>>,
630
631    state: StreamState<T>,
632}
633
634impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
635    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
636        f.debug_struct("ParquetRecordBatchStream")
637            .field("metadata", &self.metadata)
638            .field("schema", &self.schema)
639            .field("batch_size", &self.batch_size)
640            .field("projection", &self.projection)
641            .field("state", &self.state)
642            .finish()
643    }
644}
645
646impl<T> ParquetRecordBatchStream<T> {
647    /// Returns the projected [`SchemaRef`] for reading the parquet file.
648    ///
649    /// Note that the schema metadata will be stripped here. See
650    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
651    pub fn schema(&self) -> &SchemaRef {
652        &self.schema
653    }
654}
655
656impl<T> Stream for ParquetRecordBatchStream<T>
657where
658    T: AsyncFileReader + Unpin + Send + 'static,
659{
660    type Item = Result<RecordBatch>;
661
662    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
663        loop {
664            match &mut self.state {
665                StreamState::Decoding(batch_reader) => match batch_reader.next() {
666                    Some(Ok(batch)) => {
667                        return Poll::Ready(Some(Ok(batch)));
668                    }
669                    Some(Err(e)) => {
670                        self.state = StreamState::Error;
671                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
672                    }
673                    None => self.state = StreamState::Init,
674                },
675                StreamState::Init => {
676                    let row_group_idx = match self.row_groups.pop_front() {
677                        Some(idx) => idx,
678                        None => return Poll::Ready(None),
679                    };
680
681                    let reader = self.reader.take().expect("lost reader");
682
683                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
684
685                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
686
687                    let fut = reader
688                        .read_row_group(
689                            row_group_idx,
690                            selection,
691                            self.projection.clone(),
692                            self.batch_size,
693                        )
694                        .boxed();
695
696                    self.state = StreamState::Reading(fut)
697                }
698                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
699                    Ok((reader_factory, maybe_reader)) => {
700                        self.reader = Some(reader_factory);
701                        match maybe_reader {
702                            // Read records from [`ParquetRecordBatchReader`]
703                            Some(reader) => self.state = StreamState::Decoding(reader),
704                            // All rows skipped, read next row group
705                            None => self.state = StreamState::Init,
706                        }
707                    }
708                    Err(e) => {
709                        self.state = StreamState::Error;
710                        return Poll::Ready(Some(Err(e)));
711                    }
712                },
713                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
714            }
715        }
716    }
717}
718
719/// An in-memory collection of column chunks
720struct InMemoryRowGroup<'a> {
721    metadata: &'a RowGroupMetaData,
722    offset_index: Option<&'a [OffsetIndexMetaData]>,
723    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
724    row_count: usize,
725}
726
727impl<'a> InMemoryRowGroup<'a> {
728    /// Fetches the necessary column data into memory
729    async fn fetch<T: AsyncFileReader + Send>(
730        &mut self,
731        input: &mut T,
732        projection: &ProjectionMask,
733        selection: Option<&RowSelection>,
734    ) -> Result<()> {
735        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
736            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
737            // `RowSelection`
738            let mut page_start_offsets: Vec<Vec<usize>> = vec![];
739
740            let fetch_ranges = self
741                .column_chunks
742                .iter()
743                .zip(self.metadata.columns())
744                .enumerate()
745                .filter(|&(idx, (chunk, _chunk_meta))| {
746                    chunk.is_none() && projection.leaf_included(idx)
747                })
748                .flat_map(|(idx, (_chunk, chunk_meta))| {
749                    // If the first page does not start at the beginning of the column,
750                    // then we need to also fetch a dictionary page.
751                    let mut ranges = vec![];
752                    let (start, _len) = chunk_meta.byte_range();
753                    match offset_index[idx].page_locations.first() {
754                        Some(first) if first.offset as u64 != start => {
755                            ranges.push(start as usize..first.offset as usize);
756                        }
757                        _ => (),
758                    }
759
760                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
761                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
762
763                    ranges
764                })
765                .collect();
766
767            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
768            let mut page_start_offsets = page_start_offsets.into_iter();
769
770            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
771                if chunk.is_some() || !projection.leaf_included(idx) {
772                    continue;
773                }
774
775                if let Some(offsets) = page_start_offsets.next() {
776                    let mut chunks = Vec::with_capacity(offsets.len());
777                    for _ in 0..offsets.len() {
778                        chunks.push(chunk_data.next().unwrap());
779                    }
780
781                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
782                        length: self.metadata.column(idx).byte_range().1 as usize,
783                        data: offsets.into_iter().zip(chunks.into_iter()).collect(),
784                    }))
785                }
786            }
787        } else {
788            let fetch_ranges = self
789                .column_chunks
790                .iter()
791                .enumerate()
792                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
793                .map(|(idx, _chunk)| {
794                    let column = self.metadata.column(idx);
795                    let (start, length) = column.byte_range();
796                    start as usize..(start + length) as usize
797                })
798                .collect();
799
800            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
801
802            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
803                if chunk.is_some() || !projection.leaf_included(idx) {
804                    continue;
805                }
806
807                if let Some(data) = chunk_data.next() {
808                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
809                        offset: self.metadata.column(idx).byte_range().0 as usize,
810                        data,
811                    }));
812                }
813            }
814        }
815
816        Ok(())
817    }
818}
819
820impl RowGroups for InMemoryRowGroup<'_> {
821    fn num_rows(&self) -> usize {
822        self.row_count
823    }
824
825    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
826        match &self.column_chunks[i] {
827            None => Err(ParquetError::General(format!(
828                "Invalid column index {i}, column was not fetched"
829            ))),
830            Some(data) => {
831                let page_locations = self
832                    .offset_index
833                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
834                    .filter(|index| !index.is_empty())
835                    .map(|index| index[i].page_locations.clone());
836                let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
837                    data.clone(),
838                    self.metadata.column(i),
839                    self.row_count,
840                    page_locations,
841                )?);
842
843                Ok(Box::new(ColumnChunkIterator {
844                    reader: Some(Ok(page_reader)),
845                }))
846            }
847        }
848    }
849}
850
851/// An in-memory column chunk
852#[derive(Clone)]
853enum ColumnChunkData {
854    /// Column chunk data representing only a subset of data pages
855    Sparse {
856        /// Length of the full column chunk
857        length: usize,
858        /// Set of data pages included in this sparse chunk. Each element is a tuple
859        /// of (page offset, page data)
860        data: Vec<(usize, Bytes)>,
861    },
862    /// Full column chunk and its offset
863    Dense { offset: usize, data: Bytes },
864}
865
866impl ColumnChunkData {
867    fn get(&self, start: u64) -> Result<Bytes> {
868        match &self {
869            ColumnChunkData::Sparse { data, .. } => data
870                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
871                .map(|idx| data[idx].1.clone())
872                .map_err(|_| {
873                    ParquetError::General(format!(
874                        "Invalid offset in sparse column chunk data: {start}"
875                    ))
876                }),
877            ColumnChunkData::Dense { offset, data } => {
878                let start = start as usize - *offset;
879                Ok(data.slice(start..))
880            }
881        }
882    }
883}
884
885impl Length for ColumnChunkData {
886    fn len(&self) -> u64 {
887        match &self {
888            ColumnChunkData::Sparse { length, .. } => *length as u64,
889            ColumnChunkData::Dense { data, .. } => data.len() as u64,
890        }
891    }
892}
893
894impl ChunkReader for ColumnChunkData {
895    type T = bytes::buf::Reader<Bytes>;
896
897    fn get_read(&self, start: u64) -> Result<Self::T> {
898        Ok(self.get(start)?.reader())
899    }
900
901    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
902        Ok(self.get(start)?.slice(..length))
903    }
904}
905
906/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
907struct ColumnChunkIterator {
908    reader: Option<Result<Box<dyn PageReader>>>,
909}
910
911impl Iterator for ColumnChunkIterator {
912    type Item = Result<Box<dyn PageReader>>;
913
914    fn next(&mut self) -> Option<Self::Item> {
915        self.reader.take()
916    }
917}
918
919impl PageIterator for ColumnChunkIterator {}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924    use crate::arrow::arrow_reader::{
925        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
926    };
927    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
928    use crate::arrow::ArrowWriter;
929    use crate::file::metadata::ParquetMetaDataReader;
930    use crate::file::page_index::index_reader;
931    use crate::file::properties::WriterProperties;
932    use arrow::compute::kernels::cmp::eq;
933    use arrow::error::Result as ArrowResult;
934    use arrow_array::builder::{ListBuilder, StringBuilder};
935    use arrow_array::cast::AsArray;
936    use arrow_array::types::Int32Type;
937    use arrow_array::{
938        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
939        StructArray, UInt64Array,
940    };
941    use arrow_schema::{DataType, Field, Schema};
942    use futures::{StreamExt, TryStreamExt};
943    use rand::{thread_rng, Rng};
944    use std::collections::HashMap;
945    use std::sync::{Arc, Mutex};
946    use tempfile::tempfile;
947
948    #[derive(Clone)]
949    struct TestReader {
950        data: Bytes,
951        metadata: Arc<ParquetMetaData>,
952        requests: Arc<Mutex<Vec<Range<usize>>>>,
953    }
954
955    impl AsyncFileReader for TestReader {
956        fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
957            self.requests.lock().unwrap().push(range.clone());
958            futures::future::ready(Ok(self.data.slice(range))).boxed()
959        }
960
961        fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
962            futures::future::ready(Ok(self.metadata.clone())).boxed()
963        }
964    }
965
966    #[tokio::test]
967    async fn test_async_reader() {
968        let testdata = arrow::util::test_util::parquet_test_data();
969        let path = format!("{testdata}/alltypes_plain.parquet");
970        let data = Bytes::from(std::fs::read(path).unwrap());
971
972        let metadata = ParquetMetaDataReader::new()
973            .parse_and_finish(&data)
974            .unwrap();
975        let metadata = Arc::new(metadata);
976
977        assert_eq!(metadata.num_row_groups(), 1);
978
979        let async_reader = TestReader {
980            data: data.clone(),
981            metadata: metadata.clone(),
982            requests: Default::default(),
983        };
984
985        let requests = async_reader.requests.clone();
986        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
987            .await
988            .unwrap();
989
990        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
991        let stream = builder
992            .with_projection(mask.clone())
993            .with_batch_size(1024)
994            .build()
995            .unwrap();
996
997        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
998
999        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1000            .unwrap()
1001            .with_projection(mask)
1002            .with_batch_size(104)
1003            .build()
1004            .unwrap()
1005            .collect::<ArrowResult<Vec<_>>>()
1006            .unwrap();
1007
1008        assert_eq!(async_batches, sync_batches);
1009
1010        let requests = requests.lock().unwrap();
1011        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1012        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1013
1014        assert_eq!(
1015            &requests[..],
1016            &[
1017                offset_1 as usize..(offset_1 + length_1) as usize,
1018                offset_2 as usize..(offset_2 + length_2) as usize
1019            ]
1020        );
1021    }
1022
1023    #[tokio::test]
1024    async fn test_async_reader_with_index() {
1025        let testdata = arrow::util::test_util::parquet_test_data();
1026        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1027        let data = Bytes::from(std::fs::read(path).unwrap());
1028
1029        let metadata = ParquetMetaDataReader::new()
1030            .parse_and_finish(&data)
1031            .unwrap();
1032        let metadata = Arc::new(metadata);
1033
1034        assert_eq!(metadata.num_row_groups(), 1);
1035
1036        let async_reader = TestReader {
1037            data: data.clone(),
1038            metadata: metadata.clone(),
1039            requests: Default::default(),
1040        };
1041
1042        let options = ArrowReaderOptions::new().with_page_index(true);
1043        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1044            .await
1045            .unwrap();
1046
1047        // The builder should have page and offset indexes loaded now
1048        let metadata_with_index = builder.metadata();
1049
1050        // Check offset indexes are present for all columns
1051        let offset_index = metadata_with_index.offset_index().unwrap();
1052        let column_index = metadata_with_index.column_index().unwrap();
1053
1054        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1055        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1056
1057        let num_columns = metadata_with_index
1058            .file_metadata()
1059            .schema_descr()
1060            .num_columns();
1061
1062        // Check page indexes are present for all columns
1063        offset_index
1064            .iter()
1065            .for_each(|x| assert_eq!(x.len(), num_columns));
1066        column_index
1067            .iter()
1068            .for_each(|x| assert_eq!(x.len(), num_columns));
1069
1070        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1071        let stream = builder
1072            .with_projection(mask.clone())
1073            .with_batch_size(1024)
1074            .build()
1075            .unwrap();
1076
1077        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1078
1079        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1080            .unwrap()
1081            .with_projection(mask)
1082            .with_batch_size(1024)
1083            .build()
1084            .unwrap()
1085            .collect::<ArrowResult<Vec<_>>>()
1086            .unwrap();
1087
1088        assert_eq!(async_batches, sync_batches);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_async_reader_with_limit() {
1093        let testdata = arrow::util::test_util::parquet_test_data();
1094        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1095        let data = Bytes::from(std::fs::read(path).unwrap());
1096
1097        let metadata = ParquetMetaDataReader::new()
1098            .parse_and_finish(&data)
1099            .unwrap();
1100        let metadata = Arc::new(metadata);
1101
1102        assert_eq!(metadata.num_row_groups(), 1);
1103
1104        let async_reader = TestReader {
1105            data: data.clone(),
1106            metadata: metadata.clone(),
1107            requests: Default::default(),
1108        };
1109
1110        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1111            .await
1112            .unwrap();
1113
1114        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1115        let stream = builder
1116            .with_projection(mask.clone())
1117            .with_batch_size(1024)
1118            .with_limit(1)
1119            .build()
1120            .unwrap();
1121
1122        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1123
1124        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1125            .unwrap()
1126            .with_projection(mask)
1127            .with_batch_size(1024)
1128            .with_limit(1)
1129            .build()
1130            .unwrap()
1131            .collect::<ArrowResult<Vec<_>>>()
1132            .unwrap();
1133
1134        assert_eq!(async_batches, sync_batches);
1135    }
1136
1137    #[tokio::test]
1138    async fn test_async_reader_skip_pages() {
1139        let testdata = arrow::util::test_util::parquet_test_data();
1140        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1141        let data = Bytes::from(std::fs::read(path).unwrap());
1142
1143        let metadata = ParquetMetaDataReader::new()
1144            .parse_and_finish(&data)
1145            .unwrap();
1146        let metadata = Arc::new(metadata);
1147
1148        assert_eq!(metadata.num_row_groups(), 1);
1149
1150        let async_reader = TestReader {
1151            data: data.clone(),
1152            metadata: metadata.clone(),
1153            requests: Default::default(),
1154        };
1155
1156        let options = ArrowReaderOptions::new().with_page_index(true);
1157        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1158            .await
1159            .unwrap();
1160
1161        let selection = RowSelection::from(vec![
1162            RowSelector::skip(21),   // Skip first page
1163            RowSelector::select(21), // Select page to boundary
1164            RowSelector::skip(41),   // Skip multiple pages
1165            RowSelector::select(41), // Select multiple pages
1166            RowSelector::skip(25),   // Skip page across boundary
1167            RowSelector::select(25), // Select across page boundary
1168            RowSelector::skip(7116), // Skip to final page boundary
1169            RowSelector::select(10), // Select final page
1170        ]);
1171
1172        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1173
1174        let stream = builder
1175            .with_projection(mask.clone())
1176            .with_row_selection(selection.clone())
1177            .build()
1178            .expect("building stream");
1179
1180        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183            .unwrap()
1184            .with_projection(mask)
1185            .with_batch_size(1024)
1186            .with_row_selection(selection)
1187            .build()
1188            .unwrap()
1189            .collect::<ArrowResult<Vec<_>>>()
1190            .unwrap();
1191
1192        assert_eq!(async_batches, sync_batches);
1193    }
1194
1195    #[tokio::test]
1196    async fn test_fuzz_async_reader_selection() {
1197        let testdata = arrow::util::test_util::parquet_test_data();
1198        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1199        let data = Bytes::from(std::fs::read(path).unwrap());
1200
1201        let metadata = ParquetMetaDataReader::new()
1202            .parse_and_finish(&data)
1203            .unwrap();
1204        let metadata = Arc::new(metadata);
1205
1206        assert_eq!(metadata.num_row_groups(), 1);
1207
1208        let mut rand = thread_rng();
1209
1210        for _ in 0..100 {
1211            let mut expected_rows = 0;
1212            let mut total_rows = 0;
1213            let mut skip = false;
1214            let mut selectors = vec![];
1215
1216            while total_rows < 7300 {
1217                let row_count: usize = rand.gen_range(1..100);
1218
1219                let row_count = row_count.min(7300 - total_rows);
1220
1221                selectors.push(RowSelector { row_count, skip });
1222
1223                total_rows += row_count;
1224                if !skip {
1225                    expected_rows += row_count;
1226                }
1227
1228                skip = !skip;
1229            }
1230
1231            let selection = RowSelection::from(selectors);
1232
1233            let async_reader = TestReader {
1234                data: data.clone(),
1235                metadata: metadata.clone(),
1236                requests: Default::default(),
1237            };
1238
1239            let options = ArrowReaderOptions::new().with_page_index(true);
1240            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1241                .await
1242                .unwrap();
1243
1244            let col_idx: usize = rand.gen_range(0..13);
1245            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1246
1247            let stream = builder
1248                .with_projection(mask.clone())
1249                .with_row_selection(selection.clone())
1250                .build()
1251                .expect("building stream");
1252
1253            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1254
1255            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1256
1257            assert_eq!(actual_rows, expected_rows);
1258        }
1259    }
1260
1261    #[tokio::test]
1262    async fn test_async_reader_zero_row_selector() {
1263        //See https://github.com/apache/arrow-rs/issues/2669
1264        let testdata = arrow::util::test_util::parquet_test_data();
1265        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1266        let data = Bytes::from(std::fs::read(path).unwrap());
1267
1268        let metadata = ParquetMetaDataReader::new()
1269            .parse_and_finish(&data)
1270            .unwrap();
1271        let metadata = Arc::new(metadata);
1272
1273        assert_eq!(metadata.num_row_groups(), 1);
1274
1275        let mut rand = thread_rng();
1276
1277        let mut expected_rows = 0;
1278        let mut total_rows = 0;
1279        let mut skip = false;
1280        let mut selectors = vec![];
1281
1282        selectors.push(RowSelector {
1283            row_count: 0,
1284            skip: false,
1285        });
1286
1287        while total_rows < 7300 {
1288            let row_count: usize = rand.gen_range(1..100);
1289
1290            let row_count = row_count.min(7300 - total_rows);
1291
1292            selectors.push(RowSelector { row_count, skip });
1293
1294            total_rows += row_count;
1295            if !skip {
1296                expected_rows += row_count;
1297            }
1298
1299            skip = !skip;
1300        }
1301
1302        let selection = RowSelection::from(selectors);
1303
1304        let async_reader = TestReader {
1305            data: data.clone(),
1306            metadata: metadata.clone(),
1307            requests: Default::default(),
1308        };
1309
1310        let options = ArrowReaderOptions::new().with_page_index(true);
1311        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1312            .await
1313            .unwrap();
1314
1315        let col_idx: usize = rand.gen_range(0..13);
1316        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1317
1318        let stream = builder
1319            .with_projection(mask.clone())
1320            .with_row_selection(selection.clone())
1321            .build()
1322            .expect("building stream");
1323
1324        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1325
1326        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1327
1328        assert_eq!(actual_rows, expected_rows);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_row_filter() {
1333        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1334        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1335        let c = Int32Array::from_iter(0..6);
1336        let data = RecordBatch::try_from_iter([
1337            ("a", Arc::new(a) as ArrayRef),
1338            ("b", Arc::new(b) as ArrayRef),
1339            ("c", Arc::new(c) as ArrayRef),
1340        ])
1341        .unwrap();
1342
1343        let mut buf = Vec::with_capacity(1024);
1344        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1345        writer.write(&data).unwrap();
1346        writer.close().unwrap();
1347
1348        let data: Bytes = buf.into();
1349        let metadata = ParquetMetaDataReader::new()
1350            .parse_and_finish(&data)
1351            .unwrap();
1352        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1353
1354        let test = TestReader {
1355            data,
1356            metadata: Arc::new(metadata),
1357            requests: Default::default(),
1358        };
1359        let requests = test.requests.clone();
1360
1361        let a_scalar = StringArray::from_iter_values(["b"]);
1362        let a_filter = ArrowPredicateFn::new(
1363            ProjectionMask::leaves(&parquet_schema, vec![0]),
1364            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1365        );
1366
1367        let b_scalar = StringArray::from_iter_values(["4"]);
1368        let b_filter = ArrowPredicateFn::new(
1369            ProjectionMask::leaves(&parquet_schema, vec![1]),
1370            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1371        );
1372
1373        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1374
1375        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1376        let stream = ParquetRecordBatchStreamBuilder::new(test)
1377            .await
1378            .unwrap()
1379            .with_projection(mask.clone())
1380            .with_batch_size(1024)
1381            .with_row_filter(filter)
1382            .build()
1383            .unwrap();
1384
1385        let batches: Vec<_> = stream.try_collect().await.unwrap();
1386        assert_eq!(batches.len(), 1);
1387
1388        let batch = &batches[0];
1389        assert_eq!(batch.num_rows(), 1);
1390        assert_eq!(batch.num_columns(), 2);
1391
1392        let col = batch.column(0);
1393        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1394        assert_eq!(val, "b");
1395
1396        let col = batch.column(1);
1397        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1398        assert_eq!(val, 3);
1399
1400        // Should only have made 3 requests
1401        assert_eq!(requests.lock().unwrap().len(), 3);
1402    }
1403
1404    #[tokio::test]
1405    async fn test_limit_multiple_row_groups() {
1406        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1407        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1408        let c = Int32Array::from_iter(0..6);
1409        let data = RecordBatch::try_from_iter([
1410            ("a", Arc::new(a) as ArrayRef),
1411            ("b", Arc::new(b) as ArrayRef),
1412            ("c", Arc::new(c) as ArrayRef),
1413        ])
1414        .unwrap();
1415
1416        let mut buf = Vec::with_capacity(1024);
1417        let props = WriterProperties::builder()
1418            .set_max_row_group_size(3)
1419            .build();
1420        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1421        writer.write(&data).unwrap();
1422        writer.close().unwrap();
1423
1424        let data: Bytes = buf.into();
1425        let metadata = ParquetMetaDataReader::new()
1426            .parse_and_finish(&data)
1427            .unwrap();
1428
1429        assert_eq!(metadata.num_row_groups(), 2);
1430
1431        let test = TestReader {
1432            data,
1433            metadata: Arc::new(metadata),
1434            requests: Default::default(),
1435        };
1436
1437        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1438            .await
1439            .unwrap()
1440            .with_batch_size(1024)
1441            .with_limit(4)
1442            .build()
1443            .unwrap();
1444
1445        let batches: Vec<_> = stream.try_collect().await.unwrap();
1446        // Expect one batch for each row group
1447        assert_eq!(batches.len(), 2);
1448
1449        let batch = &batches[0];
1450        // First batch should contain all rows
1451        assert_eq!(batch.num_rows(), 3);
1452        assert_eq!(batch.num_columns(), 3);
1453        let col2 = batch.column(2).as_primitive::<Int32Type>();
1454        assert_eq!(col2.values(), &[0, 1, 2]);
1455
1456        let batch = &batches[1];
1457        // Second batch should trigger the limit and only have one row
1458        assert_eq!(batch.num_rows(), 1);
1459        assert_eq!(batch.num_columns(), 3);
1460        let col2 = batch.column(2).as_primitive::<Int32Type>();
1461        assert_eq!(col2.values(), &[3]);
1462
1463        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1464            .await
1465            .unwrap()
1466            .with_offset(2)
1467            .with_limit(3)
1468            .build()
1469            .unwrap();
1470
1471        let batches: Vec<_> = stream.try_collect().await.unwrap();
1472        // Expect one batch for each row group
1473        assert_eq!(batches.len(), 2);
1474
1475        let batch = &batches[0];
1476        // First batch should contain one row
1477        assert_eq!(batch.num_rows(), 1);
1478        assert_eq!(batch.num_columns(), 3);
1479        let col2 = batch.column(2).as_primitive::<Int32Type>();
1480        assert_eq!(col2.values(), &[2]);
1481
1482        let batch = &batches[1];
1483        // Second batch should contain two rows
1484        assert_eq!(batch.num_rows(), 2);
1485        assert_eq!(batch.num_columns(), 3);
1486        let col2 = batch.column(2).as_primitive::<Int32Type>();
1487        assert_eq!(col2.values(), &[3, 4]);
1488
1489        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1490            .await
1491            .unwrap()
1492            .with_offset(4)
1493            .with_limit(20)
1494            .build()
1495            .unwrap();
1496
1497        let batches: Vec<_> = stream.try_collect().await.unwrap();
1498        // Should skip first row group
1499        assert_eq!(batches.len(), 1);
1500
1501        let batch = &batches[0];
1502        // First batch should contain two rows
1503        assert_eq!(batch.num_rows(), 2);
1504        assert_eq!(batch.num_columns(), 3);
1505        let col2 = batch.column(2).as_primitive::<Int32Type>();
1506        assert_eq!(col2.values(), &[4, 5]);
1507    }
1508
1509    #[tokio::test]
1510    async fn test_row_filter_with_index() {
1511        let testdata = arrow::util::test_util::parquet_test_data();
1512        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1513        let data = Bytes::from(std::fs::read(path).unwrap());
1514
1515        let metadata = ParquetMetaDataReader::new()
1516            .parse_and_finish(&data)
1517            .unwrap();
1518        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1519        let metadata = Arc::new(metadata);
1520
1521        assert_eq!(metadata.num_row_groups(), 1);
1522
1523        let async_reader = TestReader {
1524            data: data.clone(),
1525            metadata: metadata.clone(),
1526            requests: Default::default(),
1527        };
1528
1529        let a_filter =
1530            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1531                Ok(batch.column(0).as_boolean().clone())
1532            });
1533
1534        let b_scalar = Int8Array::from(vec![2]);
1535        let b_filter = ArrowPredicateFn::new(
1536            ProjectionMask::leaves(&parquet_schema, vec![2]),
1537            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1538        );
1539
1540        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1541
1542        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1543
1544        let options = ArrowReaderOptions::new().with_page_index(true);
1545        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1546            .await
1547            .unwrap()
1548            .with_projection(mask.clone())
1549            .with_batch_size(1024)
1550            .with_row_filter(filter)
1551            .build()
1552            .unwrap();
1553
1554        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1555
1556        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1557
1558        assert_eq!(total_rows, 730);
1559    }
1560
1561    #[tokio::test]
1562    async fn test_in_memory_row_group_sparse() {
1563        let testdata = arrow::util::test_util::parquet_test_data();
1564        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1565        let data = Bytes::from(std::fs::read(path).unwrap());
1566
1567        let metadata = ParquetMetaDataReader::new()
1568            .parse_and_finish(&data)
1569            .unwrap();
1570
1571        let offset_index =
1572            index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
1573                .expect("reading offset index");
1574
1575        let mut metadata_builder = metadata.into_builder();
1576        let mut row_groups = metadata_builder.take_row_groups();
1577        row_groups.truncate(1);
1578        let row_group_meta = row_groups.pop().unwrap();
1579
1580        let metadata = metadata_builder
1581            .add_row_group(row_group_meta)
1582            .set_column_index(None)
1583            .set_offset_index(Some(vec![offset_index.clone()]))
1584            .build();
1585
1586        let metadata = Arc::new(metadata);
1587
1588        let num_rows = metadata.row_group(0).num_rows();
1589
1590        assert_eq!(metadata.num_row_groups(), 1);
1591
1592        let async_reader = TestReader {
1593            data: data.clone(),
1594            metadata: metadata.clone(),
1595            requests: Default::default(),
1596        };
1597
1598        let requests = async_reader.requests.clone();
1599        let (_, fields) = parquet_to_arrow_schema_and_fields(
1600            metadata.file_metadata().schema_descr(),
1601            ProjectionMask::all(),
1602            None,
1603        )
1604        .unwrap();
1605
1606        let _schema_desc = metadata.file_metadata().schema_descr();
1607
1608        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1609
1610        let reader_factory = ReaderFactory {
1611            metadata,
1612            fields: fields.map(Arc::new),
1613            input: async_reader,
1614            filter: None,
1615            limit: None,
1616            offset: None,
1617        };
1618
1619        let mut skip = true;
1620        let mut pages = offset_index[0].page_locations.iter().peekable();
1621
1622        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1623        let mut selectors = vec![];
1624        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1625        while let Some(page) = pages.next() {
1626            let num_rows = if let Some(next_page) = pages.peek() {
1627                next_page.first_row_index - page.first_row_index
1628            } else {
1629                num_rows - page.first_row_index
1630            };
1631
1632            if skip {
1633                selectors.push(RowSelector::skip(num_rows as usize));
1634            } else {
1635                selectors.push(RowSelector::select(num_rows as usize));
1636                let start = page.offset as usize;
1637                let end = start + page.compressed_page_size as usize;
1638                expected_page_requests.push(start..end);
1639            }
1640            skip = !skip;
1641        }
1642
1643        let selection = RowSelection::from(selectors);
1644
1645        let (_factory, _reader) = reader_factory
1646            .read_row_group(0, Some(selection), projection.clone(), 48)
1647            .await
1648            .expect("reading row group");
1649
1650        let requests = requests.lock().unwrap();
1651
1652        assert_eq!(&requests[..], &expected_page_requests)
1653    }
1654
1655    #[tokio::test]
1656    async fn test_batch_size_overallocate() {
1657        let testdata = arrow::util::test_util::parquet_test_data();
1658        // `alltypes_plain.parquet` only have 8 rows
1659        let path = format!("{testdata}/alltypes_plain.parquet");
1660        let data = Bytes::from(std::fs::read(path).unwrap());
1661
1662        let metadata = ParquetMetaDataReader::new()
1663            .parse_and_finish(&data)
1664            .unwrap();
1665        let file_rows = metadata.file_metadata().num_rows() as usize;
1666        let metadata = Arc::new(metadata);
1667
1668        let async_reader = TestReader {
1669            data: data.clone(),
1670            metadata: metadata.clone(),
1671            requests: Default::default(),
1672        };
1673
1674        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1675            .await
1676            .unwrap();
1677
1678        let stream = builder
1679            .with_projection(ProjectionMask::all())
1680            .with_batch_size(1024)
1681            .build()
1682            .unwrap();
1683        assert_ne!(1024, file_rows);
1684        assert_eq!(stream.batch_size, file_rows);
1685    }
1686
1687    #[tokio::test]
1688    async fn test_get_row_group_column_bloom_filter_without_length() {
1689        let testdata = arrow::util::test_util::parquet_test_data();
1690        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1691        let data = Bytes::from(std::fs::read(path).unwrap());
1692        test_get_row_group_column_bloom_filter(data, false).await;
1693    }
1694
1695    #[tokio::test]
1696    async fn test_parquet_record_batch_stream_schema() {
1697        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1698            schema.flattened_fields().iter().map(|f| f.name()).collect()
1699        }
1700
1701        // ParquetRecordBatchReaderBuilder::schema differs from
1702        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1703        // schema contents (in terms of custom metadata attached to schema, and fields
1704        // returned). Test to ensure this remains consistent behaviour.
1705        //
1706        // Ensure same for asynchronous versions of the above.
1707
1708        // Prep data, for a schema with nested fields, with custom metadata
1709        let mut metadata = HashMap::with_capacity(1);
1710        metadata.insert("key".to_string(), "value".to_string());
1711
1712        let nested_struct_array = StructArray::from(vec![
1713            (
1714                Arc::new(Field::new("d", DataType::Utf8, true)),
1715                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1716            ),
1717            (
1718                Arc::new(Field::new("e", DataType::Utf8, true)),
1719                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1720            ),
1721        ]);
1722        let struct_array = StructArray::from(vec![
1723            (
1724                Arc::new(Field::new("a", DataType::Int32, true)),
1725                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1726            ),
1727            (
1728                Arc::new(Field::new("b", DataType::UInt64, true)),
1729                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1730            ),
1731            (
1732                Arc::new(Field::new(
1733                    "c",
1734                    nested_struct_array.data_type().clone(),
1735                    true,
1736                )),
1737                Arc::new(nested_struct_array) as ArrayRef,
1738            ),
1739        ]);
1740
1741        let schema =
1742            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1743        let record_batch = RecordBatch::from(struct_array)
1744            .with_schema(schema.clone())
1745            .unwrap();
1746
1747        // Write parquet with custom metadata in schema
1748        let mut file = tempfile().unwrap();
1749        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1750        writer.write(&record_batch).unwrap();
1751        writer.close().unwrap();
1752
1753        let all_fields = ["a", "b", "c", "d", "e"];
1754        // (leaf indices in mask, expected names in output schema all fields)
1755        let projections = [
1756            (vec![], vec![]),
1757            (vec![0], vec!["a"]),
1758            (vec![0, 1], vec!["a", "b"]),
1759            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1760            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1761        ];
1762
1763        // Ensure we're consistent for each of these projections
1764        for (indices, expected_projected_names) in projections {
1765            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1766                // Builder schema should preserve all fields and metadata
1767                assert_eq!(get_all_field_names(&builder), all_fields);
1768                assert_eq!(builder.metadata, metadata);
1769                // Reader & batch schema should show only projected fields, and no metadata
1770                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1771                assert_eq!(reader.metadata, HashMap::default());
1772                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1773                assert_eq!(batch.metadata, HashMap::default());
1774            };
1775
1776            let builder =
1777                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1778            let sync_builder_schema = builder.schema().clone();
1779            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1780            let mut reader = builder.with_projection(mask).build().unwrap();
1781            let sync_reader_schema = reader.schema();
1782            let batch = reader.next().unwrap().unwrap();
1783            let sync_batch_schema = batch.schema();
1784            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1785
1786            // asynchronous should be same
1787            let file = tokio::fs::File::from(file.try_clone().unwrap());
1788            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1789            let async_builder_schema = builder.schema().clone();
1790            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1791            let mut reader = builder.with_projection(mask).build().unwrap();
1792            let async_reader_schema = reader.schema().clone();
1793            let batch = reader.next().await.unwrap().unwrap();
1794            let async_batch_schema = batch.schema();
1795            assert_schemas(
1796                async_builder_schema,
1797                async_reader_schema,
1798                async_batch_schema,
1799            );
1800        }
1801    }
1802
1803    #[tokio::test]
1804    async fn test_get_row_group_column_bloom_filter_with_length() {
1805        // convert to new parquet file with bloom_filter_length
1806        let testdata = arrow::util::test_util::parquet_test_data();
1807        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1808        let data = Bytes::from(std::fs::read(path).unwrap());
1809        let metadata = ParquetMetaDataReader::new()
1810            .parse_and_finish(&data)
1811            .unwrap();
1812        let metadata = Arc::new(metadata);
1813        let async_reader = TestReader {
1814            data: data.clone(),
1815            metadata: metadata.clone(),
1816            requests: Default::default(),
1817        };
1818        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1819            .await
1820            .unwrap();
1821        let schema = builder.schema().clone();
1822        let stream = builder.build().unwrap();
1823        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1824
1825        let mut parquet_data = Vec::new();
1826        let props = WriterProperties::builder()
1827            .set_bloom_filter_enabled(true)
1828            .build();
1829        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1830        for batch in batches {
1831            writer.write(&batch).unwrap();
1832        }
1833        writer.close().unwrap();
1834
1835        // test the new parquet file
1836        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1837    }
1838
1839    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1840        let metadata = ParquetMetaDataReader::new()
1841            .parse_and_finish(&data)
1842            .unwrap();
1843        let metadata = Arc::new(metadata);
1844
1845        assert_eq!(metadata.num_row_groups(), 1);
1846        let row_group = metadata.row_group(0);
1847        let column = row_group.column(0);
1848        assert_eq!(column.bloom_filter_length().is_some(), with_length);
1849
1850        let async_reader = TestReader {
1851            data: data.clone(),
1852            metadata: metadata.clone(),
1853            requests: Default::default(),
1854        };
1855
1856        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1857            .await
1858            .unwrap();
1859
1860        let sbbf = builder
1861            .get_row_group_column_bloom_filter(0, 0)
1862            .await
1863            .unwrap()
1864            .unwrap();
1865        assert!(sbbf.check(&"Hello"));
1866        assert!(!sbbf.check(&"Hello_Not_Exists"));
1867    }
1868
1869    #[tokio::test]
1870    async fn test_nested_skip() {
1871        let schema = Arc::new(Schema::new(vec![
1872            Field::new("col_1", DataType::UInt64, false),
1873            Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
1874        ]));
1875
1876        // Default writer properties
1877        let props = WriterProperties::builder()
1878            .set_data_page_row_count_limit(256)
1879            .set_write_batch_size(256)
1880            .set_max_row_group_size(1024);
1881
1882        // Write data
1883        let mut file = tempfile().unwrap();
1884        let mut writer =
1885            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1886
1887        let mut builder = ListBuilder::new(StringBuilder::new());
1888        for id in 0..1024 {
1889            match id % 3 {
1890                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1891                1 => builder.append_value([Some(format!("id_{id}"))]),
1892                _ => builder.append_null(),
1893            }
1894        }
1895        let refs = vec![
1896            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1897            Arc::new(builder.finish()) as ArrayRef,
1898        ];
1899
1900        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1901        writer.write(&batch).unwrap();
1902        writer.close().unwrap();
1903
1904        let selections = [
1905            RowSelection::from(vec![
1906                RowSelector::skip(313),
1907                RowSelector::select(1),
1908                RowSelector::skip(709),
1909                RowSelector::select(1),
1910            ]),
1911            RowSelection::from(vec![
1912                RowSelector::skip(255),
1913                RowSelector::select(1),
1914                RowSelector::skip(767),
1915                RowSelector::select(1),
1916            ]),
1917            RowSelection::from(vec![
1918                RowSelector::select(255),
1919                RowSelector::skip(1),
1920                RowSelector::select(767),
1921                RowSelector::skip(1),
1922            ]),
1923            RowSelection::from(vec![
1924                RowSelector::skip(254),
1925                RowSelector::select(1),
1926                RowSelector::select(1),
1927                RowSelector::skip(767),
1928                RowSelector::select(1),
1929            ]),
1930        ];
1931
1932        for selection in selections {
1933            let expected = selection.row_count();
1934            // Read data
1935            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1936                tokio::fs::File::from_std(file.try_clone().unwrap()),
1937                ArrowReaderOptions::new().with_page_index(true),
1938            )
1939            .await
1940            .unwrap();
1941
1942            reader = reader.with_row_selection(selection);
1943
1944            let mut stream = reader.build().unwrap();
1945
1946            let mut total_rows = 0;
1947            while let Some(rb) = stream.next().await {
1948                let rb = rb.unwrap();
1949                total_rows += rb.num_rows();
1950            }
1951            assert_eq!(total_rows, expected);
1952        }
1953    }
1954
1955    #[tokio::test]
1956    async fn test_row_filter_nested() {
1957        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1958        let b = StructArray::from(vec![
1959            (
1960                Arc::new(Field::new("aa", DataType::Utf8, true)),
1961                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1962            ),
1963            (
1964                Arc::new(Field::new("bb", DataType::Utf8, true)),
1965                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1966            ),
1967        ]);
1968        let c = Int32Array::from_iter(0..6);
1969        let data = RecordBatch::try_from_iter([
1970            ("a", Arc::new(a) as ArrayRef),
1971            ("b", Arc::new(b) as ArrayRef),
1972            ("c", Arc::new(c) as ArrayRef),
1973        ])
1974        .unwrap();
1975
1976        let mut buf = Vec::with_capacity(1024);
1977        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1978        writer.write(&data).unwrap();
1979        writer.close().unwrap();
1980
1981        let data: Bytes = buf.into();
1982        let metadata = ParquetMetaDataReader::new()
1983            .parse_and_finish(&data)
1984            .unwrap();
1985        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1986
1987        let test = TestReader {
1988            data,
1989            metadata: Arc::new(metadata),
1990            requests: Default::default(),
1991        };
1992        let requests = test.requests.clone();
1993
1994        let a_scalar = StringArray::from_iter_values(["b"]);
1995        let a_filter = ArrowPredicateFn::new(
1996            ProjectionMask::leaves(&parquet_schema, vec![0]),
1997            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1998        );
1999
2000        let b_scalar = StringArray::from_iter_values(["4"]);
2001        let b_filter = ArrowPredicateFn::new(
2002            ProjectionMask::leaves(&parquet_schema, vec![2]),
2003            move |batch| {
2004                // Filter on the second element of the struct.
2005                let struct_array = batch
2006                    .column(0)
2007                    .as_any()
2008                    .downcast_ref::<StructArray>()
2009                    .unwrap();
2010                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2011            },
2012        );
2013
2014        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2015
2016        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2017        let stream = ParquetRecordBatchStreamBuilder::new(test)
2018            .await
2019            .unwrap()
2020            .with_projection(mask.clone())
2021            .with_batch_size(1024)
2022            .with_row_filter(filter)
2023            .build()
2024            .unwrap();
2025
2026        let batches: Vec<_> = stream.try_collect().await.unwrap();
2027        assert_eq!(batches.len(), 1);
2028
2029        let batch = &batches[0];
2030        assert_eq!(batch.num_rows(), 1);
2031        assert_eq!(batch.num_columns(), 2);
2032
2033        let col = batch.column(0);
2034        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2035        assert_eq!(val, "b");
2036
2037        let col = batch.column(1);
2038        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2039        assert_eq!(val, 3);
2040
2041        // Should only have made 3 requests
2042        assert_eq!(requests.lock().unwrap().len(), 3);
2043    }
2044
2045    #[tokio::test]
2046    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2047        use tokio::fs::File;
2048        let testdata = arrow::util::test_util::parquet_test_data();
2049        let path = format!("{testdata}/alltypes_plain.parquet");
2050        let mut file = File::open(&path).await.unwrap();
2051        let file_size = file.metadata().await.unwrap().len();
2052        let mut metadata = ParquetMetaDataReader::new()
2053            .with_page_indexes(true)
2054            .load_and_finish(&mut file, file_size as usize)
2055            .await
2056            .unwrap();
2057
2058        metadata.set_offset_index(Some(vec![]));
2059        let options = ArrowReaderOptions::new().with_page_index(true);
2060        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2061        let reader =
2062            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2063                .build()
2064                .unwrap();
2065
2066        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2067        assert_eq!(result.len(), 1);
2068    }
2069
2070    #[tokio::test]
2071    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2072        use tokio::fs::File;
2073        let testdata = arrow::util::test_util::parquet_test_data();
2074        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2075        let mut file = File::open(&path).await.unwrap();
2076        let file_size = file.metadata().await.unwrap().len();
2077        let metadata = ParquetMetaDataReader::new()
2078            .with_page_indexes(true)
2079            .load_and_finish(&mut file, file_size as usize)
2080            .await
2081            .unwrap();
2082
2083        let options = ArrowReaderOptions::new().with_page_index(true);
2084        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2085        let reader =
2086            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2087                .build()
2088                .unwrap();
2089
2090        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2091        assert_eq!(result.len(), 8);
2092    }
2093
2094    #[tokio::test]
2095    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2096        use tempfile::TempDir;
2097        use tokio::fs::File;
2098        fn write_metadata_to_local_file(
2099            metadata: ParquetMetaData,
2100            file: impl AsRef<std::path::Path>,
2101        ) {
2102            use crate::file::metadata::ParquetMetaDataWriter;
2103            use std::fs::File;
2104            let file = File::create(file).unwrap();
2105            ParquetMetaDataWriter::new(file, &metadata)
2106                .finish()
2107                .unwrap()
2108        }
2109
2110        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2111            use std::fs::File;
2112            let file = File::open(file).unwrap();
2113            ParquetMetaDataReader::new()
2114                .with_page_indexes(true)
2115                .parse_and_finish(&file)
2116                .unwrap()
2117        }
2118
2119        let testdata = arrow::util::test_util::parquet_test_data();
2120        let path = format!("{testdata}/alltypes_plain.parquet");
2121        let mut file = File::open(&path).await.unwrap();
2122        let file_size = file.metadata().await.unwrap().len();
2123        let metadata = ParquetMetaDataReader::new()
2124            .with_page_indexes(true)
2125            .load_and_finish(&mut file, file_size as usize)
2126            .await
2127            .unwrap();
2128
2129        let tempdir = TempDir::new().unwrap();
2130        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2131        write_metadata_to_local_file(metadata, &metadata_path);
2132        let metadata = read_metadata_from_local_file(&metadata_path);
2133
2134        let options = ArrowReaderOptions::new().with_page_index(true);
2135        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2136        let reader =
2137            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2138                .build()
2139                .unwrap();
2140
2141        // Panics here
2142        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2143        assert_eq!(result.len(), 1);
2144    }
2145}