1use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27 api::{make_list, make_map, make_row, Field, Row},
28 triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32const DEFAULT_BATCH_SIZE: usize = 1024;
34
35pub struct TreeBuilder {
39 batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl TreeBuilder {
50 pub fn new() -> Self {
52 Self {
53 batch_size: DEFAULT_BATCH_SIZE,
54 }
55 }
56
57 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59 self.batch_size = batch_size;
60 self
61 }
62
63 pub fn build(
65 &self,
66 descr: SchemaDescPtr,
67 row_group_reader: &dyn RowGroupReader,
68 ) -> Result<Reader> {
69 let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72 let row_group_metadata = row_group_reader.metadata();
73
74 for col_index in 0..row_group_reader.num_columns() {
75 let col_meta = row_group_metadata.column(col_index);
76 let col_path = col_meta.column_path().clone();
77 paths.insert(col_path, col_index);
78 }
79
80 let mut readers = Vec::new();
82 let mut path = Vec::new();
83
84 for field in descr.root_schema().get_fields() {
85 let reader =
86 self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87 readers.push(reader);
88 }
89
90 Ok(Reader::GroupReader(None, 0, readers))
93 }
94
95 pub fn as_iter(
97 &self,
98 descr: SchemaDescPtr,
99 row_group_reader: &dyn RowGroupReader,
100 ) -> Result<ReaderIter> {
101 let num_records = row_group_reader.metadata().num_rows() as usize;
102 ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103 }
104
105 fn reader_tree(
107 &self,
108 field: TypePtr,
109 path: &mut Vec<String>,
110 mut curr_def_level: i16,
111 mut curr_rep_level: i16,
112 paths: &HashMap<ColumnPath, usize>,
113 row_group_reader: &dyn RowGroupReader,
114 ) -> Result<Reader> {
115 assert!(field.get_basic_info().has_repetition());
116 let repetition = field.get_basic_info().repetition();
118 match repetition {
119 Repetition::OPTIONAL => {
120 curr_def_level += 1;
121 }
122 Repetition::REPEATED => {
123 curr_def_level += 1;
124 curr_rep_level += 1;
125 }
126 _ => {}
127 }
128
129 path.push(String::from(field.name()));
130 let reader = if field.is_primitive() {
131 let col_path = ColumnPath::new(path.to_vec());
132 let orig_index = *paths
133 .get(&col_path)
134 .ok_or(general_err!("Path {:?} not found", col_path))?;
135 let col_descr = row_group_reader
136 .metadata()
137 .column(orig_index)
138 .column_descr_ptr();
139 let col_reader = row_group_reader.get_column_reader(orig_index)?;
140 let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141 let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142 if repetition == Repetition::REPEATED {
143 Reader::RepeatedReader(
144 field,
145 curr_def_level - 1,
146 curr_rep_level - 1,
147 Box::new(reader),
148 )
149 } else {
150 reader
151 }
152 } else {
153 match field.get_basic_info().converted_type() {
154 ConvertedType::LIST => {
156 assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158 let repeated_field = field.get_fields()[0].clone();
159 assert_eq!(
160 repeated_field.get_basic_info().repetition(),
161 Repetition::REPEATED,
162 "Invalid list type {field:?}"
163 );
164
165 if Reader::is_element_type(&repeated_field) {
166 let reader = self.reader_tree(
168 repeated_field,
169 path,
170 curr_def_level,
171 curr_rep_level,
172 paths,
173 row_group_reader,
174 )?;
175
176 Reader::RepeatedReader(
177 field,
178 curr_def_level,
179 curr_rep_level,
180 Box::new(reader),
181 )
182 } else {
183 let child_field = repeated_field.get_fields()[0].clone();
184
185 path.push(String::from(repeated_field.name()));
186
187 let reader = self.reader_tree(
188 child_field,
189 path,
190 curr_def_level + 1,
191 curr_rep_level + 1,
192 paths,
193 row_group_reader,
194 )?;
195
196 path.pop();
197
198 Reader::RepeatedReader(
199 field,
200 curr_def_level,
201 curr_rep_level,
202 Box::new(reader),
203 )
204 }
205 }
206 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208 assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209 assert!(
210 !field.get_fields()[0].is_primitive(),
211 "Invalid map type: {field:?}"
212 );
213
214 let key_value_type = field.get_fields()[0].clone();
215 assert_eq!(
216 key_value_type.get_basic_info().repetition(),
217 Repetition::REPEATED,
218 "Invalid map type: {field:?}"
219 );
220 assert_eq!(
221 key_value_type.get_fields().len(),
222 2,
223 "Invalid map type: {field:?}"
224 );
225
226 path.push(String::from(key_value_type.name()));
227
228 let key_type = &key_value_type.get_fields()[0];
229 assert!(
230 key_type.is_primitive(),
231 "Map key type is expected to be a primitive type, but found {key_type:?}"
232 );
233 let key_reader = self.reader_tree(
234 key_type.clone(),
235 path,
236 curr_def_level + 1,
237 curr_rep_level + 1,
238 paths,
239 row_group_reader,
240 )?;
241
242 let value_type = &key_value_type.get_fields()[1];
243 let value_reader = self.reader_tree(
244 value_type.clone(),
245 path,
246 curr_def_level + 1,
247 curr_rep_level + 1,
248 paths,
249 row_group_reader,
250 )?;
251
252 path.pop();
253
254 Reader::KeyValueReader(
255 field,
256 curr_def_level,
257 curr_rep_level,
258 Box::new(key_reader),
259 Box::new(value_reader),
260 )
261 }
262 _ if repetition == Repetition::REPEATED => {
267 let required_field = Type::group_type_builder(field.name())
268 .with_repetition(Repetition::REQUIRED)
269 .with_converted_type(field.get_basic_info().converted_type())
270 .with_fields(field.get_fields().to_vec())
271 .build()?;
272
273 path.pop();
274
275 let reader = self.reader_tree(
276 Arc::new(required_field),
277 path,
278 curr_def_level,
279 curr_rep_level,
280 paths,
281 row_group_reader,
282 )?;
283
284 return Ok(Reader::RepeatedReader(
285 field,
286 curr_def_level - 1,
287 curr_rep_level - 1,
288 Box::new(reader),
289 ));
290 }
291 _ => {
293 let mut readers = Vec::new();
294 for child in field.get_fields() {
295 let reader = self.reader_tree(
296 child.clone(),
297 path,
298 curr_def_level,
299 curr_rep_level,
300 paths,
301 row_group_reader,
302 )?;
303 readers.push(reader);
304 }
305 Reader::GroupReader(Some(field), curr_def_level, readers)
306 }
307 }
308 };
309 path.pop();
310
311 Ok(Reader::option(repetition, curr_def_level, reader))
312 }
313}
314
315pub enum Reader {
317 PrimitiveReader(TypePtr, Box<TripletIter>),
319 OptionReader(i16, Box<Reader>),
321 GroupReader(Option<TypePtr>, i16, Vec<Reader>),
324 RepeatedReader(TypePtr, i16, i16, Box<Reader>),
327 KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
330}
331
332impl Reader {
333 fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
335 if repetition == Repetition::OPTIONAL {
336 Reader::OptionReader(def_level - 1, Box::new(reader))
337 } else {
338 reader
339 }
340 }
341
342 fn is_element_type(repeated_type: &Type) -> bool {
348 repeated_type.is_primitive() ||
356 repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
368 repeated_type.name() == "array" ||
379 repeated_type.name().ends_with("_tuple")
389 }
390
391 fn read(&mut self) -> Result<Row> {
396 match *self {
397 Reader::GroupReader(_, _, ref mut readers) => {
398 let mut fields = Vec::new();
399 for reader in readers {
400 fields.push((String::from(reader.field_name()), reader.read_field()?));
401 }
402 Ok(make_row(fields))
403 }
404 _ => panic!("Cannot call read() on {self}"),
405 }
406 }
407
408 fn read_field(&mut self) -> Result<Field> {
411 let field = match *self {
412 Reader::PrimitiveReader(_, ref mut column) => {
413 let value = column.current_value()?;
414 column.read_next()?;
415 value
416 }
417 Reader::OptionReader(def_level, ref mut reader) => {
418 if reader.current_def_level() > def_level {
419 reader.read_field()?
420 } else {
421 reader.advance_columns()?;
422 Field::Null
423 }
424 }
425 Reader::GroupReader(_, def_level, ref mut readers) => {
426 let mut fields = Vec::new();
427 for reader in readers {
428 if reader.repetition() != Repetition::OPTIONAL
429 || reader.current_def_level() > def_level
430 {
431 fields.push((String::from(reader.field_name()), reader.read_field()?));
432 } else {
433 reader.advance_columns()?;
434 fields.push((String::from(reader.field_name()), Field::Null));
435 }
436 }
437 let row = make_row(fields);
438 Field::Group(row)
439 }
440 Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
441 let mut elements = Vec::new();
442 loop {
443 if reader.current_def_level() > def_level {
444 elements.push(reader.read_field()?);
445 } else {
446 reader.advance_columns()?;
447 break;
452 }
453
454 if !reader.has_next() || reader.current_rep_level() <= rep_level {
458 break;
459 }
460 }
461 Field::ListInternal(make_list(elements))
462 }
463 Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
464 let mut pairs = Vec::new();
465 loop {
466 if keys.current_def_level() > def_level {
467 pairs.push((keys.read_field()?, values.read_field()?));
468 } else {
469 keys.advance_columns()?;
470 values.advance_columns()?;
471 break;
476 }
477
478 if !keys.has_next() || keys.current_rep_level() <= rep_level {
482 break;
483 }
484 }
485
486 Field::MapInternal(make_map(pairs))
487 }
488 };
489 Ok(field)
490 }
491
492 fn field_name(&self) -> &str {
494 match *self {
495 Reader::PrimitiveReader(ref field, _) => field.name(),
496 Reader::OptionReader(_, ref reader) => reader.field_name(),
497 Reader::GroupReader(ref opt, ..) => match opt {
498 Some(ref field) => field.name(),
499 None => panic!("Field is None for group reader"),
500 },
501 Reader::RepeatedReader(ref field, ..) => field.name(),
502 Reader::KeyValueReader(ref field, ..) => field.name(),
503 }
504 }
505
506 fn repetition(&self) -> Repetition {
508 match *self {
509 Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
510 Reader::OptionReader(_, ref reader) => reader.repetition(),
511 Reader::GroupReader(ref opt, ..) => match opt {
512 Some(ref field) => field.get_basic_info().repetition(),
513 None => panic!("Field is None for group reader"),
514 },
515 Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
516 Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
517 }
518 }
519
520 fn has_next(&self) -> bool {
523 match *self {
524 Reader::PrimitiveReader(_, ref column) => column.has_next(),
525 Reader::OptionReader(_, ref reader) => reader.has_next(),
526 Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
527 Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
528 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
529 }
530 }
531
532 fn current_def_level(&self) -> i16 {
535 match *self {
536 Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
537 Reader::OptionReader(_, ref reader) => reader.current_def_level(),
538 Reader::GroupReader(_, _, ref readers) => match readers.first() {
539 Some(reader) => reader.current_def_level(),
540 None => panic!("Current definition level: empty group reader"),
541 },
542 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
543 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
544 }
545 }
546
547 fn current_rep_level(&self) -> i16 {
550 match *self {
551 Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
552 Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
553 Reader::GroupReader(_, _, ref readers) => match readers.first() {
554 Some(reader) => reader.current_rep_level(),
555 None => panic!("Current repetition level: empty group reader"),
556 },
557 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
558 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
559 }
560 }
561
562 fn advance_columns(&mut self) -> Result<()> {
564 match *self {
565 Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
566 Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
567 Reader::GroupReader(_, _, ref mut readers) => {
568 for reader in readers {
569 reader.advance_columns()?;
570 }
571 Ok(())
572 }
573 Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
574 Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
575 keys.advance_columns()?;
576 values.advance_columns()
577 }
578 }
579 }
580}
581
582impl fmt::Display for Reader {
583 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
584 let s = match self {
585 Reader::PrimitiveReader(..) => "PrimitiveReader",
586 Reader::OptionReader(..) => "OptionReader",
587 Reader::GroupReader(..) => "GroupReader",
588 Reader::RepeatedReader(..) => "RepeatedReader",
589 Reader::KeyValueReader(..) => "KeyValueReader",
590 };
591 write!(f, "{s}")
592 }
593}
594
595enum Either<'a> {
601 Left(&'a dyn FileReader),
602 Right(Box<dyn FileReader>),
603}
604
605impl Either<'_> {
606 fn reader(&self) -> &dyn FileReader {
607 match *self {
608 Either::Left(r) => r,
609 Either::Right(ref r) => &**r,
610 }
611 }
612}
613
614pub struct RowIter<'a> {
629 descr: SchemaDescPtr,
630 tree_builder: TreeBuilder,
631 file_reader: Option<Either<'a>>,
632 current_row_group: usize,
633 num_row_groups: usize,
634 row_iter: Option<ReaderIter>,
635}
636
637impl<'a> RowIter<'a> {
638 fn new(
640 file_reader: Option<Either<'a>>,
641 row_iter: Option<ReaderIter>,
642 descr: SchemaDescPtr,
643 ) -> Self {
644 let tree_builder = Self::tree_builder();
645 let num_row_groups = match file_reader {
646 Some(ref r) => r.reader().num_row_groups(),
647 None => 0,
648 };
649
650 Self {
651 descr,
652 file_reader,
653 tree_builder,
654 num_row_groups,
655 row_iter,
656 current_row_group: 0,
657 }
658 }
659
660 pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
663 let either = Either::Left(reader);
664 let descr =
665 Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
666
667 Ok(Self::new(Some(either), None, descr))
668 }
669
670 pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
672 let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
673 let tree_builder = Self::tree_builder();
674 let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
675
676 Ok(Self::new(None, Some(row_iter), descr))
679 }
680
681 pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
683 let either = Either::Right(reader);
684 let descr = either
685 .reader()
686 .metadata()
687 .file_metadata()
688 .schema_descr_ptr();
689
690 Self::new(Some(either), None, descr)
691 }
692
693 pub fn project(self, proj: Option<Type>) -> Result<Self> {
699 match self.file_reader {
700 Some(ref either) => {
701 let schema = either
702 .reader()
703 .metadata()
704 .file_metadata()
705 .schema_descr_ptr();
706 let descr = Self::get_proj_descr(proj, schema)?;
707
708 Ok(Self::new(self.file_reader, None, descr))
709 }
710 None => Err(general_err!("File reader is required to use projections")),
711 }
712 }
713
714 #[inline]
717 fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
718 match proj {
719 Some(projection) => {
720 let root_schema = root_descr.root_schema();
722 if !root_schema.check_contains(&projection) {
723 return Err(general_err!("Root schema does not contain projection"));
724 }
725 Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
726 }
727 None => Ok(root_descr),
728 }
729 }
730
731 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
733 self.tree_builder = self.tree_builder.with_batch_size(batch_size);
734 self
735 }
736
737 #[inline]
740 fn tree_builder() -> TreeBuilder {
741 TreeBuilder::new()
742 }
743}
744
745impl Iterator for RowIter<'_> {
746 type Item = Result<Row>;
747
748 fn next(&mut self) -> Option<Result<Row>> {
749 let mut row = None;
750 if let Some(ref mut iter) = self.row_iter {
751 row = iter.next();
752 }
753
754 while row.is_none() && self.current_row_group < self.num_row_groups {
755 if let Some(ref either) = self.file_reader {
758 let file_reader = either.reader();
759 let row_group_reader = &*file_reader
760 .get_row_group(self.current_row_group)
761 .expect("Row group is required to advance");
762
763 match self
764 .tree_builder
765 .as_iter(self.descr.clone(), row_group_reader)
766 {
767 Ok(mut iter) => {
768 row = iter.next();
769
770 self.current_row_group += 1;
771 self.row_iter = Some(iter);
772 }
773 Err(e) => return Some(Err(e)),
774 }
775 }
776 }
777
778 row
779 }
780}
781
782pub struct ReaderIter {
784 root_reader: Reader,
785 records_left: usize,
786}
787
788impl ReaderIter {
789 fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
790 root_reader.advance_columns()?;
792 Ok(Self {
793 root_reader,
794 records_left: num_records,
795 })
796 }
797}
798
799impl Iterator for ReaderIter {
800 type Item = Result<Row>;
801
802 fn next(&mut self) -> Option<Result<Row>> {
803 if self.records_left > 0 {
804 self.records_left -= 1;
805 Some(self.root_reader.read())
806 } else {
807 None
808 }
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 use crate::data_type::Int64Type;
817 use crate::file::reader::SerializedFileReader;
818 use crate::file::writer::SerializedFileWriter;
819 use crate::record::api::RowAccessor;
820 use crate::schema::parser::parse_message_type;
821 use crate::util::test_common::file_util::{get_test_file, get_test_path};
822 use bytes::Bytes;
823
824 macro_rules! row {
827 ($($e:tt)*) => {
828 {
829 make_row(vec![$($e)*])
830 }
831 }
832 }
833
834 macro_rules! list {
835 ($($e:tt)*) => {
836 {
837 Field::ListInternal(make_list(vec![$($e)*]))
838 }
839 }
840 }
841
842 macro_rules! map {
843 ($($e:tt)*) => {
844 {
845 Field::MapInternal(make_map(vec![$($e)*]))
846 }
847 }
848 }
849
850 macro_rules! group {
851 ( $( $e:expr ), * ) => {
852 {
853 Field::Group(row!($( $e ), *))
854 }
855 }
856 }
857
858 #[test]
859 fn test_file_reader_rows_nulls() {
860 let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
861 let expected_rows = vec![
862 row![(
863 "b_struct".to_string(),
864 group![("b_c_int".to_string(), Field::Null)]
865 )],
866 row![(
867 "b_struct".to_string(),
868 group![("b_c_int".to_string(), Field::Null)]
869 )],
870 row![(
871 "b_struct".to_string(),
872 group![("b_c_int".to_string(), Field::Null)]
873 )],
874 row![(
875 "b_struct".to_string(),
876 group![("b_c_int".to_string(), Field::Null)]
877 )],
878 row![(
879 "b_struct".to_string(),
880 group![("b_c_int".to_string(), Field::Null)]
881 )],
882 row![(
883 "b_struct".to_string(),
884 group![("b_c_int".to_string(), Field::Null)]
885 )],
886 row![(
887 "b_struct".to_string(),
888 group![("b_c_int".to_string(), Field::Null)]
889 )],
890 row![(
891 "b_struct".to_string(),
892 group![("b_c_int".to_string(), Field::Null)]
893 )],
894 ];
895 assert_eq!(rows, expected_rows);
896 }
897
898 #[test]
899 fn test_file_reader_rows_nonnullable() {
900 let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
901 let expected_rows = vec![row![
902 ("ID".to_string(), Field::Long(8)),
903 ("Int_Array".to_string(), list![Field::Int(-1)]),
904 (
905 "int_array_array".to_string(),
906 list![list![Field::Int(-1), Field::Int(-2)], list![]]
907 ),
908 (
909 "Int_Map".to_string(),
910 map![(Field::Str("k1".to_string()), Field::Int(-1))]
911 ),
912 (
913 "int_map_array".to_string(),
914 list![
915 map![],
916 map![(Field::Str("k1".to_string()), Field::Int(1))],
917 map![],
918 map![]
919 ]
920 ),
921 (
922 "nested_Struct".to_string(),
923 group![
924 ("a".to_string(), Field::Int(-1)),
925 ("B".to_string(), list![Field::Int(-1)]),
926 (
927 "c".to_string(),
928 group![(
929 "D".to_string(),
930 list![list![group![
931 ("e".to_string(), Field::Int(-1)),
932 ("f".to_string(), Field::Str("nonnullable".to_string()))
933 ]]]
934 )]
935 ),
936 ("G".to_string(), map![])
937 ]
938 )
939 ]];
940 assert_eq!(rows, expected_rows);
941 }
942
943 #[test]
944 fn test_file_reader_rows_nullable() {
945 let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
946 let expected_rows = vec![
947 row![
948 ("id".to_string(), Field::Long(1)),
949 (
950 "int_array".to_string(),
951 list![Field::Int(1), Field::Int(2), Field::Int(3)]
952 ),
953 (
954 "int_array_Array".to_string(),
955 list![
956 list![Field::Int(1), Field::Int(2)],
957 list![Field::Int(3), Field::Int(4)]
958 ]
959 ),
960 (
961 "int_map".to_string(),
962 map![
963 (Field::Str("k1".to_string()), Field::Int(1)),
964 (Field::Str("k2".to_string()), Field::Int(100))
965 ]
966 ),
967 (
968 "int_Map_Array".to_string(),
969 list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
970 ),
971 (
972 "nested_struct".to_string(),
973 group![
974 ("A".to_string(), Field::Int(1)),
975 ("b".to_string(), list![Field::Int(1)]),
976 (
977 "C".to_string(),
978 group![(
979 "d".to_string(),
980 list![
981 list![
982 group![
983 ("E".to_string(), Field::Int(10)),
984 ("F".to_string(), Field::Str("aaa".to_string()))
985 ],
986 group![
987 ("E".to_string(), Field::Int(-10)),
988 ("F".to_string(), Field::Str("bbb".to_string()))
989 ]
990 ],
991 list![group![
992 ("E".to_string(), Field::Int(11)),
993 ("F".to_string(), Field::Str("c".to_string()))
994 ]]
995 ]
996 )]
997 ),
998 (
999 "g".to_string(),
1000 map![(
1001 Field::Str("foo".to_string()),
1002 group![(
1003 "H".to_string(),
1004 group![("i".to_string(), list![Field::Double(1.1)])]
1005 )]
1006 )]
1007 )
1008 ]
1009 )
1010 ],
1011 row![
1012 ("id".to_string(), Field::Long(2)),
1013 (
1014 "int_array".to_string(),
1015 list![
1016 Field::Null,
1017 Field::Int(1),
1018 Field::Int(2),
1019 Field::Null,
1020 Field::Int(3),
1021 Field::Null
1022 ]
1023 ),
1024 (
1025 "int_array_Array".to_string(),
1026 list![
1027 list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1028 list![Field::Int(3), Field::Null, Field::Int(4)],
1029 list![],
1030 Field::Null
1031 ]
1032 ),
1033 (
1034 "int_map".to_string(),
1035 map![
1036 (Field::Str("k1".to_string()), Field::Int(2)),
1037 (Field::Str("k2".to_string()), Field::Null)
1038 ]
1039 ),
1040 (
1041 "int_Map_Array".to_string(),
1042 list![
1043 map![
1044 (Field::Str("k3".to_string()), Field::Null),
1045 (Field::Str("k1".to_string()), Field::Int(1))
1046 ],
1047 Field::Null,
1048 map![]
1049 ]
1050 ),
1051 (
1052 "nested_struct".to_string(),
1053 group![
1054 ("A".to_string(), Field::Null),
1055 ("b".to_string(), list![Field::Null]),
1056 (
1057 "C".to_string(),
1058 group![(
1059 "d".to_string(),
1060 list![
1061 list![
1062 group![
1063 ("E".to_string(), Field::Null),
1064 ("F".to_string(), Field::Null)
1065 ],
1066 group![
1067 ("E".to_string(), Field::Int(10)),
1068 ("F".to_string(), Field::Str("aaa".to_string()))
1069 ],
1070 group![
1071 ("E".to_string(), Field::Null),
1072 ("F".to_string(), Field::Null)
1073 ],
1074 group![
1075 ("E".to_string(), Field::Int(-10)),
1076 ("F".to_string(), Field::Str("bbb".to_string()))
1077 ],
1078 group![
1079 ("E".to_string(), Field::Null),
1080 ("F".to_string(), Field::Null)
1081 ]
1082 ],
1083 list![
1084 group![
1085 ("E".to_string(), Field::Int(11)),
1086 ("F".to_string(), Field::Str("c".to_string()))
1087 ],
1088 Field::Null
1089 ],
1090 list![],
1091 Field::Null
1092 ]
1093 )]
1094 ),
1095 (
1096 "g".to_string(),
1097 map![
1098 (
1099 Field::Str("g1".to_string()),
1100 group![(
1101 "H".to_string(),
1102 group![(
1103 "i".to_string(),
1104 list![Field::Double(2.2), Field::Null]
1105 )]
1106 )]
1107 ),
1108 (
1109 Field::Str("g2".to_string()),
1110 group![("H".to_string(), group![("i".to_string(), list![])])]
1111 ),
1112 (Field::Str("g3".to_string()), Field::Null),
1113 (
1114 Field::Str("g4".to_string()),
1115 group![(
1116 "H".to_string(),
1117 group![("i".to_string(), Field::Null)]
1118 )]
1119 ),
1120 (
1121 Field::Str("g5".to_string()),
1122 group![("H".to_string(), Field::Null)]
1123 )
1124 ]
1125 )
1126 ]
1127 )
1128 ],
1129 row![
1130 ("id".to_string(), Field::Long(3)),
1131 ("int_array".to_string(), list![]),
1132 ("int_array_Array".to_string(), list![Field::Null]),
1133 ("int_map".to_string(), map![]),
1134 ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1135 (
1136 "nested_struct".to_string(),
1137 group![
1138 ("A".to_string(), Field::Null),
1139 ("b".to_string(), Field::Null),
1140 ("C".to_string(), group![("d".to_string(), list![])]),
1141 ("g".to_string(), map![])
1142 ]
1143 )
1144 ],
1145 row![
1146 ("id".to_string(), Field::Long(4)),
1147 ("int_array".to_string(), Field::Null),
1148 ("int_array_Array".to_string(), list![]),
1149 ("int_map".to_string(), map![]),
1150 ("int_Map_Array".to_string(), list![]),
1151 (
1152 "nested_struct".to_string(),
1153 group![
1154 ("A".to_string(), Field::Null),
1155 ("b".to_string(), Field::Null),
1156 ("C".to_string(), group![("d".to_string(), Field::Null)]),
1157 ("g".to_string(), Field::Null)
1158 ]
1159 )
1160 ],
1161 row![
1162 ("id".to_string(), Field::Long(5)),
1163 ("int_array".to_string(), Field::Null),
1164 ("int_array_Array".to_string(), Field::Null),
1165 ("int_map".to_string(), map![]),
1166 ("int_Map_Array".to_string(), Field::Null),
1167 (
1168 "nested_struct".to_string(),
1169 group![
1170 ("A".to_string(), Field::Null),
1171 ("b".to_string(), Field::Null),
1172 ("C".to_string(), Field::Null),
1173 (
1174 "g".to_string(),
1175 map![(
1176 Field::Str("foo".to_string()),
1177 group![(
1178 "H".to_string(),
1179 group![(
1180 "i".to_string(),
1181 list![Field::Double(2.2), Field::Double(3.3)]
1182 )]
1183 )]
1184 )]
1185 )
1186 ]
1187 )
1188 ],
1189 row![
1190 ("id".to_string(), Field::Long(6)),
1191 ("int_array".to_string(), Field::Null),
1192 ("int_array_Array".to_string(), Field::Null),
1193 ("int_map".to_string(), Field::Null),
1194 ("int_Map_Array".to_string(), Field::Null),
1195 ("nested_struct".to_string(), Field::Null)
1196 ],
1197 row![
1198 ("id".to_string(), Field::Long(7)),
1199 ("int_array".to_string(), Field::Null),
1200 (
1201 "int_array_Array".to_string(),
1202 list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1203 ),
1204 (
1205 "int_map".to_string(),
1206 map![
1207 (Field::Str("k1".to_string()), Field::Null),
1208 (Field::Str("k3".to_string()), Field::Null)
1209 ]
1210 ),
1211 ("int_Map_Array".to_string(), Field::Null),
1212 (
1213 "nested_struct".to_string(),
1214 group![
1215 ("A".to_string(), Field::Int(7)),
1216 (
1217 "b".to_string(),
1218 list![Field::Int(2), Field::Int(3), Field::Null]
1219 ),
1220 (
1221 "C".to_string(),
1222 group![(
1223 "d".to_string(),
1224 list![list![], list![Field::Null], Field::Null]
1225 )]
1226 ),
1227 ("g".to_string(), Field::Null)
1228 ]
1229 )
1230 ],
1231 ];
1232 assert_eq!(rows, expected_rows);
1233 }
1234
1235 #[test]
1236 fn test_file_reader_rows_projection() {
1237 let schema = "
1238 message spark_schema {
1239 REQUIRED DOUBLE c;
1240 REQUIRED INT32 b;
1241 }
1242 ";
1243 let schema = parse_message_type(schema).unwrap();
1244 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1245 let expected_rows = vec![
1246 row![
1247 ("c".to_string(), Field::Double(1.0)),
1248 ("b".to_string(), Field::Int(1))
1249 ],
1250 row![
1251 ("c".to_string(), Field::Double(1.0)),
1252 ("b".to_string(), Field::Int(1))
1253 ],
1254 row![
1255 ("c".to_string(), Field::Double(1.0)),
1256 ("b".to_string(), Field::Int(1))
1257 ],
1258 row![
1259 ("c".to_string(), Field::Double(1.0)),
1260 ("b".to_string(), Field::Int(1))
1261 ],
1262 row![
1263 ("c".to_string(), Field::Double(1.0)),
1264 ("b".to_string(), Field::Int(1))
1265 ],
1266 row![
1267 ("c".to_string(), Field::Double(1.0)),
1268 ("b".to_string(), Field::Int(1))
1269 ],
1270 ];
1271 assert_eq!(rows, expected_rows);
1272 }
1273
1274 #[test]
1275 fn test_iter_columns_in_row() {
1276 let r = row![
1277 ("c".to_string(), Field::Double(1.0)),
1278 ("b".to_string(), Field::Int(1))
1279 ];
1280 let mut result = Vec::new();
1281 for (name, record) in r.get_column_iter() {
1282 result.push((name, record));
1283 }
1284 assert_eq!(
1285 vec![
1286 (&"c".to_string(), &Field::Double(1.0)),
1287 (&"b".to_string(), &Field::Int(1))
1288 ],
1289 result
1290 );
1291 }
1292
1293 #[test]
1294 fn test_into_columns_in_row() {
1295 let r = row![
1296 ("a".to_string(), Field::Str("My string".to_owned())),
1297 ("b".to_string(), Field::Int(1))
1298 ];
1299 assert_eq!(
1300 r.into_columns(),
1301 vec![
1302 ("a".to_string(), Field::Str("My string".to_owned())),
1303 ("b".to_string(), Field::Int(1)),
1304 ]
1305 );
1306 }
1307
1308 #[test]
1309 fn test_file_reader_rows_projection_map() {
1310 let schema = "
1311 message spark_schema {
1312 OPTIONAL group a (MAP) {
1313 REPEATED group key_value {
1314 REQUIRED BYTE_ARRAY key (UTF8);
1315 OPTIONAL group value (MAP) {
1316 REPEATED group key_value {
1317 REQUIRED INT32 key;
1318 REQUIRED BOOLEAN value;
1319 }
1320 }
1321 }
1322 }
1323 }
1324 ";
1325 let schema = parse_message_type(schema).unwrap();
1326 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1327 let expected_rows = vec![
1328 row![(
1329 "a".to_string(),
1330 map![(
1331 Field::Str("a".to_string()),
1332 map![
1333 (Field::Int(1), Field::Bool(true)),
1334 (Field::Int(2), Field::Bool(false))
1335 ]
1336 )]
1337 )],
1338 row![(
1339 "a".to_string(),
1340 map![(
1341 Field::Str("b".to_string()),
1342 map![(Field::Int(1), Field::Bool(true))]
1343 )]
1344 )],
1345 row![(
1346 "a".to_string(),
1347 map![(Field::Str("c".to_string()), Field::Null)]
1348 )],
1349 row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1350 row![(
1351 "a".to_string(),
1352 map![(
1353 Field::Str("e".to_string()),
1354 map![(Field::Int(1), Field::Bool(true))]
1355 )]
1356 )],
1357 row![(
1358 "a".to_string(),
1359 map![(
1360 Field::Str("f".to_string()),
1361 map![
1362 (Field::Int(3), Field::Bool(true)),
1363 (Field::Int(4), Field::Bool(false)),
1364 (Field::Int(5), Field::Bool(true))
1365 ]
1366 )]
1367 )],
1368 ];
1369 assert_eq!(rows, expected_rows);
1370 }
1371
1372 #[test]
1373 fn test_file_reader_rows_projection_list() {
1374 let schema = "
1375 message spark_schema {
1376 OPTIONAL group a (LIST) {
1377 REPEATED group list {
1378 OPTIONAL group element (LIST) {
1379 REPEATED group list {
1380 OPTIONAL group element (LIST) {
1381 REPEATED group list {
1382 OPTIONAL BYTE_ARRAY element (UTF8);
1383 }
1384 }
1385 }
1386 }
1387 }
1388 }
1389 }
1390 ";
1391 let schema = parse_message_type(schema).unwrap();
1392 let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1393 let expected_rows = vec![
1394 row![(
1395 "a".to_string(),
1396 list![
1397 list![
1398 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1399 list![Field::Str("c".to_string())]
1400 ],
1401 list![Field::Null, list![Field::Str("d".to_string())]]
1402 ]
1403 )],
1404 row![(
1405 "a".to_string(),
1406 list![
1407 list![
1408 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1409 list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1410 ],
1411 list![Field::Null, list![Field::Str("e".to_string())]]
1412 ]
1413 )],
1414 row![(
1415 "a".to_string(),
1416 list![
1417 list![
1418 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1419 list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1420 list![Field::Str("e".to_string())]
1421 ],
1422 list![Field::Null, list![Field::Str("f".to_string())]]
1423 ]
1424 )],
1425 ];
1426 assert_eq!(rows, expected_rows);
1427 }
1428
1429 #[test]
1430 fn test_file_reader_rows_invalid_projection() {
1431 let schema = "
1432 message spark_schema {
1433 REQUIRED INT32 key;
1434 REQUIRED BOOLEAN value;
1435 }
1436 ";
1437 let schema = parse_message_type(schema).unwrap();
1438 let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1439 assert_eq!(
1440 res.unwrap_err().to_string(),
1441 "Parquet error: Root schema does not contain projection"
1442 );
1443 }
1444
1445 #[test]
1446 fn test_row_group_rows_invalid_projection() {
1447 let schema = "
1448 message spark_schema {
1449 REQUIRED INT32 key;
1450 REQUIRED BOOLEAN value;
1451 }
1452 ";
1453 let schema = parse_message_type(schema).unwrap();
1454 let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1455 assert_eq!(
1456 res.unwrap_err().to_string(),
1457 "Parquet error: Root schema does not contain projection"
1458 );
1459 }
1460
1461 #[test]
1462 #[should_panic(expected = "Invalid map type")]
1463 fn test_file_reader_rows_invalid_map_type() {
1464 let schema = "
1465 message spark_schema {
1466 OPTIONAL group a (MAP) {
1467 REPEATED group key_value {
1468 REQUIRED BYTE_ARRAY key (UTF8);
1469 OPTIONAL group value (MAP) {
1470 REPEATED group key_value {
1471 REQUIRED INT32 key;
1472 }
1473 }
1474 }
1475 }
1476 }
1477 ";
1478 let schema = parse_message_type(schema).unwrap();
1479 test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1480 }
1481
1482 #[test]
1483 fn test_file_reader_iter() {
1484 let path = get_test_path("alltypes_plain.parquet");
1485 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1486 let iter = RowIter::from_file_into(Box::new(reader));
1487
1488 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1489 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1490 }
1491
1492 #[test]
1493 fn test_file_reader_iter_projection() {
1494 let path = get_test_path("alltypes_plain.parquet");
1495 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1496 let schema = "message schema { OPTIONAL INT32 id; }";
1497 let proj = parse_message_type(schema).ok();
1498
1499 let iter = RowIter::from_file_into(Box::new(reader))
1500 .project(proj)
1501 .unwrap();
1502 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1503
1504 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1505 }
1506
1507 #[test]
1508 fn test_file_reader_iter_projection_err() {
1509 let schema = "
1510 message spark_schema {
1511 REQUIRED INT32 key;
1512 REQUIRED BOOLEAN value;
1513 }
1514 ";
1515 let proj = parse_message_type(schema).ok();
1516 let path = get_test_path("nested_maps.snappy.parquet");
1517 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1518 let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1519
1520 assert_eq!(
1521 res.err().unwrap().to_string(),
1522 "Parquet error: Root schema does not contain projection"
1523 );
1524 }
1525
1526 #[test]
1527 fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1528 let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1531 let expected_rows = vec![
1532 row![
1533 ("id".to_string(), Field::Int(1)),
1534 ("phoneNumbers".to_string(), Field::Null)
1535 ],
1536 row![
1537 ("id".to_string(), Field::Int(2)),
1538 ("phoneNumbers".to_string(), Field::Null)
1539 ],
1540 row![
1541 ("id".to_string(), Field::Int(3)),
1542 (
1543 "phoneNumbers".to_string(),
1544 group![("phone".to_string(), list![])]
1545 )
1546 ],
1547 row![
1548 ("id".to_string(), Field::Int(4)),
1549 (
1550 "phoneNumbers".to_string(),
1551 group![(
1552 "phone".to_string(),
1553 list![group![
1554 ("number".to_string(), Field::Long(5555555555)),
1555 ("kind".to_string(), Field::Null)
1556 ]]
1557 )]
1558 )
1559 ],
1560 row![
1561 ("id".to_string(), Field::Int(5)),
1562 (
1563 "phoneNumbers".to_string(),
1564 group![(
1565 "phone".to_string(),
1566 list![group![
1567 ("number".to_string(), Field::Long(1111111111)),
1568 ("kind".to_string(), Field::Str("home".to_string()))
1569 ]]
1570 )]
1571 )
1572 ],
1573 row![
1574 ("id".to_string(), Field::Int(6)),
1575 (
1576 "phoneNumbers".to_string(),
1577 group![(
1578 "phone".to_string(),
1579 list![
1580 group![
1581 ("number".to_string(), Field::Long(1111111111)),
1582 ("kind".to_string(), Field::Str("home".to_string()))
1583 ],
1584 group![
1585 ("number".to_string(), Field::Long(2222222222)),
1586 ("kind".to_string(), Field::Null)
1587 ],
1588 group![
1589 ("number".to_string(), Field::Long(3333333333)),
1590 ("kind".to_string(), Field::Str("mobile".to_string()))
1591 ]
1592 ]
1593 )]
1594 )
1595 ],
1596 ];
1597
1598 assert_eq!(rows, expected_rows);
1599 }
1600
1601 #[test]
1602 fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1603 let schema = Arc::new(
1605 parse_message_type(
1606 "
1607 message schema {
1608 REPEATED group level1 {
1609 REPEATED group level2 {
1610 REQUIRED group level3 {
1611 REQUIRED INT64 value3;
1612 }
1613 }
1614 REQUIRED INT64 value1;
1615 }
1616 }",
1617 )
1618 .unwrap(),
1619 );
1620
1621 let mut buffer: Vec<u8> = Vec::new();
1623 let mut file_writer =
1624 SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1625 let mut row_group_writer = file_writer.next_row_group().unwrap();
1626
1627 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1629 column_writer
1630 .typed::<Int64Type>()
1631 .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1632 .unwrap();
1633 column_writer.close().unwrap();
1634
1635 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1637 column_writer
1638 .typed::<Int64Type>()
1639 .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1640 .unwrap();
1641 column_writer.close().unwrap();
1642
1643 row_group_writer.close().unwrap();
1645 file_writer.close().unwrap();
1646 assert_eq!(&buffer[0..4], b"PAR1");
1647
1648 let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1650 let rows: Vec<_> = file_reader
1651 .get_row_iter(None)
1652 .unwrap()
1653 .map(|row| row.unwrap())
1654 .collect();
1655
1656 let expected_rows = vec![
1657 row![(
1658 "level1".to_string(),
1659 list![group![
1660 (
1661 "level2".to_string(),
1662 list![group![(
1663 "level3".to_string(),
1664 group![("value3".to_string(), Field::Long(30))]
1665 )]]
1666 ),
1667 ("value1".to_string(), Field::Long(10))
1668 ]]
1669 )],
1670 row![(
1671 "level1".to_string(),
1672 list![group![
1673 (
1674 "level2".to_string(),
1675 list![group![(
1676 "level3".to_string(),
1677 group![("value3".to_string(), Field::Long(31))]
1678 )]]
1679 ),
1680 ("value1".to_string(), Field::Long(11))
1681 ]]
1682 )],
1683 row![(
1684 "level1".to_string(),
1685 list![group![
1686 (
1687 "level2".to_string(),
1688 list![group![(
1689 "level3".to_string(),
1690 group![("value3".to_string(), Field::Long(32))]
1691 )]]
1692 ),
1693 ("value1".to_string(), Field::Long(12))
1694 ]]
1695 )],
1696 ];
1697
1698 assert_eq!(rows, expected_rows);
1699 }
1700
1701 #[test]
1702 fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1703 let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1705 let expected_rows = vec![
1706 row![
1707 (
1708 "Int32_list".to_string(),
1709 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1710 ),
1711 (
1712 "String_list".to_string(),
1713 Field::ListInternal(make_list(
1714 ["foo", "zero", "one", "two"]
1715 .map(|s| Field::Str(s.to_string()))
1716 .to_vec()
1717 ))
1718 ),
1719 (
1720 "group_of_lists".to_string(),
1721 group![
1722 (
1723 "Int32_list_in_group".to_string(),
1724 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1725 ),
1726 (
1727 "String_list_in_group".to_string(),
1728 Field::ListInternal(make_list(
1729 ["foo", "zero", "one", "two"]
1730 .map(|s| Field::Str(s.to_string()))
1731 .to_vec()
1732 ))
1733 )
1734 ]
1735 )
1736 ],
1737 row![
1738 (
1739 "Int32_list".to_string(),
1740 Field::ListInternal(make_list(vec![]))
1741 ),
1742 (
1743 "String_list".to_string(),
1744 Field::ListInternal(make_list(
1745 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1746 ))
1747 ),
1748 (
1749 "group_of_lists".to_string(),
1750 group![
1751 (
1752 "Int32_list_in_group".to_string(),
1753 Field::ListInternal(make_list(vec![]))
1754 ),
1755 (
1756 "String_list_in_group".to_string(),
1757 Field::ListInternal(make_list(
1758 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1759 ))
1760 )
1761 ]
1762 )
1763 ],
1764 row![
1765 (
1766 "Int32_list".to_string(),
1767 Field::ListInternal(make_list(vec![Field::Int(4)]))
1768 ),
1769 (
1770 "String_list".to_string(),
1771 Field::ListInternal(make_list(
1772 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1773 ))
1774 ),
1775 (
1776 "group_of_lists".to_string(),
1777 group![
1778 (
1779 "Int32_list_in_group".to_string(),
1780 Field::ListInternal(make_list(vec![Field::Int(4)]))
1781 ),
1782 (
1783 "String_list_in_group".to_string(),
1784 Field::ListInternal(make_list(
1785 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1786 ))
1787 )
1788 ]
1789 )
1790 ],
1791 row![
1792 (
1793 "Int32_list".to_string(),
1794 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1795 ),
1796 (
1797 "String_list".to_string(),
1798 Field::ListInternal(make_list(
1799 ["five", "six", "seven", "eight"]
1800 .map(|s| Field::Str(s.to_string()))
1801 .to_vec()
1802 ))
1803 ),
1804 (
1805 "group_of_lists".to_string(),
1806 group![
1807 (
1808 "Int32_list_in_group".to_string(),
1809 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1810 ),
1811 (
1812 "String_list_in_group".to_string(),
1813 Field::ListInternal(make_list(
1814 ["five", "six", "seven", "eight"]
1815 .map(|s| Field::Str(s.to_string()))
1816 .to_vec()
1817 ))
1818 )
1819 ]
1820 )
1821 ],
1822 ];
1823 assert_eq!(rows, expected_rows);
1824 }
1825
1826 fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1827 let file = get_test_file(file_name);
1828 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1829 let iter = file_reader.get_row_iter(schema)?;
1830 Ok(iter.map(|row| row.unwrap()).collect())
1831 }
1832
1833 fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1834 let file = get_test_file(file_name);
1835 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1836 let row_group_reader = file_reader.get_row_group(0).unwrap();
1839 let iter = row_group_reader.get_row_iter(schema)?;
1840 Ok(iter.map(|row| row.unwrap()).collect())
1841 }
1842}