parquet/arrow/array_reader/
map_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
19use crate::errors::Result;
20use arrow_array::{Array, ArrayRef, MapArray};
21use arrow_schema::DataType as ArrowType;
22use std::any::Any;
23use std::sync::Arc;
24
25/// Implementation of a map array reader.
26pub struct MapArrayReader {
27    data_type: ArrowType,
28    reader: ListArrayReader<i32>,
29}
30
31impl MapArrayReader {
32    #[allow(rustdoc::private_intra_doc_links)]
33    /// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and `nullable`
34    /// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
35    pub fn new(
36        key_reader: Box<dyn ArrayReader>,
37        value_reader: Box<dyn ArrayReader>,
38        data_type: ArrowType,
39        def_level: i16,
40        rep_level: i16,
41        nullable: bool,
42    ) -> Self {
43        let struct_def_level = match nullable {
44            true => def_level + 2,
45            false => def_level + 1,
46        };
47        let struct_rep_level = rep_level + 1;
48
49        let element = match &data_type {
50            ArrowType::Map(element, _) => match element.data_type() {
51                ArrowType::Struct(fields) if fields.len() == 2 => {
52                    // Parquet cannot represent nullability at this level (#1697)
53                    // and so encountering nullability here indicates some manner
54                    // of schema inconsistency / inference bug
55                    assert!(!element.is_nullable(), "map struct cannot be nullable");
56                    element
57                }
58                _ => unreachable!("expected struct with two fields"),
59            },
60            _ => unreachable!("expected map type"),
61        };
62
63        let struct_reader = StructArrayReader::new(
64            element.data_type().clone(),
65            vec![key_reader, value_reader],
66            struct_def_level,
67            struct_rep_level,
68            false,
69        );
70
71        let reader = ListArrayReader::new(
72            Box::new(struct_reader),
73            ArrowType::List(element.clone()),
74            def_level,
75            rep_level,
76            nullable,
77        );
78
79        Self { data_type, reader }
80    }
81}
82
83impl ArrayReader for MapArrayReader {
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87
88    fn get_data_type(&self) -> &ArrowType {
89        &self.data_type
90    }
91
92    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
93        self.reader.read_records(batch_size)
94    }
95
96    fn consume_batch(&mut self) -> Result<ArrayRef> {
97        // A MapArray is just a ListArray with a StructArray child
98        // we can therefore just alter the ArrayData
99        let array = self.reader.consume_batch().unwrap();
100        let data = array.to_data();
101        let builder = data.into_builder().data_type(self.data_type.clone());
102
103        // SAFETY - we can assume that ListArrayReader produces valid ListArray
104        // of the expected type, and as such its output can be reinterpreted as
105        // a MapArray without validation
106        Ok(Arc::new(MapArray::from(unsafe {
107            builder.build_unchecked()
108        })))
109    }
110
111    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
112        self.reader.skip_records(num_records)
113    }
114
115    fn get_def_levels(&self) -> Option<&[i16]> {
116        self.reader.get_def_levels()
117    }
118
119    fn get_rep_levels(&self) -> Option<&[i16]> {
120        self.reader.get_rep_levels()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use crate::arrow::ArrowWriter;
128    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
129    use arrow::datatypes::{Field, Int32Type, Schema};
130    use arrow_array::RecordBatch;
131    use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
132    use arrow_array::cast::*;
133    use arrow_schema::Fields;
134    use bytes::Bytes;
135
136    #[test]
137    // This test writes a parquet file with the following data:
138    // +--------------------------------------------------------+
139    // |map                                                     |
140    // +--------------------------------------------------------+
141    // |null                                                    |
142    // |null                                                    |
143    // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
144    // +--------------------------------------------------------+
145    //
146    // It then attempts to read the data back and checks that the third record
147    // contains the expected values.
148    fn read_map_array_column() {
149        // Schema for single map of string to int32
150        let schema = Schema::new(vec![Field::new(
151            "map",
152            ArrowType::Map(
153                Arc::new(Field::new(
154                    "entries",
155                    ArrowType::Struct(Fields::from(vec![
156                        Field::new("keys", ArrowType::Utf8, false),
157                        Field::new("values", ArrowType::Int32, true),
158                    ])),
159                    false,
160                )),
161                false, // Map field not sorted
162            ),
163            true,
164        )]);
165
166        // Create builders for map
167        let string_builder = StringBuilder::new();
168        let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new();
169        let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);
170
171        // Add two null records and one record with five entries
172        map_builder.append(false).expect("adding null map entry");
173        map_builder.append(false).expect("adding null map entry");
174        map_builder.keys().append_value("three");
175        map_builder.keys().append_value("four");
176        map_builder.keys().append_value("five");
177        map_builder.keys().append_value("six");
178        map_builder.keys().append_value("seven");
179
180        map_builder.values().append_value(3);
181        map_builder.values().append_value(4);
182        map_builder.values().append_value(5);
183        map_builder.values().append_value(6);
184        map_builder.values().append_value(7);
185        map_builder.append(true).expect("adding map entry");
186
187        // Create record batch
188        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
189            .expect("create record batch");
190
191        // Write record batch to file
192        let mut buffer = Vec::with_capacity(1024);
193        let mut writer =
194            ArrowWriter::try_new(&mut buffer, batch.schema(), None).expect("creat file writer");
195        writer.write(&batch).expect("writing file");
196        writer.close().expect("close writer");
197
198        // Read file
199        let reader = Bytes::from(buffer);
200        let record_batch_reader = ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
201        for maybe_record_batch in record_batch_reader {
202            let record_batch = maybe_record_batch.expect("Getting current batch");
203            let col = record_batch.column(0);
204            assert!(col.is_null(0));
205            assert!(col.is_null(1));
206            let map_entry = as_map_array(col).value(2);
207            let struct_col = as_struct_array(&map_entry);
208            let key_col = as_string_array(struct_col.column(0)); // Key column
209            assert_eq!(key_col.value(0), "three");
210            assert_eq!(key_col.value(1), "four");
211            assert_eq!(key_col.value(2), "five");
212            assert_eq!(key_col.value(3), "six");
213            assert_eq!(key_col.value(4), "seven");
214        }
215    }
216}