parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use std::collections::VecDeque;
21use std::sync::Arc;
22
23use arrow_array::cast::AsArray;
24use arrow_array::Array;
25use arrow_array::{RecordBatch, RecordBatchReader};
26use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
27use arrow_select::filter::prep_null_mask_filter;
28pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
29pub use selection::{RowSelection, RowSelector};
30
31pub use crate::arrow::array_reader::RowGroups;
32use crate::arrow::array_reader::{build_array_reader, ArrayReader};
33use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
34use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
35use crate::column::page::{PageIterator, PageReader};
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41mod filter;
42mod selection;
43pub mod statistics;
44
45/// Builder for constructing parquet readers into arrow.
46///
47/// Most users should use one of the following specializations:
48///
49/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
50/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
51///
52/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
53pub struct ArrowReaderBuilder<T> {
54    pub(crate) input: T,
55
56    pub(crate) metadata: Arc<ParquetMetaData>,
57
58    pub(crate) schema: SchemaRef,
59
60    pub(crate) fields: Option<Arc<ParquetField>>,
61
62    pub(crate) batch_size: usize,
63
64    pub(crate) row_groups: Option<Vec<usize>>,
65
66    pub(crate) projection: ProjectionMask,
67
68    pub(crate) filter: Option<RowFilter>,
69
70    pub(crate) selection: Option<RowSelection>,
71
72    pub(crate) limit: Option<usize>,
73
74    pub(crate) offset: Option<usize>,
75}
76
77impl<T> ArrowReaderBuilder<T> {
78    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
79        Self {
80            input,
81            metadata: metadata.metadata,
82            schema: metadata.schema,
83            fields: metadata.fields,
84            batch_size: 1024,
85            row_groups: None,
86            projection: ProjectionMask::all(),
87            filter: None,
88            selection: None,
89            limit: None,
90            offset: None,
91        }
92    }
93
94    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
95    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
96        &self.metadata
97    }
98
99    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
100    pub fn parquet_schema(&self) -> &SchemaDescriptor {
101        self.metadata.file_metadata().schema_descr()
102    }
103
104    /// Returns the arrow [`SchemaRef`] for this parquet file
105    pub fn schema(&self) -> &SchemaRef {
106        &self.schema
107    }
108
109    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
110    /// If the batch_size more than the file row count, use the file row count.
111    pub fn with_batch_size(self, batch_size: usize) -> Self {
112        // Try to avoid allocate large buffer
113        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
114        Self { batch_size, ..self }
115    }
116
117    /// Only read data from the provided row group indexes
118    ///
119    /// This is also called row group filtering
120    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
121        Self {
122            row_groups: Some(row_groups),
123            ..self
124        }
125    }
126
127    /// Only read data from the provided column indexes
128    pub fn with_projection(self, mask: ProjectionMask) -> Self {
129        Self {
130            projection: mask,
131            ..self
132        }
133    }
134
135    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
136    /// data into memory.
137    ///
138    /// This feature is used to restrict which rows are decoded within row
139    /// groups, skipping ranges of rows that are not needed. Such selections
140    /// could be determined by evaluating predicates against the parquet page
141    /// [`Index`] or some other external information available to a query
142    /// engine.
143    ///
144    /// # Notes
145    ///
146    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
147    /// applying the row selection, and therefore rows from skipped row groups
148    /// should not be included in the [`RowSelection`] (see example below)
149    ///
150    /// It is recommended to enable writing the page index if using this
151    /// functionality, to allow more efficient skipping over data pages. See
152    /// [`ArrowReaderOptions::with_page_index`].
153    ///
154    /// # Example
155    ///
156    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
157    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
158    /// row group 3:
159    ///
160    /// ```text
161    ///   Row Group 0, 1000 rows (selected)
162    ///   Row Group 1, 1000 rows (skipped)
163    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
164    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
165    /// ```
166    ///
167    /// You could pass the following [`RowSelection`]:
168    ///
169    /// ```text
170    ///  Select 1000    (scan all rows in row group 0)
171    ///  Skip 50        (skip the first 50 rows in row group 2)
172    ///  Select 50      (scan rows 50-100 in row group 2)
173    ///  Skip 900       (skip the remaining rows in row group 2)
174    ///  Skip 200       (skip the first 200 rows in row group 3)
175    ///  Select 100     (scan rows 200-300 in row group 3)
176    ///  Skip 700       (skip the remaining rows in row group 3)
177    /// ```
178    /// Note there is no entry for the (entirely) skipped row group 1.
179    ///
180    /// Note you can represent the same selection with fewer entries. Instead of
181    ///
182    /// ```text
183    ///  Skip 900       (skip the remaining rows in row group 2)
184    ///  Skip 200       (skip the first 200 rows in row group 3)
185    /// ```
186    ///
187    /// you could use
188    ///
189    /// ```text
190    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
191    /// ```
192    ///
193    /// [`Index`]: crate::file::page_index::index::Index
194    pub fn with_row_selection(self, selection: RowSelection) -> Self {
195        Self {
196            selection: Some(selection),
197            ..self
198        }
199    }
200
201    /// Provide a [`RowFilter`] to skip decoding rows
202    ///
203    /// Row filters are applied after row group selection and row selection
204    ///
205    /// It is recommended to enable reading the page index if using this functionality, to allow
206    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
207    pub fn with_row_filter(self, filter: RowFilter) -> Self {
208        Self {
209            filter: Some(filter),
210            ..self
211        }
212    }
213
214    /// Provide a limit to the number of rows to be read
215    ///
216    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
217    /// allowing it to limit the final set of rows decoded after any pushed down predicates
218    ///
219    /// It is recommended to enable reading the page index if using this functionality, to allow
220    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
221    pub fn with_limit(self, limit: usize) -> Self {
222        Self {
223            limit: Some(limit),
224            ..self
225        }
226    }
227
228    /// Provide an offset to skip over the given number of rows
229    ///
230    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
231    /// allowing it to skip rows after any pushed down predicates
232    ///
233    /// It is recommended to enable reading the page index if using this functionality, to allow
234    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
235    pub fn with_offset(self, offset: usize) -> Self {
236        Self {
237            offset: Some(offset),
238            ..self
239        }
240    }
241}
242
243/// Options that control how metadata is read for a parquet file
244///
245/// See [`ArrowReaderBuilder`] for how to configure how the column data
246/// is then read from the file, including projection and filter pushdown
247#[derive(Debug, Clone, Default)]
248pub struct ArrowReaderOptions {
249    /// Should the reader strip any user defined metadata from the Arrow schema
250    skip_arrow_metadata: bool,
251    /// If provided used as the schema for the file, otherwise the schema is read from the file
252    supplied_schema: Option<SchemaRef>,
253    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
254    pub(crate) page_index: bool,
255}
256
257impl ArrowReaderOptions {
258    /// Create a new [`ArrowReaderOptions`] with the default settings
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Skip decoding the embedded arrow metadata (defaults to `false`)
264    ///
265    /// Parquet files generated by some writers may contain embedded arrow
266    /// schema and metadata.
267    /// This may not be correct or compatible with your system,
268    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
269    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
270        Self {
271            skip_arrow_metadata,
272            ..self
273        }
274    }
275
276    /// Provide a schema to use when reading the parquet file. If provided it
277    /// takes precedence over the schema inferred from the file or the schema defined
278    /// in the file's metadata. If the schema is not compatible with the file's
279    /// schema an error will be returned when constructing the builder.
280    ///
281    /// This option is only required if you want to cast columns to a different type.
282    /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp
283    /// in the Arrow schema.
284    ///
285    /// The supplied schema must have the same number of columns as the parquet schema and
286    /// the column names need to be the same.
287    ///
288    /// # Example
289    /// ```
290    /// use std::io::Bytes;
291    /// use std::sync::Arc;
292    /// use tempfile::tempfile;
293    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
294    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
295    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
296    /// use parquet::arrow::ArrowWriter;
297    ///
298    /// // Write data - schema is inferred from the data to be Int32
299    /// let file = tempfile().unwrap();
300    /// let batch = RecordBatch::try_from_iter(vec![
301    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
302    /// ]).unwrap();
303    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
304    /// writer.write(&batch).unwrap();
305    /// writer.close().unwrap();
306    ///
307    /// // Read the file back.
308    /// // Supply a schema that interprets the Int32 column as a Timestamp.
309    /// let supplied_schema = Arc::new(Schema::new(vec![
310    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
311    /// ]));
312    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
313    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
314    ///     file.try_clone().unwrap(),
315    ///     options
316    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
317    ///
318    /// // Create the reader and read the data using the supplied schema.
319    /// let mut reader = builder.build().unwrap();
320    /// let _batch = reader.next().unwrap().unwrap();   
321    /// ```
322    pub fn with_schema(self, schema: SchemaRef) -> Self {
323        Self {
324            supplied_schema: Some(schema),
325            skip_arrow_metadata: true,
326            ..self
327        }
328    }
329
330    /// Enable reading [`PageIndex`], if present (defaults to `false`)
331    ///
332    /// The `PageIndex` can be used to push down predicates to the parquet scan,
333    /// potentially eliminating unnecessary IO, by some query engines.
334    ///
335    /// If this is enabled, [`ParquetMetaData::column_index`] and
336    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
337    /// information is present in the file.
338    ///
339    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
340    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
341    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
342    pub fn with_page_index(self, page_index: bool) -> Self {
343        Self { page_index, ..self }
344    }
345}
346
347/// The metadata necessary to construct a [`ArrowReaderBuilder`]
348///
349/// Note this structure is cheaply clone-able as it consists of several arcs.
350///
351/// This structure allows
352///
353/// 1. Loading metadata for a file once and then using that same metadata to
354///    construct multiple separate readers, for example, to distribute readers
355///    across multiple threads
356///
357/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
358///    from the file each time a reader is constructed.
359///
360/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
361#[derive(Debug, Clone)]
362pub struct ArrowReaderMetadata {
363    /// The Parquet Metadata, if known aprior
364    pub(crate) metadata: Arc<ParquetMetaData>,
365    /// The Arrow Schema
366    pub(crate) schema: SchemaRef,
367
368    pub(crate) fields: Option<Arc<ParquetField>>,
369}
370
371impl ArrowReaderMetadata {
372    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
373    ///
374    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
375    /// example of how this can be used
376    ///
377    /// # Notes
378    ///
379    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
380    /// `Self::metadata` is missing the page index, this function will attempt
381    /// to load the page index by making an object store request.
382    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
383        let metadata = ParquetMetaDataReader::new()
384            .with_page_indexes(options.page_index)
385            .parse_and_finish(reader)?;
386        Self::try_new(Arc::new(metadata), options)
387    }
388
389    /// Create a new [`ArrowReaderMetadata`]
390    ///
391    /// # Notes
392    ///
393    /// This function does not attempt to load the PageIndex if not present in the metadata.
394    /// See [`Self::load`] for more details.
395    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
396        match options.supplied_schema {
397            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
398            None => {
399                let kv_metadata = match options.skip_arrow_metadata {
400                    true => None,
401                    false => metadata.file_metadata().key_value_metadata(),
402                };
403
404                let (schema, fields) = parquet_to_arrow_schema_and_fields(
405                    metadata.file_metadata().schema_descr(),
406                    ProjectionMask::all(),
407                    kv_metadata,
408                )?;
409
410                Ok(Self {
411                    metadata,
412                    schema: Arc::new(schema),
413                    fields: fields.map(Arc::new),
414                })
415            }
416        }
417    }
418
419    fn with_supplied_schema(
420        metadata: Arc<ParquetMetaData>,
421        supplied_schema: SchemaRef,
422    ) -> Result<Self> {
423        let parquet_schema = metadata.file_metadata().schema_descr();
424        let field_levels = parquet_to_arrow_field_levels(
425            parquet_schema,
426            ProjectionMask::all(),
427            Some(supplied_schema.fields()),
428        )?;
429        let fields = field_levels.fields;
430        let inferred_len = fields.len();
431        let supplied_len = supplied_schema.fields().len();
432        // Ensure the supplied schema has the same number of columns as the parquet schema.
433        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
434        // different lengths, but we check here to be safe.
435        if inferred_len != supplied_len {
436            Err(arrow_err!(format!(
437                "incompatible arrow schema, expected {} columns received {}",
438                inferred_len, supplied_len
439            )))
440        } else {
441            let diff_fields: Vec<_> = supplied_schema
442                .fields()
443                .iter()
444                .zip(fields.iter())
445                .filter_map(|(field1, field2)| {
446                    if field1 != field2 {
447                        Some(field1.name().clone())
448                    } else {
449                        None
450                    }
451                })
452                .collect();
453
454            if !diff_fields.is_empty() {
455                Err(ParquetError::ArrowError(format!(
456                    "incompatible arrow schema, the following fields could not be cast: [{}]",
457                    diff_fields.join(", ")
458                )))
459            } else {
460                Ok(Self {
461                    metadata,
462                    schema: supplied_schema,
463                    fields: field_levels.levels.map(Arc::new),
464                })
465            }
466        }
467    }
468
469    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
470    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
471        &self.metadata
472    }
473
474    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
475    pub fn parquet_schema(&self) -> &SchemaDescriptor {
476        self.metadata.file_metadata().schema_descr()
477    }
478
479    /// Returns the arrow [`SchemaRef`] for this parquet file
480    pub fn schema(&self) -> &SchemaRef {
481        &self.schema
482    }
483}
484
485#[doc(hidden)]
486/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
487pub struct SyncReader<T: ChunkReader>(T);
488
489/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
490///
491/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
492///
493/// See [`ArrowReaderBuilder`] for additional member functions
494pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
495
496impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
497    /// Create a new [`ParquetRecordBatchReaderBuilder`]
498    ///
499    /// ```
500    /// # use std::sync::Arc;
501    /// # use bytes::Bytes;
502    /// # use arrow_array::{Int32Array, RecordBatch};
503    /// # use arrow_schema::{DataType, Field, Schema};
504    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
505    /// # use parquet::arrow::ArrowWriter;
506    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
507    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
508    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
509    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
510    /// # writer.write(&batch).unwrap();
511    /// # writer.close().unwrap();
512    /// # let file = Bytes::from(file);
513    /// #
514    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
515    ///
516    /// // Inspect metadata
517    /// assert_eq!(builder.metadata().num_row_groups(), 1);
518    ///
519    /// // Construct reader
520    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
521    ///
522    /// // Read data
523    /// let _batch = reader.next().unwrap().unwrap();
524    /// ```
525    pub fn try_new(reader: T) -> Result<Self> {
526        Self::try_new_with_options(reader, Default::default())
527    }
528
529    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
530    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
531        let metadata = ArrowReaderMetadata::load(&reader, options)?;
532        Ok(Self::new_with_metadata(reader, metadata))
533    }
534
535    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
536    ///
537    /// This interface allows:
538    ///
539    /// 1. Loading metadata once and using it to create multiple builders with
540    ///    potentially different settings or run on different threads
541    ///
542    /// 2. Using a cached copy of the metadata rather than re-reading it from the
543    ///    file each time a reader is constructed.
544    ///
545    /// See the docs on [`ArrowReaderMetadata`] for more details
546    ///
547    /// # Example
548    /// ```
549    /// # use std::fs::metadata;
550    /// # use std::sync::Arc;
551    /// # use bytes::Bytes;
552    /// # use arrow_array::{Int32Array, RecordBatch};
553    /// # use arrow_schema::{DataType, Field, Schema};
554    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
555    /// # use parquet::arrow::ArrowWriter;
556    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
557    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
558    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
559    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
560    /// # writer.write(&batch).unwrap();
561    /// # writer.close().unwrap();
562    /// # let file = Bytes::from(file);
563    /// #
564    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
565    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
566    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
567    ///
568    /// // Should be able to read from both in parallel
569    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
570    /// ```
571    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
572        Self::new_builder(SyncReader(input), metadata)
573    }
574
575    /// Build a [`ParquetRecordBatchReader`]
576    ///
577    /// Note: this will eagerly evaluate any `RowFilter` before returning
578    pub fn build(self) -> Result<ParquetRecordBatchReader> {
579        // Try to avoid allocate large buffer
580        let batch_size = self
581            .batch_size
582            .min(self.metadata.file_metadata().num_rows() as usize);
583
584        let row_groups = self
585            .row_groups
586            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
587
588        let reader = ReaderRowGroups {
589            reader: Arc::new(self.input.0),
590            metadata: self.metadata,
591            row_groups,
592        };
593
594        let mut filter = self.filter;
595        let mut selection = self.selection;
596
597        if let Some(filter) = filter.as_mut() {
598            for predicate in filter.predicates.iter_mut() {
599                if !selects_any(selection.as_ref()) {
600                    break;
601                }
602
603                let array_reader =
604                    build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
605
606                selection = Some(evaluate_predicate(
607                    batch_size,
608                    array_reader,
609                    selection,
610                    predicate.as_mut(),
611                )?);
612            }
613        }
614
615        let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
616
617        // If selection is empty, truncate
618        if !selects_any(selection.as_ref()) {
619            selection = Some(RowSelection::from(vec![]));
620        }
621
622        Ok(ParquetRecordBatchReader::new(
623            batch_size,
624            array_reader,
625            apply_range(selection, reader.num_rows(), self.offset, self.limit),
626        ))
627    }
628}
629
630struct ReaderRowGroups<T: ChunkReader> {
631    reader: Arc<T>,
632
633    metadata: Arc<ParquetMetaData>,
634    /// Optional list of row group indices to scan
635    row_groups: Vec<usize>,
636}
637
638impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
639    fn num_rows(&self) -> usize {
640        let meta = self.metadata.row_groups();
641        self.row_groups
642            .iter()
643            .map(|x| meta[*x].num_rows() as usize)
644            .sum()
645    }
646
647    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
648        Ok(Box::new(ReaderPageIterator {
649            column_idx: i,
650            reader: self.reader.clone(),
651            metadata: self.metadata.clone(),
652            row_groups: self.row_groups.clone().into_iter(),
653        }))
654    }
655}
656
657struct ReaderPageIterator<T: ChunkReader> {
658    reader: Arc<T>,
659    column_idx: usize,
660    row_groups: std::vec::IntoIter<usize>,
661    metadata: Arc<ParquetMetaData>,
662}
663
664impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
665    type Item = Result<Box<dyn PageReader>>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        let rg_idx = self.row_groups.next()?;
669        let rg = self.metadata.row_group(rg_idx);
670        let meta = rg.column(self.column_idx);
671        let offset_index = self.metadata.offset_index();
672        // `offset_index` may not exist and `i[rg_idx]` will be empty.
673        // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
674        let page_locations = offset_index
675            .filter(|i| !i[rg_idx].is_empty())
676            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
677        let total_rows = rg.num_rows() as usize;
678        let reader = self.reader.clone();
679
680        let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
681        Some(ret.map(|x| Box::new(x) as _))
682    }
683}
684
685impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
686
687/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
688/// read from a parquet data source
689pub struct ParquetRecordBatchReader {
690    batch_size: usize,
691    array_reader: Box<dyn ArrayReader>,
692    schema: SchemaRef,
693    selection: Option<VecDeque<RowSelector>>,
694}
695
696impl Iterator for ParquetRecordBatchReader {
697    type Item = Result<RecordBatch, ArrowError>;
698
699    fn next(&mut self) -> Option<Self::Item> {
700        let mut read_records = 0;
701        match self.selection.as_mut() {
702            Some(selection) => {
703                while read_records < self.batch_size && !selection.is_empty() {
704                    let front = selection.pop_front().unwrap();
705                    if front.skip {
706                        let skipped = match self.array_reader.skip_records(front.row_count) {
707                            Ok(skipped) => skipped,
708                            Err(e) => return Some(Err(e.into())),
709                        };
710
711                        if skipped != front.row_count {
712                            return Some(Err(general_err!(
713                                "failed to skip rows, expected {}, got {}",
714                                front.row_count,
715                                skipped
716                            )
717                            .into()));
718                        }
719                        continue;
720                    }
721
722                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
723                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
724                    if front.row_count == 0 {
725                        continue;
726                    }
727
728                    // try to read record
729                    let need_read = self.batch_size - read_records;
730                    let to_read = match front.row_count.checked_sub(need_read) {
731                        Some(remaining) if remaining != 0 => {
732                            // if page row count less than batch_size we must set batch size to page row count.
733                            // add check avoid dead loop
734                            selection.push_front(RowSelector::select(remaining));
735                            need_read
736                        }
737                        _ => front.row_count,
738                    };
739                    match self.array_reader.read_records(to_read) {
740                        Ok(0) => break,
741                        Ok(rec) => read_records += rec,
742                        Err(error) => return Some(Err(error.into())),
743                    }
744                }
745            }
746            None => {
747                if let Err(error) = self.array_reader.read_records(self.batch_size) {
748                    return Some(Err(error.into()));
749                }
750            }
751        };
752
753        match self.array_reader.consume_batch() {
754            Err(error) => Some(Err(error.into())),
755            Ok(array) => {
756                let struct_array = array.as_struct_opt().ok_or_else(|| {
757                    ArrowError::ParquetError(
758                        "Struct array reader should return struct array".to_string(),
759                    )
760                });
761
762                match struct_array {
763                    Err(err) => Some(Err(err)),
764                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
765                }
766            }
767        }
768    }
769}
770
771impl RecordBatchReader for ParquetRecordBatchReader {
772    /// Returns the projected [`SchemaRef`] for reading the parquet file.
773    ///
774    /// Note that the schema metadata will be stripped here. See
775    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
776    fn schema(&self) -> SchemaRef {
777        self.schema.clone()
778    }
779}
780
781impl ParquetRecordBatchReader {
782    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
783    ///
784    /// See [`ParquetRecordBatchReaderBuilder`] for more options
785    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
786        ParquetRecordBatchReaderBuilder::try_new(reader)?
787            .with_batch_size(batch_size)
788            .build()
789    }
790
791    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
792    ///
793    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
794    /// higher-level interface for reading parquet data from a file
795    pub fn try_new_with_row_groups(
796        levels: &FieldLevels,
797        row_groups: &dyn RowGroups,
798        batch_size: usize,
799        selection: Option<RowSelection>,
800    ) -> Result<Self> {
801        let array_reader =
802            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
803
804        Ok(Self {
805            batch_size,
806            array_reader,
807            schema: Arc::new(Schema::new(levels.fields.clone())),
808            selection: selection.map(|s| s.trim().into()),
809        })
810    }
811
812    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
813    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
814    /// all rows will be returned
815    pub(crate) fn new(
816        batch_size: usize,
817        array_reader: Box<dyn ArrayReader>,
818        selection: Option<RowSelection>,
819    ) -> Self {
820        let schema = match array_reader.get_data_type() {
821            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
822            _ => unreachable!("Struct array reader's data type is not struct!"),
823        };
824
825        Self {
826            batch_size,
827            array_reader,
828            schema: Arc::new(schema),
829            selection: selection.map(|s| s.trim().into()),
830        }
831    }
832}
833
834/// Returns `true` if `selection` is `None` or selects some rows
835pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
836    selection.map(|x| x.selects_any()).unwrap_or(true)
837}
838
839/// Applies an optional offset and limit to an optional [`RowSelection`]
840pub(crate) fn apply_range(
841    mut selection: Option<RowSelection>,
842    row_count: usize,
843    offset: Option<usize>,
844    limit: Option<usize>,
845) -> Option<RowSelection> {
846    // If an offset is defined, apply it to the `selection`
847    if let Some(offset) = offset {
848        selection = Some(match row_count.checked_sub(offset) {
849            None => RowSelection::from(vec![]),
850            Some(remaining) => selection
851                .map(|selection| selection.offset(offset))
852                .unwrap_or_else(|| {
853                    RowSelection::from(vec![
854                        RowSelector::skip(offset),
855                        RowSelector::select(remaining),
856                    ])
857                }),
858        });
859    }
860
861    // If a limit is defined, apply it to the final `selection`
862    if let Some(limit) = limit {
863        selection = Some(
864            selection
865                .map(|selection| selection.limit(limit))
866                .unwrap_or_else(|| {
867                    RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
868                }),
869        );
870    }
871    selection
872}
873
874/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
875/// which rows to return.
876///
877/// `input_selection`: Optional pre-existing selection. If `Some`, then the
878/// final [`RowSelection`] will be the conjunction of it and the rows selected
879/// by `predicate`.
880///
881/// Note: A pre-existing selection may come from evaluating a previous predicate
882/// or if the [`ParquetRecordBatchReader`] specified an explicit
883/// [`RowSelection`] in addition to one or more predicates.
884pub(crate) fn evaluate_predicate(
885    batch_size: usize,
886    array_reader: Box<dyn ArrayReader>,
887    input_selection: Option<RowSelection>,
888    predicate: &mut dyn ArrowPredicate,
889) -> Result<RowSelection> {
890    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
891    let mut filters = vec![];
892    for maybe_batch in reader {
893        let maybe_batch = maybe_batch?;
894        let input_rows = maybe_batch.num_rows();
895        let filter = predicate.evaluate(maybe_batch)?;
896        // Since user supplied predicate, check error here to catch bugs quickly
897        if filter.len() != input_rows {
898            return Err(arrow_err!(
899                "ArrowPredicate predicate returned {} rows, expected {input_rows}",
900                filter.len()
901            ));
902        }
903        match filter.null_count() {
904            0 => filters.push(filter),
905            _ => filters.push(prep_null_mask_filter(&filter)),
906        };
907    }
908
909    let raw = RowSelection::from_filters(&filters);
910    Ok(match input_selection {
911        Some(selection) => selection.and_then(&raw),
912        None => raw,
913    })
914}
915
916#[cfg(test)]
917mod tests {
918    use std::cmp::min;
919    use std::collections::{HashMap, VecDeque};
920    use std::fmt::Formatter;
921    use std::fs::File;
922    use std::io::Seek;
923    use std::path::PathBuf;
924    use std::sync::Arc;
925
926    use bytes::Bytes;
927    use half::f16;
928    use num::PrimInt;
929    use rand::{thread_rng, Rng, RngCore};
930    use tempfile::tempfile;
931
932    use arrow_array::builder::*;
933    use arrow_array::cast::AsArray;
934    use arrow_array::types::{
935        Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
936        Time32MillisecondType, Time64MicrosecondType,
937    };
938    use arrow_array::*;
939    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
940    use arrow_data::ArrayDataBuilder;
941    use arrow_schema::{
942        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
943    };
944    use arrow_select::concat::concat_batches;
945
946    use crate::arrow::arrow_reader::{
947        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
948        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
949    };
950    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
951    use crate::arrow::{ArrowWriter, ProjectionMask};
952    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
953    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
954    use crate::data_type::{
955        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
956        FloatType, Int32Type, Int64Type, Int96Type,
957    };
958    use crate::errors::Result;
959    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
960    use crate::file::writer::SerializedFileWriter;
961    use crate::schema::parser::parse_message_type;
962    use crate::schema::types::{Type, TypePtr};
963    use crate::util::test_common::rand_gen::RandGen;
964
965    #[test]
966    fn test_arrow_reader_all_columns() {
967        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
968
969        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
970        let original_schema = Arc::clone(builder.schema());
971        let reader = builder.build().unwrap();
972
973        // Verify that the schema was correctly parsed
974        assert_eq!(original_schema.fields(), reader.schema().fields());
975    }
976
977    #[test]
978    fn test_arrow_reader_single_column() {
979        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
980
981        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
982        let original_schema = Arc::clone(builder.schema());
983
984        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
985        let reader = builder.with_projection(mask).build().unwrap();
986
987        // Verify that the schema was correctly parsed
988        assert_eq!(1, reader.schema().fields().len());
989        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
990    }
991
992    #[test]
993    fn test_null_column_reader_test() {
994        let mut file = tempfile::tempfile().unwrap();
995
996        let schema = "
997            message message {
998                OPTIONAL INT32 int32;
999            }
1000        ";
1001        let schema = Arc::new(parse_message_type(schema).unwrap());
1002
1003        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1004        generate_single_column_file_with_data::<Int32Type>(
1005            &[vec![], vec![]],
1006            Some(&def_levels),
1007            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1008            schema,
1009            Some(Field::new("int32", ArrowDataType::Null, true)),
1010            &Default::default(),
1011        )
1012        .unwrap();
1013
1014        file.rewind().unwrap();
1015
1016        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1017        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1018
1019        assert_eq!(batches.len(), 4);
1020        for batch in &batches[0..3] {
1021            assert_eq!(batch.num_rows(), 2);
1022            assert_eq!(batch.num_columns(), 1);
1023            assert_eq!(batch.column(0).null_count(), 2);
1024        }
1025
1026        assert_eq!(batches[3].num_rows(), 1);
1027        assert_eq!(batches[3].num_columns(), 1);
1028        assert_eq!(batches[3].column(0).null_count(), 1);
1029    }
1030
1031    #[test]
1032    fn test_primitive_single_column_reader_test() {
1033        run_single_column_reader_tests::<BoolType, _, BoolType>(
1034            2,
1035            ConvertedType::NONE,
1036            None,
1037            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1038            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1039        );
1040        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1041            2,
1042            ConvertedType::NONE,
1043            None,
1044            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1045            &[
1046                Encoding::PLAIN,
1047                Encoding::RLE_DICTIONARY,
1048                Encoding::DELTA_BINARY_PACKED,
1049                Encoding::BYTE_STREAM_SPLIT,
1050            ],
1051        );
1052        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1053            2,
1054            ConvertedType::NONE,
1055            None,
1056            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1057            &[
1058                Encoding::PLAIN,
1059                Encoding::RLE_DICTIONARY,
1060                Encoding::DELTA_BINARY_PACKED,
1061                Encoding::BYTE_STREAM_SPLIT,
1062            ],
1063        );
1064        run_single_column_reader_tests::<FloatType, _, FloatType>(
1065            2,
1066            ConvertedType::NONE,
1067            None,
1068            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1069            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1070        );
1071    }
1072
1073    #[test]
1074    fn test_unsigned_primitive_single_column_reader_test() {
1075        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1076            2,
1077            ConvertedType::UINT_32,
1078            Some(ArrowDataType::UInt32),
1079            |vals| {
1080                Arc::new(UInt32Array::from_iter(
1081                    vals.iter().map(|x| x.map(|x| x as u32)),
1082                ))
1083            },
1084            &[
1085                Encoding::PLAIN,
1086                Encoding::RLE_DICTIONARY,
1087                Encoding::DELTA_BINARY_PACKED,
1088            ],
1089        );
1090        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1091            2,
1092            ConvertedType::UINT_64,
1093            Some(ArrowDataType::UInt64),
1094            |vals| {
1095                Arc::new(UInt64Array::from_iter(
1096                    vals.iter().map(|x| x.map(|x| x as u64)),
1097                ))
1098            },
1099            &[
1100                Encoding::PLAIN,
1101                Encoding::RLE_DICTIONARY,
1102                Encoding::DELTA_BINARY_PACKED,
1103            ],
1104        );
1105    }
1106
1107    #[test]
1108    fn test_unsigned_roundtrip() {
1109        let schema = Arc::new(Schema::new(vec![
1110            Field::new("uint32", ArrowDataType::UInt32, true),
1111            Field::new("uint64", ArrowDataType::UInt64, true),
1112        ]));
1113
1114        let mut buf = Vec::with_capacity(1024);
1115        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1116
1117        let original = RecordBatch::try_new(
1118            schema,
1119            vec![
1120                Arc::new(UInt32Array::from_iter_values([
1121                    0,
1122                    i32::MAX as u32,
1123                    u32::MAX,
1124                ])),
1125                Arc::new(UInt64Array::from_iter_values([
1126                    0,
1127                    i64::MAX as u64,
1128                    u64::MAX,
1129                ])),
1130            ],
1131        )
1132        .unwrap();
1133
1134        writer.write(&original).unwrap();
1135        writer.close().unwrap();
1136
1137        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1138        let ret = reader.next().unwrap().unwrap();
1139        assert_eq!(ret, original);
1140
1141        // Check they can be downcast to the correct type
1142        ret.column(0)
1143            .as_any()
1144            .downcast_ref::<UInt32Array>()
1145            .unwrap();
1146
1147        ret.column(1)
1148            .as_any()
1149            .downcast_ref::<UInt64Array>()
1150            .unwrap();
1151    }
1152
1153    #[test]
1154    fn test_float16_roundtrip() -> Result<()> {
1155        let schema = Arc::new(Schema::new(vec![
1156            Field::new("float16", ArrowDataType::Float16, false),
1157            Field::new("float16-nullable", ArrowDataType::Float16, true),
1158        ]));
1159
1160        let mut buf = Vec::with_capacity(1024);
1161        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1162
1163        let original = RecordBatch::try_new(
1164            schema,
1165            vec![
1166                Arc::new(Float16Array::from_iter_values([
1167                    f16::EPSILON,
1168                    f16::MIN,
1169                    f16::MAX,
1170                    f16::NAN,
1171                    f16::INFINITY,
1172                    f16::NEG_INFINITY,
1173                    f16::ONE,
1174                    f16::NEG_ONE,
1175                    f16::ZERO,
1176                    f16::NEG_ZERO,
1177                    f16::E,
1178                    f16::PI,
1179                    f16::FRAC_1_PI,
1180                ])),
1181                Arc::new(Float16Array::from(vec![
1182                    None,
1183                    None,
1184                    None,
1185                    Some(f16::NAN),
1186                    Some(f16::INFINITY),
1187                    Some(f16::NEG_INFINITY),
1188                    None,
1189                    None,
1190                    None,
1191                    None,
1192                    None,
1193                    None,
1194                    Some(f16::FRAC_1_PI),
1195                ])),
1196            ],
1197        )?;
1198
1199        writer.write(&original)?;
1200        writer.close()?;
1201
1202        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1203        let ret = reader.next().unwrap()?;
1204        assert_eq!(ret, original);
1205
1206        // Ensure can be downcast to the correct type
1207        ret.column(0).as_primitive::<Float16Type>();
1208        ret.column(1).as_primitive::<Float16Type>();
1209
1210        Ok(())
1211    }
1212
1213    #[test]
1214    fn test_time_utc_roundtrip() -> Result<()> {
1215        let schema = Arc::new(Schema::new(vec![
1216            Field::new(
1217                "time_millis",
1218                ArrowDataType::Time32(TimeUnit::Millisecond),
1219                true,
1220            )
1221            .with_metadata(HashMap::from_iter(vec![(
1222                "adjusted_to_utc".to_string(),
1223                "".to_string(),
1224            )])),
1225            Field::new(
1226                "time_micros",
1227                ArrowDataType::Time64(TimeUnit::Microsecond),
1228                true,
1229            )
1230            .with_metadata(HashMap::from_iter(vec![(
1231                "adjusted_to_utc".to_string(),
1232                "".to_string(),
1233            )])),
1234        ]));
1235
1236        let mut buf = Vec::with_capacity(1024);
1237        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1238
1239        let original = RecordBatch::try_new(
1240            schema,
1241            vec![
1242                Arc::new(Time32MillisecondArray::from(vec![
1243                    Some(-1),
1244                    Some(0),
1245                    Some(86_399_000),
1246                    Some(86_400_000),
1247                    Some(86_401_000),
1248                    None,
1249                ])),
1250                Arc::new(Time64MicrosecondArray::from(vec![
1251                    Some(-1),
1252                    Some(0),
1253                    Some(86_399 * 1_000_000),
1254                    Some(86_400 * 1_000_000),
1255                    Some(86_401 * 1_000_000),
1256                    None,
1257                ])),
1258            ],
1259        )?;
1260
1261        writer.write(&original)?;
1262        writer.close()?;
1263
1264        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1265        let ret = reader.next().unwrap()?;
1266        assert_eq!(ret, original);
1267
1268        // Ensure can be downcast to the correct type
1269        ret.column(0).as_primitive::<Time32MillisecondType>();
1270        ret.column(1).as_primitive::<Time64MicrosecondType>();
1271
1272        Ok(())
1273    }
1274
1275    struct RandFixedLenGen {}
1276
1277    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1278        fn gen(len: i32) -> FixedLenByteArray {
1279            let mut v = vec![0u8; len as usize];
1280            thread_rng().fill_bytes(&mut v);
1281            ByteArray::from(v).into()
1282        }
1283    }
1284
1285    #[test]
1286    fn test_fixed_length_binary_column_reader() {
1287        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1288            20,
1289            ConvertedType::NONE,
1290            None,
1291            |vals| {
1292                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1293                for val in vals {
1294                    match val {
1295                        Some(b) => builder.append_value(b).unwrap(),
1296                        None => builder.append_null(),
1297                    }
1298                }
1299                Arc::new(builder.finish())
1300            },
1301            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1302        );
1303    }
1304
1305    #[test]
1306    fn test_interval_day_time_column_reader() {
1307        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1308            12,
1309            ConvertedType::INTERVAL,
1310            None,
1311            |vals| {
1312                Arc::new(
1313                    vals.iter()
1314                        .map(|x| {
1315                            x.as_ref().map(|b| IntervalDayTime {
1316                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1317                                milliseconds: i32::from_le_bytes(
1318                                    b.as_ref()[8..12].try_into().unwrap(),
1319                                ),
1320                            })
1321                        })
1322                        .collect::<IntervalDayTimeArray>(),
1323                )
1324            },
1325            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1326        );
1327    }
1328
1329    #[test]
1330    fn test_int96_single_column_reader_test() {
1331        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1332        run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1333            2,
1334            ConvertedType::NONE,
1335            None,
1336            |vals| {
1337                Arc::new(TimestampNanosecondArray::from_iter(
1338                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1339                )) as _
1340            },
1341            encodings,
1342        );
1343    }
1344
1345    struct RandUtf8Gen {}
1346
1347    impl RandGen<ByteArrayType> for RandUtf8Gen {
1348        fn gen(len: i32) -> ByteArray {
1349            Int32Type::gen(len).to_string().as_str().into()
1350        }
1351    }
1352
1353    #[test]
1354    fn test_utf8_single_column_reader_test() {
1355        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1356            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1357                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1358            })))
1359        }
1360
1361        let encodings = &[
1362            Encoding::PLAIN,
1363            Encoding::RLE_DICTIONARY,
1364            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1365            Encoding::DELTA_BYTE_ARRAY,
1366        ];
1367
1368        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1369            2,
1370            ConvertedType::NONE,
1371            None,
1372            |vals| {
1373                Arc::new(BinaryArray::from_iter(
1374                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1375                ))
1376            },
1377            encodings,
1378        );
1379
1380        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1381            2,
1382            ConvertedType::UTF8,
1383            None,
1384            string_converter::<i32>,
1385            encodings,
1386        );
1387
1388        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1389            2,
1390            ConvertedType::UTF8,
1391            Some(ArrowDataType::Utf8),
1392            string_converter::<i32>,
1393            encodings,
1394        );
1395
1396        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1397            2,
1398            ConvertedType::UTF8,
1399            Some(ArrowDataType::LargeUtf8),
1400            string_converter::<i64>,
1401            encodings,
1402        );
1403
1404        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1405        for key in &small_key_types {
1406            for encoding in encodings {
1407                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1408                opts.encoding = *encoding;
1409
1410                let data_type =
1411                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1412
1413                // Cannot run full test suite as keys overflow, run small test instead
1414                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1415                    opts,
1416                    2,
1417                    ConvertedType::UTF8,
1418                    Some(data_type.clone()),
1419                    move |vals| {
1420                        let vals = string_converter::<i32>(vals);
1421                        arrow::compute::cast(&vals, &data_type).unwrap()
1422                    },
1423                );
1424            }
1425        }
1426
1427        let key_types = [
1428            ArrowDataType::Int16,
1429            ArrowDataType::UInt16,
1430            ArrowDataType::Int32,
1431            ArrowDataType::UInt32,
1432            ArrowDataType::Int64,
1433            ArrowDataType::UInt64,
1434        ];
1435
1436        for key in &key_types {
1437            let data_type =
1438                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1439
1440            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1441                2,
1442                ConvertedType::UTF8,
1443                Some(data_type.clone()),
1444                move |vals| {
1445                    let vals = string_converter::<i32>(vals);
1446                    arrow::compute::cast(&vals, &data_type).unwrap()
1447                },
1448                encodings,
1449            );
1450
1451            // https://github.com/apache/arrow-rs/issues/1179
1452            // let data_type = ArrowDataType::Dictionary(
1453            //     Box::new(key.clone()),
1454            //     Box::new(ArrowDataType::LargeUtf8),
1455            // );
1456            //
1457            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1458            //     2,
1459            //     ConvertedType::UTF8,
1460            //     Some(data_type.clone()),
1461            //     move |vals| {
1462            //         let vals = string_converter::<i64>(vals);
1463            //         arrow::compute::cast(&vals, &data_type).unwrap()
1464            //     },
1465            //     encodings,
1466            // );
1467        }
1468    }
1469
1470    #[test]
1471    fn test_decimal_nullable_struct() {
1472        let decimals = Decimal256Array::from_iter_values(
1473            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1474        );
1475
1476        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1477            "decimals",
1478            decimals.data_type().clone(),
1479            false,
1480        )])))
1481        .len(8)
1482        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1483        .child_data(vec![decimals.into_data()])
1484        .build()
1485        .unwrap();
1486
1487        let written =
1488            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1489                .unwrap();
1490
1491        let mut buffer = Vec::with_capacity(1024);
1492        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1493        writer.write(&written).unwrap();
1494        writer.close().unwrap();
1495
1496        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1497            .unwrap()
1498            .collect::<Result<Vec<_>, _>>()
1499            .unwrap();
1500
1501        assert_eq!(&written.slice(0, 3), &read[0]);
1502        assert_eq!(&written.slice(3, 3), &read[1]);
1503        assert_eq!(&written.slice(6, 2), &read[2]);
1504    }
1505
1506    #[test]
1507    fn test_int32_nullable_struct() {
1508        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1509        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1510            "int32",
1511            int32.data_type().clone(),
1512            false,
1513        )])))
1514        .len(8)
1515        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1516        .child_data(vec![int32.into_data()])
1517        .build()
1518        .unwrap();
1519
1520        let written =
1521            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1522                .unwrap();
1523
1524        let mut buffer = Vec::with_capacity(1024);
1525        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1526        writer.write(&written).unwrap();
1527        writer.close().unwrap();
1528
1529        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1530            .unwrap()
1531            .collect::<Result<Vec<_>, _>>()
1532            .unwrap();
1533
1534        assert_eq!(&written.slice(0, 3), &read[0]);
1535        assert_eq!(&written.slice(3, 3), &read[1]);
1536        assert_eq!(&written.slice(6, 2), &read[2]);
1537    }
1538
1539    #[test]
1540    #[ignore] // https://github.com/apache/arrow-rs/issues/2253
1541    fn test_decimal_list() {
1542        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1543
1544        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1545        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new(
1546            "item",
1547            decimals.data_type().clone(),
1548            false,
1549        ))))
1550        .len(7)
1551        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1552        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1553        .child_data(vec![decimals.into_data()])
1554        .build()
1555        .unwrap();
1556
1557        let written =
1558            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1559                .unwrap();
1560
1561        let mut buffer = Vec::with_capacity(1024);
1562        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1563        writer.write(&written).unwrap();
1564        writer.close().unwrap();
1565
1566        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1567            .unwrap()
1568            .collect::<Result<Vec<_>, _>>()
1569            .unwrap();
1570
1571        assert_eq!(&written.slice(0, 3), &read[0]);
1572        assert_eq!(&written.slice(3, 3), &read[1]);
1573        assert_eq!(&written.slice(6, 1), &read[2]);
1574    }
1575
1576    #[test]
1577    fn test_read_decimal_file() {
1578        use arrow_array::Decimal128Array;
1579        let testdata = arrow::util::test_util::parquet_test_data();
1580        let file_variants = vec![
1581            ("byte_array", 4),
1582            ("fixed_length", 25),
1583            ("int32", 4),
1584            ("int64", 10),
1585        ];
1586        for (prefix, target_precision) in file_variants {
1587            let path = format!("{testdata}/{prefix}_decimal.parquet");
1588            let file = File::open(path).unwrap();
1589            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1590
1591            let batch = record_reader.next().unwrap().unwrap();
1592            assert_eq!(batch.num_rows(), 24);
1593            let col = batch
1594                .column(0)
1595                .as_any()
1596                .downcast_ref::<Decimal128Array>()
1597                .unwrap();
1598
1599            let expected = 1..25;
1600
1601            assert_eq!(col.precision(), target_precision);
1602            assert_eq!(col.scale(), 2);
1603
1604            for (i, v) in expected.enumerate() {
1605                assert_eq!(col.value(i), v * 100_i128);
1606            }
1607        }
1608    }
1609
1610    #[test]
1611    fn test_read_float16_nonzeros_file() {
1612        use arrow_array::Float16Array;
1613        let testdata = arrow::util::test_util::parquet_test_data();
1614        // see https://github.com/apache/parquet-testing/pull/40
1615        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1616        let file = File::open(path).unwrap();
1617        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1618
1619        let batch = record_reader.next().unwrap().unwrap();
1620        assert_eq!(batch.num_rows(), 8);
1621        let col = batch
1622            .column(0)
1623            .as_any()
1624            .downcast_ref::<Float16Array>()
1625            .unwrap();
1626
1627        let f16_two = f16::ONE + f16::ONE;
1628
1629        assert_eq!(col.null_count(), 1);
1630        assert!(col.is_null(0));
1631        assert_eq!(col.value(1), f16::ONE);
1632        assert_eq!(col.value(2), -f16_two);
1633        assert!(col.value(3).is_nan());
1634        assert_eq!(col.value(4), f16::ZERO);
1635        assert!(col.value(4).is_sign_positive());
1636        assert_eq!(col.value(5), f16::NEG_ONE);
1637        assert_eq!(col.value(6), f16::NEG_ZERO);
1638        assert!(col.value(6).is_sign_negative());
1639        assert_eq!(col.value(7), f16_two);
1640    }
1641
1642    #[test]
1643    fn test_read_float16_zeros_file() {
1644        use arrow_array::Float16Array;
1645        let testdata = arrow::util::test_util::parquet_test_data();
1646        // see https://github.com/apache/parquet-testing/pull/40
1647        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1648        let file = File::open(path).unwrap();
1649        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1650
1651        let batch = record_reader.next().unwrap().unwrap();
1652        assert_eq!(batch.num_rows(), 3);
1653        let col = batch
1654            .column(0)
1655            .as_any()
1656            .downcast_ref::<Float16Array>()
1657            .unwrap();
1658
1659        assert_eq!(col.null_count(), 1);
1660        assert!(col.is_null(0));
1661        assert_eq!(col.value(1), f16::ZERO);
1662        assert!(col.value(1).is_sign_positive());
1663        assert!(col.value(2).is_nan());
1664    }
1665
1666    #[test]
1667    fn test_read_float32_float64_byte_stream_split() {
1668        let path = format!(
1669            "{}/byte_stream_split.zstd.parquet",
1670            arrow::util::test_util::parquet_test_data(),
1671        );
1672        let file = File::open(path).unwrap();
1673        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1674
1675        let mut row_count = 0;
1676        for batch in record_reader {
1677            let batch = batch.unwrap();
1678            row_count += batch.num_rows();
1679            let f32_col = batch.column(0).as_primitive::<Float32Type>();
1680            let f64_col = batch.column(1).as_primitive::<Float64Type>();
1681
1682            // This file contains floats from a standard normal distribution
1683            for &x in f32_col.values() {
1684                assert!(x > -10.0);
1685                assert!(x < 10.0);
1686            }
1687            for &x in f64_col.values() {
1688                assert!(x > -10.0);
1689                assert!(x < 10.0);
1690            }
1691        }
1692        assert_eq!(row_count, 300);
1693    }
1694
1695    #[test]
1696    fn test_read_extended_byte_stream_split() {
1697        let path = format!(
1698            "{}/byte_stream_split_extended.gzip.parquet",
1699            arrow::util::test_util::parquet_test_data(),
1700        );
1701        let file = File::open(path).unwrap();
1702        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1703
1704        let mut row_count = 0;
1705        for batch in record_reader {
1706            let batch = batch.unwrap();
1707            row_count += batch.num_rows();
1708
1709            // 0,1 are f16
1710            let f16_col = batch.column(0).as_primitive::<Float16Type>();
1711            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1712            assert_eq!(f16_col.len(), f16_bss.len());
1713            f16_col
1714                .iter()
1715                .zip(f16_bss.iter())
1716                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1717
1718            // 2,3 are f32
1719            let f32_col = batch.column(2).as_primitive::<Float32Type>();
1720            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1721            assert_eq!(f32_col.len(), f32_bss.len());
1722            f32_col
1723                .iter()
1724                .zip(f32_bss.iter())
1725                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1726
1727            // 4,5 are f64
1728            let f64_col = batch.column(4).as_primitive::<Float64Type>();
1729            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1730            assert_eq!(f64_col.len(), f64_bss.len());
1731            f64_col
1732                .iter()
1733                .zip(f64_bss.iter())
1734                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1735
1736            // 6,7 are i32
1737            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1738            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1739            assert_eq!(i32_col.len(), i32_bss.len());
1740            i32_col
1741                .iter()
1742                .zip(i32_bss.iter())
1743                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1744
1745            // 8,9 are i64
1746            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1747            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1748            assert_eq!(i64_col.len(), i64_bss.len());
1749            i64_col
1750                .iter()
1751                .zip(i64_bss.iter())
1752                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1753
1754            // 10,11 are FLBA(5)
1755            let flba_col = batch.column(10).as_fixed_size_binary();
1756            let flba_bss = batch.column(11).as_fixed_size_binary();
1757            assert_eq!(flba_col.len(), flba_bss.len());
1758            flba_col
1759                .iter()
1760                .zip(flba_bss.iter())
1761                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1762
1763            // 12,13 are FLBA(4) (decimal(7,3))
1764            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1765            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1766            assert_eq!(dec_col.len(), dec_bss.len());
1767            dec_col
1768                .iter()
1769                .zip(dec_bss.iter())
1770                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1771        }
1772        assert_eq!(row_count, 200);
1773    }
1774
1775    #[test]
1776    fn test_read_incorrect_map_schema_file() {
1777        let testdata = arrow::util::test_util::parquet_test_data();
1778        // see https://github.com/apache/parquet-testing/pull/47
1779        let path = format!("{testdata}/incorrect_map_schema.parquet");
1780        let file = File::open(path).unwrap();
1781        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1782
1783        let batch = record_reader.next().unwrap().unwrap();
1784        assert_eq!(batch.num_rows(), 1);
1785
1786        let expected_schema = Schema::new(Fields::from(vec![Field::new(
1787            "my_map",
1788            ArrowDataType::Map(
1789                Arc::new(Field::new(
1790                    "key_value",
1791                    ArrowDataType::Struct(Fields::from(vec![
1792                        Field::new("key", ArrowDataType::Utf8, false),
1793                        Field::new("value", ArrowDataType::Utf8, true),
1794                    ])),
1795                    false,
1796                )),
1797                false,
1798            ),
1799            true,
1800        )]));
1801        assert_eq!(batch.schema().as_ref(), &expected_schema);
1802
1803        assert_eq!(batch.num_rows(), 1);
1804        assert_eq!(batch.column(0).null_count(), 0);
1805        assert_eq!(
1806            batch.column(0).as_map().keys().as_ref(),
1807            &StringArray::from(vec!["parent", "name"])
1808        );
1809        assert_eq!(
1810            batch.column(0).as_map().values().as_ref(),
1811            &StringArray::from(vec!["another", "report"])
1812        );
1813    }
1814
1815    /// Parameters for single_column_reader_test
1816    #[derive(Clone)]
1817    struct TestOptions {
1818        /// Number of row group to write to parquet (row group size =
1819        /// num_row_groups / num_rows)
1820        num_row_groups: usize,
1821        /// Total number of rows per row group
1822        num_rows: usize,
1823        /// Size of batches to read back
1824        record_batch_size: usize,
1825        /// Percentage of nulls in column or None if required
1826        null_percent: Option<usize>,
1827        /// Set write batch size
1828        ///
1829        /// This is the number of rows that are written at once to a page and
1830        /// therefore acts as a bound on the page granularity of a row group
1831        write_batch_size: usize,
1832        /// Maximum size of page in bytes
1833        max_data_page_size: usize,
1834        /// Maximum size of dictionary page in bytes
1835        max_dict_page_size: usize,
1836        /// Writer version
1837        writer_version: WriterVersion,
1838        /// Enabled statistics
1839        enabled_statistics: EnabledStatistics,
1840        /// Encoding
1841        encoding: Encoding,
1842        /// row selections and total selected row count
1843        row_selections: Option<(RowSelection, usize)>,
1844        /// row filter
1845        row_filter: Option<Vec<bool>>,
1846        /// limit
1847        limit: Option<usize>,
1848        /// offset
1849        offset: Option<usize>,
1850    }
1851
1852    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
1853    impl std::fmt::Debug for TestOptions {
1854        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1855            f.debug_struct("TestOptions")
1856                .field("num_row_groups", &self.num_row_groups)
1857                .field("num_rows", &self.num_rows)
1858                .field("record_batch_size", &self.record_batch_size)
1859                .field("null_percent", &self.null_percent)
1860                .field("write_batch_size", &self.write_batch_size)
1861                .field("max_data_page_size", &self.max_data_page_size)
1862                .field("max_dict_page_size", &self.max_dict_page_size)
1863                .field("writer_version", &self.writer_version)
1864                .field("enabled_statistics", &self.enabled_statistics)
1865                .field("encoding", &self.encoding)
1866                .field("row_selections", &self.row_selections.is_some())
1867                .field("row_filter", &self.row_filter.is_some())
1868                .field("limit", &self.limit)
1869                .field("offset", &self.offset)
1870                .finish()
1871        }
1872    }
1873
1874    impl Default for TestOptions {
1875        fn default() -> Self {
1876            Self {
1877                num_row_groups: 2,
1878                num_rows: 100,
1879                record_batch_size: 15,
1880                null_percent: None,
1881                write_batch_size: 64,
1882                max_data_page_size: 1024 * 1024,
1883                max_dict_page_size: 1024 * 1024,
1884                writer_version: WriterVersion::PARQUET_1_0,
1885                enabled_statistics: EnabledStatistics::Page,
1886                encoding: Encoding::PLAIN,
1887                row_selections: None,
1888                row_filter: None,
1889                limit: None,
1890                offset: None,
1891            }
1892        }
1893    }
1894
1895    impl TestOptions {
1896        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
1897            Self {
1898                num_row_groups,
1899                num_rows,
1900                record_batch_size,
1901                ..Default::default()
1902            }
1903        }
1904
1905        fn with_null_percent(self, null_percent: usize) -> Self {
1906            Self {
1907                null_percent: Some(null_percent),
1908                ..self
1909            }
1910        }
1911
1912        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
1913            Self {
1914                max_data_page_size,
1915                ..self
1916            }
1917        }
1918
1919        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
1920            Self {
1921                max_dict_page_size,
1922                ..self
1923            }
1924        }
1925
1926        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
1927            Self {
1928                enabled_statistics,
1929                ..self
1930            }
1931        }
1932
1933        fn with_row_selections(self) -> Self {
1934            assert!(self.row_filter.is_none(), "Must set row selection first");
1935
1936            let mut rng = thread_rng();
1937            let step = rng.gen_range(self.record_batch_size..self.num_rows);
1938            let row_selections =
1939                create_test_selection(step, self.num_row_groups * self.num_rows, rng.gen::<bool>());
1940            Self {
1941                row_selections: Some(row_selections),
1942                ..self
1943            }
1944        }
1945
1946        fn with_row_filter(self) -> Self {
1947            let row_count = match &self.row_selections {
1948                Some((_, count)) => *count,
1949                None => self.num_row_groups * self.num_rows,
1950            };
1951
1952            let mut rng = thread_rng();
1953            Self {
1954                row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
1955                ..self
1956            }
1957        }
1958
1959        fn with_limit(self, limit: usize) -> Self {
1960            Self {
1961                limit: Some(limit),
1962                ..self
1963            }
1964        }
1965
1966        fn with_offset(self, offset: usize) -> Self {
1967            Self {
1968                offset: Some(offset),
1969                ..self
1970            }
1971        }
1972
1973        fn writer_props(&self) -> WriterProperties {
1974            let builder = WriterProperties::builder()
1975                .set_data_page_size_limit(self.max_data_page_size)
1976                .set_write_batch_size(self.write_batch_size)
1977                .set_writer_version(self.writer_version)
1978                .set_statistics_enabled(self.enabled_statistics);
1979
1980            let builder = match self.encoding {
1981                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
1982                    .set_dictionary_enabled(true)
1983                    .set_dictionary_page_size_limit(self.max_dict_page_size),
1984                _ => builder
1985                    .set_dictionary_enabled(false)
1986                    .set_encoding(self.encoding),
1987            };
1988
1989            builder.build()
1990        }
1991    }
1992
1993    /// Create a parquet file and then read it using
1994    /// `ParquetFileArrowReader` using a standard set of parameters
1995    /// `opts`.
1996    ///
1997    /// `rand_max` represents the maximum size of value to pass to to
1998    /// value generator
1999    fn run_single_column_reader_tests<T, F, G>(
2000        rand_max: i32,
2001        converted_type: ConvertedType,
2002        arrow_type: Option<ArrowDataType>,
2003        converter: F,
2004        encodings: &[Encoding],
2005    ) where
2006        T: DataType,
2007        G: RandGen<T>,
2008        F: Fn(&[Option<T::T>]) -> ArrayRef,
2009    {
2010        let all_options = vec![
2011            // choose record_batch_batch (15) so batches cross row
2012            // group boundaries (50 rows in 2 row groups) cases.
2013            TestOptions::new(2, 100, 15),
2014            // choose record_batch_batch (5) so batches sometime fall
2015            // on row group boundaries and (25 rows in 3 row groups
2016            // --> row groups of 10, 10, and 5). Tests buffer
2017            // refilling edge cases.
2018            TestOptions::new(3, 25, 5),
2019            // Choose record_batch_size (25) so all batches fall
2020            // exactly on row group boundary (25). Tests buffer
2021            // refilling edge cases.
2022            TestOptions::new(4, 100, 25),
2023            // Set maximum page size so row groups have multiple pages
2024            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2025            // Set small dictionary page size to test dictionary fallback
2026            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2027            // Test optional but with no nulls
2028            TestOptions::new(2, 256, 127).with_null_percent(0),
2029            // Test optional with nulls
2030            TestOptions::new(2, 256, 93).with_null_percent(25),
2031            // Test with limit of 0
2032            TestOptions::new(4, 100, 25).with_limit(0),
2033            // Test with limit of 50
2034            TestOptions::new(4, 100, 25).with_limit(50),
2035            // Test with limit equal to number of rows
2036            TestOptions::new(4, 100, 25).with_limit(10),
2037            // Test with limit larger than number of rows
2038            TestOptions::new(4, 100, 25).with_limit(101),
2039            // Test with limit + offset equal to number of rows
2040            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2041            // Test with limit + offset equal to number of rows
2042            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2043            // Test with limit + offset larger than number of rows
2044            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2045            // Test with no page-level statistics
2046            TestOptions::new(2, 256, 91)
2047                .with_null_percent(25)
2048                .with_enabled_statistics(EnabledStatistics::Chunk),
2049            // Test with no statistics
2050            TestOptions::new(2, 256, 91)
2051                .with_null_percent(25)
2052                .with_enabled_statistics(EnabledStatistics::None),
2053            // Test with all null
2054            TestOptions::new(2, 128, 91)
2055                .with_null_percent(100)
2056                .with_enabled_statistics(EnabledStatistics::None),
2057            // Test skip
2058
2059            // choose record_batch_batch (15) so batches cross row
2060            // group boundaries (50 rows in 2 row groups) cases.
2061            TestOptions::new(2, 100, 15).with_row_selections(),
2062            // choose record_batch_batch (5) so batches sometime fall
2063            // on row group boundaries and (25 rows in 3 row groups
2064            // --> row groups of 10, 10, and 5). Tests buffer
2065            // refilling edge cases.
2066            TestOptions::new(3, 25, 5).with_row_selections(),
2067            // Choose record_batch_size (25) so all batches fall
2068            // exactly on row group boundary (25). Tests buffer
2069            // refilling edge cases.
2070            TestOptions::new(4, 100, 25).with_row_selections(),
2071            // Set maximum page size so row groups have multiple pages
2072            TestOptions::new(3, 256, 73)
2073                .with_max_data_page_size(128)
2074                .with_row_selections(),
2075            // Set small dictionary page size to test dictionary fallback
2076            TestOptions::new(3, 256, 57)
2077                .with_max_dict_page_size(128)
2078                .with_row_selections(),
2079            // Test optional but with no nulls
2080            TestOptions::new(2, 256, 127)
2081                .with_null_percent(0)
2082                .with_row_selections(),
2083            // Test optional with nulls
2084            TestOptions::new(2, 256, 93)
2085                .with_null_percent(25)
2086                .with_row_selections(),
2087            // Test optional with nulls
2088            TestOptions::new(2, 256, 93)
2089                .with_null_percent(25)
2090                .with_row_selections()
2091                .with_limit(10),
2092            // Test optional with nulls
2093            TestOptions::new(2, 256, 93)
2094                .with_null_percent(25)
2095                .with_row_selections()
2096                .with_offset(20)
2097                .with_limit(10),
2098            // Test filter
2099
2100            // Test with row filter
2101            TestOptions::new(4, 100, 25).with_row_filter(),
2102            // Test with row selection and row filter
2103            TestOptions::new(4, 100, 25)
2104                .with_row_selections()
2105                .with_row_filter(),
2106            // Test with nulls and row filter
2107            TestOptions::new(2, 256, 93)
2108                .with_null_percent(25)
2109                .with_max_data_page_size(10)
2110                .with_row_filter(),
2111            // Test with nulls and row filter and small pages
2112            TestOptions::new(2, 256, 93)
2113                .with_null_percent(25)
2114                .with_max_data_page_size(10)
2115                .with_row_selections()
2116                .with_row_filter(),
2117            // Test with row selection and no offset index and small pages
2118            TestOptions::new(2, 256, 93)
2119                .with_enabled_statistics(EnabledStatistics::None)
2120                .with_max_data_page_size(10)
2121                .with_row_selections(),
2122        ];
2123
2124        all_options.into_iter().for_each(|opts| {
2125            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2126                for encoding in encodings {
2127                    let opts = TestOptions {
2128                        writer_version,
2129                        encoding: *encoding,
2130                        ..opts.clone()
2131                    };
2132
2133                    single_column_reader_test::<T, _, G>(
2134                        opts,
2135                        rand_max,
2136                        converted_type,
2137                        arrow_type.clone(),
2138                        &converter,
2139                    )
2140                }
2141            }
2142        });
2143    }
2144
2145    /// Create a parquet file and then read it using
2146    /// `ParquetFileArrowReader` using the parameters described in
2147    /// `opts`.
2148    fn single_column_reader_test<T, F, G>(
2149        opts: TestOptions,
2150        rand_max: i32,
2151        converted_type: ConvertedType,
2152        arrow_type: Option<ArrowDataType>,
2153        converter: F,
2154    ) where
2155        T: DataType,
2156        G: RandGen<T>,
2157        F: Fn(&[Option<T::T>]) -> ArrayRef,
2158    {
2159        // Print out options to facilitate debugging failures on CI
2160        println!(
2161            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2162            T::get_physical_type(), converted_type, arrow_type, opts
2163        );
2164
2165        //according to null_percent generate def_levels
2166        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2167            Some(null_percent) => {
2168                let mut rng = thread_rng();
2169
2170                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2171                    .map(|_| {
2172                        std::iter::from_fn(|| {
2173                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2174                        })
2175                        .take(opts.num_rows)
2176                        .collect()
2177                    })
2178                    .collect();
2179                (Repetition::OPTIONAL, Some(def_levels))
2180            }
2181            None => (Repetition::REQUIRED, None),
2182        };
2183
2184        //generate random table data
2185        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2186            .map(|idx| {
2187                let null_count = match def_levels.as_ref() {
2188                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2189                    None => 0,
2190                };
2191                G::gen_vec(rand_max, opts.num_rows - null_count)
2192            })
2193            .collect();
2194
2195        let len = match T::get_physical_type() {
2196            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2197            crate::basic::Type::INT96 => 12,
2198            _ => -1,
2199        };
2200
2201        let fields = vec![Arc::new(
2202            Type::primitive_type_builder("leaf", T::get_physical_type())
2203                .with_repetition(repetition)
2204                .with_converted_type(converted_type)
2205                .with_length(len)
2206                .build()
2207                .unwrap(),
2208        )];
2209
2210        let schema = Arc::new(
2211            Type::group_type_builder("test_schema")
2212                .with_fields(fields)
2213                .build()
2214                .unwrap(),
2215        );
2216
2217        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2218
2219        let mut file = tempfile::tempfile().unwrap();
2220
2221        generate_single_column_file_with_data::<T>(
2222            &values,
2223            def_levels.as_ref(),
2224            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2225            schema,
2226            arrow_field,
2227            &opts,
2228        )
2229        .unwrap();
2230
2231        file.rewind().unwrap();
2232
2233        let options = ArrowReaderOptions::new()
2234            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2235
2236        let mut builder =
2237            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2238
2239        let expected_data = match opts.row_selections {
2240            Some((selections, row_count)) => {
2241                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2242
2243                let mut skip_data: Vec<Option<T::T>> = vec![];
2244                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2245                for select in dequeue {
2246                    if select.skip {
2247                        without_skip_data.drain(0..select.row_count);
2248                    } else {
2249                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2250                    }
2251                }
2252                builder = builder.with_row_selection(selections);
2253
2254                assert_eq!(skip_data.len(), row_count);
2255                skip_data
2256            }
2257            None => {
2258                //get flatten table data
2259                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2260                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2261                expected_data
2262            }
2263        };
2264
2265        let mut expected_data = match opts.row_filter {
2266            Some(filter) => {
2267                let expected_data = expected_data
2268                    .into_iter()
2269                    .zip(filter.iter())
2270                    .filter_map(|(d, f)| f.then(|| d))
2271                    .collect();
2272
2273                let mut filter_offset = 0;
2274                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2275                    ProjectionMask::all(),
2276                    move |b| {
2277                        let array = BooleanArray::from_iter(
2278                            filter
2279                                .iter()
2280                                .skip(filter_offset)
2281                                .take(b.num_rows())
2282                                .map(|x| Some(*x)),
2283                        );
2284                        filter_offset += b.num_rows();
2285                        Ok(array)
2286                    },
2287                ))]);
2288
2289                builder = builder.with_row_filter(filter);
2290                expected_data
2291            }
2292            None => expected_data,
2293        };
2294
2295        if let Some(offset) = opts.offset {
2296            builder = builder.with_offset(offset);
2297            expected_data = expected_data.into_iter().skip(offset).collect();
2298        }
2299
2300        if let Some(limit) = opts.limit {
2301            builder = builder.with_limit(limit);
2302            expected_data = expected_data.into_iter().take(limit).collect();
2303        }
2304
2305        let mut record_reader = builder
2306            .with_batch_size(opts.record_batch_size)
2307            .build()
2308            .unwrap();
2309
2310        let mut total_read = 0;
2311        loop {
2312            let maybe_batch = record_reader.next();
2313            if total_read < expected_data.len() {
2314                let end = min(total_read + opts.record_batch_size, expected_data.len());
2315                let batch = maybe_batch.unwrap().unwrap();
2316                assert_eq!(end - total_read, batch.num_rows());
2317
2318                let a = converter(&expected_data[total_read..end]);
2319                let b = Arc::clone(batch.column(0));
2320
2321                assert_eq!(a.data_type(), b.data_type());
2322                assert_eq!(a.to_data(), b.to_data());
2323                assert_eq!(
2324                    a.as_any().type_id(),
2325                    b.as_any().type_id(),
2326                    "incorrect type ids"
2327                );
2328
2329                total_read = end;
2330            } else {
2331                assert!(maybe_batch.is_none());
2332                break;
2333            }
2334        }
2335    }
2336
2337    fn gen_expected_data<T: DataType>(
2338        def_levels: Option<&Vec<Vec<i16>>>,
2339        values: &[Vec<T::T>],
2340    ) -> Vec<Option<T::T>> {
2341        let data: Vec<Option<T::T>> = match def_levels {
2342            Some(levels) => {
2343                let mut values_iter = values.iter().flatten();
2344                levels
2345                    .iter()
2346                    .flatten()
2347                    .map(|d| match d {
2348                        1 => Some(values_iter.next().cloned().unwrap()),
2349                        0 => None,
2350                        _ => unreachable!(),
2351                    })
2352                    .collect()
2353            }
2354            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2355        };
2356        data
2357    }
2358
2359    fn generate_single_column_file_with_data<T: DataType>(
2360        values: &[Vec<T::T>],
2361        def_levels: Option<&Vec<Vec<i16>>>,
2362        file: File,
2363        schema: TypePtr,
2364        field: Option<Field>,
2365        opts: &TestOptions,
2366    ) -> Result<crate::format::FileMetaData> {
2367        let mut writer_props = opts.writer_props();
2368        if let Some(field) = field {
2369            let arrow_schema = Schema::new(vec![field]);
2370            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2371        }
2372
2373        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2374
2375        for (idx, v) in values.iter().enumerate() {
2376            let def_levels = def_levels.map(|d| d[idx].as_slice());
2377            let mut row_group_writer = writer.next_row_group()?;
2378            {
2379                let mut column_writer = row_group_writer
2380                    .next_column()?
2381                    .expect("Column writer is none!");
2382
2383                column_writer
2384                    .typed::<T>()
2385                    .write_batch(v, def_levels, None)?;
2386
2387                column_writer.close()?;
2388            }
2389            row_group_writer.close()?;
2390        }
2391
2392        writer.close()
2393    }
2394
2395    fn get_test_file(file_name: &str) -> File {
2396        let mut path = PathBuf::new();
2397        path.push(arrow::util::test_util::arrow_test_data());
2398        path.push(file_name);
2399
2400        File::open(path.as_path()).expect("File not found!")
2401    }
2402
2403    #[test]
2404    fn test_read_structs() {
2405        // This particular test file has columns of struct types where there is
2406        // a column that has the same name as one of the struct fields
2407        // (see: ARROW-11452)
2408        let testdata = arrow::util::test_util::parquet_test_data();
2409        let path = format!("{testdata}/nested_structs.rust.parquet");
2410        let file = File::open(&path).unwrap();
2411        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2412
2413        for batch in record_batch_reader {
2414            batch.unwrap();
2415        }
2416
2417        let file = File::open(&path).unwrap();
2418        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2419
2420        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2421        let projected_reader = builder
2422            .with_projection(mask)
2423            .with_batch_size(60)
2424            .build()
2425            .unwrap();
2426
2427        let expected_schema = Schema::new(vec![
2428            Field::new(
2429                "roll_num",
2430                ArrowDataType::Struct(Fields::from(vec![Field::new(
2431                    "count",
2432                    ArrowDataType::UInt64,
2433                    false,
2434                )])),
2435                false,
2436            ),
2437            Field::new(
2438                "PC_CUR",
2439                ArrowDataType::Struct(Fields::from(vec![
2440                    Field::new("mean", ArrowDataType::Int64, false),
2441                    Field::new("sum", ArrowDataType::Int64, false),
2442                ])),
2443                false,
2444            ),
2445        ]);
2446
2447        // Tests for #1652 and #1654
2448        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2449
2450        for batch in projected_reader {
2451            let batch = batch.unwrap();
2452            assert_eq!(batch.schema().as_ref(), &expected_schema);
2453        }
2454    }
2455
2456    #[test]
2457    fn test_read_maps() {
2458        let testdata = arrow::util::test_util::parquet_test_data();
2459        let path = format!("{testdata}/nested_maps.snappy.parquet");
2460        let file = File::open(path).unwrap();
2461        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2462
2463        for batch in record_batch_reader {
2464            batch.unwrap();
2465        }
2466    }
2467
2468    #[test]
2469    fn test_nested_nullability() {
2470        let message_type = "message nested {
2471          OPTIONAL Group group {
2472            REQUIRED INT32 leaf;
2473          }
2474        }";
2475
2476        let file = tempfile::tempfile().unwrap();
2477        let schema = Arc::new(parse_message_type(message_type).unwrap());
2478
2479        {
2480            // Write using low-level parquet API (#1167)
2481            let mut writer =
2482                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2483                    .unwrap();
2484
2485            {
2486                let mut row_group_writer = writer.next_row_group().unwrap();
2487                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2488
2489                column_writer
2490                    .typed::<Int32Type>()
2491                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2492                    .unwrap();
2493
2494                column_writer.close().unwrap();
2495                row_group_writer.close().unwrap();
2496            }
2497
2498            writer.close().unwrap();
2499        }
2500
2501        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2502        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2503
2504        let reader = builder.with_projection(mask).build().unwrap();
2505
2506        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2507            "group",
2508            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2509            true,
2510        )]));
2511
2512        let batch = reader.into_iter().next().unwrap().unwrap();
2513        assert_eq!(batch.schema().as_ref(), &expected_schema);
2514        assert_eq!(batch.num_rows(), 4);
2515        assert_eq!(batch.column(0).null_count(), 2);
2516    }
2517
2518    #[test]
2519    fn test_invalid_utf8() {
2520        // a parquet file with 1 column with invalid utf8
2521        let data = vec![
2522            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2523            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2524            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2525            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2526            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2527            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2528            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2529            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2530            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2531            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2532            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2533            82, 49,
2534        ];
2535
2536        let file = Bytes::from(data);
2537        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2538
2539        let error = record_batch_reader.next().unwrap().unwrap_err();
2540
2541        assert!(
2542            error.to_string().contains("invalid utf-8 sequence"),
2543            "{}",
2544            error
2545        );
2546    }
2547
2548    #[test]
2549    fn test_invalid_utf8_string_array() {
2550        test_invalid_utf8_string_array_inner::<i32>();
2551    }
2552
2553    #[test]
2554    fn test_invalid_utf8_large_string_array() {
2555        test_invalid_utf8_string_array_inner::<i64>();
2556    }
2557
2558    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2559        let cases = [
2560            invalid_utf8_first_char::<O>(),
2561            invalid_utf8_first_char_long_strings::<O>(),
2562            invalid_utf8_later_char::<O>(),
2563            invalid_utf8_later_char_long_strings::<O>(),
2564            invalid_utf8_later_char_really_long_strings::<O>(),
2565            invalid_utf8_later_char_really_long_strings2::<O>(),
2566        ];
2567        for array in &cases {
2568            for encoding in STRING_ENCODINGS {
2569                // data is not valid utf8 we can not construct a correct StringArray
2570                // safely, so purposely create an invalid StringArray
2571                let array = unsafe {
2572                    GenericStringArray::<O>::new_unchecked(
2573                        array.offsets().clone(),
2574                        array.values().clone(),
2575                        array.nulls().cloned(),
2576                    )
2577                };
2578                let data_type = array.data_type().clone();
2579                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2580                let err = read_from_parquet(data).unwrap_err();
2581                let expected_err =
2582                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2583                assert!(
2584                    err.to_string().contains(expected_err),
2585                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2586                );
2587            }
2588        }
2589    }
2590
2591    #[test]
2592    fn test_invalid_utf8_string_view_array() {
2593        let cases = [
2594            invalid_utf8_first_char::<i32>(),
2595            invalid_utf8_first_char_long_strings::<i32>(),
2596            invalid_utf8_later_char::<i32>(),
2597            invalid_utf8_later_char_long_strings::<i32>(),
2598            invalid_utf8_later_char_really_long_strings::<i32>(),
2599            invalid_utf8_later_char_really_long_strings2::<i32>(),
2600        ];
2601
2602        for encoding in STRING_ENCODINGS {
2603            for array in &cases {
2604                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2605                let array = array.as_binary_view();
2606
2607                // data is not valid utf8 we can not construct a correct StringArray
2608                // safely, so purposely create an invalid StringViewArray
2609                let array = unsafe {
2610                    StringViewArray::new_unchecked(
2611                        array.views().clone(),
2612                        array.data_buffers().to_vec(),
2613                        array.nulls().cloned(),
2614                    )
2615                };
2616
2617                let data_type = array.data_type().clone();
2618                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2619                let err = read_from_parquet(data).unwrap_err();
2620                let expected_err =
2621                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2622                assert!(
2623                    err.to_string().contains(expected_err),
2624                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2625                );
2626            }
2627        }
2628    }
2629
2630    /// Encodings suitable for string data
2631    const STRING_ENCODINGS: &[Option<Encoding>] = &[
2632        None,
2633        Some(Encoding::PLAIN),
2634        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2635        Some(Encoding::DELTA_BYTE_ARRAY),
2636    ];
2637
2638    /// Invalid Utf-8 sequence in the first character
2639    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2640    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2641
2642    /// Invalid Utf=8 sequence in NOT the first character
2643    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2644    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2645
2646    /// returns a BinaryArray with invalid UTF8 data in the first character
2647    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2648        let valid: &[u8] = b"   ";
2649        let invalid = INVALID_UTF8_FIRST_CHAR;
2650        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2651    }
2652
2653    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
2654    /// string larger than 12 bytes which is handled specially when reading
2655    /// `ByteViewArray`s
2656    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2657        let valid: &[u8] = b"   ";
2658        let mut invalid = vec![];
2659        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2660        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2661        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2662    }
2663
2664    /// returns a BinaryArray with invalid UTF8 data in a character other than
2665    /// the first (this is checked in a special codepath)
2666    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2667        let valid: &[u8] = b"   ";
2668        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2669        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2670    }
2671
2672    /// returns a BinaryArray with invalid UTF8 data in a character other than
2673    /// the first in a string larger than 12 bytes which is handled specially
2674    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2675    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2676        let valid: &[u8] = b"   ";
2677        let mut invalid = vec![];
2678        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2679        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2680        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2681    }
2682
2683    /// returns a BinaryArray with invalid UTF8 data in a character other than
2684    /// the first in a string larger than 128 bytes which is handled specially
2685    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2686    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2687        let valid: &[u8] = b"   ";
2688        let mut invalid = vec![];
2689        for _ in 0..10 {
2690            // each instance is 38 bytes
2691            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2692        }
2693        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2694        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2695    }
2696
2697    /// returns a BinaryArray with small invalid UTF8 data followed by a large
2698    /// invalid UTF8 data in a character other than the first in a string larger
2699    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2700        let valid: &[u8] = b"   ";
2701        let mut valid_long = vec![];
2702        for _ in 0..10 {
2703            // each instance is 38 bytes
2704            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2705        }
2706        let invalid = INVALID_UTF8_LATER_CHAR;
2707        GenericBinaryArray::<O>::from_iter(vec![
2708            None,
2709            Some(valid),
2710            Some(invalid),
2711            None,
2712            Some(&valid_long),
2713            Some(valid),
2714        ])
2715    }
2716
2717    /// writes the array into a single column parquet file with the specified
2718    /// encoding.
2719    ///
2720    /// If no encoding is specified, use default (dictionary) encoding
2721    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2722        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2723        let mut data = vec![];
2724        let schema = batch.schema();
2725        let props = encoding.map(|encoding| {
2726            WriterProperties::builder()
2727                // must disable dictionary encoding to actually use encoding
2728                .set_dictionary_enabled(false)
2729                .set_encoding(encoding)
2730                .build()
2731        });
2732
2733        {
2734            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
2735            writer.write(&batch).unwrap();
2736            writer.flush().unwrap();
2737            writer.close().unwrap();
2738        };
2739        data
2740    }
2741
2742    /// read the parquet file into a record batch
2743    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
2744        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
2745            .unwrap()
2746            .build()
2747            .unwrap();
2748
2749        reader.collect()
2750    }
2751
2752    #[test]
2753    fn test_dictionary_preservation() {
2754        let fields = vec![Arc::new(
2755            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
2756                .with_repetition(Repetition::OPTIONAL)
2757                .with_converted_type(ConvertedType::UTF8)
2758                .build()
2759                .unwrap(),
2760        )];
2761
2762        let schema = Arc::new(
2763            Type::group_type_builder("test_schema")
2764                .with_fields(fields)
2765                .build()
2766                .unwrap(),
2767        );
2768
2769        let dict_type = ArrowDataType::Dictionary(
2770            Box::new(ArrowDataType::Int32),
2771            Box::new(ArrowDataType::Utf8),
2772        );
2773
2774        let arrow_field = Field::new("leaf", dict_type, true);
2775
2776        let mut file = tempfile::tempfile().unwrap();
2777
2778        let values = vec![
2779            vec![
2780                ByteArray::from("hello"),
2781                ByteArray::from("a"),
2782                ByteArray::from("b"),
2783                ByteArray::from("d"),
2784            ],
2785            vec![
2786                ByteArray::from("c"),
2787                ByteArray::from("a"),
2788                ByteArray::from("b"),
2789            ],
2790        ];
2791
2792        let def_levels = vec![
2793            vec![1, 0, 0, 1, 0, 0, 1, 1],
2794            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
2795        ];
2796
2797        let opts = TestOptions {
2798            encoding: Encoding::RLE_DICTIONARY,
2799            ..Default::default()
2800        };
2801
2802        generate_single_column_file_with_data::<ByteArrayType>(
2803            &values,
2804            Some(&def_levels),
2805            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2806            schema,
2807            Some(arrow_field),
2808            &opts,
2809        )
2810        .unwrap();
2811
2812        file.rewind().unwrap();
2813
2814        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
2815
2816        let batches = record_reader
2817            .collect::<Result<Vec<RecordBatch>, _>>()
2818            .unwrap();
2819
2820        assert_eq!(batches.len(), 6);
2821        assert!(batches.iter().all(|x| x.num_columns() == 1));
2822
2823        let row_counts = batches
2824            .iter()
2825            .map(|x| (x.num_rows(), x.column(0).null_count()))
2826            .collect::<Vec<_>>();
2827
2828        assert_eq!(
2829            row_counts,
2830            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
2831        );
2832
2833        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
2834
2835        // First and second batch in same row group -> same dictionary
2836        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
2837        // Third batch spans row group -> computed dictionary
2838        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
2839        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
2840        // Fourth, fifth and sixth from same row group -> same dictionary
2841        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
2842        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
2843    }
2844
2845    #[test]
2846    fn test_read_null_list() {
2847        let testdata = arrow::util::test_util::parquet_test_data();
2848        let path = format!("{testdata}/null_list.parquet");
2849        let file = File::open(path).unwrap();
2850        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2851
2852        let batch = record_batch_reader.next().unwrap().unwrap();
2853        assert_eq!(batch.num_rows(), 1);
2854        assert_eq!(batch.num_columns(), 1);
2855        assert_eq!(batch.column(0).len(), 1);
2856
2857        let list = batch
2858            .column(0)
2859            .as_any()
2860            .downcast_ref::<ListArray>()
2861            .unwrap();
2862        assert_eq!(list.len(), 1);
2863        assert!(list.is_valid(0));
2864
2865        let val = list.value(0);
2866        assert_eq!(val.len(), 0);
2867    }
2868
2869    #[test]
2870    fn test_null_schema_inference() {
2871        let testdata = arrow::util::test_util::parquet_test_data();
2872        let path = format!("{testdata}/null_list.parquet");
2873        let file = File::open(path).unwrap();
2874
2875        let arrow_field = Field::new(
2876            "emptylist",
2877            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Null, true))),
2878            true,
2879        );
2880
2881        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
2882        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2883        let schema = builder.schema();
2884        assert_eq!(schema.fields().len(), 1);
2885        assert_eq!(schema.field(0), &arrow_field);
2886    }
2887
2888    #[test]
2889    fn test_skip_metadata() {
2890        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
2891        let field = Field::new("col", col.data_type().clone(), true);
2892
2893        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
2894
2895        let metadata = [("key".to_string(), "value".to_string())]
2896            .into_iter()
2897            .collect();
2898
2899        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
2900
2901        assert_ne!(schema_with_metadata, schema_without_metadata);
2902
2903        let batch =
2904            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
2905
2906        let file = |version: WriterVersion| {
2907            let props = WriterProperties::builder()
2908                .set_writer_version(version)
2909                .build();
2910
2911            let file = tempfile().unwrap();
2912            let mut writer =
2913                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2914                    .unwrap();
2915            writer.write(&batch).unwrap();
2916            writer.close().unwrap();
2917            file
2918        };
2919
2920        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
2921
2922        let v1_reader = file(WriterVersion::PARQUET_1_0);
2923        let v2_reader = file(WriterVersion::PARQUET_2_0);
2924
2925        let arrow_reader =
2926            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
2927        assert_eq!(arrow_reader.schema(), schema_with_metadata);
2928
2929        let reader =
2930            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
2931                .unwrap()
2932                .build()
2933                .unwrap();
2934        assert_eq!(reader.schema(), schema_without_metadata);
2935
2936        let arrow_reader =
2937            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
2938        assert_eq!(arrow_reader.schema(), schema_with_metadata);
2939
2940        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
2941            .unwrap()
2942            .build()
2943            .unwrap();
2944        assert_eq!(reader.schema(), schema_without_metadata);
2945    }
2946
2947    fn write_parquet_from_iter<I, F>(value: I) -> File
2948    where
2949        I: IntoIterator<Item = (F, ArrayRef)>,
2950        F: AsRef<str>,
2951    {
2952        let batch = RecordBatch::try_from_iter(value).unwrap();
2953        let file = tempfile().unwrap();
2954        let mut writer =
2955            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
2956        writer.write(&batch).unwrap();
2957        writer.close().unwrap();
2958        file
2959    }
2960
2961    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
2962    where
2963        I: IntoIterator<Item = (F, ArrayRef)>,
2964        F: AsRef<str>,
2965    {
2966        let file = write_parquet_from_iter(value);
2967        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
2968        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
2969            file.try_clone().unwrap(),
2970            options_with_schema,
2971        );
2972        assert_eq!(builder.err().unwrap().to_string(), expected_error);
2973    }
2974
2975    #[test]
2976    fn test_schema_too_few_columns() {
2977        run_schema_test_with_error(
2978            vec![
2979                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
2980                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
2981            ],
2982            Arc::new(Schema::new(vec![Field::new(
2983                "int64",
2984                ArrowDataType::Int64,
2985                false,
2986            )])),
2987            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
2988        );
2989    }
2990
2991    #[test]
2992    fn test_schema_too_many_columns() {
2993        run_schema_test_with_error(
2994            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
2995            Arc::new(Schema::new(vec![
2996                Field::new("int64", ArrowDataType::Int64, false),
2997                Field::new("int32", ArrowDataType::Int32, false),
2998            ])),
2999            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3000        );
3001    }
3002
3003    #[test]
3004    fn test_schema_mismatched_column_names() {
3005        run_schema_test_with_error(
3006            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3007            Arc::new(Schema::new(vec![Field::new(
3008                "other",
3009                ArrowDataType::Int64,
3010                false,
3011            )])),
3012            "Arrow: incompatible arrow schema, expected field named int64 got other",
3013        );
3014    }
3015
3016    #[test]
3017    fn test_schema_incompatible_columns() {
3018        run_schema_test_with_error(
3019            vec![
3020                (
3021                    "col1_invalid",
3022                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3023                ),
3024                (
3025                    "col2_valid",
3026                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3027                ),
3028                (
3029                    "col3_invalid",
3030                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3031                ),
3032            ],
3033            Arc::new(Schema::new(vec![
3034                Field::new("col1_invalid", ArrowDataType::Int32, false),
3035                Field::new("col2_valid", ArrowDataType::Int32, false),
3036                Field::new("col3_invalid", ArrowDataType::Int32, false),
3037            ])),
3038            "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3039        );
3040    }
3041
3042    #[test]
3043    fn test_one_incompatible_nested_column() {
3044        let nested_fields = Fields::from(vec![
3045            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3046            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3047        ]);
3048        let nested = StructArray::try_new(
3049            nested_fields,
3050            vec![
3051                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3052                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3053            ],
3054            None,
3055        )
3056        .expect("struct array");
3057        let supplied_nested_fields = Fields::from(vec![
3058            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3059            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3060        ]);
3061        run_schema_test_with_error(
3062            vec![
3063                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3064                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3065                ("nested", Arc::new(nested) as ArrayRef),
3066            ],
3067            Arc::new(Schema::new(vec![
3068                Field::new("col1", ArrowDataType::Int64, false),
3069                Field::new("col2", ArrowDataType::Int32, false),
3070                Field::new(
3071                    "nested",
3072                    ArrowDataType::Struct(supplied_nested_fields),
3073                    false,
3074                ),
3075            ])),
3076            "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3077        );
3078    }
3079
3080    #[test]
3081    fn test_read_binary_as_utf8() {
3082        let file = write_parquet_from_iter(vec![
3083            (
3084                "binary_to_utf8",
3085                Arc::new(BinaryArray::from(vec![
3086                    b"one".as_ref(),
3087                    b"two".as_ref(),
3088                    b"three".as_ref(),
3089                ])) as ArrayRef,
3090            ),
3091            (
3092                "large_binary_to_large_utf8",
3093                Arc::new(LargeBinaryArray::from(vec![
3094                    b"one".as_ref(),
3095                    b"two".as_ref(),
3096                    b"three".as_ref(),
3097                ])) as ArrayRef,
3098            ),
3099            (
3100                "binary_view_to_utf8_view",
3101                Arc::new(BinaryViewArray::from(vec![
3102                    b"one".as_ref(),
3103                    b"two".as_ref(),
3104                    b"three".as_ref(),
3105                ])) as ArrayRef,
3106            ),
3107        ]);
3108        let supplied_fields = Fields::from(vec![
3109            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3110            Field::new(
3111                "large_binary_to_large_utf8",
3112                ArrowDataType::LargeUtf8,
3113                false,
3114            ),
3115            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3116        ]);
3117
3118        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3119        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3120            file.try_clone().unwrap(),
3121            options,
3122        )
3123        .expect("reader builder with schema")
3124        .build()
3125        .expect("reader with schema");
3126
3127        let batch = arrow_reader.next().unwrap().unwrap();
3128        assert_eq!(batch.num_columns(), 3);
3129        assert_eq!(batch.num_rows(), 3);
3130        assert_eq!(
3131            batch
3132                .column(0)
3133                .as_string::<i32>()
3134                .iter()
3135                .collect::<Vec<_>>(),
3136            vec![Some("one"), Some("two"), Some("three")]
3137        );
3138
3139        assert_eq!(
3140            batch
3141                .column(1)
3142                .as_string::<i64>()
3143                .iter()
3144                .collect::<Vec<_>>(),
3145            vec![Some("one"), Some("two"), Some("three")]
3146        );
3147
3148        assert_eq!(
3149            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3150            vec![Some("one"), Some("two"), Some("three")]
3151        );
3152    }
3153
3154    #[test]
3155    #[should_panic(expected = "Invalid UTF8 sequence at")]
3156    fn test_read_non_utf8_binary_as_utf8() {
3157        let file = write_parquet_from_iter(vec![(
3158            "non_utf8_binary",
3159            Arc::new(BinaryArray::from(vec![
3160                b"\xDE\x00\xFF".as_ref(),
3161                b"\xDE\x01\xAA".as_ref(),
3162                b"\xDE\x02\xFF".as_ref(),
3163            ])) as ArrayRef,
3164        )]);
3165        let supplied_fields = Fields::from(vec![Field::new(
3166            "non_utf8_binary",
3167            ArrowDataType::Utf8,
3168            false,
3169        )]);
3170
3171        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3172        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3173            file.try_clone().unwrap(),
3174            options,
3175        )
3176        .expect("reader builder with schema")
3177        .build()
3178        .expect("reader with schema");
3179        arrow_reader.next().unwrap().unwrap_err();
3180    }
3181
3182    #[test]
3183    fn test_with_schema() {
3184        let nested_fields = Fields::from(vec![
3185            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3186            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3187        ]);
3188
3189        let nested_arrays: Vec<ArrayRef> = vec![
3190            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3191            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3192        ];
3193
3194        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3195
3196        let file = write_parquet_from_iter(vec![
3197            (
3198                "int32_to_ts_second",
3199                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3200            ),
3201            (
3202                "date32_to_date64",
3203                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3204            ),
3205            ("nested", Arc::new(nested) as ArrayRef),
3206        ]);
3207
3208        let supplied_nested_fields = Fields::from(vec![
3209            Field::new(
3210                "utf8_to_dict",
3211                ArrowDataType::Dictionary(
3212                    Box::new(ArrowDataType::Int32),
3213                    Box::new(ArrowDataType::Utf8),
3214                ),
3215                false,
3216            ),
3217            Field::new(
3218                "int64_to_ts_nano",
3219                ArrowDataType::Timestamp(
3220                    arrow::datatypes::TimeUnit::Nanosecond,
3221                    Some("+10:00".into()),
3222                ),
3223                false,
3224            ),
3225        ]);
3226
3227        let supplied_schema = Arc::new(Schema::new(vec![
3228            Field::new(
3229                "int32_to_ts_second",
3230                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3231                false,
3232            ),
3233            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3234            Field::new(
3235                "nested",
3236                ArrowDataType::Struct(supplied_nested_fields),
3237                false,
3238            ),
3239        ]));
3240
3241        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3242        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3243            file.try_clone().unwrap(),
3244            options,
3245        )
3246        .expect("reader builder with schema")
3247        .build()
3248        .expect("reader with schema");
3249
3250        assert_eq!(arrow_reader.schema(), supplied_schema);
3251        let batch = arrow_reader.next().unwrap().unwrap();
3252        assert_eq!(batch.num_columns(), 3);
3253        assert_eq!(batch.num_rows(), 4);
3254        assert_eq!(
3255            batch
3256                .column(0)
3257                .as_any()
3258                .downcast_ref::<TimestampSecondArray>()
3259                .expect("downcast to timestamp second")
3260                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3261                .map(|v| v.to_string())
3262                .expect("value as datetime"),
3263            "1970-01-01 01:00:00 +01:00"
3264        );
3265        assert_eq!(
3266            batch
3267                .column(1)
3268                .as_any()
3269                .downcast_ref::<Date64Array>()
3270                .expect("downcast to date64")
3271                .value_as_date(0)
3272                .map(|v| v.to_string())
3273                .expect("value as date"),
3274            "1970-01-01"
3275        );
3276
3277        let nested = batch
3278            .column(2)
3279            .as_any()
3280            .downcast_ref::<StructArray>()
3281            .expect("downcast to struct");
3282
3283        let nested_dict = nested
3284            .column(0)
3285            .as_any()
3286            .downcast_ref::<Int32DictionaryArray>()
3287            .expect("downcast to dictionary");
3288
3289        assert_eq!(
3290            nested_dict
3291                .values()
3292                .as_any()
3293                .downcast_ref::<StringArray>()
3294                .expect("downcast to string")
3295                .iter()
3296                .collect::<Vec<_>>(),
3297            vec![Some("a"), Some("b")]
3298        );
3299
3300        assert_eq!(
3301            nested_dict.keys().iter().collect::<Vec<_>>(),
3302            vec![Some(0), Some(0), Some(0), Some(1)]
3303        );
3304
3305        assert_eq!(
3306            nested
3307                .column(1)
3308                .as_any()
3309                .downcast_ref::<TimestampNanosecondArray>()
3310                .expect("downcast to timestamp nanosecond")
3311                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3312                .map(|v| v.to_string())
3313                .expect("value as datetime"),
3314            "1970-01-01 10:00:00.000000001 +10:00"
3315        );
3316    }
3317
3318    #[test]
3319    fn test_empty_projection() {
3320        let testdata = arrow::util::test_util::parquet_test_data();
3321        let path = format!("{testdata}/alltypes_plain.parquet");
3322        let file = File::open(path).unwrap();
3323
3324        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3325        let file_metadata = builder.metadata().file_metadata();
3326        let expected_rows = file_metadata.num_rows() as usize;
3327
3328        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3329        let batch_reader = builder
3330            .with_projection(mask)
3331            .with_batch_size(2)
3332            .build()
3333            .unwrap();
3334
3335        let mut total_rows = 0;
3336        for maybe_batch in batch_reader {
3337            let batch = maybe_batch.unwrap();
3338            total_rows += batch.num_rows();
3339            assert_eq!(batch.num_columns(), 0);
3340            assert!(batch.num_rows() <= 2);
3341        }
3342
3343        assert_eq!(total_rows, expected_rows);
3344    }
3345
3346    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3347        let schema = Arc::new(Schema::new(vec![Field::new(
3348            "list",
3349            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int32, true))),
3350            true,
3351        )]));
3352
3353        let mut buf = Vec::with_capacity(1024);
3354
3355        let mut writer = ArrowWriter::try_new(
3356            &mut buf,
3357            schema.clone(),
3358            Some(
3359                WriterProperties::builder()
3360                    .set_max_row_group_size(row_group_size)
3361                    .build(),
3362            ),
3363        )
3364        .unwrap();
3365        for _ in 0..2 {
3366            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3367            for _ in 0..(batch_size) {
3368                list_builder.append(true);
3369            }
3370            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3371                .unwrap();
3372            writer.write(&batch).unwrap();
3373        }
3374        writer.close().unwrap();
3375
3376        let mut record_reader =
3377            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3378        assert_eq!(
3379            batch_size,
3380            record_reader.next().unwrap().unwrap().num_rows()
3381        );
3382        assert_eq!(
3383            batch_size,
3384            record_reader.next().unwrap().unwrap().num_rows()
3385        );
3386    }
3387
3388    #[test]
3389    fn test_row_group_exact_multiple() {
3390        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3391        test_row_group_batch(8, 8);
3392        test_row_group_batch(10, 8);
3393        test_row_group_batch(8, 10);
3394        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3395        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3396        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3397        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3398        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3399    }
3400
3401    /// Given a RecordBatch containing all the column data, return the expected batches given
3402    /// a `batch_size` and `selection`
3403    fn get_expected_batches(
3404        column: &RecordBatch,
3405        selection: &RowSelection,
3406        batch_size: usize,
3407    ) -> Vec<RecordBatch> {
3408        let mut expected_batches = vec![];
3409
3410        let mut selection: VecDeque<_> = selection.clone().into();
3411        let mut row_offset = 0;
3412        let mut last_start = None;
3413        while row_offset < column.num_rows() && !selection.is_empty() {
3414            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3415            while batch_remaining > 0 && !selection.is_empty() {
3416                let (to_read, skip) = match selection.front_mut() {
3417                    Some(selection) if selection.row_count > batch_remaining => {
3418                        selection.row_count -= batch_remaining;
3419                        (batch_remaining, selection.skip)
3420                    }
3421                    Some(_) => {
3422                        let select = selection.pop_front().unwrap();
3423                        (select.row_count, select.skip)
3424                    }
3425                    None => break,
3426                };
3427
3428                batch_remaining -= to_read;
3429
3430                match skip {
3431                    true => {
3432                        if let Some(last_start) = last_start.take() {
3433                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3434                        }
3435                        row_offset += to_read
3436                    }
3437                    false => {
3438                        last_start.get_or_insert(row_offset);
3439                        row_offset += to_read
3440                    }
3441                }
3442            }
3443        }
3444
3445        if let Some(last_start) = last_start.take() {
3446            expected_batches.push(column.slice(last_start, row_offset - last_start))
3447        }
3448
3449        // Sanity check, all batches except the final should be the batch size
3450        for batch in &expected_batches[..expected_batches.len() - 1] {
3451            assert_eq!(batch.num_rows(), batch_size);
3452        }
3453
3454        expected_batches
3455    }
3456
3457    fn create_test_selection(
3458        step_len: usize,
3459        total_len: usize,
3460        skip_first: bool,
3461    ) -> (RowSelection, usize) {
3462        let mut remaining = total_len;
3463        let mut skip = skip_first;
3464        let mut vec = vec![];
3465        let mut selected_count = 0;
3466        while remaining != 0 {
3467            let step = if remaining > step_len {
3468                step_len
3469            } else {
3470                remaining
3471            };
3472            vec.push(RowSelector {
3473                row_count: step,
3474                skip,
3475            });
3476            remaining -= step;
3477            if !skip {
3478                selected_count += step;
3479            }
3480            skip = !skip;
3481        }
3482        (vec.into(), selected_count)
3483    }
3484
3485    #[test]
3486    fn test_scan_row_with_selection() {
3487        let testdata = arrow::util::test_util::parquet_test_data();
3488        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3489        let test_file = File::open(&path).unwrap();
3490
3491        let mut serial_reader =
3492            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3493        let data = serial_reader.next().unwrap().unwrap();
3494
3495        let do_test = |batch_size: usize, selection_len: usize| {
3496            for skip_first in [false, true] {
3497                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3498
3499                let expected = get_expected_batches(&data, &selections, batch_size);
3500                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3501                assert_eq!(
3502                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3503                    expected,
3504                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3505                );
3506            }
3507        };
3508
3509        // total row count 7300
3510        // 1. test selection len more than one page row count
3511        do_test(1000, 1000);
3512
3513        // 2. test selection len less than one page row count
3514        do_test(20, 20);
3515
3516        // 3. test selection_len less than batch_size
3517        do_test(20, 5);
3518
3519        // 4. test selection_len more than batch_size
3520        // If batch_size < selection_len
3521        do_test(20, 5);
3522
3523        fn create_skip_reader(
3524            test_file: &File,
3525            batch_size: usize,
3526            selections: RowSelection,
3527        ) -> ParquetRecordBatchReader {
3528            let options = ArrowReaderOptions::new().with_page_index(true);
3529            let file = test_file.try_clone().unwrap();
3530            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3531                .unwrap()
3532                .with_batch_size(batch_size)
3533                .with_row_selection(selections)
3534                .build()
3535                .unwrap()
3536        }
3537    }
3538
3539    #[test]
3540    fn test_batch_size_overallocate() {
3541        let testdata = arrow::util::test_util::parquet_test_data();
3542        // `alltypes_plain.parquet` only have 8 rows
3543        let path = format!("{testdata}/alltypes_plain.parquet");
3544        let test_file = File::open(path).unwrap();
3545
3546        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3547        let num_rows = builder.metadata.file_metadata().num_rows();
3548        let reader = builder
3549            .with_batch_size(1024)
3550            .with_projection(ProjectionMask::all())
3551            .build()
3552            .unwrap();
3553        assert_ne!(1024, num_rows);
3554        assert_eq!(reader.batch_size, num_rows as usize);
3555    }
3556
3557    #[test]
3558    fn test_read_with_page_index_enabled() {
3559        let testdata = arrow::util::test_util::parquet_test_data();
3560
3561        {
3562            // `alltypes_tiny_pages.parquet` has page index
3563            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3564            let test_file = File::open(path).unwrap();
3565            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3566                test_file,
3567                ArrowReaderOptions::new().with_page_index(true),
3568            )
3569            .unwrap();
3570            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3571            let reader = builder.build().unwrap();
3572            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3573            assert_eq!(batches.len(), 8);
3574        }
3575
3576        {
3577            // `alltypes_plain.parquet` doesn't have page index
3578            let path = format!("{testdata}/alltypes_plain.parquet");
3579            let test_file = File::open(path).unwrap();
3580            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3581                test_file,
3582                ArrowReaderOptions::new().with_page_index(true),
3583            )
3584            .unwrap();
3585            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
3586            // we should read the file successfully.
3587            // FIXME: this test will fail when metadata parsing returns `None` for missing page
3588            // indexes. https://github.com/apache/arrow-rs/issues/6447
3589            assert!(builder.metadata().offset_index().unwrap()[0].is_empty());
3590            let reader = builder.build().unwrap();
3591            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3592            assert_eq!(batches.len(), 1);
3593        }
3594    }
3595
3596    #[test]
3597    fn test_raw_repetition() {
3598        const MESSAGE_TYPE: &str = "
3599            message Log {
3600              OPTIONAL INT32 eventType;
3601              REPEATED INT32 category;
3602              REPEATED group filter {
3603                OPTIONAL INT32 error;
3604              }
3605            }
3606        ";
3607        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3608        let props = Default::default();
3609
3610        let mut buf = Vec::with_capacity(1024);
3611        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3612        let mut row_group_writer = writer.next_row_group().unwrap();
3613
3614        // column 0
3615        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3616        col_writer
3617            .typed::<Int32Type>()
3618            .write_batch(&[1], Some(&[1]), None)
3619            .unwrap();
3620        col_writer.close().unwrap();
3621        // column 1
3622        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3623        col_writer
3624            .typed::<Int32Type>()
3625            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3626            .unwrap();
3627        col_writer.close().unwrap();
3628        // column 2
3629        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3630        col_writer
3631            .typed::<Int32Type>()
3632            .write_batch(&[1], Some(&[1]), Some(&[0]))
3633            .unwrap();
3634        col_writer.close().unwrap();
3635
3636        let rg_md = row_group_writer.close().unwrap();
3637        assert_eq!(rg_md.num_rows(), 1);
3638        writer.close().unwrap();
3639
3640        let bytes = Bytes::from(buf);
3641
3642        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3643        let full = no_mask.next().unwrap().unwrap();
3644
3645        assert_eq!(full.num_columns(), 3);
3646
3647        for idx in 0..3 {
3648            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3649            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3650            let mut reader = b.with_projection(mask).build().unwrap();
3651            let projected = reader.next().unwrap().unwrap();
3652
3653            assert_eq!(projected.num_columns(), 1);
3654            assert_eq!(full.column(idx), projected.column(0));
3655        }
3656    }
3657
3658    #[test]
3659    fn test_read_lz4_raw() {
3660        let testdata = arrow::util::test_util::parquet_test_data();
3661        let path = format!("{testdata}/lz4_raw_compressed.parquet");
3662        let file = File::open(path).unwrap();
3663
3664        let batches = ParquetRecordBatchReader::try_new(file, 1024)
3665            .unwrap()
3666            .collect::<Result<Vec<_>, _>>()
3667            .unwrap();
3668        assert_eq!(batches.len(), 1);
3669        let batch = &batches[0];
3670
3671        assert_eq!(batch.num_columns(), 3);
3672        assert_eq!(batch.num_rows(), 4);
3673
3674        // https://github.com/apache/parquet-testing/pull/18
3675        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3676        assert_eq!(
3677            a.values(),
3678            &[1593604800, 1593604800, 1593604801, 1593604801]
3679        );
3680
3681        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3682        let a: Vec<_> = a.iter().flatten().collect();
3683        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3684
3685        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3686        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3687    }
3688
3689    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
3690    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
3691    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3692    //    LZ4_HADOOP algorithm for compression.
3693    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3694    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
3695    //    older parquet-cpp versions.
3696    //
3697    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
3698    #[test]
3699    fn test_read_lz4_hadoop_fallback() {
3700        for file in [
3701            "hadoop_lz4_compressed.parquet",
3702            "non_hadoop_lz4_compressed.parquet",
3703        ] {
3704            let testdata = arrow::util::test_util::parquet_test_data();
3705            let path = format!("{testdata}/{file}");
3706            let file = File::open(path).unwrap();
3707            let expected_rows = 4;
3708
3709            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3710                .unwrap()
3711                .collect::<Result<Vec<_>, _>>()
3712                .unwrap();
3713            assert_eq!(batches.len(), 1);
3714            let batch = &batches[0];
3715
3716            assert_eq!(batch.num_columns(), 3);
3717            assert_eq!(batch.num_rows(), expected_rows);
3718
3719            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3720            assert_eq!(
3721                a.values(),
3722                &[1593604800, 1593604800, 1593604801, 1593604801]
3723            );
3724
3725            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3726            let b: Vec<_> = b.iter().flatten().collect();
3727            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
3728
3729            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3730            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
3731        }
3732    }
3733
3734    #[test]
3735    fn test_read_lz4_hadoop_large() {
3736        let testdata = arrow::util::test_util::parquet_test_data();
3737        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
3738        let file = File::open(path).unwrap();
3739        let expected_rows = 10000;
3740
3741        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3742            .unwrap()
3743            .collect::<Result<Vec<_>, _>>()
3744            .unwrap();
3745        assert_eq!(batches.len(), 1);
3746        let batch = &batches[0];
3747
3748        assert_eq!(batch.num_columns(), 1);
3749        assert_eq!(batch.num_rows(), expected_rows);
3750
3751        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
3752        let a: Vec<_> = a.iter().flatten().collect();
3753        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
3754        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
3755        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
3756        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
3757    }
3758
3759    #[test]
3760    #[cfg(feature = "snap")]
3761    fn test_read_nested_lists() {
3762        let testdata = arrow::util::test_util::parquet_test_data();
3763        let path = format!("{testdata}/nested_lists.snappy.parquet");
3764        let file = File::open(path).unwrap();
3765
3766        let f = file.try_clone().unwrap();
3767        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
3768        let expected = reader.next().unwrap().unwrap();
3769        assert_eq!(expected.num_rows(), 3);
3770
3771        let selection = RowSelection::from(vec![
3772            RowSelector::skip(1),
3773            RowSelector::select(1),
3774            RowSelector::skip(1),
3775        ]);
3776        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
3777            .unwrap()
3778            .with_row_selection(selection)
3779            .build()
3780            .unwrap();
3781
3782        let actual = reader.next().unwrap().unwrap();
3783        assert_eq!(actual.num_rows(), 1);
3784        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
3785    }
3786
3787    #[test]
3788    fn test_arbitrary_decimal() {
3789        let values = [1, 2, 3, 4, 5, 6, 7, 8];
3790        let decimals_19_0 = Decimal128Array::from_iter_values(values)
3791            .with_precision_and_scale(19, 0)
3792            .unwrap();
3793        let decimals_12_0 = Decimal128Array::from_iter_values(values)
3794            .with_precision_and_scale(12, 0)
3795            .unwrap();
3796        let decimals_17_10 = Decimal128Array::from_iter_values(values)
3797            .with_precision_and_scale(17, 10)
3798            .unwrap();
3799
3800        let written = RecordBatch::try_from_iter([
3801            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
3802            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
3803            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
3804        ])
3805        .unwrap();
3806
3807        let mut buffer = Vec::with_capacity(1024);
3808        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
3809        writer.write(&written).unwrap();
3810        writer.close().unwrap();
3811
3812        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
3813            .unwrap()
3814            .collect::<Result<Vec<_>, _>>()
3815            .unwrap();
3816
3817        assert_eq!(&written.slice(0, 8), &read[0]);
3818    }
3819
3820    #[test]
3821    fn test_list_skip() {
3822        let mut list = ListBuilder::new(Int32Builder::new());
3823        list.append_value([Some(1), Some(2)]);
3824        list.append_value([Some(3)]);
3825        list.append_value([Some(4)]);
3826        let list = list.finish();
3827        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
3828
3829        // First page contains 2 values but only 1 row
3830        let props = WriterProperties::builder()
3831            .set_data_page_row_count_limit(1)
3832            .set_write_batch_size(2)
3833            .build();
3834
3835        let mut buffer = Vec::with_capacity(1024);
3836        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
3837        writer.write(&batch).unwrap();
3838        writer.close().unwrap();
3839
3840        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
3841        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
3842            .unwrap()
3843            .with_row_selection(selection.into())
3844            .build()
3845            .unwrap();
3846        let out = reader.next().unwrap().unwrap();
3847        assert_eq!(out.num_rows(), 1);
3848        assert_eq!(out, batch.slice(2, 1));
3849    }
3850
3851    fn test_decimal_roundtrip<T: DecimalType>() {
3852        // Precision <= 9 -> INT32
3853        // Precision <= 18 -> INT64
3854        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
3855
3856        let d = |values: Vec<usize>, p: u8| {
3857            let iter = values.into_iter().map(T::Native::usize_as);
3858            PrimitiveArray::<T>::from_iter_values(iter)
3859                .with_precision_and_scale(p, 2)
3860                .unwrap()
3861        };
3862
3863        let d1 = d(vec![1, 2, 3, 4, 5], 9);
3864        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
3865        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
3866        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
3867
3868        let batch = RecordBatch::try_from_iter([
3869            ("d1", Arc::new(d1) as ArrayRef),
3870            ("d2", Arc::new(d2) as ArrayRef),
3871            ("d3", Arc::new(d3) as ArrayRef),
3872            ("d4", Arc::new(d4) as ArrayRef),
3873        ])
3874        .unwrap();
3875
3876        let mut buffer = Vec::with_capacity(1024);
3877        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
3878        writer.write(&batch).unwrap();
3879        writer.close().unwrap();
3880
3881        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
3882        let t1 = builder.parquet_schema().columns()[0].physical_type();
3883        assert_eq!(t1, PhysicalType::INT32);
3884        let t2 = builder.parquet_schema().columns()[1].physical_type();
3885        assert_eq!(t2, PhysicalType::INT64);
3886        let t3 = builder.parquet_schema().columns()[2].physical_type();
3887        assert_eq!(t3, PhysicalType::INT64);
3888        let t4 = builder.parquet_schema().columns()[3].physical_type();
3889        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
3890
3891        let mut reader = builder.build().unwrap();
3892        assert_eq!(batch.schema(), reader.schema());
3893
3894        let out = reader.next().unwrap().unwrap();
3895        assert_eq!(batch, out);
3896    }
3897
3898    #[test]
3899    fn test_decimal() {
3900        test_decimal_roundtrip::<Decimal128Type>();
3901        test_decimal_roundtrip::<Decimal256Type>();
3902    }
3903
3904    #[test]
3905    fn test_list_selection() {
3906        let schema = Arc::new(Schema::new(vec![Field::new_list(
3907            "list",
3908            Field::new("item", ArrowDataType::Utf8, true),
3909            false,
3910        )]));
3911        let mut buf = Vec::with_capacity(1024);
3912
3913        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
3914
3915        for i in 0..2 {
3916            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
3917            for j in 0..1024 {
3918                list_a_builder.values().append_value(format!("{i} {j}"));
3919                list_a_builder.append(true);
3920            }
3921            let batch =
3922                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
3923                    .unwrap();
3924            writer.write(&batch).unwrap();
3925        }
3926        let _metadata = writer.close().unwrap();
3927
3928        let buf = Bytes::from(buf);
3929        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
3930            .unwrap()
3931            .with_row_selection(RowSelection::from(vec![
3932                RowSelector::skip(100),
3933                RowSelector::select(924),
3934                RowSelector::skip(100),
3935                RowSelector::select(924),
3936            ]))
3937            .build()
3938            .unwrap();
3939
3940        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3941        let batch = concat_batches(&schema, &batches).unwrap();
3942
3943        assert_eq!(batch.num_rows(), 924 * 2);
3944        let list = batch.column(0).as_list::<i32>();
3945
3946        for w in list.value_offsets().windows(2) {
3947            assert_eq!(w[0] + 1, w[1])
3948        }
3949        let mut values = list.values().as_string::<i32>().iter();
3950
3951        for i in 0..2 {
3952            for j in 100..1024 {
3953                let expected = format!("{i} {j}");
3954                assert_eq!(values.next().unwrap().unwrap(), &expected);
3955            }
3956        }
3957    }
3958
3959    #[test]
3960    fn test_list_selection_fuzz() {
3961        let mut rng = thread_rng();
3962        let schema = Arc::new(Schema::new(vec![Field::new_list(
3963            "list",
3964            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
3965            true,
3966        )]));
3967        let mut buf = Vec::with_capacity(1024);
3968        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
3969
3970        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
3971
3972        for _ in 0..2048 {
3973            if rng.gen_bool(0.2) {
3974                list_a_builder.append(false);
3975                continue;
3976            }
3977
3978            let list_a_len = rng.gen_range(0..10);
3979            let list_b_builder = list_a_builder.values();
3980
3981            for _ in 0..list_a_len {
3982                if rng.gen_bool(0.2) {
3983                    list_b_builder.append(false);
3984                    continue;
3985                }
3986
3987                let list_b_len = rng.gen_range(0..10);
3988                let int_builder = list_b_builder.values();
3989                for _ in 0..list_b_len {
3990                    match rng.gen_bool(0.2) {
3991                        true => int_builder.append_null(),
3992                        false => int_builder.append_value(rng.gen()),
3993                    }
3994                }
3995                list_b_builder.append(true)
3996            }
3997            list_a_builder.append(true);
3998        }
3999
4000        let array = Arc::new(list_a_builder.finish());
4001        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4002
4003        writer.write(&batch).unwrap();
4004        let _metadata = writer.close().unwrap();
4005
4006        let buf = Bytes::from(buf);
4007
4008        let cases = [
4009            vec![
4010                RowSelector::skip(100),
4011                RowSelector::select(924),
4012                RowSelector::skip(100),
4013                RowSelector::select(924),
4014            ],
4015            vec![
4016                RowSelector::select(924),
4017                RowSelector::skip(100),
4018                RowSelector::select(924),
4019                RowSelector::skip(100),
4020            ],
4021            vec![
4022                RowSelector::skip(1023),
4023                RowSelector::select(1),
4024                RowSelector::skip(1023),
4025                RowSelector::select(1),
4026            ],
4027            vec![
4028                RowSelector::select(1),
4029                RowSelector::skip(1023),
4030                RowSelector::select(1),
4031                RowSelector::skip(1023),
4032            ],
4033        ];
4034
4035        for batch_size in [100, 1024, 2048] {
4036            for selection in &cases {
4037                let selection = RowSelection::from(selection.clone());
4038                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4039                    .unwrap()
4040                    .with_row_selection(selection.clone())
4041                    .with_batch_size(batch_size)
4042                    .build()
4043                    .unwrap();
4044
4045                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4046                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4047                assert_eq!(actual.num_rows(), selection.row_count());
4048
4049                let mut batch_offset = 0;
4050                let mut actual_offset = 0;
4051                for selector in selection.iter() {
4052                    if selector.skip {
4053                        batch_offset += selector.row_count;
4054                        continue;
4055                    }
4056
4057                    assert_eq!(
4058                        batch.slice(batch_offset, selector.row_count),
4059                        actual.slice(actual_offset, selector.row_count)
4060                    );
4061
4062                    batch_offset += selector.row_count;
4063                    actual_offset += selector.row_count;
4064                }
4065            }
4066        }
4067    }
4068}