parquet/arrow/array_reader/
map_array.rs1use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
19use crate::errors::Result;
20use arrow_array::{Array, ArrayRef, MapArray};
21use arrow_schema::DataType as ArrowType;
22use std::any::Any;
23use std::sync::Arc;
24
25pub struct MapArrayReader {
27 data_type: ArrowType,
28 reader: ListArrayReader<i32>,
29}
30
31impl MapArrayReader {
32 #[allow(rustdoc::private_intra_doc_links)]
33 pub fn new(
36 key_reader: Box<dyn ArrayReader>,
37 value_reader: Box<dyn ArrayReader>,
38 data_type: ArrowType,
39 def_level: i16,
40 rep_level: i16,
41 nullable: bool,
42 ) -> Self {
43 let struct_def_level = match nullable {
44 true => def_level + 2,
45 false => def_level + 1,
46 };
47 let struct_rep_level = rep_level + 1;
48
49 let element = match &data_type {
50 ArrowType::Map(element, _) => match element.data_type() {
51 ArrowType::Struct(fields) if fields.len() == 2 => {
52 assert!(!element.is_nullable(), "map struct cannot be nullable");
56 element
57 }
58 _ => unreachable!("expected struct with two fields"),
59 },
60 _ => unreachable!("expected map type"),
61 };
62
63 let struct_reader = StructArrayReader::new(
64 element.data_type().clone(),
65 vec![key_reader, value_reader],
66 struct_def_level,
67 struct_rep_level,
68 false,
69 );
70
71 let reader = ListArrayReader::new(
72 Box::new(struct_reader),
73 ArrowType::List(element.clone()),
74 def_level,
75 rep_level,
76 nullable,
77 );
78
79 Self { data_type, reader }
80 }
81}
82
83impl ArrayReader for MapArrayReader {
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87
88 fn get_data_type(&self) -> &ArrowType {
89 &self.data_type
90 }
91
92 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
93 self.reader.read_records(batch_size)
94 }
95
96 fn consume_batch(&mut self) -> Result<ArrayRef> {
97 let array = self.reader.consume_batch().unwrap();
100 let data = array.to_data();
101 let builder = data.into_builder().data_type(self.data_type.clone());
102
103 Ok(Arc::new(MapArray::from(unsafe {
107 builder.build_unchecked()
108 })))
109 }
110
111 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
112 self.reader.skip_records(num_records)
113 }
114
115 fn get_def_levels(&self) -> Option<&[i16]> {
116 self.reader.get_def_levels()
117 }
118
119 fn get_rep_levels(&self) -> Option<&[i16]> {
120 self.reader.get_rep_levels()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use crate::arrow::ArrowWriter;
128 use crate::arrow::arrow_reader::ParquetRecordBatchReader;
129 use arrow::datatypes::{Field, Int32Type, Schema};
130 use arrow_array::RecordBatch;
131 use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
132 use arrow_array::cast::*;
133 use arrow_schema::Fields;
134 use bytes::Bytes;
135
136 #[test]
137 fn read_map_array_column() {
149 let schema = Schema::new(vec![Field::new(
151 "map",
152 ArrowType::Map(
153 Arc::new(Field::new(
154 "entries",
155 ArrowType::Struct(Fields::from(vec![
156 Field::new("keys", ArrowType::Utf8, false),
157 Field::new("values", ArrowType::Int32, true),
158 ])),
159 false,
160 )),
161 false, ),
163 true,
164 )]);
165
166 let string_builder = StringBuilder::new();
168 let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new();
169 let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);
170
171 map_builder.append(false).expect("adding null map entry");
173 map_builder.append(false).expect("adding null map entry");
174 map_builder.keys().append_value("three");
175 map_builder.keys().append_value("four");
176 map_builder.keys().append_value("five");
177 map_builder.keys().append_value("six");
178 map_builder.keys().append_value("seven");
179
180 map_builder.values().append_value(3);
181 map_builder.values().append_value(4);
182 map_builder.values().append_value(5);
183 map_builder.values().append_value(6);
184 map_builder.values().append_value(7);
185 map_builder.append(true).expect("adding map entry");
186
187 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
189 .expect("create record batch");
190
191 let mut buffer = Vec::with_capacity(1024);
193 let mut writer =
194 ArrowWriter::try_new(&mut buffer, batch.schema(), None).expect("creat file writer");
195 writer.write(&batch).expect("writing file");
196 writer.close().expect("close writer");
197
198 let reader = Bytes::from(buffer);
200 let record_batch_reader = ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
201 for maybe_record_batch in record_batch_reader {
202 let record_batch = maybe_record_batch.expect("Getting current batch");
203 let col = record_batch.column(0);
204 assert!(col.is_null(0));
205 assert!(col.is_null(1));
206 let map_entry = as_map_array(col).value(2);
207 let struct_col = as_struct_array(&map_entry);
208 let key_col = as_string_array(struct_col.column(0)); assert_eq!(key_col.value(0), "three");
210 assert_eq!(key_col.value(1), "four");
211 assert_eq!(key_col.value(2), "five");
212 assert_eq!(key_col.value(3), "six");
213 assert_eq!(key_col.value(4), "seven");
214 }
215 }
216}