parquet/record/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementation of record assembly and converting Parquet types into
19//! [`Row`]s.
20
21use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27    api::{make_list, make_map, make_row, Field, Row},
28    triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32/// Default batch size for a reader
33const DEFAULT_BATCH_SIZE: usize = 1024;
34
35/// Tree builder for `Reader` enum.
36/// Serves as a container of options for building a reader tree and a builder, and
37/// accessing a records iterator [`RowIter`].
38pub struct TreeBuilder {
39    // Batch size (>= 1) for triplet iterators
40    batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TreeBuilder {
50    /// Creates new tree builder with default parameters.
51    pub fn new() -> Self {
52        Self {
53            batch_size: DEFAULT_BATCH_SIZE,
54        }
55    }
56
57    /// Sets batch size for this tree builder.
58    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59        self.batch_size = batch_size;
60        self
61    }
62
63    /// Creates new root reader for provided schema and row group.
64    pub fn build(
65        &self,
66        descr: SchemaDescPtr,
67        row_group_reader: &dyn RowGroupReader,
68    ) -> Result<Reader> {
69        // Prepare lookup table of column path -> original column index
70        // This allows to prune columns and map schema leaf nodes to the column readers
71        let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72        let row_group_metadata = row_group_reader.metadata();
73
74        for col_index in 0..row_group_reader.num_columns() {
75            let col_meta = row_group_metadata.column(col_index);
76            let col_path = col_meta.column_path().clone();
77            paths.insert(col_path, col_index);
78        }
79
80        // Build child readers for the message type
81        let mut readers = Vec::new();
82        let mut path = Vec::new();
83
84        for field in descr.root_schema().get_fields() {
85            let reader =
86                self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87            readers.push(reader);
88        }
89
90        // Return group reader for message type,
91        // it is always required with definition level 0
92        Ok(Reader::GroupReader(None, 0, readers))
93    }
94
95    /// Creates iterator of `Row`s directly from schema descriptor and row group.
96    pub fn as_iter(
97        &self,
98        descr: SchemaDescPtr,
99        row_group_reader: &dyn RowGroupReader,
100    ) -> Result<ReaderIter> {
101        let num_records = row_group_reader.metadata().num_rows() as usize;
102        ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103    }
104
105    /// Builds tree of readers for the current schema recursively.
106    fn reader_tree(
107        &self,
108        field: TypePtr,
109        path: &mut Vec<String>,
110        mut curr_def_level: i16,
111        mut curr_rep_level: i16,
112        paths: &HashMap<ColumnPath, usize>,
113        row_group_reader: &dyn RowGroupReader,
114    ) -> Result<Reader> {
115        assert!(field.get_basic_info().has_repetition());
116        // Update current definition and repetition levels for this type
117        let repetition = field.get_basic_info().repetition();
118        match repetition {
119            Repetition::OPTIONAL => {
120                curr_def_level += 1;
121            }
122            Repetition::REPEATED => {
123                curr_def_level += 1;
124                curr_rep_level += 1;
125            }
126            _ => {}
127        }
128
129        path.push(String::from(field.name()));
130        let reader = if field.is_primitive() {
131            let col_path = ColumnPath::new(path.to_vec());
132            let orig_index = *paths
133                .get(&col_path)
134                .ok_or(general_err!("Path {:?} not found", col_path))?;
135            let col_descr = row_group_reader
136                .metadata()
137                .column(orig_index)
138                .column_descr_ptr();
139            let col_reader = row_group_reader.get_column_reader(orig_index)?;
140            let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141            let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142            if repetition == Repetition::REPEATED {
143                Reader::RepeatedReader(
144                    field,
145                    curr_def_level - 1,
146                    curr_rep_level - 1,
147                    Box::new(reader),
148                )
149            } else {
150                reader
151            }
152        } else {
153            match field.get_basic_info().converted_type() {
154                // List types
155                ConvertedType::LIST => {
156                    assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158                    let repeated_field = field.get_fields()[0].clone();
159                    assert_eq!(
160                        repeated_field.get_basic_info().repetition(),
161                        Repetition::REPEATED,
162                        "Invalid list type {field:?}"
163                    );
164
165                    if Reader::is_element_type(&repeated_field) {
166                        // Support for backward compatible lists
167                        let reader = self.reader_tree(
168                            repeated_field,
169                            path,
170                            curr_def_level,
171                            curr_rep_level,
172                            paths,
173                            row_group_reader,
174                        )?;
175
176                        Reader::RepeatedReader(
177                            field,
178                            curr_def_level,
179                            curr_rep_level,
180                            Box::new(reader),
181                        )
182                    } else {
183                        let child_field = repeated_field.get_fields()[0].clone();
184
185                        path.push(String::from(repeated_field.name()));
186
187                        let reader = self.reader_tree(
188                            child_field,
189                            path,
190                            curr_def_level + 1,
191                            curr_rep_level + 1,
192                            paths,
193                            row_group_reader,
194                        )?;
195
196                        path.pop();
197
198                        Reader::RepeatedReader(
199                            field,
200                            curr_def_level,
201                            curr_rep_level,
202                            Box::new(reader),
203                        )
204                    }
205                }
206                // Map types (key-value pairs)
207                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208                    assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209                    assert!(
210                        !field.get_fields()[0].is_primitive(),
211                        "Invalid map type: {field:?}"
212                    );
213
214                    let key_value_type = field.get_fields()[0].clone();
215                    assert_eq!(
216                        key_value_type.get_basic_info().repetition(),
217                        Repetition::REPEATED,
218                        "Invalid map type: {field:?}"
219                    );
220                    assert_eq!(
221                        key_value_type.get_fields().len(),
222                        2,
223                        "Invalid map type: {field:?}"
224                    );
225
226                    path.push(String::from(key_value_type.name()));
227
228                    let key_type = &key_value_type.get_fields()[0];
229                    assert!(
230                        key_type.is_primitive(),
231                        "Map key type is expected to be a primitive type, but found {key_type:?}"
232                    );
233                    let key_reader = self.reader_tree(
234                        key_type.clone(),
235                        path,
236                        curr_def_level + 1,
237                        curr_rep_level + 1,
238                        paths,
239                        row_group_reader,
240                    )?;
241
242                    let value_type = &key_value_type.get_fields()[1];
243                    let value_reader = self.reader_tree(
244                        value_type.clone(),
245                        path,
246                        curr_def_level + 1,
247                        curr_rep_level + 1,
248                        paths,
249                        row_group_reader,
250                    )?;
251
252                    path.pop();
253
254                    Reader::KeyValueReader(
255                        field,
256                        curr_def_level,
257                        curr_rep_level,
258                        Box::new(key_reader),
259                        Box::new(value_reader),
260                    )
261                }
262                // A repeated field that is neither contained by a `LIST`- or
263                // `MAP`-annotated group nor annotated by `LIST` or `MAP`
264                // should be interpreted as a required list of required
265                // elements where the element type is the type of the field.
266                _ if repetition == Repetition::REPEATED => {
267                    let required_field = Type::group_type_builder(field.name())
268                        .with_repetition(Repetition::REQUIRED)
269                        .with_converted_type(field.get_basic_info().converted_type())
270                        .with_fields(field.get_fields().to_vec())
271                        .build()?;
272
273                    path.pop();
274
275                    let reader = self.reader_tree(
276                        Arc::new(required_field),
277                        path,
278                        curr_def_level,
279                        curr_rep_level,
280                        paths,
281                        row_group_reader,
282                    )?;
283
284                    return Ok(Reader::RepeatedReader(
285                        field,
286                        curr_def_level - 1,
287                        curr_rep_level - 1,
288                        Box::new(reader),
289                    ));
290                }
291                // Group types (structs)
292                _ => {
293                    let mut readers = Vec::new();
294                    for child in field.get_fields() {
295                        let reader = self.reader_tree(
296                            child.clone(),
297                            path,
298                            curr_def_level,
299                            curr_rep_level,
300                            paths,
301                            row_group_reader,
302                        )?;
303                        readers.push(reader);
304                    }
305                    Reader::GroupReader(Some(field), curr_def_level, readers)
306                }
307            }
308        };
309        path.pop();
310
311        Ok(Reader::option(repetition, curr_def_level, reader))
312    }
313}
314
315/// Reader tree for record assembly
316pub enum Reader {
317    /// Primitive reader with type information and triplet iterator
318    PrimitiveReader(TypePtr, Box<TripletIter>),
319    /// Optional reader with definition level of a parent and a reader
320    OptionReader(i16, Box<Reader>),
321    /// Group (struct) reader with type information, definition level and list of child
322    /// readers. When it represents message type, type information is None
323    GroupReader(Option<TypePtr>, i16, Vec<Reader>),
324    /// Reader for repeated values, e.g. lists, contains type information, definition
325    /// level, repetition level and a child reader
326    RepeatedReader(TypePtr, i16, i16, Box<Reader>),
327    /// Reader of key-value pairs, e.g. maps, contains type information, definition
328    /// level, repetition level, child reader for keys and child reader for values
329    KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
330}
331
332impl Reader {
333    /// Wraps reader in option reader based on repetition.
334    fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
335        if repetition == Repetition::OPTIONAL {
336            Reader::OptionReader(def_level - 1, Box::new(reader))
337        } else {
338            reader
339        }
340    }
341
342    /// Returns true if repeated type is an element type for the list.
343    /// Used to determine legacy list types.
344    /// This method is copied from Spark Parquet reader and is based on the reference:
345    /// <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
346    ///   #backward-compatibility-rules
347    fn is_element_type(repeated_type: &Type) -> bool {
348        // For legacy 2-level list types with primitive element type, e.g.:
349        //
350        //    // ARRAY<INT> (nullable list, non-null elements)
351        //    optional group my_list (LIST) {
352        //      repeated int32 element;
353        //    }
354        //
355        repeated_type.is_primitive() ||
356    // For legacy 2-level list types whose element type is a group type with 2 or more
357    // fields, e.g.:
358    //
359    //    // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
360    //    optional group my_list (LIST) {
361    //      repeated group element {
362    //        required binary str (UTF8);
363    //        required int32 num;
364    //      };
365    //    }
366    //
367    repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
368    // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0),
369    // e.g.:
370    //
371    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
372    //    optional group my_list (LIST) {
373    //      repeated group array {
374    //        required binary str (UTF8);
375    //      };
376    //    }
377    //
378    repeated_type.name() == "array" ||
379    // For Parquet data generated by parquet-thrift, e.g.:
380    //
381    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
382    //    optional group my_list (LIST) {
383    //      repeated group my_list_tuple {
384    //        required binary str (UTF8);
385    //      };
386    //    }
387    //
388    repeated_type.name().ends_with("_tuple")
389    }
390
391    /// Reads current record as `Row` from the reader tree.
392    /// Automatically advances all necessary readers.
393    /// This must be called on the root level reader (i.e., for Message type).
394    /// Otherwise, it will panic.
395    fn read(&mut self) -> Result<Row> {
396        match *self {
397            Reader::GroupReader(_, _, ref mut readers) => {
398                let mut fields = Vec::new();
399                for reader in readers {
400                    fields.push((String::from(reader.field_name()), reader.read_field()?));
401                }
402                Ok(make_row(fields))
403            }
404            _ => panic!("Cannot call read() on {self}"),
405        }
406    }
407
408    /// Reads current record as `Field` from the reader tree.
409    /// Automatically advances all necessary readers.
410    fn read_field(&mut self) -> Result<Field> {
411        let field = match *self {
412            Reader::PrimitiveReader(_, ref mut column) => {
413                let value = column.current_value()?;
414                column.read_next()?;
415                value
416            }
417            Reader::OptionReader(def_level, ref mut reader) => {
418                if reader.current_def_level() > def_level {
419                    reader.read_field()?
420                } else {
421                    reader.advance_columns()?;
422                    Field::Null
423                }
424            }
425            Reader::GroupReader(_, def_level, ref mut readers) => {
426                let mut fields = Vec::new();
427                for reader in readers {
428                    if reader.repetition() != Repetition::OPTIONAL
429                        || reader.current_def_level() > def_level
430                    {
431                        fields.push((String::from(reader.field_name()), reader.read_field()?));
432                    } else {
433                        reader.advance_columns()?;
434                        fields.push((String::from(reader.field_name()), Field::Null));
435                    }
436                }
437                let row = make_row(fields);
438                Field::Group(row)
439            }
440            Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
441                let mut elements = Vec::new();
442                loop {
443                    if reader.current_def_level() > def_level {
444                        elements.push(reader.read_field()?);
445                    } else {
446                        reader.advance_columns()?;
447                        // If the current definition level is equal to the definition
448                        // level of this repeated type, then the
449                        // result is an empty list and the repetition level
450                        // will always be <= rl.
451                        break;
452                    }
453
454                    // This covers case when we are out of repetition levels and should
455                    // close the group, or there are no values left to
456                    // buffer.
457                    if !reader.has_next() || reader.current_rep_level() <= rep_level {
458                        break;
459                    }
460                }
461                Field::ListInternal(make_list(elements))
462            }
463            Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
464                let mut pairs = Vec::new();
465                loop {
466                    if keys.current_def_level() > def_level {
467                        pairs.push((keys.read_field()?, values.read_field()?));
468                    } else {
469                        keys.advance_columns()?;
470                        values.advance_columns()?;
471                        // If the current definition level is equal to the definition
472                        // level of this repeated type, then the
473                        // result is an empty list and the repetition level
474                        // will always be <= rl.
475                        break;
476                    }
477
478                    // This covers case when we are out of repetition levels and should
479                    // close the group, or there are no values left to
480                    // buffer.
481                    if !keys.has_next() || keys.current_rep_level() <= rep_level {
482                        break;
483                    }
484                }
485
486                Field::MapInternal(make_map(pairs))
487            }
488        };
489        Ok(field)
490    }
491
492    /// Returns field name for the current reader.
493    fn field_name(&self) -> &str {
494        match *self {
495            Reader::PrimitiveReader(ref field, _) => field.name(),
496            Reader::OptionReader(_, ref reader) => reader.field_name(),
497            Reader::GroupReader(ref opt, ..) => match opt {
498                Some(ref field) => field.name(),
499                None => panic!("Field is None for group reader"),
500            },
501            Reader::RepeatedReader(ref field, ..) => field.name(),
502            Reader::KeyValueReader(ref field, ..) => field.name(),
503        }
504    }
505
506    /// Returns repetition for the current reader.
507    fn repetition(&self) -> Repetition {
508        match *self {
509            Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
510            Reader::OptionReader(_, ref reader) => reader.repetition(),
511            Reader::GroupReader(ref opt, ..) => match opt {
512                Some(ref field) => field.get_basic_info().repetition(),
513                None => panic!("Field is None for group reader"),
514            },
515            Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
516            Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
517        }
518    }
519
520    /// Returns true, if current reader has more values, false otherwise.
521    /// Method does not advance internal iterator.
522    fn has_next(&self) -> bool {
523        match *self {
524            Reader::PrimitiveReader(_, ref column) => column.has_next(),
525            Reader::OptionReader(_, ref reader) => reader.has_next(),
526            Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
527            Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
528            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
529        }
530    }
531
532    /// Returns current definition level,
533    /// Method does not advance internal iterator.
534    fn current_def_level(&self) -> i16 {
535        match *self {
536            Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
537            Reader::OptionReader(_, ref reader) => reader.current_def_level(),
538            Reader::GroupReader(_, _, ref readers) => match readers.first() {
539                Some(reader) => reader.current_def_level(),
540                None => panic!("Current definition level: empty group reader"),
541            },
542            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
543            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
544        }
545    }
546
547    /// Returns current repetition level.
548    /// Method does not advance internal iterator.
549    fn current_rep_level(&self) -> i16 {
550        match *self {
551            Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
552            Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
553            Reader::GroupReader(_, _, ref readers) => match readers.first() {
554                Some(reader) => reader.current_rep_level(),
555                None => panic!("Current repetition level: empty group reader"),
556            },
557            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
558            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
559        }
560    }
561
562    /// Advances leaf columns for the current reader.
563    fn advance_columns(&mut self) -> Result<()> {
564        match *self {
565            Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
566            Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
567            Reader::GroupReader(_, _, ref mut readers) => {
568                for reader in readers {
569                    reader.advance_columns()?;
570                }
571                Ok(())
572            }
573            Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
574            Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
575                keys.advance_columns()?;
576                values.advance_columns()
577            }
578        }
579    }
580}
581
582impl fmt::Display for Reader {
583    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
584        let s = match self {
585            Reader::PrimitiveReader(..) => "PrimitiveReader",
586            Reader::OptionReader(..) => "OptionReader",
587            Reader::GroupReader(..) => "GroupReader",
588            Reader::RepeatedReader(..) => "RepeatedReader",
589            Reader::KeyValueReader(..) => "KeyValueReader",
590        };
591        write!(f, "{s}")
592    }
593}
594
595// ----------------------------------------------------------------------
596// Row iterators
597
598/// The enum Either with variants That represents a reference and a box of
599/// [`FileReader`].
600enum Either<'a> {
601    Left(&'a dyn FileReader),
602    Right(Box<dyn FileReader>),
603}
604
605impl Either<'_> {
606    fn reader(&self) -> &dyn FileReader {
607        match *self {
608            Either::Left(r) => r,
609            Either::Right(ref r) => &**r,
610        }
611    }
612}
613
614/// Access parquet data as an iterator of [`Row`]
615///
616/// # Caveats
617///
618/// Parquet stores data in a columnar fashion using [Dremel] encoding, and is therefore highly
619/// optimised for reading data by column, not row. As a consequence applications concerned with
620/// performance should prefer the columnar arrow or [ColumnReader] APIs.
621///
622/// Additionally the current implementation does not correctly handle repeated fields ([#2394]),
623/// and workloads looking to handle such schema should use the other APIs.
624///
625/// [#2394]: https://github.com/apache/arrow-rs/issues/2394
626/// [ColumnReader]: crate::file::reader::RowGroupReader::get_column_reader
627/// [Dremel]: https://research.google/pubs/pub36632/
628pub struct RowIter<'a> {
629    descr: SchemaDescPtr,
630    tree_builder: TreeBuilder,
631    file_reader: Option<Either<'a>>,
632    current_row_group: usize,
633    num_row_groups: usize,
634    row_iter: Option<ReaderIter>,
635}
636
637impl<'a> RowIter<'a> {
638    /// Creates a new iterator of [`Row`]s.
639    fn new(
640        file_reader: Option<Either<'a>>,
641        row_iter: Option<ReaderIter>,
642        descr: SchemaDescPtr,
643    ) -> Self {
644        let tree_builder = Self::tree_builder();
645        let num_row_groups = match file_reader {
646            Some(ref r) => r.reader().num_row_groups(),
647            None => 0,
648        };
649
650        Self {
651            descr,
652            file_reader,
653            tree_builder,
654            num_row_groups,
655            row_iter,
656            current_row_group: 0,
657        }
658    }
659
660    /// Creates iterator of [`Row`]s for all row groups in a
661    /// file.
662    pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
663        let either = Either::Left(reader);
664        let descr =
665            Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
666
667        Ok(Self::new(Some(either), None, descr))
668    }
669
670    /// Creates iterator of [`Row`]s for a specific row group.
671    pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
672        let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
673        let tree_builder = Self::tree_builder();
674        let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
675
676        // For row group we need to set `current_row_group` >= `num_row_groups`, because
677        // we only have one row group and can't buffer more.
678        Ok(Self::new(None, Some(row_iter), descr))
679    }
680
681    /// Creates a iterator of [`Row`]s from a [`FileReader`] using the full file schema.
682    pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
683        let either = Either::Right(reader);
684        let descr = either
685            .reader()
686            .metadata()
687            .file_metadata()
688            .schema_descr_ptr();
689
690        Self::new(Some(either), None, descr)
691    }
692
693    /// Tries to create a iterator of [`Row`]s using projections.
694    /// Returns a error if a file reader is not the source of this iterator.
695    ///
696    /// The Projected schema can be a subset of or equal to the file schema,
697    /// when it is None, full file schema is assumed.
698    pub fn project(self, proj: Option<Type>) -> Result<Self> {
699        match self.file_reader {
700            Some(ref either) => {
701                let schema = either
702                    .reader()
703                    .metadata()
704                    .file_metadata()
705                    .schema_descr_ptr();
706                let descr = Self::get_proj_descr(proj, schema)?;
707
708                Ok(Self::new(self.file_reader, None, descr))
709            }
710            None => Err(general_err!("File reader is required to use projections")),
711        }
712    }
713
714    /// Helper method to get schema descriptor for projected schema.
715    /// If projection is None, then full schema is returned.
716    #[inline]
717    fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
718        match proj {
719            Some(projection) => {
720                // check if projection is part of file schema
721                let root_schema = root_descr.root_schema();
722                if !root_schema.check_contains(&projection) {
723                    return Err(general_err!("Root schema does not contain projection"));
724                }
725                Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
726            }
727            None => Ok(root_descr),
728        }
729    }
730
731    /// Sets batch size for this row iter.
732    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
733        self.tree_builder = self.tree_builder.with_batch_size(batch_size);
734        self
735    }
736
737    /// Returns common tree builder, so the same settings are applied to both iterators
738    /// from file reader and row group.
739    #[inline]
740    fn tree_builder() -> TreeBuilder {
741        TreeBuilder::new()
742    }
743}
744
745impl Iterator for RowIter<'_> {
746    type Item = Result<Row>;
747
748    fn next(&mut self) -> Option<Result<Row>> {
749        let mut row = None;
750        if let Some(ref mut iter) = self.row_iter {
751            row = iter.next();
752        }
753
754        while row.is_none() && self.current_row_group < self.num_row_groups {
755            // We do not expect any failures when accessing a row group, and file reader
756            // must be set for selecting next row group.
757            if let Some(ref either) = self.file_reader {
758                let file_reader = either.reader();
759                let row_group_reader = &*file_reader
760                    .get_row_group(self.current_row_group)
761                    .expect("Row group is required to advance");
762
763                match self
764                    .tree_builder
765                    .as_iter(self.descr.clone(), row_group_reader)
766                {
767                    Ok(mut iter) => {
768                        row = iter.next();
769
770                        self.current_row_group += 1;
771                        self.row_iter = Some(iter);
772                    }
773                    Err(e) => return Some(Err(e)),
774                }
775            }
776        }
777
778        row
779    }
780}
781
782/// Internal iterator of [`Row`]s for a reader.
783pub struct ReaderIter {
784    root_reader: Reader,
785    records_left: usize,
786}
787
788impl ReaderIter {
789    fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
790        // Prepare root reader by advancing all column vectors
791        root_reader.advance_columns()?;
792        Ok(Self {
793            root_reader,
794            records_left: num_records,
795        })
796    }
797}
798
799impl Iterator for ReaderIter {
800    type Item = Result<Row>;
801
802    fn next(&mut self) -> Option<Result<Row>> {
803        if self.records_left > 0 {
804            self.records_left -= 1;
805            Some(self.root_reader.read())
806        } else {
807            None
808        }
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    use crate::data_type::Int64Type;
817    use crate::file::reader::SerializedFileReader;
818    use crate::file::writer::SerializedFileWriter;
819    use crate::record::api::RowAccessor;
820    use crate::schema::parser::parse_message_type;
821    use crate::util::test_common::file_util::{get_test_file, get_test_path};
822    use bytes::Bytes;
823
824    // Convenient macros to assemble row, list, map, and group.
825
826    macro_rules! row {
827        ($($e:tt)*) => {
828            {
829                make_row(vec![$($e)*])
830            }
831        }
832    }
833
834    macro_rules! list {
835        ($($e:tt)*) => {
836            {
837                Field::ListInternal(make_list(vec![$($e)*]))
838            }
839        }
840    }
841
842    macro_rules! map {
843        ($($e:tt)*) => {
844            {
845                Field::MapInternal(make_map(vec![$($e)*]))
846            }
847        }
848    }
849
850    macro_rules! group {
851        ( $( $e:expr ), * ) => {
852            {
853                Field::Group(row!($( $e ), *))
854            }
855        }
856    }
857
858    #[test]
859    fn test_file_reader_rows_nulls() {
860        let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
861        let expected_rows = vec![
862            row![(
863                "b_struct".to_string(),
864                group![("b_c_int".to_string(), Field::Null)]
865            )],
866            row![(
867                "b_struct".to_string(),
868                group![("b_c_int".to_string(), Field::Null)]
869            )],
870            row![(
871                "b_struct".to_string(),
872                group![("b_c_int".to_string(), Field::Null)]
873            )],
874            row![(
875                "b_struct".to_string(),
876                group![("b_c_int".to_string(), Field::Null)]
877            )],
878            row![(
879                "b_struct".to_string(),
880                group![("b_c_int".to_string(), Field::Null)]
881            )],
882            row![(
883                "b_struct".to_string(),
884                group![("b_c_int".to_string(), Field::Null)]
885            )],
886            row![(
887                "b_struct".to_string(),
888                group![("b_c_int".to_string(), Field::Null)]
889            )],
890            row![(
891                "b_struct".to_string(),
892                group![("b_c_int".to_string(), Field::Null)]
893            )],
894        ];
895        assert_eq!(rows, expected_rows);
896    }
897
898    #[test]
899    fn test_file_reader_rows_nonnullable() {
900        let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
901        let expected_rows = vec![row![
902            ("ID".to_string(), Field::Long(8)),
903            ("Int_Array".to_string(), list![Field::Int(-1)]),
904            (
905                "int_array_array".to_string(),
906                list![list![Field::Int(-1), Field::Int(-2)], list![]]
907            ),
908            (
909                "Int_Map".to_string(),
910                map![(Field::Str("k1".to_string()), Field::Int(-1))]
911            ),
912            (
913                "int_map_array".to_string(),
914                list![
915                    map![],
916                    map![(Field::Str("k1".to_string()), Field::Int(1))],
917                    map![],
918                    map![]
919                ]
920            ),
921            (
922                "nested_Struct".to_string(),
923                group![
924                    ("a".to_string(), Field::Int(-1)),
925                    ("B".to_string(), list![Field::Int(-1)]),
926                    (
927                        "c".to_string(),
928                        group![(
929                            "D".to_string(),
930                            list![list![group![
931                                ("e".to_string(), Field::Int(-1)),
932                                ("f".to_string(), Field::Str("nonnullable".to_string()))
933                            ]]]
934                        )]
935                    ),
936                    ("G".to_string(), map![])
937                ]
938            )
939        ]];
940        assert_eq!(rows, expected_rows);
941    }
942
943    #[test]
944    fn test_file_reader_rows_nullable() {
945        let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
946        let expected_rows = vec![
947            row![
948                ("id".to_string(), Field::Long(1)),
949                (
950                    "int_array".to_string(),
951                    list![Field::Int(1), Field::Int(2), Field::Int(3)]
952                ),
953                (
954                    "int_array_Array".to_string(),
955                    list![
956                        list![Field::Int(1), Field::Int(2)],
957                        list![Field::Int(3), Field::Int(4)]
958                    ]
959                ),
960                (
961                    "int_map".to_string(),
962                    map![
963                        (Field::Str("k1".to_string()), Field::Int(1)),
964                        (Field::Str("k2".to_string()), Field::Int(100))
965                    ]
966                ),
967                (
968                    "int_Map_Array".to_string(),
969                    list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
970                ),
971                (
972                    "nested_struct".to_string(),
973                    group![
974                        ("A".to_string(), Field::Int(1)),
975                        ("b".to_string(), list![Field::Int(1)]),
976                        (
977                            "C".to_string(),
978                            group![(
979                                "d".to_string(),
980                                list![
981                                    list![
982                                        group![
983                                            ("E".to_string(), Field::Int(10)),
984                                            ("F".to_string(), Field::Str("aaa".to_string()))
985                                        ],
986                                        group![
987                                            ("E".to_string(), Field::Int(-10)),
988                                            ("F".to_string(), Field::Str("bbb".to_string()))
989                                        ]
990                                    ],
991                                    list![group![
992                                        ("E".to_string(), Field::Int(11)),
993                                        ("F".to_string(), Field::Str("c".to_string()))
994                                    ]]
995                                ]
996                            )]
997                        ),
998                        (
999                            "g".to_string(),
1000                            map![(
1001                                Field::Str("foo".to_string()),
1002                                group![(
1003                                    "H".to_string(),
1004                                    group![("i".to_string(), list![Field::Double(1.1)])]
1005                                )]
1006                            )]
1007                        )
1008                    ]
1009                )
1010            ],
1011            row![
1012                ("id".to_string(), Field::Long(2)),
1013                (
1014                    "int_array".to_string(),
1015                    list![
1016                        Field::Null,
1017                        Field::Int(1),
1018                        Field::Int(2),
1019                        Field::Null,
1020                        Field::Int(3),
1021                        Field::Null
1022                    ]
1023                ),
1024                (
1025                    "int_array_Array".to_string(),
1026                    list![
1027                        list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1028                        list![Field::Int(3), Field::Null, Field::Int(4)],
1029                        list![],
1030                        Field::Null
1031                    ]
1032                ),
1033                (
1034                    "int_map".to_string(),
1035                    map![
1036                        (Field::Str("k1".to_string()), Field::Int(2)),
1037                        (Field::Str("k2".to_string()), Field::Null)
1038                    ]
1039                ),
1040                (
1041                    "int_Map_Array".to_string(),
1042                    list![
1043                        map![
1044                            (Field::Str("k3".to_string()), Field::Null),
1045                            (Field::Str("k1".to_string()), Field::Int(1))
1046                        ],
1047                        Field::Null,
1048                        map![]
1049                    ]
1050                ),
1051                (
1052                    "nested_struct".to_string(),
1053                    group![
1054                        ("A".to_string(), Field::Null),
1055                        ("b".to_string(), list![Field::Null]),
1056                        (
1057                            "C".to_string(),
1058                            group![(
1059                                "d".to_string(),
1060                                list![
1061                                    list![
1062                                        group![
1063                                            ("E".to_string(), Field::Null),
1064                                            ("F".to_string(), Field::Null)
1065                                        ],
1066                                        group![
1067                                            ("E".to_string(), Field::Int(10)),
1068                                            ("F".to_string(), Field::Str("aaa".to_string()))
1069                                        ],
1070                                        group![
1071                                            ("E".to_string(), Field::Null),
1072                                            ("F".to_string(), Field::Null)
1073                                        ],
1074                                        group![
1075                                            ("E".to_string(), Field::Int(-10)),
1076                                            ("F".to_string(), Field::Str("bbb".to_string()))
1077                                        ],
1078                                        group![
1079                                            ("E".to_string(), Field::Null),
1080                                            ("F".to_string(), Field::Null)
1081                                        ]
1082                                    ],
1083                                    list![
1084                                        group![
1085                                            ("E".to_string(), Field::Int(11)),
1086                                            ("F".to_string(), Field::Str("c".to_string()))
1087                                        ],
1088                                        Field::Null
1089                                    ],
1090                                    list![],
1091                                    Field::Null
1092                                ]
1093                            )]
1094                        ),
1095                        (
1096                            "g".to_string(),
1097                            map![
1098                                (
1099                                    Field::Str("g1".to_string()),
1100                                    group![(
1101                                        "H".to_string(),
1102                                        group![(
1103                                            "i".to_string(),
1104                                            list![Field::Double(2.2), Field::Null]
1105                                        )]
1106                                    )]
1107                                ),
1108                                (
1109                                    Field::Str("g2".to_string()),
1110                                    group![("H".to_string(), group![("i".to_string(), list![])])]
1111                                ),
1112                                (Field::Str("g3".to_string()), Field::Null),
1113                                (
1114                                    Field::Str("g4".to_string()),
1115                                    group![(
1116                                        "H".to_string(),
1117                                        group![("i".to_string(), Field::Null)]
1118                                    )]
1119                                ),
1120                                (
1121                                    Field::Str("g5".to_string()),
1122                                    group![("H".to_string(), Field::Null)]
1123                                )
1124                            ]
1125                        )
1126                    ]
1127                )
1128            ],
1129            row![
1130                ("id".to_string(), Field::Long(3)),
1131                ("int_array".to_string(), list![]),
1132                ("int_array_Array".to_string(), list![Field::Null]),
1133                ("int_map".to_string(), map![]),
1134                ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1135                (
1136                    "nested_struct".to_string(),
1137                    group![
1138                        ("A".to_string(), Field::Null),
1139                        ("b".to_string(), Field::Null),
1140                        ("C".to_string(), group![("d".to_string(), list![])]),
1141                        ("g".to_string(), map![])
1142                    ]
1143                )
1144            ],
1145            row![
1146                ("id".to_string(), Field::Long(4)),
1147                ("int_array".to_string(), Field::Null),
1148                ("int_array_Array".to_string(), list![]),
1149                ("int_map".to_string(), map![]),
1150                ("int_Map_Array".to_string(), list![]),
1151                (
1152                    "nested_struct".to_string(),
1153                    group![
1154                        ("A".to_string(), Field::Null),
1155                        ("b".to_string(), Field::Null),
1156                        ("C".to_string(), group![("d".to_string(), Field::Null)]),
1157                        ("g".to_string(), Field::Null)
1158                    ]
1159                )
1160            ],
1161            row![
1162                ("id".to_string(), Field::Long(5)),
1163                ("int_array".to_string(), Field::Null),
1164                ("int_array_Array".to_string(), Field::Null),
1165                ("int_map".to_string(), map![]),
1166                ("int_Map_Array".to_string(), Field::Null),
1167                (
1168                    "nested_struct".to_string(),
1169                    group![
1170                        ("A".to_string(), Field::Null),
1171                        ("b".to_string(), Field::Null),
1172                        ("C".to_string(), Field::Null),
1173                        (
1174                            "g".to_string(),
1175                            map![(
1176                                Field::Str("foo".to_string()),
1177                                group![(
1178                                    "H".to_string(),
1179                                    group![(
1180                                        "i".to_string(),
1181                                        list![Field::Double(2.2), Field::Double(3.3)]
1182                                    )]
1183                                )]
1184                            )]
1185                        )
1186                    ]
1187                )
1188            ],
1189            row![
1190                ("id".to_string(), Field::Long(6)),
1191                ("int_array".to_string(), Field::Null),
1192                ("int_array_Array".to_string(), Field::Null),
1193                ("int_map".to_string(), Field::Null),
1194                ("int_Map_Array".to_string(), Field::Null),
1195                ("nested_struct".to_string(), Field::Null)
1196            ],
1197            row![
1198                ("id".to_string(), Field::Long(7)),
1199                ("int_array".to_string(), Field::Null),
1200                (
1201                    "int_array_Array".to_string(),
1202                    list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1203                ),
1204                (
1205                    "int_map".to_string(),
1206                    map![
1207                        (Field::Str("k1".to_string()), Field::Null),
1208                        (Field::Str("k3".to_string()), Field::Null)
1209                    ]
1210                ),
1211                ("int_Map_Array".to_string(), Field::Null),
1212                (
1213                    "nested_struct".to_string(),
1214                    group![
1215                        ("A".to_string(), Field::Int(7)),
1216                        (
1217                            "b".to_string(),
1218                            list![Field::Int(2), Field::Int(3), Field::Null]
1219                        ),
1220                        (
1221                            "C".to_string(),
1222                            group![(
1223                                "d".to_string(),
1224                                list![list![], list![Field::Null], Field::Null]
1225                            )]
1226                        ),
1227                        ("g".to_string(), Field::Null)
1228                    ]
1229                )
1230            ],
1231        ];
1232        assert_eq!(rows, expected_rows);
1233    }
1234
1235    #[test]
1236    fn test_file_reader_rows_projection() {
1237        let schema = "
1238      message spark_schema {
1239        REQUIRED DOUBLE c;
1240        REQUIRED INT32 b;
1241      }
1242    ";
1243        let schema = parse_message_type(schema).unwrap();
1244        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1245        let expected_rows = vec![
1246            row![
1247                ("c".to_string(), Field::Double(1.0)),
1248                ("b".to_string(), Field::Int(1))
1249            ],
1250            row![
1251                ("c".to_string(), Field::Double(1.0)),
1252                ("b".to_string(), Field::Int(1))
1253            ],
1254            row![
1255                ("c".to_string(), Field::Double(1.0)),
1256                ("b".to_string(), Field::Int(1))
1257            ],
1258            row![
1259                ("c".to_string(), Field::Double(1.0)),
1260                ("b".to_string(), Field::Int(1))
1261            ],
1262            row![
1263                ("c".to_string(), Field::Double(1.0)),
1264                ("b".to_string(), Field::Int(1))
1265            ],
1266            row![
1267                ("c".to_string(), Field::Double(1.0)),
1268                ("b".to_string(), Field::Int(1))
1269            ],
1270        ];
1271        assert_eq!(rows, expected_rows);
1272    }
1273
1274    #[test]
1275    fn test_iter_columns_in_row() {
1276        let r = row![
1277            ("c".to_string(), Field::Double(1.0)),
1278            ("b".to_string(), Field::Int(1))
1279        ];
1280        let mut result = Vec::new();
1281        for (name, record) in r.get_column_iter() {
1282            result.push((name, record));
1283        }
1284        assert_eq!(
1285            vec![
1286                (&"c".to_string(), &Field::Double(1.0)),
1287                (&"b".to_string(), &Field::Int(1))
1288            ],
1289            result
1290        );
1291    }
1292
1293    #[test]
1294    fn test_into_columns_in_row() {
1295        let r = row![
1296            ("a".to_string(), Field::Str("My string".to_owned())),
1297            ("b".to_string(), Field::Int(1))
1298        ];
1299        assert_eq!(
1300            r.into_columns(),
1301            vec![
1302                ("a".to_string(), Field::Str("My string".to_owned())),
1303                ("b".to_string(), Field::Int(1)),
1304            ]
1305        );
1306    }
1307
1308    #[test]
1309    fn test_file_reader_rows_projection_map() {
1310        let schema = "
1311      message spark_schema {
1312        OPTIONAL group a (MAP) {
1313          REPEATED group key_value {
1314            REQUIRED BYTE_ARRAY key (UTF8);
1315            OPTIONAL group value (MAP) {
1316              REPEATED group key_value {
1317                REQUIRED INT32 key;
1318                REQUIRED BOOLEAN value;
1319              }
1320            }
1321          }
1322        }
1323      }
1324    ";
1325        let schema = parse_message_type(schema).unwrap();
1326        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1327        let expected_rows = vec![
1328            row![(
1329                "a".to_string(),
1330                map![(
1331                    Field::Str("a".to_string()),
1332                    map![
1333                        (Field::Int(1), Field::Bool(true)),
1334                        (Field::Int(2), Field::Bool(false))
1335                    ]
1336                )]
1337            )],
1338            row![(
1339                "a".to_string(),
1340                map![(
1341                    Field::Str("b".to_string()),
1342                    map![(Field::Int(1), Field::Bool(true))]
1343                )]
1344            )],
1345            row![(
1346                "a".to_string(),
1347                map![(Field::Str("c".to_string()), Field::Null)]
1348            )],
1349            row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1350            row![(
1351                "a".to_string(),
1352                map![(
1353                    Field::Str("e".to_string()),
1354                    map![(Field::Int(1), Field::Bool(true))]
1355                )]
1356            )],
1357            row![(
1358                "a".to_string(),
1359                map![(
1360                    Field::Str("f".to_string()),
1361                    map![
1362                        (Field::Int(3), Field::Bool(true)),
1363                        (Field::Int(4), Field::Bool(false)),
1364                        (Field::Int(5), Field::Bool(true))
1365                    ]
1366                )]
1367            )],
1368        ];
1369        assert_eq!(rows, expected_rows);
1370    }
1371
1372    #[test]
1373    fn test_file_reader_rows_projection_list() {
1374        let schema = "
1375      message spark_schema {
1376        OPTIONAL group a (LIST) {
1377          REPEATED group list {
1378            OPTIONAL group element (LIST) {
1379              REPEATED group list {
1380                OPTIONAL group element (LIST) {
1381                  REPEATED group list {
1382                    OPTIONAL BYTE_ARRAY element (UTF8);
1383                  }
1384                }
1385              }
1386            }
1387          }
1388        }
1389      }
1390    ";
1391        let schema = parse_message_type(schema).unwrap();
1392        let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1393        let expected_rows = vec![
1394            row![(
1395                "a".to_string(),
1396                list![
1397                    list![
1398                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1399                        list![Field::Str("c".to_string())]
1400                    ],
1401                    list![Field::Null, list![Field::Str("d".to_string())]]
1402                ]
1403            )],
1404            row![(
1405                "a".to_string(),
1406                list![
1407                    list![
1408                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1409                        list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1410                    ],
1411                    list![Field::Null, list![Field::Str("e".to_string())]]
1412                ]
1413            )],
1414            row![(
1415                "a".to_string(),
1416                list![
1417                    list![
1418                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1419                        list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1420                        list![Field::Str("e".to_string())]
1421                    ],
1422                    list![Field::Null, list![Field::Str("f".to_string())]]
1423                ]
1424            )],
1425        ];
1426        assert_eq!(rows, expected_rows);
1427    }
1428
1429    #[test]
1430    fn test_file_reader_rows_invalid_projection() {
1431        let schema = "
1432      message spark_schema {
1433        REQUIRED INT32 key;
1434        REQUIRED BOOLEAN value;
1435      }
1436    ";
1437        let schema = parse_message_type(schema).unwrap();
1438        let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1439        assert_eq!(
1440            res.unwrap_err().to_string(),
1441            "Parquet error: Root schema does not contain projection"
1442        );
1443    }
1444
1445    #[test]
1446    fn test_row_group_rows_invalid_projection() {
1447        let schema = "
1448      message spark_schema {
1449        REQUIRED INT32 key;
1450        REQUIRED BOOLEAN value;
1451      }
1452    ";
1453        let schema = parse_message_type(schema).unwrap();
1454        let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1455        assert_eq!(
1456            res.unwrap_err().to_string(),
1457            "Parquet error: Root schema does not contain projection"
1458        );
1459    }
1460
1461    #[test]
1462    #[should_panic(expected = "Invalid map type")]
1463    fn test_file_reader_rows_invalid_map_type() {
1464        let schema = "
1465      message spark_schema {
1466        OPTIONAL group a (MAP) {
1467          REPEATED group key_value {
1468            REQUIRED BYTE_ARRAY key (UTF8);
1469            OPTIONAL group value (MAP) {
1470              REPEATED group key_value {
1471                REQUIRED INT32 key;
1472              }
1473            }
1474          }
1475        }
1476      }
1477    ";
1478        let schema = parse_message_type(schema).unwrap();
1479        test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1480    }
1481
1482    #[test]
1483    fn test_file_reader_iter() {
1484        let path = get_test_path("alltypes_plain.parquet");
1485        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1486        let iter = RowIter::from_file_into(Box::new(reader));
1487
1488        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1489        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1490    }
1491
1492    #[test]
1493    fn test_file_reader_iter_projection() {
1494        let path = get_test_path("alltypes_plain.parquet");
1495        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1496        let schema = "message schema { OPTIONAL INT32 id; }";
1497        let proj = parse_message_type(schema).ok();
1498
1499        let iter = RowIter::from_file_into(Box::new(reader))
1500            .project(proj)
1501            .unwrap();
1502        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1503
1504        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1505    }
1506
1507    #[test]
1508    fn test_file_reader_iter_projection_err() {
1509        let schema = "
1510      message spark_schema {
1511        REQUIRED INT32 key;
1512        REQUIRED BOOLEAN value;
1513      }
1514    ";
1515        let proj = parse_message_type(schema).ok();
1516        let path = get_test_path("nested_maps.snappy.parquet");
1517        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1518        let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1519
1520        assert_eq!(
1521            res.err().unwrap().to_string(),
1522            "Parquet error: Root schema does not contain projection"
1523        );
1524    }
1525
1526    #[test]
1527    fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1528        // Array field `phoneNumbers` does not contain LIST annotation.
1529        // We parse it as struct with `phone` repeated field as array.
1530        let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1531        let expected_rows = vec![
1532            row![
1533                ("id".to_string(), Field::Int(1)),
1534                ("phoneNumbers".to_string(), Field::Null)
1535            ],
1536            row![
1537                ("id".to_string(), Field::Int(2)),
1538                ("phoneNumbers".to_string(), Field::Null)
1539            ],
1540            row![
1541                ("id".to_string(), Field::Int(3)),
1542                (
1543                    "phoneNumbers".to_string(),
1544                    group![("phone".to_string(), list![])]
1545                )
1546            ],
1547            row![
1548                ("id".to_string(), Field::Int(4)),
1549                (
1550                    "phoneNumbers".to_string(),
1551                    group![(
1552                        "phone".to_string(),
1553                        list![group![
1554                            ("number".to_string(), Field::Long(5555555555)),
1555                            ("kind".to_string(), Field::Null)
1556                        ]]
1557                    )]
1558                )
1559            ],
1560            row![
1561                ("id".to_string(), Field::Int(5)),
1562                (
1563                    "phoneNumbers".to_string(),
1564                    group![(
1565                        "phone".to_string(),
1566                        list![group![
1567                            ("number".to_string(), Field::Long(1111111111)),
1568                            ("kind".to_string(), Field::Str("home".to_string()))
1569                        ]]
1570                    )]
1571                )
1572            ],
1573            row![
1574                ("id".to_string(), Field::Int(6)),
1575                (
1576                    "phoneNumbers".to_string(),
1577                    group![(
1578                        "phone".to_string(),
1579                        list![
1580                            group![
1581                                ("number".to_string(), Field::Long(1111111111)),
1582                                ("kind".to_string(), Field::Str("home".to_string()))
1583                            ],
1584                            group![
1585                                ("number".to_string(), Field::Long(2222222222)),
1586                                ("kind".to_string(), Field::Null)
1587                            ],
1588                            group![
1589                                ("number".to_string(), Field::Long(3333333333)),
1590                                ("kind".to_string(), Field::Str("mobile".to_string()))
1591                            ]
1592                        ]
1593                    )]
1594                )
1595            ],
1596        ];
1597
1598        assert_eq!(rows, expected_rows);
1599    }
1600
1601    #[test]
1602    fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1603        // Create schema
1604        let schema = Arc::new(
1605            parse_message_type(
1606                "
1607            message schema {
1608                REPEATED group level1 {
1609                    REPEATED group level2 {
1610                        REQUIRED group level3 {
1611                            REQUIRED INT64 value3;
1612                        }
1613                    }
1614                    REQUIRED INT64 value1;
1615                }
1616            }",
1617            )
1618            .unwrap(),
1619        );
1620
1621        // Write Parquet file to buffer
1622        let mut buffer: Vec<u8> = Vec::new();
1623        let mut file_writer =
1624            SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1625        let mut row_group_writer = file_writer.next_row_group().unwrap();
1626
1627        // Write column level1.level2.level3.value3
1628        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1629        column_writer
1630            .typed::<Int64Type>()
1631            .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1632            .unwrap();
1633        column_writer.close().unwrap();
1634
1635        // Write column level1.value1
1636        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1637        column_writer
1638            .typed::<Int64Type>()
1639            .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1640            .unwrap();
1641        column_writer.close().unwrap();
1642
1643        // Finalize Parquet file
1644        row_group_writer.close().unwrap();
1645        file_writer.close().unwrap();
1646        assert_eq!(&buffer[0..4], b"PAR1");
1647
1648        // Read Parquet file from buffer
1649        let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1650        let rows: Vec<_> = file_reader
1651            .get_row_iter(None)
1652            .unwrap()
1653            .map(|row| row.unwrap())
1654            .collect();
1655
1656        let expected_rows = vec![
1657            row![(
1658                "level1".to_string(),
1659                list![group![
1660                    (
1661                        "level2".to_string(),
1662                        list![group![(
1663                            "level3".to_string(),
1664                            group![("value3".to_string(), Field::Long(30))]
1665                        )]]
1666                    ),
1667                    ("value1".to_string(), Field::Long(10))
1668                ]]
1669            )],
1670            row![(
1671                "level1".to_string(),
1672                list![group![
1673                    (
1674                        "level2".to_string(),
1675                        list![group![(
1676                            "level3".to_string(),
1677                            group![("value3".to_string(), Field::Long(31))]
1678                        )]]
1679                    ),
1680                    ("value1".to_string(), Field::Long(11))
1681                ]]
1682            )],
1683            row![(
1684                "level1".to_string(),
1685                list![group![
1686                    (
1687                        "level2".to_string(),
1688                        list![group![(
1689                            "level3".to_string(),
1690                            group![("value3".to_string(), Field::Long(32))]
1691                        )]]
1692                    ),
1693                    ("value1".to_string(), Field::Long(12))
1694                ]]
1695            )],
1696        ];
1697
1698        assert_eq!(rows, expected_rows);
1699    }
1700
1701    #[test]
1702    fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1703        // In this test the REPEATED fields are primitives
1704        let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1705        let expected_rows = vec![
1706            row![
1707                (
1708                    "Int32_list".to_string(),
1709                    Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1710                ),
1711                (
1712                    "String_list".to_string(),
1713                    Field::ListInternal(make_list(
1714                        ["foo", "zero", "one", "two"]
1715                            .map(|s| Field::Str(s.to_string()))
1716                            .to_vec()
1717                    ))
1718                ),
1719                (
1720                    "group_of_lists".to_string(),
1721                    group![
1722                        (
1723                            "Int32_list_in_group".to_string(),
1724                            Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1725                        ),
1726                        (
1727                            "String_list_in_group".to_string(),
1728                            Field::ListInternal(make_list(
1729                                ["foo", "zero", "one", "two"]
1730                                    .map(|s| Field::Str(s.to_string()))
1731                                    .to_vec()
1732                            ))
1733                        )
1734                    ]
1735                )
1736            ],
1737            row![
1738                (
1739                    "Int32_list".to_string(),
1740                    Field::ListInternal(make_list(vec![]))
1741                ),
1742                (
1743                    "String_list".to_string(),
1744                    Field::ListInternal(make_list(
1745                        ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1746                    ))
1747                ),
1748                (
1749                    "group_of_lists".to_string(),
1750                    group![
1751                        (
1752                            "Int32_list_in_group".to_string(),
1753                            Field::ListInternal(make_list(vec![]))
1754                        ),
1755                        (
1756                            "String_list_in_group".to_string(),
1757                            Field::ListInternal(make_list(
1758                                ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1759                            ))
1760                        )
1761                    ]
1762                )
1763            ],
1764            row![
1765                (
1766                    "Int32_list".to_string(),
1767                    Field::ListInternal(make_list(vec![Field::Int(4)]))
1768                ),
1769                (
1770                    "String_list".to_string(),
1771                    Field::ListInternal(make_list(
1772                        ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1773                    ))
1774                ),
1775                (
1776                    "group_of_lists".to_string(),
1777                    group![
1778                        (
1779                            "Int32_list_in_group".to_string(),
1780                            Field::ListInternal(make_list(vec![Field::Int(4)]))
1781                        ),
1782                        (
1783                            "String_list_in_group".to_string(),
1784                            Field::ListInternal(make_list(
1785                                ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1786                            ))
1787                        )
1788                    ]
1789                )
1790            ],
1791            row![
1792                (
1793                    "Int32_list".to_string(),
1794                    Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1795                ),
1796                (
1797                    "String_list".to_string(),
1798                    Field::ListInternal(make_list(
1799                        ["five", "six", "seven", "eight"]
1800                            .map(|s| Field::Str(s.to_string()))
1801                            .to_vec()
1802                    ))
1803                ),
1804                (
1805                    "group_of_lists".to_string(),
1806                    group![
1807                        (
1808                            "Int32_list_in_group".to_string(),
1809                            Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1810                        ),
1811                        (
1812                            "String_list_in_group".to_string(),
1813                            Field::ListInternal(make_list(
1814                                ["five", "six", "seven", "eight"]
1815                                    .map(|s| Field::Str(s.to_string()))
1816                                    .to_vec()
1817                            ))
1818                        )
1819                    ]
1820                )
1821            ],
1822        ];
1823        assert_eq!(rows, expected_rows);
1824    }
1825
1826    fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1827        let file = get_test_file(file_name);
1828        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1829        let iter = file_reader.get_row_iter(schema)?;
1830        Ok(iter.map(|row| row.unwrap()).collect())
1831    }
1832
1833    fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1834        let file = get_test_file(file_name);
1835        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1836        // Check the first row group only, because files will contain only single row
1837        // group
1838        let row_group_reader = file_reader.get_row_group(0).unwrap();
1839        let iter = row_group_reader.get_row_iter(schema)?;
1840        Ok(iter.map(|row| row.unwrap()).collect())
1841    }
1842}