parquet/arrow/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! High-level API for reading/writing Arrow
19//! [RecordBatch](arrow_array::RecordBatch)es and
20//! [Array](arrow_array::Array)s to/from Parquet Files.
21//!
22//! [Apache Arrow](http://arrow.apache.org/) is a cross-language development platform for
23//! in-memory data.
24//!
25//!# Example of writing Arrow record batch to Parquet file
26//!
27//!```rust
28//! # use arrow_array::{Int32Array, ArrayRef};
29//! # use arrow_array::RecordBatch;
30//! # use parquet::arrow::arrow_writer::ArrowWriter;
31//! # use parquet::file::properties::WriterProperties;
32//! # use tempfile::tempfile;
33//! # use std::sync::Arc;
34//! # use parquet::basic::Compression;
35//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
36//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
37//! let batch = RecordBatch::try_from_iter(vec![
38//!   ("id", Arc::new(ids) as ArrayRef),
39//!   ("val", Arc::new(vals) as ArrayRef),
40//! ]).unwrap();
41//!
42//! let file = tempfile().unwrap();
43//!
44//! // WriterProperties can be used to set Parquet file options
45//! let props = WriterProperties::builder()
46//!     .set_compression(Compression::SNAPPY)
47//!     .build();
48//!
49//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
50//!
51//! writer.write(&batch).expect("Writing batch");
52//!
53//! // writer must be closed to write footer
54//! writer.close().unwrap();
55//! ```
56//!
57//! # Example of reading parquet file into arrow record batch
58//!
59//! ```rust
60//! # use std::fs::File;
61//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
62//! # use std::sync::Arc;
63//! # use arrow_array::Int32Array;
64//! # use arrow::datatypes::{DataType, Field, Schema};
65//! # use arrow_array::RecordBatch;
66//! # use parquet::arrow::arrow_writer::ArrowWriter;
67//! #
68//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
69//! # let schema = Arc::new(Schema::new(vec![
70//! #     Field::new("id", DataType::Int32, false),
71//! # ]));
72//! #
73//! # let file = File::create("data.parquet").unwrap();
74//! #
75//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
76//! # let batches = vec![batch];
77//! #
78//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
79//! #
80//! # for batch in batches {
81//! #     writer.write(&batch).expect("Writing batch");
82//! # }
83//! # writer.close().unwrap();
84//! #
85//! let file = File::open("data.parquet").unwrap();
86//!
87//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
88//! println!("Converted arrow schema is: {}", builder.schema());
89//!
90//! let mut reader = builder.build().unwrap();
91//!
92//! let record_batch = reader.next().unwrap().unwrap();
93//!
94//! println!("Read {} records.", record_batch.num_rows());
95//! ```
96
97experimental!(mod array_reader);
98pub mod arrow_reader;
99pub mod arrow_writer;
100mod buffer;
101mod decoder;
102
103#[cfg(feature = "async")]
104pub mod async_reader;
105#[cfg(feature = "async")]
106pub mod async_writer;
107
108mod record_reader;
109experimental!(mod schema);
110
111pub use self::arrow_writer::ArrowWriter;
112#[cfg(feature = "async")]
113pub use self::async_reader::ParquetRecordBatchStreamBuilder;
114#[cfg(feature = "async")]
115pub use self::async_writer::AsyncArrowWriter;
116use crate::schema::types::SchemaDescriptor;
117use arrow_schema::{FieldRef, Schema};
118
119pub use self::schema::{
120    arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
121    parquet_to_arrow_schema_by_columns, FieldLevels,
122};
123
124/// Schema metadata key used to store serialized Arrow IPC schema
125pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
126
127/// The value of this metadata key, if present on [`Field::metadata`], will be used
128/// to populate [`BasicTypeInfo::id`]
129///
130/// [`Field::metadata`]: arrow_schema::Field::metadata
131/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
132pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
133
134/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
135///
136/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
137/// or root column indices where:
138///
139/// * Root columns are the direct children of the root schema, enumerated in order
140/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
141///
142/// For example, the schema
143///
144/// ```ignore
145/// message schema {
146///   REQUIRED boolean         leaf_1;
147///   REQUIRED GROUP group {
148///     OPTIONAL int32 leaf_2;
149///     OPTIONAL int64 leaf_3;
150///   }
151/// }
152/// ```
153///
154/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
155///
156/// For non-nested schemas, i.e. those containing only primitive columns, the root
157/// and leaves are the same
158///
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct ProjectionMask {
161    /// If present a leaf column should be included if the value at
162    /// the corresponding index is true
163    ///
164    /// If `None`, include all columns
165    mask: Option<Vec<bool>>,
166}
167
168impl ProjectionMask {
169    /// Create a [`ProjectionMask`] which selects all columns
170    pub fn all() -> Self {
171        Self { mask: None }
172    }
173
174    /// Create a [`ProjectionMask`] which selects only the specified leaf columns
175    ///
176    /// Note: repeated or out of order indices will not impact the final mask
177    ///
178    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
179    pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
180        let mut mask = vec![false; schema.num_columns()];
181        for leaf_idx in indices {
182            mask[leaf_idx] = true;
183        }
184        Self { mask: Some(mask) }
185    }
186
187    /// Create a [`ProjectionMask`] which selects only the specified root columns
188    ///
189    /// Note: repeated or out of order indices will not impact the final mask
190    ///
191    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
192    pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
193        let num_root_columns = schema.root_schema().get_fields().len();
194        let mut root_mask = vec![false; num_root_columns];
195        for root_idx in indices {
196            root_mask[root_idx] = true;
197        }
198
199        let mask = (0..schema.num_columns())
200            .map(|leaf_idx| {
201                let root_idx = schema.get_column_root_idx(leaf_idx);
202                root_mask[root_idx]
203            })
204            .collect();
205
206        Self { mask: Some(mask) }
207    }
208
209    /// Returns true if the leaf column `leaf_idx` is included by the mask
210    pub fn leaf_included(&self, leaf_idx: usize) -> bool {
211        self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
212    }
213}
214
215/// Lookups up the parquet column by name
216///
217/// Returns the parquet column index and the corresponding arrow field
218pub fn parquet_column<'a>(
219    parquet_schema: &SchemaDescriptor,
220    arrow_schema: &'a Schema,
221    name: &str,
222) -> Option<(usize, &'a FieldRef)> {
223    let (root_idx, field) = arrow_schema.fields.find(name)?;
224    if field.data_type().is_nested() {
225        // Nested fields are not supported and require non-trivial logic
226        // to correctly walk the parquet schema accounting for the
227        // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
228        //
229        // For example a ListArray could correspond to anything from 1 to 3 levels
230        // in the parquet schema
231        return None;
232    }
233
234    // This could be made more efficient (#TBD)
235    let parquet_idx = (0..parquet_schema.columns().len())
236        .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
237    Some((parquet_idx, field))
238}
239
240#[cfg(test)]
241mod test {
242    use crate::arrow::ArrowWriter;
243    use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
244    use crate::file::properties::{EnabledStatistics, WriterProperties};
245    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
246    use bytes::Bytes;
247    use std::sync::Arc;
248
249    #[test]
250    // Reproducer for https://github.com/apache/arrow-rs/issues/6464
251    fn test_metadata_read_write_partial_offset() {
252        let parquet_bytes = create_parquet_file();
253
254        // read the metadata from the file WITHOUT the page index structures
255        let original_metadata = ParquetMetaDataReader::new()
256            .parse_and_finish(&parquet_bytes)
257            .unwrap();
258
259        // this should error because the page indexes are not present, but have offsets specified
260        let metadata_bytes = metadata_to_bytes(&original_metadata);
261        let err = ParquetMetaDataReader::new()
262            .with_page_indexes(true) // there are no page indexes in the metadata
263            .parse_and_finish(&metadata_bytes)
264            .err()
265            .unwrap();
266        assert_eq!(
267            err.to_string(),
268            "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
269        );
270    }
271
272    #[test]
273    fn test_metadata_read_write_roundtrip() {
274        let parquet_bytes = create_parquet_file();
275
276        // read the metadata from the file
277        let original_metadata = ParquetMetaDataReader::new()
278            .parse_and_finish(&parquet_bytes)
279            .unwrap();
280
281        // read metadata back from the serialized bytes and ensure it is the same
282        let metadata_bytes = metadata_to_bytes(&original_metadata);
283        assert_ne!(
284            metadata_bytes.len(),
285            parquet_bytes.len(),
286            "metadata is subset of parquet"
287        );
288
289        let roundtrip_metadata = ParquetMetaDataReader::new()
290            .parse_and_finish(&metadata_bytes)
291            .unwrap();
292
293        assert_eq!(original_metadata, roundtrip_metadata);
294    }
295
296    #[test]
297    fn test_metadata_read_write_roundtrip_page_index() {
298        let parquet_bytes = create_parquet_file();
299
300        // read the metadata from the file including the page index structures
301        // (which are stored elsewhere in the footer)
302        let original_metadata = ParquetMetaDataReader::new()
303            .with_page_indexes(true)
304            .parse_and_finish(&parquet_bytes)
305            .unwrap();
306
307        // read metadata back from the serialized bytes and ensure it is the same
308        let metadata_bytes = metadata_to_bytes(&original_metadata);
309        let roundtrip_metadata = ParquetMetaDataReader::new()
310            .with_page_indexes(true)
311            .parse_and_finish(&metadata_bytes)
312            .unwrap();
313
314        // Need to normalize the metadata first to remove offsets in data
315        let original_metadata = normalize_locations(original_metadata);
316        let roundtrip_metadata = normalize_locations(roundtrip_metadata);
317        assert_eq!(
318            format!("{original_metadata:#?}"),
319            format!("{roundtrip_metadata:#?}")
320        );
321        assert_eq!(original_metadata, roundtrip_metadata);
322    }
323
324    /// Sets the page index offset locations in the metadata to `None`
325    ///
326    /// This is because the offsets are used to find the relative location of the index
327    /// structures, and thus differ depending on how the structures are stored.
328    fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
329        let mut metadata_builder = metadata.into_builder();
330        for rg in metadata_builder.take_row_groups() {
331            let mut rg_builder = rg.into_builder();
332            for col in rg_builder.take_columns() {
333                rg_builder = rg_builder.add_column_metadata(
334                    col.into_builder()
335                        .set_offset_index_offset(None)
336                        .set_index_page_offset(None)
337                        .set_column_index_offset(None)
338                        .build()
339                        .unwrap(),
340                );
341            }
342            let rg = rg_builder.build().unwrap();
343            metadata_builder = metadata_builder.add_row_group(rg);
344        }
345        metadata_builder.build()
346    }
347
348    /// Write a parquet filed into an in memory buffer
349    fn create_parquet_file() -> Bytes {
350        let mut buf = vec![];
351        let data = vec![100, 200, 201, 300, 102, 33];
352        let array: ArrayRef = Arc::new(Int32Array::from(data));
353        let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
354        let props = WriterProperties::builder()
355            .set_statistics_enabled(EnabledStatistics::Page)
356            .build();
357
358        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
359        writer.write(&batch).unwrap();
360        writer.finish().unwrap();
361        drop(writer);
362
363        Bytes::from(buf)
364    }
365
366    /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
367    fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
368        let mut buf = vec![];
369        ParquetMetaDataWriter::new(&mut buf, metadata)
370            .finish()
371            .unwrap();
372        Bytes::from(buf)
373    }
374}