1use crate::arrow::array_reader::ArrayReader;
19use crate::errors::ParquetError;
20use crate::errors::Result;
21use arrow_array::{
22 Array, ArrayRef, GenericListArray, OffsetSizeTrait, builder::BooleanBufferBuilder,
23 new_empty_array,
24};
25use arrow_buffer::Buffer;
26use arrow_buffer::ToByteSlice;
27use arrow_data::{ArrayData, transform::MutableArrayData};
28use arrow_schema::DataType as ArrowType;
29use std::any::Any;
30use std::cmp::Ordering;
31use std::marker::PhantomData;
32use std::sync::Arc;
33
34pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
36 item_reader: Box<dyn ArrayReader>,
37 data_type: ArrowType,
38 def_level: i16,
40 rep_level: i16,
42 nullable: bool,
44 _marker: PhantomData<OffsetSize>,
45}
46
47impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
48 pub fn new(
50 item_reader: Box<dyn ArrayReader>,
51 data_type: ArrowType,
52 def_level: i16,
53 rep_level: i16,
54 nullable: bool,
55 ) -> Self {
56 Self {
57 item_reader,
58 data_type,
59 def_level,
60 rep_level,
61 nullable,
62 _marker: PhantomData,
63 }
64 }
65}
66
67impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn get_data_type(&self) -> &ArrowType {
76 &self.data_type
77 }
78
79 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
80 let size = self.item_reader.read_records(batch_size)?;
81 Ok(size)
82 }
83
84 fn consume_batch(&mut self) -> Result<ArrayRef> {
85 let next_batch_array = self.item_reader.consume_batch()?;
86 if next_batch_array.is_empty() {
87 return Ok(new_empty_array(&self.data_type));
88 }
89
90 let def_levels = self
91 .item_reader
92 .get_def_levels()
93 .ok_or_else(|| general_err!("item_reader def levels are None."))?;
94
95 let rep_levels = self
96 .item_reader
97 .get_rep_levels()
98 .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
99
100 if OffsetSize::from_usize(next_batch_array.len()).is_none() {
101 return Err(general_err!(
102 "offset of {} would overflow list array",
103 next_batch_array.len()
104 ));
105 }
106
107 if !rep_levels.is_empty() && rep_levels[0] != 0 {
108 return Err(general_err!("first repetition level of batch must be 0"));
111 }
112
113 let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);
129
130 let mut validity = self
132 .nullable
133 .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
134
135 let mut cur_offset = 0;
137
138 let mut filter_start = None;
140
141 let mut skipped = 0;
143
144 let data = next_batch_array.to_data();
146 let mut child_data_builder =
147 MutableArrayData::new(vec![&data], false, next_batch_array.len());
148
149 def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
150 match r.cmp(&self.rep_level) {
151 Ordering::Greater => {
152 if *d < self.def_level {
154 return Err(general_err!(
155 "Encountered repetition level too large for definition level"
156 ));
157 }
158 }
159 Ordering::Equal => {
160 cur_offset += 1;
162 }
163 Ordering::Less => {
164 list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
167
168 if *d >= self.def_level {
169 filter_start.get_or_insert(cur_offset + skipped);
173
174 cur_offset += 1;
175
176 if let Some(validity) = validity.as_mut() {
177 validity.append(true)
178 }
179 } else {
180 if let Some(start) = filter_start.take() {
182 child_data_builder.extend(0, start, cur_offset + skipped);
183 }
184
185 if let Some(validity) = validity.as_mut() {
186 validity.append(*d + 1 == self.def_level)
188 }
189
190 skipped += 1;
191 }
192 }
193 }
194 Ok(())
195 })?;
196
197 list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
198
199 let child_data = if skipped == 0 {
200 next_batch_array.to_data()
202 } else {
203 if let Some(start) = filter_start.take() {
205 child_data_builder.extend(0, start, cur_offset + skipped)
206 }
207
208 child_data_builder.freeze()
209 };
210
211 if cur_offset != child_data.len() {
212 return Err(general_err!("Failed to reconstruct list from level data"));
213 }
214
215 let value_offsets = Buffer::from(list_offsets.to_byte_slice());
216
217 let mut data_builder = ArrayData::builder(self.get_data_type().clone())
218 .len(list_offsets.len() - 1)
219 .add_buffer(value_offsets)
220 .add_child_data(child_data);
221
222 if let Some(builder) = validity {
223 assert_eq!(builder.len(), list_offsets.len() - 1);
224 data_builder = data_builder.null_bit_buffer(Some(builder.into()))
225 }
226
227 let list_data = unsafe { data_builder.build_unchecked() };
228
229 let result_array = GenericListArray::<OffsetSize>::from(list_data);
230 Ok(Arc::new(result_array))
231 }
232
233 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
234 self.item_reader.skip_records(num_records)
235 }
236
237 fn get_def_levels(&self) -> Option<&[i16]> {
238 self.item_reader.get_def_levels()
239 }
240
241 fn get_rep_levels(&self) -> Option<&[i16]> {
242 self.item_reader.get_rep_levels()
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::arrow::array_reader::ArrayReaderBuilder;
250 use crate::arrow::array_reader::list_array::ListArrayReader;
251 use crate::arrow::array_reader::test_util::InMemoryArrayReader;
252 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
253 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
254 use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema};
255 use crate::file::properties::WriterProperties;
256 use crate::file::reader::{FileReader, SerializedFileReader};
257 use crate::schema::parser::parse_message_type;
258 use crate::schema::types::SchemaDescriptor;
259 use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type};
260 use arrow_array::{Array, PrimitiveArray};
261 use arrow_data::ArrayDataBuilder;
262 use arrow_schema::Fields;
263 use std::sync::Arc;
264
265 fn list_type<OffsetSize: OffsetSizeTrait>(
266 data_type: ArrowType,
267 item_nullable: bool,
268 ) -> ArrowType {
269 let field = Arc::new(Field::new_list_field(data_type, item_nullable));
270 GenericListArray::<OffsetSize>::DATA_TYPE_CONSTRUCTOR(field)
271 }
272
273 fn downcast<OffsetSize: OffsetSizeTrait>(array: &ArrayRef) -> &'_ GenericListArray<OffsetSize> {
274 array
275 .as_any()
276 .downcast_ref::<GenericListArray<OffsetSize>>()
277 .unwrap()
278 }
279
280 fn to_offsets<OffsetSize: OffsetSizeTrait>(values: Vec<usize>) -> Buffer {
281 Buffer::from_iter(
282 values
283 .into_iter()
284 .map(|x| OffsetSize::from_usize(x).unwrap()),
285 )
286 }
287
288 fn test_nested_list<OffsetSize: OffsetSizeTrait>() {
289 let l3_item_type = ArrowType::Int32;
304 let l3_type = list_type::<OffsetSize>(l3_item_type, true);
305
306 let l2_item_type = l3_type.clone();
307 let l2_type = list_type::<OffsetSize>(l2_item_type, true);
308
309 let l1_item_type = l2_type.clone();
310 let l1_type = list_type::<OffsetSize>(l1_item_type, false);
311
312 let leaf = PrimitiveArray::<Int32Type>::from_iter(vec![
313 Some(1),
314 None,
315 Some(4),
316 Some(7),
317 Some(1),
318 Some(2),
319 Some(3),
320 Some(4),
321 None,
322 Some(6),
323 Some(11),
324 ]);
325
326 let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]);
328 let l3 = ArrayDataBuilder::new(l3_type.clone())
329 .len(10)
330 .add_buffer(offsets)
331 .add_child_data(leaf.into_data())
332 .null_bit_buffer(Some(Buffer::from([0b11111101, 0b00000010])))
333 .build()
334 .unwrap();
335
336 let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 9, 10]);
338 let l2 = ArrayDataBuilder::new(l2_type.clone())
339 .len(6)
340 .add_buffer(offsets)
341 .add_child_data(l3)
342 .build()
343 .unwrap();
344
345 let offsets = to_offsets::<OffsetSize>(vec![0, 5, 5, 5, 6]);
346 let l1 = ArrayDataBuilder::new(l1_type.clone())
347 .len(4)
348 .add_buffer(offsets)
349 .add_child_data(l2)
350 .null_bit_buffer(Some(Buffer::from([0b00001101])))
351 .build()
352 .unwrap();
353
354 let expected = GenericListArray::<OffsetSize>::from(l1);
355
356 let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
357 Some(1),
358 None,
359 None,
360 Some(4),
361 None,
362 None,
363 Some(7),
364 None,
365 Some(1),
366 Some(2),
367 Some(3),
368 Some(4),
369 None,
370 Some(6),
371 None,
372 None,
373 None,
374 Some(11),
375 ]));
376
377 let item_array_reader = InMemoryArrayReader::new(
378 ArrowType::Int32,
379 values,
380 Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]),
381 Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]),
382 );
383
384 let l3 =
385 ListArrayReader::<OffsetSize>::new(Box::new(item_array_reader), l3_type, 5, 3, true);
386
387 let l2 = ListArrayReader::<OffsetSize>::new(Box::new(l3), l2_type, 3, 2, false);
388
389 let mut l1 = ListArrayReader::<OffsetSize>::new(Box::new(l2), l1_type, 2, 1, true);
390
391 let expected_1 = expected.slice(0, 2);
392 let expected_2 = expected.slice(2, 2);
393
394 let actual = l1.next_batch(2).unwrap();
395 assert_eq!(actual.as_ref(), &expected_1);
396
397 let actual = l1.next_batch(1024).unwrap();
398 assert_eq!(actual.as_ref(), &expected_2);
399 }
400
401 fn test_required_list<OffsetSize: OffsetSizeTrait>() {
402 let expected =
404 GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
405 Some(vec![Some(1), None, Some(2)]),
406 Some(vec![]),
407 Some(vec![Some(3), Some(4)]),
408 Some(vec![]),
409 Some(vec![]),
410 Some(vec![None, Some(1)]),
411 ]);
412
413 let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
414 Some(1),
415 None,
416 Some(2),
417 None,
418 Some(3),
419 Some(4),
420 None,
421 None,
422 None,
423 Some(1),
424 ]));
425
426 let item_array_reader = InMemoryArrayReader::new(
427 ArrowType::Int32,
428 array,
429 Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
430 Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
431 );
432
433 let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
434 Box::new(item_array_reader),
435 list_type::<OffsetSize>(ArrowType::Int32, true),
436 1,
437 1,
438 false,
439 );
440
441 let actual = list_array_reader.next_batch(1024).unwrap();
442 let actual = downcast::<OffsetSize>(&actual);
443
444 assert_eq!(&expected, actual)
445 }
446
447 fn test_nullable_list<OffsetSize: OffsetSizeTrait>() {
448 let expected =
450 GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
451 Some(vec![Some(1), None, Some(2)]),
452 None,
453 Some(vec![]),
454 Some(vec![Some(3), Some(4)]),
455 Some(vec![]),
456 Some(vec![]),
457 None,
458 Some(vec![]),
459 Some(vec![None, Some(1)]),
460 ]);
461
462 let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
463 Some(1),
464 None,
465 Some(2),
466 None,
467 None,
468 Some(3),
469 Some(4),
470 None,
471 None,
472 None,
473 None,
474 None,
475 Some(1),
476 ]));
477
478 let item_array_reader = InMemoryArrayReader::new(
479 ArrowType::Int32,
480 array,
481 Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
482 Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
483 );
484
485 let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
486 Box::new(item_array_reader),
487 list_type::<OffsetSize>(ArrowType::Int32, true),
488 2,
489 1,
490 true,
491 );
492
493 let actual = list_array_reader.next_batch(1024).unwrap();
494 let actual = downcast::<OffsetSize>(&actual);
495
496 assert_eq!(&expected, actual)
497 }
498
499 fn test_list_array<OffsetSize: OffsetSizeTrait>() {
500 test_nullable_list::<OffsetSize>();
501 test_required_list::<OffsetSize>();
502 test_nested_list::<OffsetSize>();
503 }
504
505 #[test]
506 fn test_list_array_reader() {
507 test_list_array::<i32>();
508 }
509
510 #[test]
511 fn test_large_list_array_reader() {
512 test_list_array::<i64>()
513 }
514
515 #[test]
516 fn test_nested_lists() {
517 let message_type = "
519 message table {
520 REPEATED group table_info {
521 REQUIRED BYTE_ARRAY name;
522 REPEATED group cols {
523 REQUIRED BYTE_ARRAY name;
524 REQUIRED INT32 type;
525 OPTIONAL INT32 length;
526 }
527 REPEATED group tags {
528 REQUIRED BYTE_ARRAY name;
529 REQUIRED INT32 type;
530 OPTIONAL INT32 length;
531 }
532 }
533 }
534 ";
535
536 let schema = parse_message_type(message_type)
537 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
538 .unwrap();
539
540 let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
541
542 let file = tempfile::tempfile().unwrap();
543 let props = WriterProperties::builder()
544 .set_max_row_group_size(200)
545 .build();
546
547 let writer = ArrowWriter::try_new(
548 file.try_clone().unwrap(),
549 Arc::new(arrow_schema),
550 Some(props),
551 )
552 .unwrap();
553 writer.close().unwrap();
554
555 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
556
557 let file_metadata = file_reader.metadata().file_metadata();
558 let schema = file_metadata.schema_descr();
559 let mask = ProjectionMask::leaves(schema, vec![0]);
560 let (_, fields) = parquet_to_arrow_schema_and_fields(
561 schema,
562 ProjectionMask::all(),
563 file_metadata.key_value_metadata(),
564 &[],
565 )
566 .unwrap();
567
568 let metrics = ArrowReaderMetrics::disabled();
569 let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
570 .build_array_reader(fields.as_ref(), &mask)
571 .unwrap();
572
573 let batch = array_reader.next_batch(100).unwrap();
574 assert_eq!(batch.data_type(), array_reader.get_data_type());
575 assert_eq!(
576 batch.data_type(),
577 &ArrowType::Struct(Fields::from(vec![Field::new(
578 "table_info",
579 ArrowType::List(Arc::new(Field::new(
580 "table_info",
581 ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)].into()),
582 false
583 ))),
584 false
585 )]))
586 );
587 assert_eq!(batch.len(), 0);
588 }
589}