1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::primitive::convert_primitive;
22use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::ParquetError;
25use crate::errors::Result;
26use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
27use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
28
29fn get_repetition(t: &Type) -> Repetition {
30 let info = t.get_basic_info();
31 match info.has_repetition() {
32 true => info.repetition(),
33 false => Repetition::REQUIRED,
34 }
35}
36
37#[derive(Debug, Clone)]
39pub struct ParquetField {
40 pub rep_level: i16,
43 pub def_level: i16,
47 pub nullable: bool,
49 pub arrow_type: DataType,
54 pub field_type: ParquetFieldType,
56}
57
58impl ParquetField {
59 fn into_list(self, name: &str) -> Self {
63 ParquetField {
64 rep_level: self.rep_level,
65 def_level: self.def_level,
66 nullable: false,
67 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
68 field_type: ParquetFieldType::Group {
69 children: vec![self],
70 },
71 }
72 }
73
74 pub fn children(&self) -> Option<&[Self]> {
76 match &self.field_type {
77 ParquetFieldType::Primitive { .. } => None,
78 ParquetFieldType::Group { children } => Some(children),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
84pub enum ParquetFieldType {
85 Primitive {
86 col_idx: usize,
88 primitive_type: TypePtr,
90 },
91 Group {
92 children: Vec<ParquetField>,
93 },
94}
95
96struct VisitorContext {
98 rep_level: i16,
99 def_level: i16,
100 data_type: Option<DataType>,
102}
103
104impl VisitorContext {
105 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
108 match repetition {
109 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
110 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
111 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
112 }
113 }
114}
115
116struct Visitor {
122 next_col_idx: usize,
124
125 mask: ProjectionMask,
127}
128
129impl Visitor {
130 fn visit_primitive(
131 &mut self,
132 primitive_type: &TypePtr,
133 context: VisitorContext,
134 ) -> Result<Option<ParquetField>> {
135 let col_idx = self.next_col_idx;
136 self.next_col_idx += 1;
137
138 if !self.mask.leaf_included(col_idx) {
139 return Ok(None);
140 }
141
142 let repetition = get_repetition(primitive_type);
143 let (def_level, rep_level, nullable) = context.levels(repetition);
144
145 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
146
147 let primitive_field = ParquetField {
148 rep_level,
149 def_level,
150 nullable,
151 arrow_type,
152 field_type: ParquetFieldType::Primitive {
153 primitive_type: primitive_type.clone(),
154 col_idx,
155 },
156 };
157
158 Ok(Some(match repetition {
159 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
160 _ => primitive_field,
161 }))
162 }
163
164 fn visit_struct(
165 &mut self,
166 struct_type: &TypePtr,
167 context: VisitorContext,
168 ) -> Result<Option<ParquetField>> {
169 let repetition = get_repetition(struct_type);
171 let (def_level, rep_level, nullable) = context.levels(repetition);
172
173 let parquet_fields = struct_type.get_fields();
174
175 let arrow_fields = match &context.data_type {
177 Some(DataType::Struct(fields)) => {
178 if fields.len() != parquet_fields.len() {
179 return Err(arrow_err!(
180 "incompatible arrow schema, expected {} struct fields got {}",
181 parquet_fields.len(),
182 fields.len()
183 ));
184 }
185 Some(fields)
186 }
187 Some(d) => {
188 return Err(arrow_err!(
189 "incompatible arrow schema, expected struct got {}",
190 d
191 ))
192 }
193 None => None,
194 };
195
196 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
197 let mut children = Vec::with_capacity(parquet_fields.len());
198
199 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
201 let data_type = match arrow_fields {
202 Some(fields) => {
203 let field = &fields[idx];
204 if field.name() != parquet_field.name() {
205 return Err(arrow_err!(
206 "incompatible arrow schema, expected field named {} got {}",
207 parquet_field.name(),
208 field.name()
209 ));
210 }
211 Some(field.data_type().clone())
212 }
213 None => None,
214 };
215
216 let arrow_field = arrow_fields.map(|x| &*x[idx]);
217 let child_ctx = VisitorContext {
218 rep_level,
219 def_level,
220 data_type,
221 };
222
223 if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
224 child_fields.push(convert_field(parquet_field, &child, arrow_field));
227 children.push(child);
228 }
229 }
230
231 if children.is_empty() {
232 return Ok(None);
233 }
234
235 let struct_field = ParquetField {
236 rep_level,
237 def_level,
238 nullable,
239 arrow_type: DataType::Struct(child_fields.finish().fields),
240 field_type: ParquetFieldType::Group { children },
241 };
242
243 Ok(Some(match repetition {
244 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
245 _ => struct_field,
246 }))
247 }
248
249 fn visit_map(
250 &mut self,
251 map_type: &TypePtr,
252 context: VisitorContext,
253 ) -> Result<Option<ParquetField>> {
254 let rep_level = context.rep_level + 1;
255 let (def_level, nullable) = match get_repetition(map_type) {
256 Repetition::REQUIRED => (context.def_level + 1, false),
257 Repetition::OPTIONAL => (context.def_level + 2, true),
258 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
259 };
260
261 if map_type.get_fields().len() != 1 {
262 return Err(arrow_err!(
263 "Map field must have exactly one key_value child, found {}",
264 map_type.get_fields().len()
265 ));
266 }
267
268 let map_key_value = &map_type.get_fields()[0];
270 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
271 return Err(arrow_err!("Child of map field must be repeated"));
272 }
273
274 if map_key_value.get_fields().len() != 2 {
275 return Err(arrow_err!(
277 "Child of map field must have two children, found {}",
278 map_key_value.get_fields().len()
279 ));
280 }
281
282 let map_key = &map_key_value.get_fields()[0];
284 let map_value = &map_key_value.get_fields()[1];
285
286 match map_key.get_basic_info().repetition() {
287 Repetition::REPEATED => {
288 return Err(arrow_err!("Map keys cannot be repeated"));
289 }
290 Repetition::REQUIRED | Repetition::OPTIONAL => {
291 }
296 }
297
298 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
299 return Err(arrow_err!("Map values cannot be repeated"));
300 }
301
302 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
304 Some(DataType::Map(field, sorted)) => match field.data_type() {
305 DataType::Struct(fields) => {
306 if fields.len() != 2 {
307 return Err(arrow_err!(
308 "Map data type should contain struct with two children, got {}",
309 fields.len()
310 ));
311 }
312
313 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
314 }
315 d => {
316 return Err(arrow_err!("Map data type should contain struct got {}", d));
317 }
318 },
319 Some(d) => {
320 return Err(arrow_err!(
321 "incompatible arrow schema, expected map got {}",
322 d
323 ))
324 }
325 None => (None, None, None, false),
326 };
327
328 let maybe_key = {
329 let context = VisitorContext {
330 rep_level,
331 def_level,
332 data_type: arrow_key.map(|x| x.data_type().clone()),
333 };
334
335 self.dispatch(map_key, context)?
336 };
337
338 let maybe_value = {
339 let context = VisitorContext {
340 rep_level,
341 def_level,
342 data_type: arrow_value.map(|x| x.data_type().clone()),
343 };
344
345 self.dispatch(map_value, context)?
346 };
347
348 match (maybe_key, maybe_value) {
350 (Some(key), Some(value)) => {
351 let key_field = Arc::new(
352 convert_field(map_key, &key, arrow_key)
353 .with_nullable(false),
355 );
356 let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
357 let field_metadata = match arrow_map {
358 Some(field) => field.metadata().clone(),
359 _ => HashMap::default(),
360 };
361
362 let map_field = Field::new_struct(
363 map_key_value.name(),
364 [key_field, value_field],
365 false, )
367 .with_metadata(field_metadata);
368
369 Ok(Some(ParquetField {
370 rep_level,
371 def_level,
372 nullable,
373 arrow_type: DataType::Map(Arc::new(map_field), sorted),
374 field_type: ParquetFieldType::Group {
375 children: vec![key, value],
376 },
377 }))
378 }
379 _ => Ok(None),
380 }
381 }
382
383 fn visit_list(
384 &mut self,
385 list_type: &TypePtr,
386 context: VisitorContext,
387 ) -> Result<Option<ParquetField>> {
388 if list_type.is_primitive() {
389 return Err(arrow_err!(
390 "{:?} is a list type and can't be processed as primitive.",
391 list_type
392 ));
393 }
394
395 let fields = list_type.get_fields();
396 if fields.len() != 1 {
397 return Err(arrow_err!(
398 "list type must have a single child, found {}",
399 fields.len()
400 ));
401 }
402
403 let repeated_field = &fields[0];
404 if get_repetition(repeated_field) != Repetition::REPEATED {
405 return Err(arrow_err!("List child must be repeated"));
406 }
407
408 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
410 Repetition::REQUIRED => (context.def_level, false),
411 Repetition::OPTIONAL => (context.def_level + 1, true),
412 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
413 };
414
415 let arrow_field = match &context.data_type {
416 Some(DataType::List(f)) => Some(f.as_ref()),
417 Some(DataType::LargeList(f)) => Some(f.as_ref()),
418 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
419 Some(d) => {
420 return Err(arrow_err!(
421 "incompatible arrow schema, expected list got {}",
422 d
423 ))
424 }
425 None => None,
426 };
427
428 if repeated_field.is_primitive() {
429 let context = VisitorContext {
436 rep_level: context.rep_level,
437 def_level,
438 data_type: arrow_field.map(|f| f.data_type().clone()),
439 };
440
441 return match self.visit_primitive(repeated_field, context) {
442 Ok(Some(mut field)) => {
443 field.nullable = nullable;
445 Ok(Some(field))
446 }
447 r => r,
448 };
449 }
450
451 let items = repeated_field.get_fields();
452 if items.len() != 1
453 || repeated_field.name() == "array"
454 || repeated_field.name() == format!("{}_tuple", list_type.name())
455 {
456 let context = VisitorContext {
461 rep_level: context.rep_level,
462 def_level,
463 data_type: arrow_field.map(|f| f.data_type().clone()),
464 };
465
466 return match self.visit_struct(repeated_field, context) {
467 Ok(Some(mut field)) => {
468 field.nullable = nullable;
469 Ok(Some(field))
470 }
471 r => r,
472 };
473 }
474
475 let item_type = &items[0];
477 let rep_level = context.rep_level + 1;
478 let def_level = def_level + 1;
479
480 let new_context = VisitorContext {
481 def_level,
482 rep_level,
483 data_type: arrow_field.map(|f| f.data_type().clone()),
484 };
485
486 match self.dispatch(item_type, new_context) {
487 Ok(Some(item)) => {
488 let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
489
490 let arrow_type = match context.data_type {
492 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
493 Some(DataType::FixedSizeList(_, len)) => {
494 DataType::FixedSizeList(item_field, len)
495 }
496 _ => DataType::List(item_field),
497 };
498
499 Ok(Some(ParquetField {
500 rep_level,
501 def_level,
502 nullable,
503 arrow_type,
504 field_type: ParquetFieldType::Group {
505 children: vec![item],
506 },
507 }))
508 }
509 r => r,
510 }
511 }
512
513 fn dispatch(
514 &mut self,
515 cur_type: &TypePtr,
516 context: VisitorContext,
517 ) -> Result<Option<ParquetField>> {
518 if cur_type.is_primitive() {
519 self.visit_primitive(cur_type, context)
520 } else {
521 match cur_type.get_basic_info().converted_type() {
522 ConvertedType::LIST => self.visit_list(cur_type, context),
523 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
524 self.visit_map(cur_type, context)
525 }
526 _ => self.visit_struct(cur_type, context),
527 }
528 }
529 }
530}
531
532fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
537 let name = parquet_type.name();
538 let data_type = field.arrow_type.clone();
539 let nullable = field.nullable;
540
541 match arrow_hint {
542 Some(hint) => {
543 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
545 (DataType::Dictionary(_, _), Some(id), Some(ordered)) => {
546 Field::new_dict(name, data_type, nullable, id, ordered)
547 }
548 _ => Field::new(name, data_type, nullable),
549 };
550
551 field.with_metadata(hint.metadata().clone())
552 }
553 None => {
554 let mut ret = Field::new(name, data_type, nullable);
555 let basic_info = parquet_type.get_basic_info();
556 if basic_info.has_id() {
557 let mut meta = HashMap::with_capacity(1);
558 meta.insert(
559 PARQUET_FIELD_ID_META_KEY.to_string(),
560 basic_info.id().to_string(),
561 );
562 ret.set_metadata(meta);
563 }
564 ret
565 }
566 }
567}
568
569pub fn convert_schema(
575 schema: &SchemaDescriptor,
576 mask: ProjectionMask,
577 embedded_arrow_schema: Option<&Fields>,
578) -> Result<Option<ParquetField>> {
579 let mut visitor = Visitor {
580 next_col_idx: 0,
581 mask,
582 };
583
584 let context = VisitorContext {
585 rep_level: 0,
586 def_level: 0,
587 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
588 };
589
590 visitor.dispatch(&schema.root_schema_ptr(), context)
591}
592
593pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
595 let mut visitor = Visitor {
596 next_col_idx: 0,
597 mask: ProjectionMask::all(),
598 };
599
600 let context = VisitorContext {
601 rep_level: 0,
602 def_level: 0,
603 data_type: None,
604 };
605
606 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
607}