1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718//! Logic for reading into arrow arrays
1920use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
2526use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
3132mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod empty_array;
37mod fixed_len_byte_array;
38mod fixed_size_list_array;
39mod list_array;
40mod map_array;
41mod null_array;
42mod primitive_array;
43mod struct_array;
4445#[cfg(test)]
46mod test_util;
4748pub use builder::build_array_reader;
49pub use byte_array::make_byte_array_reader;
50pub use byte_array_dictionary::make_byte_array_dictionary_reader;
51#[allow(unused_imports)] // Only used for benchmarks
52pub use byte_view_array::make_byte_view_array_reader;
53#[allow(unused_imports)] // Only used for benchmarks
54pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
55pub use fixed_size_list_array::FixedSizeListArrayReader;
56pub use list_array::ListArrayReader;
57pub use map_array::MapArrayReader;
58pub use null_array::NullArrayReader;
59pub use primitive_array::PrimitiveArrayReader;
60pub use struct_array::StructArrayReader;
6162/// Array reader reads parquet data into arrow array.
63pub trait ArrayReader: Send {
64// TODO: this function is never used, and the trait is not public. Perhaps this should be
65 // removed.
66#[allow(dead_code)]
67fn as_any(&self) -> &dyn Any;
6869/// Returns the arrow type of this array reader.
70fn get_data_type(&self) -> &ArrowType;
7172/// Reads at most `batch_size` records into an arrow array and return it.
73#[cfg(any(feature = "experimental", test))]
74fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
75self.read_records(batch_size)?;
76self.consume_batch()
77 }
7879/// Reads at most `batch_size` records' bytes into buffer
80 ///
81 /// Returns the number of records read, which can be less than `batch_size` if
82 /// pages is exhausted.
83fn read_records(&mut self, batch_size: usize) -> Result<usize>;
8485/// Consume all currently stored buffer data
86 /// into an arrow array and return it.
87fn consume_batch(&mut self) -> Result<ArrayRef>;
8889/// Skips over `num_records` records, returning the number of rows skipped
90fn skip_records(&mut self, num_records: usize) -> Result<usize>;
9192/// If this array has a non-zero definition level, i.e. has a nullable parent
93 /// array, returns the definition levels of data from the last call of `next_batch`
94 ///
95 /// Otherwise returns None
96 ///
97 /// This is used by parent [`ArrayReader`] to compute their null bitmaps
98fn get_def_levels(&self) -> Option<&[i16]>;
99100/// If this array has a non-zero repetition level, i.e. has a repeated parent
101 /// array, returns the repetition levels of data from the last call of `next_batch`
102 ///
103 /// Otherwise returns None
104 ///
105 /// This is used by parent [`ArrayReader`] to compute their array offsets
106fn get_rep_levels(&self) -> Option<&[i16]>;
107}
108109/// A collection of row groups
110pub trait RowGroups {
111/// Get the number of rows in this collection
112fn num_rows(&self) -> usize;
113114/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
115fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
116}
117118impl RowGroups for Arc<dyn FileReader> {
119fn num_rows(&self) -> usize {
120self.metadata().file_metadata().num_rows() as usize
121 }
122123fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
124let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
125Ok(Box::new(iterator))
126 }
127}
128129/// Uses `record_reader` to read up to `batch_size` records from `pages`
130///
131/// Returns the number of records read, which can be less than `batch_size` if
132/// pages is exhausted.
133fn read_records<V, CV>(
134 record_reader: &mut GenericRecordReader<V, CV>,
135 pages: &mut dyn PageIterator,
136 batch_size: usize,
137) -> Result<usize>
138where
139V: ValuesBuffer,
140 CV: ColumnValueDecoder<Buffer = V>,
141{
142let mut records_read = 0usize;
143while records_read < batch_size {
144let records_to_read = batch_size - records_read;
145146let records_read_once = record_reader.read_records(records_to_read)?;
147 records_read += records_read_once;
148149// Record reader exhausted
150if records_read_once < records_to_read {
151if let Some(page_reader) = pages.next() {
152// Read from new page reader (i.e. column chunk)
153record_reader.set_page_reader(page_reader?)?;
154 } else {
155// Page reader also exhausted
156break;
157 }
158 }
159 }
160Ok(records_read)
161}
162163/// Uses `record_reader` to skip up to `batch_size` records from `pages`
164///
165/// Returns the number of records skipped, which can be less than `batch_size` if
166/// pages is exhausted
167fn skip_records<V, CV>(
168 record_reader: &mut GenericRecordReader<V, CV>,
169 pages: &mut dyn PageIterator,
170 batch_size: usize,
171) -> Result<usize>
172where
173V: ValuesBuffer,
174 CV: ColumnValueDecoder<Buffer = V>,
175{
176let mut records_skipped = 0usize;
177while records_skipped < batch_size {
178let records_to_read = batch_size - records_skipped;
179180let records_skipped_once = record_reader.skip_records(records_to_read)?;
181 records_skipped += records_skipped_once;
182183// Record reader exhausted
184if records_skipped_once < records_to_read {
185if let Some(page_reader) = pages.next() {
186// Read from new page reader (i.e. column chunk)
187record_reader.set_page_reader(page_reader?)?;
188 } else {
189// Page reader also exhausted
190break;
191 }
192 }
193 }
194Ok(records_skipped)
195}