parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::primitive::convert_primitive;
22use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::ParquetError;
25use crate::errors::Result;
26use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
27use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
28
29fn get_repetition(t: &Type) -> Repetition {
30    let info = t.get_basic_info();
31    match info.has_repetition() {
32        true => info.repetition(),
33        false => Repetition::REQUIRED,
34    }
35}
36
37/// Representation of a parquet schema element, in terms of arrow schema elements
38#[derive(Debug, Clone)]
39pub struct ParquetField {
40    /// The level which represents an insertion into the current list
41    /// i.e. guaranteed to be > 0 for an element of list type
42    pub rep_level: i16,
43    /// The level at which this field is fully defined,
44    /// i.e. guaranteed to be > 0 for a nullable type or child of a
45    /// nullable type
46    pub def_level: i16,
47    /// Whether this field is nullable
48    pub nullable: bool,
49    /// The arrow type of the column data
50    ///
51    /// Note: In certain cases the data stored in parquet may have been coerced
52    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
53    pub arrow_type: DataType,
54    /// The type of this field
55    pub field_type: ParquetFieldType,
56}
57
58impl ParquetField {
59    /// Converts `self` into an arrow list, with its current type as the field type
60    ///
61    /// This is used to convert repeated columns, into their arrow representation
62    fn into_list(self, name: &str) -> Self {
63        ParquetField {
64            rep_level: self.rep_level,
65            def_level: self.def_level,
66            nullable: false,
67            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
68            field_type: ParquetFieldType::Group {
69                children: vec![self],
70            },
71        }
72    }
73
74    /// Returns a list of [`ParquetField`] children if this is a group type
75    pub fn children(&self) -> Option<&[Self]> {
76        match &self.field_type {
77            ParquetFieldType::Primitive { .. } => None,
78            ParquetFieldType::Group { children } => Some(children),
79        }
80    }
81}
82
83#[derive(Debug, Clone)]
84pub enum ParquetFieldType {
85    Primitive {
86        /// The index of the column in parquet
87        col_idx: usize,
88        /// The type of the column in parquet
89        primitive_type: TypePtr,
90    },
91    Group {
92        children: Vec<ParquetField>,
93    },
94}
95
96/// Encodes the context of the parent of the field currently under consideration
97struct VisitorContext {
98    rep_level: i16,
99    def_level: i16,
100    /// An optional [`DataType`] sourced from the embedded arrow schema
101    data_type: Option<DataType>,
102}
103
104impl VisitorContext {
105    /// Compute the resulting definition level, repetition level and nullability
106    /// for a child field with the given [`Repetition`]
107    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
108        match repetition {
109            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
110            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
111            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
112        }
113    }
114}
115
116/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
117///
118/// See [Logical Types] for more information on the conversion algorithm
119///
120/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
121struct Visitor {
122    /// The column index of the next leaf column
123    next_col_idx: usize,
124
125    /// Mask of columns to include
126    mask: ProjectionMask,
127}
128
129impl Visitor {
130    fn visit_primitive(
131        &mut self,
132        primitive_type: &TypePtr,
133        context: VisitorContext,
134    ) -> Result<Option<ParquetField>> {
135        let col_idx = self.next_col_idx;
136        self.next_col_idx += 1;
137
138        if !self.mask.leaf_included(col_idx) {
139            return Ok(None);
140        }
141
142        let repetition = get_repetition(primitive_type);
143        let (def_level, rep_level, nullable) = context.levels(repetition);
144
145        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
146
147        let primitive_field = ParquetField {
148            rep_level,
149            def_level,
150            nullable,
151            arrow_type,
152            field_type: ParquetFieldType::Primitive {
153                primitive_type: primitive_type.clone(),
154                col_idx,
155            },
156        };
157
158        Ok(Some(match repetition {
159            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
160            _ => primitive_field,
161        }))
162    }
163
164    fn visit_struct(
165        &mut self,
166        struct_type: &TypePtr,
167        context: VisitorContext,
168    ) -> Result<Option<ParquetField>> {
169        // The root type will not have a repetition level
170        let repetition = get_repetition(struct_type);
171        let (def_level, rep_level, nullable) = context.levels(repetition);
172
173        let parquet_fields = struct_type.get_fields();
174
175        // Extract the arrow fields
176        let arrow_fields = match &context.data_type {
177            Some(DataType::Struct(fields)) => {
178                if fields.len() != parquet_fields.len() {
179                    return Err(arrow_err!(
180                        "incompatible arrow schema, expected {} struct fields got {}",
181                        parquet_fields.len(),
182                        fields.len()
183                    ));
184                }
185                Some(fields)
186            }
187            Some(d) => {
188                return Err(arrow_err!(
189                    "incompatible arrow schema, expected struct got {}",
190                    d
191                ))
192            }
193            None => None,
194        };
195
196        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
197        let mut children = Vec::with_capacity(parquet_fields.len());
198
199        // Perform a DFS of children
200        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
201            let data_type = match arrow_fields {
202                Some(fields) => {
203                    let field = &fields[idx];
204                    if field.name() != parquet_field.name() {
205                        return Err(arrow_err!(
206                            "incompatible arrow schema, expected field named {} got {}",
207                            parquet_field.name(),
208                            field.name()
209                        ));
210                    }
211                    Some(field.data_type().clone())
212                }
213                None => None,
214            };
215
216            let arrow_field = arrow_fields.map(|x| &*x[idx]);
217            let child_ctx = VisitorContext {
218                rep_level,
219                def_level,
220                data_type,
221            };
222
223            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
224                // The child type returned may be different from what is encoded in the arrow
225                // schema in the event of a mismatch or a projection
226                child_fields.push(convert_field(parquet_field, &child, arrow_field));
227                children.push(child);
228            }
229        }
230
231        if children.is_empty() {
232            return Ok(None);
233        }
234
235        let struct_field = ParquetField {
236            rep_level,
237            def_level,
238            nullable,
239            arrow_type: DataType::Struct(child_fields.finish().fields),
240            field_type: ParquetFieldType::Group { children },
241        };
242
243        Ok(Some(match repetition {
244            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
245            _ => struct_field,
246        }))
247    }
248
249    fn visit_map(
250        &mut self,
251        map_type: &TypePtr,
252        context: VisitorContext,
253    ) -> Result<Option<ParquetField>> {
254        let rep_level = context.rep_level + 1;
255        let (def_level, nullable) = match get_repetition(map_type) {
256            Repetition::REQUIRED => (context.def_level + 1, false),
257            Repetition::OPTIONAL => (context.def_level + 2, true),
258            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
259        };
260
261        if map_type.get_fields().len() != 1 {
262            return Err(arrow_err!(
263                "Map field must have exactly one key_value child, found {}",
264                map_type.get_fields().len()
265            ));
266        }
267
268        // Add map entry (key_value) to context
269        let map_key_value = &map_type.get_fields()[0];
270        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
271            return Err(arrow_err!("Child of map field must be repeated"));
272        }
273
274        if map_key_value.get_fields().len() != 2 {
275            // According to the specification the values are optional (#1642)
276            return Err(arrow_err!(
277                "Child of map field must have two children, found {}",
278                map_key_value.get_fields().len()
279            ));
280        }
281
282        // Get key and value, and create context for each
283        let map_key = &map_key_value.get_fields()[0];
284        let map_value = &map_key_value.get_fields()[1];
285
286        match map_key.get_basic_info().repetition() {
287            Repetition::REPEATED => {
288                return Err(arrow_err!("Map keys cannot be repeated"));
289            }
290            Repetition::REQUIRED | Repetition::OPTIONAL => {
291                // Relaxed check for having repetition REQUIRED as there exists
292                // parquet writers and files that do not conform to this standard.
293                // This allows us to consume a broader range of existing files even
294                // if they are out of spec.
295            }
296        }
297
298        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
299            return Err(arrow_err!("Map values cannot be repeated"));
300        }
301
302        // Extract the arrow fields
303        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
304            Some(DataType::Map(field, sorted)) => match field.data_type() {
305                DataType::Struct(fields) => {
306                    if fields.len() != 2 {
307                        return Err(arrow_err!(
308                            "Map data type should contain struct with two children, got {}",
309                            fields.len()
310                        ));
311                    }
312
313                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
314                }
315                d => {
316                    return Err(arrow_err!("Map data type should contain struct got {}", d));
317                }
318            },
319            Some(d) => {
320                return Err(arrow_err!(
321                    "incompatible arrow schema, expected map got {}",
322                    d
323                ))
324            }
325            None => (None, None, None, false),
326        };
327
328        let maybe_key = {
329            let context = VisitorContext {
330                rep_level,
331                def_level,
332                data_type: arrow_key.map(|x| x.data_type().clone()),
333            };
334
335            self.dispatch(map_key, context)?
336        };
337
338        let maybe_value = {
339            let context = VisitorContext {
340                rep_level,
341                def_level,
342                data_type: arrow_value.map(|x| x.data_type().clone()),
343            };
344
345            self.dispatch(map_value, context)?
346        };
347
348        // Need both columns to be projected
349        match (maybe_key, maybe_value) {
350            (Some(key), Some(value)) => {
351                let key_field = Arc::new(
352                    convert_field(map_key, &key, arrow_key)
353                        // The key is always non-nullable (#5630)
354                        .with_nullable(false),
355                );
356                let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
357                let field_metadata = match arrow_map {
358                    Some(field) => field.metadata().clone(),
359                    _ => HashMap::default(),
360                };
361
362                let map_field = Field::new_struct(
363                    map_key_value.name(),
364                    [key_field, value_field],
365                    false, // The inner map field is always non-nullable (#1697)
366                )
367                .with_metadata(field_metadata);
368
369                Ok(Some(ParquetField {
370                    rep_level,
371                    def_level,
372                    nullable,
373                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
374                    field_type: ParquetFieldType::Group {
375                        children: vec![key, value],
376                    },
377                }))
378            }
379            _ => Ok(None),
380        }
381    }
382
383    fn visit_list(
384        &mut self,
385        list_type: &TypePtr,
386        context: VisitorContext,
387    ) -> Result<Option<ParquetField>> {
388        if list_type.is_primitive() {
389            return Err(arrow_err!(
390                "{:?} is a list type and can't be processed as primitive.",
391                list_type
392            ));
393        }
394
395        let fields = list_type.get_fields();
396        if fields.len() != 1 {
397            return Err(arrow_err!(
398                "list type must have a single child, found {}",
399                fields.len()
400            ));
401        }
402
403        let repeated_field = &fields[0];
404        if get_repetition(repeated_field) != Repetition::REPEATED {
405            return Err(arrow_err!("List child must be repeated"));
406        }
407
408        // If the list is nullable
409        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
410            Repetition::REQUIRED => (context.def_level, false),
411            Repetition::OPTIONAL => (context.def_level + 1, true),
412            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
413        };
414
415        let arrow_field = match &context.data_type {
416            Some(DataType::List(f)) => Some(f.as_ref()),
417            Some(DataType::LargeList(f)) => Some(f.as_ref()),
418            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
419            Some(d) => {
420                return Err(arrow_err!(
421                    "incompatible arrow schema, expected list got {}",
422                    d
423                ))
424            }
425            None => None,
426        };
427
428        if repeated_field.is_primitive() {
429            // If the repeated field is not a group, then its type is the element type and elements are required.
430            //
431            // required/optional group my_list (LIST) {
432            //   repeated int32 element;
433            // }
434            //
435            let context = VisitorContext {
436                rep_level: context.rep_level,
437                def_level,
438                data_type: arrow_field.map(|f| f.data_type().clone()),
439            };
440
441            return match self.visit_primitive(repeated_field, context) {
442                Ok(Some(mut field)) => {
443                    // visit_primitive will infer a non-nullable list, update if necessary
444                    field.nullable = nullable;
445                    Ok(Some(field))
446                }
447                r => r,
448            };
449        }
450
451        let items = repeated_field.get_fields();
452        if items.len() != 1
453            || repeated_field.name() == "array"
454            || repeated_field.name() == format!("{}_tuple", list_type.name())
455        {
456            // If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
457            //
458            // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name
459            // with _tuple appended then the repeated type is the element type and elements are required.
460            let context = VisitorContext {
461                rep_level: context.rep_level,
462                def_level,
463                data_type: arrow_field.map(|f| f.data_type().clone()),
464            };
465
466            return match self.visit_struct(repeated_field, context) {
467                Ok(Some(mut field)) => {
468                    field.nullable = nullable;
469                    Ok(Some(field))
470                }
471                r => r,
472            };
473        }
474
475        // Regular list handling logic
476        let item_type = &items[0];
477        let rep_level = context.rep_level + 1;
478        let def_level = def_level + 1;
479
480        let new_context = VisitorContext {
481            def_level,
482            rep_level,
483            data_type: arrow_field.map(|f| f.data_type().clone()),
484        };
485
486        match self.dispatch(item_type, new_context) {
487            Ok(Some(item)) => {
488                let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
489
490                // Use arrow type as hint for index size
491                let arrow_type = match context.data_type {
492                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
493                    Some(DataType::FixedSizeList(_, len)) => {
494                        DataType::FixedSizeList(item_field, len)
495                    }
496                    _ => DataType::List(item_field),
497                };
498
499                Ok(Some(ParquetField {
500                    rep_level,
501                    def_level,
502                    nullable,
503                    arrow_type,
504                    field_type: ParquetFieldType::Group {
505                        children: vec![item],
506                    },
507                }))
508            }
509            r => r,
510        }
511    }
512
513    fn dispatch(
514        &mut self,
515        cur_type: &TypePtr,
516        context: VisitorContext,
517    ) -> Result<Option<ParquetField>> {
518        if cur_type.is_primitive() {
519            self.visit_primitive(cur_type, context)
520        } else {
521            match cur_type.get_basic_info().converted_type() {
522                ConvertedType::LIST => self.visit_list(cur_type, context),
523                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
524                    self.visit_map(cur_type, context)
525                }
526                _ => self.visit_struct(cur_type, context),
527            }
528        }
529    }
530}
531
532/// Computes the [`Field`] for a child column
533///
534/// The resulting [`Field`] will have the type dictated by `field`, a name
535/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
536fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
537    let name = parquet_type.name();
538    let data_type = field.arrow_type.clone();
539    let nullable = field.nullable;
540
541    match arrow_hint {
542        Some(hint) => {
543            // If the inferred type is a dictionary, preserve dictionary metadata
544            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
545                (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
546                    Field::new_dict(name, data_type, nullable, id, ordered)
547                }
548                _ => Field::new(name, data_type, nullable),
549            };
550
551            field.with_metadata(hint.metadata().clone())
552        }
553        None => {
554            let mut ret = Field::new(name, data_type, nullable);
555            let basic_info = parquet_type.get_basic_info();
556            if basic_info.has_id() {
557                let mut meta = HashMap::with_capacity(1);
558                meta.insert(
559                    PARQUET_FIELD_ID_META_KEY.to_string(),
560                    basic_info.id().to_string(),
561                );
562                ret.set_metadata(meta);
563            }
564            ret
565        }
566    }
567}
568
569/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
570/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
571/// [`Fields`] embedded in the parquet metadata
572///
573/// Note: This does not support out of order column projection
574pub fn convert_schema(
575    schema: &SchemaDescriptor,
576    mask: ProjectionMask,
577    embedded_arrow_schema: Option<&Fields>,
578) -> Result<Option<ParquetField>> {
579    let mut visitor = Visitor {
580        next_col_idx: 0,
581        mask,
582    };
583
584    let context = VisitorContext {
585        rep_level: 0,
586        def_level: 0,
587        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
588    };
589
590    visitor.dispatch(&schema.root_schema_ptr(), context)
591}
592
593/// Computes the [`ParquetField`] for the provided `parquet_type`
594pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
595    let mut visitor = Visitor {
596        next_col_idx: 0,
597        mask: ProjectionMask::all(),
598    };
599
600    let context = VisitorContext {
601        rep_level: 0,
602        def_level: 0,
603        data_type: None,
604    };
605
606    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
607}