parquet/arrow/array_reader/
list_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::ArrayReader;
19use crate::errors::ParquetError;
20use crate::errors::Result;
21use arrow_array::{
22    Array, ArrayRef, GenericListArray, OffsetSizeTrait, builder::BooleanBufferBuilder,
23    new_empty_array,
24};
25use arrow_buffer::Buffer;
26use arrow_buffer::ToByteSlice;
27use arrow_data::{ArrayData, transform::MutableArrayData};
28use arrow_schema::DataType as ArrowType;
29use std::any::Any;
30use std::cmp::Ordering;
31use std::marker::PhantomData;
32use std::sync::Arc;
33
34/// Implementation of list array reader.
35pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
36    item_reader: Box<dyn ArrayReader>,
37    data_type: ArrowType,
38    /// The definition level at which this list is not null
39    def_level: i16,
40    /// The repetition level that corresponds to a new value in this array
41    rep_level: i16,
42    /// If this list is nullable
43    nullable: bool,
44    _marker: PhantomData<OffsetSize>,
45}
46
47impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
48    /// Construct list array reader.
49    pub fn new(
50        item_reader: Box<dyn ArrayReader>,
51        data_type: ArrowType,
52        def_level: i16,
53        rep_level: i16,
54        nullable: bool,
55    ) -> Self {
56        Self {
57            item_reader,
58            data_type,
59            def_level,
60            rep_level,
61            nullable,
62            _marker: PhantomData,
63        }
64    }
65}
66
67/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported.
68impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    /// Returns data type.
74    /// This must be a List.
75    fn get_data_type(&self) -> &ArrowType {
76        &self.data_type
77    }
78
79    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
80        let size = self.item_reader.read_records(batch_size)?;
81        Ok(size)
82    }
83
84    fn consume_batch(&mut self) -> Result<ArrayRef> {
85        let next_batch_array = self.item_reader.consume_batch()?;
86        if next_batch_array.is_empty() {
87            return Ok(new_empty_array(&self.data_type));
88        }
89
90        let def_levels = self
91            .item_reader
92            .get_def_levels()
93            .ok_or_else(|| general_err!("item_reader def levels are None."))?;
94
95        let rep_levels = self
96            .item_reader
97            .get_rep_levels()
98            .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
99
100        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
101            return Err(general_err!(
102                "offset of {} would overflow list array",
103                next_batch_array.len()
104            ));
105        }
106
107        if !rep_levels.is_empty() && rep_levels[0] != 0 {
108            // This implies either the source data was invalid, or the leaf column
109            // reader did not correctly delimit semantic records
110            return Err(general_err!("first repetition level of batch must be 0"));
111        }
112
113        // A non-nullable list has a single definition level indicating if the list is empty
114        //
115        // A nullable list has two definition levels associated with it:
116        //
117        // The first identifies if the list is null
118        // The second identifies if the list is empty
119        //
120        // The child data returned above is padded with a value for each not-fully defined level.
121        // Therefore null and empty lists will correspond to a value in the child array.
122        //
123        // Whilst nulls may have a non-zero slice in the offsets array, empty lists must
124        // be of zero length. As a result we MUST filter out values corresponding to empty
125        // lists, and for consistency we do the same for nulls.
126
127        // The output offsets for the computed ListArray
128        let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);
129
130        // The validity mask of the computed ListArray if nullable
131        let mut validity = self
132            .nullable
133            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
134
135        // The offset into the filtered child data of the current level being considered
136        let mut cur_offset = 0;
137
138        // Identifies the start of a run of values to copy from the source child data
139        let mut filter_start = None;
140
141        // The number of child values skipped due to empty lists or nulls
142        let mut skipped = 0;
143
144        // Builder used to construct the filtered child data, skipping empty lists and nulls
145        let data = next_batch_array.to_data();
146        let mut child_data_builder =
147            MutableArrayData::new(vec![&data], false, next_batch_array.len());
148
149        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
150            match r.cmp(&self.rep_level) {
151                Ordering::Greater => {
152                    // Repetition level greater than current => already handled by inner array
153                    if *d < self.def_level {
154                        return Err(general_err!(
155                            "Encountered repetition level too large for definition level"
156                        ));
157                    }
158                }
159                Ordering::Equal => {
160                    // New value in the current list
161                    cur_offset += 1;
162                }
163                Ordering::Less => {
164                    // Create new array slice
165                    // Already checked that this cannot overflow
166                    list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
167
168                    if *d >= self.def_level {
169                        // Fully defined value
170
171                        // Record current offset if it is None
172                        filter_start.get_or_insert(cur_offset + skipped);
173
174                        cur_offset += 1;
175
176                        if let Some(validity) = validity.as_mut() {
177                            validity.append(true)
178                        }
179                    } else {
180                        // Flush the current slice of child values if any
181                        if let Some(start) = filter_start.take() {
182                            child_data_builder.extend(0, start, cur_offset + skipped);
183                        }
184
185                        if let Some(validity) = validity.as_mut() {
186                            // Valid if empty list
187                            validity.append(*d + 1 == self.def_level)
188                        }
189
190                        skipped += 1;
191                    }
192                }
193            }
194            Ok(())
195        })?;
196
197        list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
198
199        let child_data = if skipped == 0 {
200            // No filtered values - can reuse original array
201            next_batch_array.to_data()
202        } else {
203            // One or more filtered values - must build new array
204            if let Some(start) = filter_start.take() {
205                child_data_builder.extend(0, start, cur_offset + skipped)
206            }
207
208            child_data_builder.freeze()
209        };
210
211        if cur_offset != child_data.len() {
212            return Err(general_err!("Failed to reconstruct list from level data"));
213        }
214
215        let value_offsets = Buffer::from(list_offsets.to_byte_slice());
216
217        let mut data_builder = ArrayData::builder(self.get_data_type().clone())
218            .len(list_offsets.len() - 1)
219            .add_buffer(value_offsets)
220            .add_child_data(child_data);
221
222        if let Some(builder) = validity {
223            assert_eq!(builder.len(), list_offsets.len() - 1);
224            data_builder = data_builder.null_bit_buffer(Some(builder.into()))
225        }
226
227        let list_data = unsafe { data_builder.build_unchecked() };
228
229        let result_array = GenericListArray::<OffsetSize>::from(list_data);
230        Ok(Arc::new(result_array))
231    }
232
233    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
234        self.item_reader.skip_records(num_records)
235    }
236
237    fn get_def_levels(&self) -> Option<&[i16]> {
238        self.item_reader.get_def_levels()
239    }
240
241    fn get_rep_levels(&self) -> Option<&[i16]> {
242        self.item_reader.get_rep_levels()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::arrow::array_reader::ArrayReaderBuilder;
250    use crate::arrow::array_reader::list_array::ListArrayReader;
251    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
252    use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
253    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
254    use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema};
255    use crate::file::properties::WriterProperties;
256    use crate::file::reader::{FileReader, SerializedFileReader};
257    use crate::schema::parser::parse_message_type;
258    use crate::schema::types::SchemaDescriptor;
259    use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type};
260    use arrow_array::{Array, PrimitiveArray};
261    use arrow_data::ArrayDataBuilder;
262    use arrow_schema::Fields;
263    use std::sync::Arc;
264
265    fn list_type<OffsetSize: OffsetSizeTrait>(
266        data_type: ArrowType,
267        item_nullable: bool,
268    ) -> ArrowType {
269        let field = Arc::new(Field::new_list_field(data_type, item_nullable));
270        GenericListArray::<OffsetSize>::DATA_TYPE_CONSTRUCTOR(field)
271    }
272
273    fn downcast<OffsetSize: OffsetSizeTrait>(array: &ArrayRef) -> &'_ GenericListArray<OffsetSize> {
274        array
275            .as_any()
276            .downcast_ref::<GenericListArray<OffsetSize>>()
277            .unwrap()
278    }
279
280    fn to_offsets<OffsetSize: OffsetSizeTrait>(values: Vec<usize>) -> Buffer {
281        Buffer::from_iter(
282            values
283                .into_iter()
284                .map(|x| OffsetSize::from_usize(x).unwrap()),
285        )
286    }
287
288    fn test_nested_list<OffsetSize: OffsetSizeTrait>() {
289        // 3 lists, with first and third nullable
290        // [
291        //     [
292        //         [[1, null], null, [4], []],
293        //         [],
294        //         [[7]],
295        //         [[]],
296        //         [[1, 2, 3], [4, null, 6], null]
297        //     ],
298        //     null,
299        //     [],
300        //     [[[11]]]
301        // ]
302
303        let l3_item_type = ArrowType::Int32;
304        let l3_type = list_type::<OffsetSize>(l3_item_type, true);
305
306        let l2_item_type = l3_type.clone();
307        let l2_type = list_type::<OffsetSize>(l2_item_type, true);
308
309        let l1_item_type = l2_type.clone();
310        let l1_type = list_type::<OffsetSize>(l1_item_type, false);
311
312        let leaf = PrimitiveArray::<Int32Type>::from_iter(vec![
313            Some(1),
314            None,
315            Some(4),
316            Some(7),
317            Some(1),
318            Some(2),
319            Some(3),
320            Some(4),
321            None,
322            Some(6),
323            Some(11),
324        ]);
325
326        // [[1, null], null, [4], [], [7], [], [1, 2, 3], [4, null, 6], null, [11]]
327        let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]);
328        let l3 = ArrayDataBuilder::new(l3_type.clone())
329            .len(10)
330            .add_buffer(offsets)
331            .add_child_data(leaf.into_data())
332            .null_bit_buffer(Some(Buffer::from([0b11111101, 0b00000010])))
333            .build()
334            .unwrap();
335
336        // [[[1, null], null, [4], []], [], [[7]], [[]], [[1, 2, 3], [4, null, 6], null], [[11]]]
337        let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 9, 10]);
338        let l2 = ArrayDataBuilder::new(l2_type.clone())
339            .len(6)
340            .add_buffer(offsets)
341            .add_child_data(l3)
342            .build()
343            .unwrap();
344
345        let offsets = to_offsets::<OffsetSize>(vec![0, 5, 5, 5, 6]);
346        let l1 = ArrayDataBuilder::new(l1_type.clone())
347            .len(4)
348            .add_buffer(offsets)
349            .add_child_data(l2)
350            .null_bit_buffer(Some(Buffer::from([0b00001101])))
351            .build()
352            .unwrap();
353
354        let expected = GenericListArray::<OffsetSize>::from(l1);
355
356        let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
357            Some(1),
358            None,
359            None,
360            Some(4),
361            None,
362            None,
363            Some(7),
364            None,
365            Some(1),
366            Some(2),
367            Some(3),
368            Some(4),
369            None,
370            Some(6),
371            None,
372            None,
373            None,
374            Some(11),
375        ]));
376
377        let item_array_reader = InMemoryArrayReader::new(
378            ArrowType::Int32,
379            values,
380            Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]),
381            Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]),
382        );
383
384        let l3 =
385            ListArrayReader::<OffsetSize>::new(Box::new(item_array_reader), l3_type, 5, 3, true);
386
387        let l2 = ListArrayReader::<OffsetSize>::new(Box::new(l3), l2_type, 3, 2, false);
388
389        let mut l1 = ListArrayReader::<OffsetSize>::new(Box::new(l2), l1_type, 2, 1, true);
390
391        let expected_1 = expected.slice(0, 2);
392        let expected_2 = expected.slice(2, 2);
393
394        let actual = l1.next_batch(2).unwrap();
395        assert_eq!(actual.as_ref(), &expected_1);
396
397        let actual = l1.next_batch(1024).unwrap();
398        assert_eq!(actual.as_ref(), &expected_2);
399    }
400
401    fn test_required_list<OffsetSize: OffsetSizeTrait>() {
402        // [[1, null, 2], [], [3, 4], [], [], [null, 1]]
403        let expected =
404            GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
405                Some(vec![Some(1), None, Some(2)]),
406                Some(vec![]),
407                Some(vec![Some(3), Some(4)]),
408                Some(vec![]),
409                Some(vec![]),
410                Some(vec![None, Some(1)]),
411            ]);
412
413        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
414            Some(1),
415            None,
416            Some(2),
417            None,
418            Some(3),
419            Some(4),
420            None,
421            None,
422            None,
423            Some(1),
424        ]));
425
426        let item_array_reader = InMemoryArrayReader::new(
427            ArrowType::Int32,
428            array,
429            Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
430            Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
431        );
432
433        let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
434            Box::new(item_array_reader),
435            list_type::<OffsetSize>(ArrowType::Int32, true),
436            1,
437            1,
438            false,
439        );
440
441        let actual = list_array_reader.next_batch(1024).unwrap();
442        let actual = downcast::<OffsetSize>(&actual);
443
444        assert_eq!(&expected, actual)
445    }
446
447    fn test_nullable_list<OffsetSize: OffsetSizeTrait>() {
448        // [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]]
449        let expected =
450            GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
451                Some(vec![Some(1), None, Some(2)]),
452                None,
453                Some(vec![]),
454                Some(vec![Some(3), Some(4)]),
455                Some(vec![]),
456                Some(vec![]),
457                None,
458                Some(vec![]),
459                Some(vec![None, Some(1)]),
460            ]);
461
462        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
463            Some(1),
464            None,
465            Some(2),
466            None,
467            None,
468            Some(3),
469            Some(4),
470            None,
471            None,
472            None,
473            None,
474            None,
475            Some(1),
476        ]));
477
478        let item_array_reader = InMemoryArrayReader::new(
479            ArrowType::Int32,
480            array,
481            Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
482            Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
483        );
484
485        let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
486            Box::new(item_array_reader),
487            list_type::<OffsetSize>(ArrowType::Int32, true),
488            2,
489            1,
490            true,
491        );
492
493        let actual = list_array_reader.next_batch(1024).unwrap();
494        let actual = downcast::<OffsetSize>(&actual);
495
496        assert_eq!(&expected, actual)
497    }
498
499    fn test_list_array<OffsetSize: OffsetSizeTrait>() {
500        test_nullable_list::<OffsetSize>();
501        test_required_list::<OffsetSize>();
502        test_nested_list::<OffsetSize>();
503    }
504
505    #[test]
506    fn test_list_array_reader() {
507        test_list_array::<i32>();
508    }
509
510    #[test]
511    fn test_large_list_array_reader() {
512        test_list_array::<i64>()
513    }
514
515    #[test]
516    fn test_nested_lists() {
517        // Construct column schema
518        let message_type = "
519        message table {
520            REPEATED group table_info {
521                REQUIRED BYTE_ARRAY name;
522                REPEATED group cols {
523                    REQUIRED BYTE_ARRAY name;
524                    REQUIRED INT32 type;
525                    OPTIONAL INT32 length;
526                }
527                REPEATED group tags {
528                    REQUIRED BYTE_ARRAY name;
529                    REQUIRED INT32 type;
530                    OPTIONAL INT32 length;
531                }
532            }
533        }
534        ";
535
536        let schema = parse_message_type(message_type)
537            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
538            .unwrap();
539
540        let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
541
542        let file = tempfile::tempfile().unwrap();
543        let props = WriterProperties::builder()
544            .set_max_row_group_size(200)
545            .build();
546
547        let writer = ArrowWriter::try_new(
548            file.try_clone().unwrap(),
549            Arc::new(arrow_schema),
550            Some(props),
551        )
552        .unwrap();
553        writer.close().unwrap();
554
555        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
556
557        let file_metadata = file_reader.metadata().file_metadata();
558        let schema = file_metadata.schema_descr();
559        let mask = ProjectionMask::leaves(schema, vec![0]);
560        let (_, fields) = parquet_to_arrow_schema_and_fields(
561            schema,
562            ProjectionMask::all(),
563            file_metadata.key_value_metadata(),
564            &[],
565        )
566        .unwrap();
567
568        let metrics = ArrowReaderMetrics::disabled();
569        let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
570            .build_array_reader(fields.as_ref(), &mask)
571            .unwrap();
572
573        let batch = array_reader.next_batch(100).unwrap();
574        assert_eq!(batch.data_type(), array_reader.get_data_type());
575        assert_eq!(
576            batch.data_type(),
577            &ArrowType::Struct(Fields::from(vec![Field::new(
578                "table_info",
579                ArrowType::List(Arc::new(Field::new(
580                    "table_info",
581                    ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)].into()),
582                    false
583                ))),
584                false
585            )]))
586        );
587        assert_eq!(batch.len(), 0);
588    }
589}