parquet/arrow/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! High-level API for reading/writing Arrow
19//! [RecordBatch](arrow_array::RecordBatch)es and
20//! [Array](arrow_array::Array)s to/from Parquet Files.
21//!
22//! [Apache Arrow](http://arrow.apache.org/) is a cross-language development platform for
23//! in-memory data.
24//!
25//!# Example of writing Arrow record batch to Parquet file
26//!
27//!```rust
28//! # use arrow_array::{Int32Array, ArrayRef};
29//! # use arrow_array::RecordBatch;
30//! # use parquet::arrow::arrow_writer::ArrowWriter;
31//! # use parquet::file::properties::WriterProperties;
32//! # use tempfile::tempfile;
33//! # use std::sync::Arc;
34//! # use parquet::basic::Compression;
35//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
36//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
37//! let batch = RecordBatch::try_from_iter(vec![
38//! ("id", Arc::new(ids) as ArrayRef),
39//! ("val", Arc::new(vals) as ArrayRef),
40//! ]).unwrap();
41//!
42//! let file = tempfile().unwrap();
43//!
44//! // WriterProperties can be used to set Parquet file options
45//! let props = WriterProperties::builder()
46//! .set_compression(Compression::SNAPPY)
47//! .build();
48//!
49//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
50//!
51//! writer.write(&batch).expect("Writing batch");
52//!
53//! // writer must be closed to write footer
54//! writer.close().unwrap();
55//! ```
56//!
57//! # Example of reading parquet file into arrow record batch
58//!
59//! ```rust
60//! # use std::fs::File;
61//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
62//! # use std::sync::Arc;
63//! # use arrow_array::Int32Array;
64//! # use arrow::datatypes::{DataType, Field, Schema};
65//! # use arrow_array::RecordBatch;
66//! # use parquet::arrow::arrow_writer::ArrowWriter;
67//! #
68//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
69//! # let schema = Arc::new(Schema::new(vec![
70//! # Field::new("id", DataType::Int32, false),
71//! # ]));
72//! #
73//! # let file = File::create("data.parquet").unwrap();
74//! #
75//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
76//! # let batches = vec![batch];
77//! #
78//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
79//! #
80//! # for batch in batches {
81//! # writer.write(&batch).expect("Writing batch");
82//! # }
83//! # writer.close().unwrap();
84//! #
85//! let file = File::open("data.parquet").unwrap();
86//!
87//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
88//! println!("Converted arrow schema is: {}", builder.schema());
89//!
90//! let mut reader = builder.build().unwrap();
91//!
92//! let record_batch = reader.next().unwrap().unwrap();
93//!
94//! println!("Read {} records.", record_batch.num_rows());
95//! ```
96
97experimental!(mod array_reader);
98pub mod arrow_reader;
99pub mod arrow_writer;
100mod buffer;
101mod decoder;
102
103#[cfg(feature = "async")]
104pub mod async_reader;
105#[cfg(feature = "async")]
106pub mod async_writer;
107
108mod record_reader;
109experimental!(mod schema);
110
111pub use self::arrow_writer::ArrowWriter;
112#[cfg(feature = "async")]
113pub use self::async_reader::ParquetRecordBatchStreamBuilder;
114#[cfg(feature = "async")]
115pub use self::async_writer::AsyncArrowWriter;
116use crate::schema::types::SchemaDescriptor;
117use arrow_schema::{FieldRef, Schema};
118
119pub use self::schema::{
120 arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
121 parquet_to_arrow_schema_by_columns, FieldLevels,
122};
123
124/// Schema metadata key used to store serialized Arrow IPC schema
125pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
126
127/// The value of this metadata key, if present on [`Field::metadata`], will be used
128/// to populate [`BasicTypeInfo::id`]
129///
130/// [`Field::metadata`]: arrow_schema::Field::metadata
131/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
132pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
133
134/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
135///
136/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
137/// or root column indices where:
138///
139/// * Root columns are the direct children of the root schema, enumerated in order
140/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
141///
142/// For example, the schema
143///
144/// ```ignore
145/// message schema {
146/// REQUIRED boolean leaf_1;
147/// REQUIRED GROUP group {
148/// OPTIONAL int32 leaf_2;
149/// OPTIONAL int64 leaf_3;
150/// }
151/// }
152/// ```
153///
154/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
155///
156/// For non-nested schemas, i.e. those containing only primitive columns, the root
157/// and leaves are the same
158///
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct ProjectionMask {
161 /// If present a leaf column should be included if the value at
162 /// the corresponding index is true
163 ///
164 /// If `None`, include all columns
165 mask: Option<Vec<bool>>,
166}
167
168impl ProjectionMask {
169 /// Create a [`ProjectionMask`] which selects all columns
170 pub fn all() -> Self {
171 Self { mask: None }
172 }
173
174 /// Create a [`ProjectionMask`] which selects only the specified leaf columns
175 ///
176 /// Note: repeated or out of order indices will not impact the final mask
177 ///
178 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
179 pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
180 let mut mask = vec![false; schema.num_columns()];
181 for leaf_idx in indices {
182 mask[leaf_idx] = true;
183 }
184 Self { mask: Some(mask) }
185 }
186
187 /// Create a [`ProjectionMask`] which selects only the specified root columns
188 ///
189 /// Note: repeated or out of order indices will not impact the final mask
190 ///
191 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
192 pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
193 let num_root_columns = schema.root_schema().get_fields().len();
194 let mut root_mask = vec![false; num_root_columns];
195 for root_idx in indices {
196 root_mask[root_idx] = true;
197 }
198
199 let mask = (0..schema.num_columns())
200 .map(|leaf_idx| {
201 let root_idx = schema.get_column_root_idx(leaf_idx);
202 root_mask[root_idx]
203 })
204 .collect();
205
206 Self { mask: Some(mask) }
207 }
208
209 /// Returns true if the leaf column `leaf_idx` is included by the mask
210 pub fn leaf_included(&self, leaf_idx: usize) -> bool {
211 self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
212 }
213}
214
215/// Lookups up the parquet column by name
216///
217/// Returns the parquet column index and the corresponding arrow field
218pub fn parquet_column<'a>(
219 parquet_schema: &SchemaDescriptor,
220 arrow_schema: &'a Schema,
221 name: &str,
222) -> Option<(usize, &'a FieldRef)> {
223 let (root_idx, field) = arrow_schema.fields.find(name)?;
224 if field.data_type().is_nested() {
225 // Nested fields are not supported and require non-trivial logic
226 // to correctly walk the parquet schema accounting for the
227 // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
228 //
229 // For example a ListArray could correspond to anything from 1 to 3 levels
230 // in the parquet schema
231 return None;
232 }
233
234 // This could be made more efficient (#TBD)
235 let parquet_idx = (0..parquet_schema.columns().len())
236 .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
237 Some((parquet_idx, field))
238}
239
240#[cfg(test)]
241mod test {
242 use crate::arrow::ArrowWriter;
243 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
244 use crate::file::properties::{EnabledStatistics, WriterProperties};
245 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
246 use bytes::Bytes;
247 use std::sync::Arc;
248
249 #[test]
250 // Reproducer for https://github.com/apache/arrow-rs/issues/6464
251 fn test_metadata_read_write_partial_offset() {
252 let parquet_bytes = create_parquet_file();
253
254 // read the metadata from the file WITHOUT the page index structures
255 let original_metadata = ParquetMetaDataReader::new()
256 .parse_and_finish(&parquet_bytes)
257 .unwrap();
258
259 // this should error because the page indexes are not present, but have offsets specified
260 let metadata_bytes = metadata_to_bytes(&original_metadata);
261 let err = ParquetMetaDataReader::new()
262 .with_page_indexes(true) // there are no page indexes in the metadata
263 .parse_and_finish(&metadata_bytes)
264 .err()
265 .unwrap();
266 assert_eq!(
267 err.to_string(),
268 "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
269 );
270 }
271
272 #[test]
273 fn test_metadata_read_write_roundtrip() {
274 let parquet_bytes = create_parquet_file();
275
276 // read the metadata from the file
277 let original_metadata = ParquetMetaDataReader::new()
278 .parse_and_finish(&parquet_bytes)
279 .unwrap();
280
281 // read metadata back from the serialized bytes and ensure it is the same
282 let metadata_bytes = metadata_to_bytes(&original_metadata);
283 assert_ne!(
284 metadata_bytes.len(),
285 parquet_bytes.len(),
286 "metadata is subset of parquet"
287 );
288
289 let roundtrip_metadata = ParquetMetaDataReader::new()
290 .parse_and_finish(&metadata_bytes)
291 .unwrap();
292
293 assert_eq!(original_metadata, roundtrip_metadata);
294 }
295
296 #[test]
297 fn test_metadata_read_write_roundtrip_page_index() {
298 let parquet_bytes = create_parquet_file();
299
300 // read the metadata from the file including the page index structures
301 // (which are stored elsewhere in the footer)
302 let original_metadata = ParquetMetaDataReader::new()
303 .with_page_indexes(true)
304 .parse_and_finish(&parquet_bytes)
305 .unwrap();
306
307 // read metadata back from the serialized bytes and ensure it is the same
308 let metadata_bytes = metadata_to_bytes(&original_metadata);
309 let roundtrip_metadata = ParquetMetaDataReader::new()
310 .with_page_indexes(true)
311 .parse_and_finish(&metadata_bytes)
312 .unwrap();
313
314 // Need to normalize the metadata first to remove offsets in data
315 let original_metadata = normalize_locations(original_metadata);
316 let roundtrip_metadata = normalize_locations(roundtrip_metadata);
317 assert_eq!(
318 format!("{original_metadata:#?}"),
319 format!("{roundtrip_metadata:#?}")
320 );
321 assert_eq!(original_metadata, roundtrip_metadata);
322 }
323
324 /// Sets the page index offset locations in the metadata to `None`
325 ///
326 /// This is because the offsets are used to find the relative location of the index
327 /// structures, and thus differ depending on how the structures are stored.
328 fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
329 let mut metadata_builder = metadata.into_builder();
330 for rg in metadata_builder.take_row_groups() {
331 let mut rg_builder = rg.into_builder();
332 for col in rg_builder.take_columns() {
333 rg_builder = rg_builder.add_column_metadata(
334 col.into_builder()
335 .set_offset_index_offset(None)
336 .set_index_page_offset(None)
337 .set_column_index_offset(None)
338 .build()
339 .unwrap(),
340 );
341 }
342 let rg = rg_builder.build().unwrap();
343 metadata_builder = metadata_builder.add_row_group(rg);
344 }
345 metadata_builder.build()
346 }
347
348 /// Write a parquet filed into an in memory buffer
349 fn create_parquet_file() -> Bytes {
350 let mut buf = vec![];
351 let data = vec![100, 200, 201, 300, 102, 33];
352 let array: ArrayRef = Arc::new(Int32Array::from(data));
353 let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
354 let props = WriterProperties::builder()
355 .set_statistics_enabled(EnabledStatistics::Page)
356 .build();
357
358 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
359 writer.write(&batch).unwrap();
360 writer.finish().unwrap();
361 drop(writer);
362
363 Bytes::from(buf)
364 }
365
366 /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
367 fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
368 let mut buf = vec![];
369 ParquetMetaDataWriter::new(&mut buf, metadata)
370 .finish()
371 .unwrap();
372 Bytes::from(buf)
373 }
374}