1use base64::prelude::BASE64_STANDARD;
27use base64::Engine;
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use arrow_ipc::writer;
32use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
33
34use crate::basic::{
35 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
36};
37use crate::errors::{ParquetError, Result};
38use crate::file::{metadata::KeyValue, properties::WriterProperties};
39use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
40
41mod complex;
42mod primitive;
43
44use crate::arrow::ProjectionMask;
45pub(crate) use complex::{ParquetField, ParquetFieldType};
46
47use super::PARQUET_FIELD_ID_META_KEY;
48
49pub fn parquet_to_arrow_schema(
54 parquet_schema: &SchemaDescriptor,
55 key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60pub fn parquet_to_arrow_schema_by_columns(
63 parquet_schema: &SchemaDescriptor,
64 mask: ProjectionMask,
65 key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
68}
69
70pub(crate) fn parquet_to_arrow_schema_and_fields(
72 parquet_schema: &SchemaDescriptor,
73 mask: ProjectionMask,
74 key_value_metadata: Option<&Vec<KeyValue>>,
75) -> Result<(Schema, Option<ParquetField>)> {
76 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
77 let maybe_schema = metadata
78 .remove(super::ARROW_SCHEMA_META_KEY)
79 .map(|value| get_arrow_schema_from_metadata(&value))
80 .transpose()?;
81
82 if let Some(arrow_schema) = &maybe_schema {
84 arrow_schema.metadata().iter().for_each(|(k, v)| {
85 metadata.entry(k.clone()).or_insert_with(|| v.clone());
86 });
87 }
88
89 let hint = maybe_schema.as_ref().map(|s| s.fields());
90 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
91 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
92 Ok((schema, field_levels.levels))
93}
94
95#[derive(Debug, Clone)]
103pub struct FieldLevels {
104 pub(crate) fields: Fields,
105 pub(crate) levels: Option<ParquetField>,
106}
107
108pub fn parquet_to_arrow_field_levels(
123 schema: &SchemaDescriptor,
124 mask: ProjectionMask,
125 hint: Option<&Fields>,
126) -> Result<FieldLevels> {
127 match complex::convert_schema(schema, mask, hint)? {
128 Some(field) => match &field.arrow_type {
129 DataType::Struct(fields) => Ok(FieldLevels {
130 fields: fields.clone(),
131 levels: Some(field),
132 }),
133 _ => unreachable!(),
134 },
135 None => Ok(FieldLevels {
136 fields: Fields::empty(),
137 levels: None,
138 }),
139 }
140}
141
142fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
144 let decoded = BASE64_STANDARD.decode(encoded_meta);
145 match decoded {
146 Ok(bytes) => {
147 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
148 &bytes[8..]
149 } else {
150 bytes.as_slice()
151 };
152 match arrow_ipc::root_as_message(slice) {
153 Ok(message) => message
154 .header_as_schema()
155 .map(arrow_ipc::convert::fb_to_schema)
156 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
157 Err(err) => {
158 Err(arrow_err!(
160 "Unable to get root as message stored in {}: {:?}",
161 super::ARROW_SCHEMA_META_KEY,
162 err
163 ))
164 }
165 }
166 }
167 Err(err) => {
168 Err(arrow_err!(
170 "Unable to decode the encoded schema stored in {}, {:?}",
171 super::ARROW_SCHEMA_META_KEY,
172 err
173 ))
174 }
175 }
176}
177
178fn encode_arrow_schema(schema: &Schema) -> String {
180 let options = writer::IpcWriteOptions::default();
181 let mut dictionary_tracker =
182 writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
183 let data_gen = writer::IpcDataGenerator::default();
184 let mut serialized_schema =
185 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
186
187 let schema_len = serialized_schema.ipc_message.len();
190 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
191 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
192 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
193 len_prefix_schema.append(&mut serialized_schema.ipc_message);
194
195 BASE64_STANDARD.encode(&len_prefix_schema)
196}
197
198pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
201 let encoded = encode_arrow_schema(schema);
202
203 let schema_kv = KeyValue {
204 key: super::ARROW_SCHEMA_META_KEY.to_string(),
205 value: Some(encoded),
206 };
207
208 let meta = props
209 .key_value_metadata
210 .get_or_insert_with(Default::default);
211
212 let schema_meta = meta
214 .iter()
215 .enumerate()
216 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
217 match schema_meta {
218 Some((i, _)) => {
219 meta.remove(i);
220 meta.push(schema_kv);
221 }
222 None => {
223 meta.push(schema_kv);
224 }
225 }
226}
227
228pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
233 arrow_to_parquet_schema_with_root(schema, "arrow_schema")
234}
235
236pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> Result<SchemaDescriptor> {
238 let fields = schema
239 .fields()
240 .iter()
241 .map(|field| arrow_to_parquet_type(field).map(Arc::new))
242 .collect::<Result<_>>()?;
243 let group = Type::group_type_builder(root).with_fields(fields).build()?;
244 Ok(SchemaDescriptor::new(Arc::new(group)))
245}
246
247fn parse_key_value_metadata(
248 key_value_metadata: Option<&Vec<KeyValue>>,
249) -> Option<HashMap<String, String>> {
250 match key_value_metadata {
251 Some(key_values) => {
252 let map: HashMap<String, String> = key_values
253 .iter()
254 .filter_map(|kv| {
255 kv.value
256 .as_ref()
257 .map(|value| (kv.key.clone(), value.clone()))
258 })
259 .collect();
260
261 if map.is_empty() {
262 None
263 } else {
264 Some(map)
265 }
266 }
267 None => None,
268 }
269}
270
271pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
273 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
274 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
275
276 let basic_info = parquet_column.self_type().get_basic_info();
277 if basic_info.has_id() {
278 let mut meta = HashMap::with_capacity(1);
279 meta.insert(
280 PARQUET_FIELD_ID_META_KEY.to_string(),
281 basic_info.id().to_string(),
282 );
283 ret.set_metadata(meta);
284 }
285
286 Ok(ret)
287}
288
289pub fn decimal_length_from_precision(precision: u8) -> usize {
290 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
298}
299
300fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
302 let name = field.name().as_str();
303 let repetition = if field.is_nullable() {
304 Repetition::OPTIONAL
305 } else {
306 Repetition::REQUIRED
307 };
308 let id = field_id(field);
309 match field.data_type() {
311 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
312 .with_logical_type(Some(LogicalType::Unknown))
313 .with_repetition(repetition)
314 .with_id(id)
315 .build(),
316 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
317 .with_repetition(repetition)
318 .with_id(id)
319 .build(),
320 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
321 .with_logical_type(Some(LogicalType::Integer {
322 bit_width: 8,
323 is_signed: true,
324 }))
325 .with_repetition(repetition)
326 .with_id(id)
327 .build(),
328 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
329 .with_logical_type(Some(LogicalType::Integer {
330 bit_width: 16,
331 is_signed: true,
332 }))
333 .with_repetition(repetition)
334 .with_id(id)
335 .build(),
336 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
337 .with_repetition(repetition)
338 .with_id(id)
339 .build(),
340 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
341 .with_repetition(repetition)
342 .with_id(id)
343 .build(),
344 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
345 .with_logical_type(Some(LogicalType::Integer {
346 bit_width: 8,
347 is_signed: false,
348 }))
349 .with_repetition(repetition)
350 .with_id(id)
351 .build(),
352 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
353 .with_logical_type(Some(LogicalType::Integer {
354 bit_width: 16,
355 is_signed: false,
356 }))
357 .with_repetition(repetition)
358 .with_id(id)
359 .build(),
360 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
361 .with_logical_type(Some(LogicalType::Integer {
362 bit_width: 32,
363 is_signed: false,
364 }))
365 .with_repetition(repetition)
366 .with_id(id)
367 .build(),
368 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
369 .with_logical_type(Some(LogicalType::Integer {
370 bit_width: 64,
371 is_signed: false,
372 }))
373 .with_repetition(repetition)
374 .with_id(id)
375 .build(),
376 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
377 .with_repetition(repetition)
378 .with_id(id)
379 .with_logical_type(Some(LogicalType::Float16))
380 .with_length(2)
381 .build(),
382 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
383 .with_repetition(repetition)
384 .with_id(id)
385 .build(),
386 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
387 .with_repetition(repetition)
388 .with_id(id)
389 .build(),
390 DataType::Timestamp(TimeUnit::Second, _) => {
391 Type::primitive_type_builder(name, PhysicalType::INT64)
393 .with_repetition(repetition)
394 .with_id(id)
395 .build()
396 }
397 DataType::Timestamp(time_unit, tz) => {
398 Type::primitive_type_builder(name, PhysicalType::INT64)
399 .with_logical_type(Some(LogicalType::Timestamp {
400 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
402 unit: match time_unit {
403 TimeUnit::Second => unreachable!(),
404 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
405 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
406 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
407 },
408 }))
409 .with_repetition(repetition)
410 .with_id(id)
411 .build()
412 }
413 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
414 .with_logical_type(Some(LogicalType::Date))
415 .with_repetition(repetition)
416 .with_id(id)
417 .build(),
418 DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
420 .with_logical_type(Some(LogicalType::Date))
421 .with_repetition(repetition)
422 .with_id(id)
423 .build(),
424 DataType::Time32(TimeUnit::Second) => {
425 Type::primitive_type_builder(name, PhysicalType::INT32)
427 .with_repetition(repetition)
428 .with_id(id)
429 .build()
430 }
431 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
432 .with_logical_type(Some(LogicalType::Time {
433 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
434 unit: match unit {
435 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
436 u => unreachable!("Invalid unit for Time32: {:?}", u),
437 },
438 }))
439 .with_repetition(repetition)
440 .with_id(id)
441 .build(),
442 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
443 .with_logical_type(Some(LogicalType::Time {
444 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
445 unit: match unit {
446 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
447 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
448 u => unreachable!("Invalid unit for Time64: {:?}", u),
449 },
450 }))
451 .with_repetition(repetition)
452 .with_id(id)
453 .build(),
454 DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)),
455 DataType::Interval(_) => {
456 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
457 .with_converted_type(ConvertedType::INTERVAL)
458 .with_repetition(repetition)
459 .with_id(id)
460 .with_length(12)
461 .build()
462 }
463 DataType::Binary | DataType::LargeBinary => {
464 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
465 .with_repetition(repetition)
466 .with_id(id)
467 .build()
468 }
469 DataType::FixedSizeBinary(length) => {
470 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
471 .with_repetition(repetition)
472 .with_id(id)
473 .with_length(*length)
474 .build()
475 }
476 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
477 .with_repetition(repetition)
478 .with_id(id)
479 .build(),
480 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
481 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
484 (PhysicalType::INT32, -1)
485 } else if *precision <= 18 {
486 (PhysicalType::INT64, -1)
487 } else {
488 (
489 PhysicalType::FIXED_LEN_BYTE_ARRAY,
490 decimal_length_from_precision(*precision) as i32,
491 )
492 };
493 Type::primitive_type_builder(name, physical_type)
494 .with_repetition(repetition)
495 .with_id(id)
496 .with_length(length)
497 .with_logical_type(Some(LogicalType::Decimal {
498 scale: *scale as i32,
499 precision: *precision as i32,
500 }))
501 .with_precision(*precision as i32)
502 .with_scale(*scale as i32)
503 .build()
504 }
505 DataType::Utf8 | DataType::LargeUtf8 => {
506 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
507 .with_logical_type(Some(LogicalType::String))
508 .with_repetition(repetition)
509 .with_id(id)
510 .build()
511 }
512 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
513 .with_logical_type(Some(LogicalType::String))
514 .with_repetition(repetition)
515 .with_id(id)
516 .build(),
517 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
518 Type::group_type_builder(name)
519 .with_fields(vec![Arc::new(
520 Type::group_type_builder("list")
521 .with_fields(vec![Arc::new(arrow_to_parquet_type(f)?)])
522 .with_repetition(Repetition::REPEATED)
523 .build()?,
524 )])
525 .with_logical_type(Some(LogicalType::List))
526 .with_repetition(repetition)
527 .with_id(id)
528 .build()
529 }
530 DataType::ListView(_) | DataType::LargeListView(_) => {
531 unimplemented!("ListView/LargeListView not implemented")
532 }
533 DataType::Struct(fields) => {
534 if fields.is_empty() {
535 return Err(arrow_err!("Parquet does not support writing empty structs",));
536 }
537 let fields = fields
539 .iter()
540 .map(|f| arrow_to_parquet_type(f).map(Arc::new))
541 .collect::<Result<_>>()?;
542 Type::group_type_builder(name)
543 .with_fields(fields)
544 .with_repetition(repetition)
545 .with_id(id)
546 .build()
547 }
548 DataType::Map(field, _) => {
549 if let DataType::Struct(struct_fields) = field.data_type() {
550 Type::group_type_builder(name)
551 .with_fields(vec![Arc::new(
552 Type::group_type_builder(field.name())
553 .with_fields(vec![
554 Arc::new(arrow_to_parquet_type(&struct_fields[0])?),
555 Arc::new(arrow_to_parquet_type(&struct_fields[1])?),
556 ])
557 .with_repetition(Repetition::REPEATED)
558 .build()?,
559 )])
560 .with_logical_type(Some(LogicalType::Map))
561 .with_repetition(repetition)
562 .with_id(id)
563 .build()
564 } else {
565 Err(arrow_err!(
566 "DataType::Map should contain a struct field child",
567 ))
568 }
569 }
570 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
571 DataType::Dictionary(_, ref value) => {
572 let dict_field = field.clone().with_data_type(value.as_ref().clone());
574 arrow_to_parquet_type(&dict_field)
575 }
576 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
577 "Converting RunEndEncodedType to parquet not supported",
578 )),
579 }
580}
581
582fn field_id(field: &Field) -> Option<i32> {
583 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
584 value.parse().ok() }
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 use std::{collections::HashMap, sync::Arc};
592
593 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
594
595 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
596 use crate::file::metadata::KeyValue;
597 use crate::file::reader::FileReader;
598 use crate::{
599 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
600 schema::{parser::parse_message_type, types::SchemaDescriptor},
601 };
602
603 #[test]
604 fn test_flat_primitives() {
605 let message_type = "
606 message test_schema {
607 REQUIRED BOOLEAN boolean;
608 REQUIRED INT32 int8 (INT_8);
609 REQUIRED INT32 int16 (INT_16);
610 REQUIRED INT32 uint8 (INTEGER(8,false));
611 REQUIRED INT32 uint16 (INTEGER(16,false));
612 REQUIRED INT32 int32;
613 REQUIRED INT64 int64;
614 OPTIONAL DOUBLE double;
615 OPTIONAL FLOAT float;
616 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
617 OPTIONAL BINARY string (UTF8);
618 OPTIONAL BINARY string_2 (STRING);
619 OPTIONAL BINARY json (JSON);
620 }
621 ";
622 let parquet_group_type = parse_message_type(message_type).unwrap();
623
624 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
625 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
626
627 let arrow_fields = Fields::from(vec![
628 Field::new("boolean", DataType::Boolean, false),
629 Field::new("int8", DataType::Int8, false),
630 Field::new("int16", DataType::Int16, false),
631 Field::new("uint8", DataType::UInt8, false),
632 Field::new("uint16", DataType::UInt16, false),
633 Field::new("int32", DataType::Int32, false),
634 Field::new("int64", DataType::Int64, false),
635 Field::new("double", DataType::Float64, true),
636 Field::new("float", DataType::Float32, true),
637 Field::new("float16", DataType::Float16, true),
638 Field::new("string", DataType::Utf8, true),
639 Field::new("string_2", DataType::Utf8, true),
640 Field::new("json", DataType::Utf8, true),
641 ]);
642
643 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
644 }
645
646 #[test]
647 fn test_decimal_fields() {
648 let message_type = "
649 message test_schema {
650 REQUIRED INT32 decimal1 (DECIMAL(4,2));
651 REQUIRED INT64 decimal2 (DECIMAL(12,2));
652 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
653 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
654 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
655 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
656 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
657 }
658 ";
659
660 let parquet_group_type = parse_message_type(message_type).unwrap();
661
662 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
663 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
664
665 let arrow_fields = Fields::from(vec![
666 Field::new("decimal1", DataType::Decimal128(4, 2), false),
667 Field::new("decimal2", DataType::Decimal128(12, 2), false),
668 Field::new("decimal3", DataType::Decimal128(30, 2), false),
669 Field::new("decimal4", DataType::Decimal128(33, 2), false),
670 Field::new("decimal5", DataType::Decimal128(38, 2), false),
671 Field::new("decimal6", DataType::Decimal256(39, 2), false),
672 Field::new("decimal7", DataType::Decimal256(39, 2), false),
673 ]);
674 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
675 }
676
677 #[test]
678 fn test_byte_array_fields() {
679 let message_type = "
680 message test_schema {
681 REQUIRED BYTE_ARRAY binary;
682 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
683 }
684 ";
685
686 let parquet_group_type = parse_message_type(message_type).unwrap();
687
688 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
689 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
690
691 let arrow_fields = Fields::from(vec![
692 Field::new("binary", DataType::Binary, false),
693 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
694 ]);
695 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
696 }
697
698 #[test]
699 fn test_duplicate_fields() {
700 let message_type = "
701 message test_schema {
702 REQUIRED BOOLEAN boolean;
703 REQUIRED INT32 int8 (INT_8);
704 }
705 ";
706
707 let parquet_group_type = parse_message_type(message_type).unwrap();
708
709 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
710 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
711
712 let arrow_fields = Fields::from(vec![
713 Field::new("boolean", DataType::Boolean, false),
714 Field::new("int8", DataType::Int8, false),
715 ]);
716 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
717
718 let converted_arrow_schema =
719 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
720 .unwrap();
721 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
722 }
723
724 #[test]
725 fn test_parquet_lists() {
726 let mut arrow_fields = Vec::new();
727
728 let message_type = "
730 message test_schema {
731 REQUIRED GROUP my_list (LIST) {
732 REPEATED GROUP list {
733 OPTIONAL BINARY element (UTF8);
734 }
735 }
736 OPTIONAL GROUP my_list (LIST) {
737 REPEATED GROUP list {
738 REQUIRED BINARY element (UTF8);
739 }
740 }
741 OPTIONAL GROUP array_of_arrays (LIST) {
742 REPEATED GROUP list {
743 REQUIRED GROUP element (LIST) {
744 REPEATED GROUP list {
745 REQUIRED INT32 element;
746 }
747 }
748 }
749 }
750 OPTIONAL GROUP my_list (LIST) {
751 REPEATED GROUP element {
752 REQUIRED BINARY str (UTF8);
753 }
754 }
755 OPTIONAL GROUP my_list (LIST) {
756 REPEATED INT32 element;
757 }
758 OPTIONAL GROUP my_list (LIST) {
759 REPEATED GROUP element {
760 REQUIRED BINARY str (UTF8);
761 REQUIRED INT32 num;
762 }
763 }
764 OPTIONAL GROUP my_list (LIST) {
765 REPEATED GROUP array {
766 REQUIRED BINARY str (UTF8);
767 }
768
769 }
770 OPTIONAL GROUP my_list (LIST) {
771 REPEATED GROUP my_list_tuple {
772 REQUIRED BINARY str (UTF8);
773 }
774 }
775 REPEATED INT32 name;
776 }
777 ";
778
779 {
786 arrow_fields.push(Field::new_list(
787 "my_list",
788 Field::new("element", DataType::Utf8, true),
789 false,
790 ));
791 }
792
793 {
800 arrow_fields.push(Field::new_list(
801 "my_list",
802 Field::new("element", DataType::Utf8, false),
803 true,
804 ));
805 }
806
807 {
820 let arrow_inner_list = Field::new("element", DataType::Int32, false);
821 arrow_fields.push(Field::new_list(
822 "array_of_arrays",
823 Field::new_list("element", arrow_inner_list, false),
824 true,
825 ));
826 }
827
828 {
835 arrow_fields.push(Field::new_list(
836 "my_list",
837 Field::new("str", DataType::Utf8, false),
838 true,
839 ));
840 }
841
842 {
847 arrow_fields.push(Field::new_list(
848 "my_list",
849 Field::new("element", DataType::Int32, false),
850 true,
851 ));
852 }
853
854 {
862 let fields = vec![
863 Field::new("str", DataType::Utf8, false),
864 Field::new("num", DataType::Int32, false),
865 ];
866 arrow_fields.push(Field::new_list(
867 "my_list",
868 Field::new_struct("element", fields, false),
869 true,
870 ));
871 }
872
873 {
881 let fields = vec![Field::new("str", DataType::Utf8, false)];
882 arrow_fields.push(Field::new_list(
883 "my_list",
884 Field::new_struct("array", fields, false),
885 true,
886 ));
887 }
888
889 {
897 let fields = vec![Field::new("str", DataType::Utf8, false)];
898 arrow_fields.push(Field::new_list(
899 "my_list",
900 Field::new_struct("my_list_tuple", fields, false),
901 true,
902 ));
903 }
904
905 {
908 arrow_fields.push(Field::new_list(
909 "name",
910 Field::new("name", DataType::Int32, false),
911 false,
912 ));
913 }
914
915 let parquet_group_type = parse_message_type(message_type).unwrap();
916
917 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
918 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
919 let converted_fields = converted_arrow_schema.fields();
920
921 assert_eq!(arrow_fields.len(), converted_fields.len());
922 for i in 0..arrow_fields.len() {
923 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
924 }
925 }
926
927 #[test]
928 fn test_parquet_list_nullable() {
929 let mut arrow_fields = Vec::new();
930
931 let message_type = "
932 message test_schema {
933 REQUIRED GROUP my_list1 (LIST) {
934 REPEATED GROUP list {
935 OPTIONAL BINARY element (UTF8);
936 }
937 }
938 OPTIONAL GROUP my_list2 (LIST) {
939 REPEATED GROUP list {
940 REQUIRED BINARY element (UTF8);
941 }
942 }
943 REQUIRED GROUP my_list3 (LIST) {
944 REPEATED GROUP list {
945 REQUIRED BINARY element (UTF8);
946 }
947 }
948 }
949 ";
950
951 {
958 arrow_fields.push(Field::new_list(
959 "my_list1",
960 Field::new("element", DataType::Utf8, true),
961 false,
962 ));
963 }
964
965 {
972 arrow_fields.push(Field::new_list(
973 "my_list2",
974 Field::new("element", DataType::Utf8, false),
975 true,
976 ));
977 }
978
979 {
986 arrow_fields.push(Field::new_list(
987 "my_list3",
988 Field::new("element", DataType::Utf8, false),
989 false,
990 ));
991 }
992
993 let parquet_group_type = parse_message_type(message_type).unwrap();
994
995 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
996 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
997 let converted_fields = converted_arrow_schema.fields();
998
999 assert_eq!(arrow_fields.len(), converted_fields.len());
1000 for i in 0..arrow_fields.len() {
1001 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1002 }
1003 }
1004
1005 #[test]
1006 fn test_parquet_maps() {
1007 let mut arrow_fields = Vec::new();
1008
1009 let message_type = "
1011 message test_schema {
1012 REQUIRED group my_map1 (MAP) {
1013 REPEATED group key_value {
1014 REQUIRED binary key (UTF8);
1015 OPTIONAL int32 value;
1016 }
1017 }
1018 OPTIONAL group my_map2 (MAP) {
1019 REPEATED group map {
1020 REQUIRED binary str (UTF8);
1021 REQUIRED int32 num;
1022 }
1023 }
1024 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1025 REPEATED group map {
1026 REQUIRED binary key (UTF8);
1027 OPTIONAL int32 value;
1028 }
1029 }
1030 REQUIRED group my_map4 (MAP) {
1031 REPEATED group map {
1032 OPTIONAL binary key (UTF8);
1033 REQUIRED int32 value;
1034 }
1035 }
1036 }
1037 ";
1038
1039 {
1047 arrow_fields.push(Field::new_map(
1048 "my_map1",
1049 "key_value",
1050 Field::new("key", DataType::Utf8, false),
1051 Field::new("value", DataType::Int32, true),
1052 false,
1053 false,
1054 ));
1055 }
1056
1057 {
1065 arrow_fields.push(Field::new_map(
1066 "my_map2",
1067 "map",
1068 Field::new("str", DataType::Utf8, false),
1069 Field::new("num", DataType::Int32, false),
1070 false,
1071 true,
1072 ));
1073 }
1074
1075 {
1083 arrow_fields.push(Field::new_map(
1084 "my_map3",
1085 "map",
1086 Field::new("key", DataType::Utf8, false),
1087 Field::new("value", DataType::Int32, true),
1088 false,
1089 true,
1090 ));
1091 }
1092
1093 {
1101 arrow_fields.push(Field::new_map(
1102 "my_map4",
1103 "map",
1104 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1106 false,
1107 false,
1108 ));
1109 }
1110
1111 let parquet_group_type = parse_message_type(message_type).unwrap();
1112
1113 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1114 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1115 let converted_fields = converted_arrow_schema.fields();
1116
1117 assert_eq!(arrow_fields.len(), converted_fields.len());
1118 for i in 0..arrow_fields.len() {
1119 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1120 }
1121 }
1122
1123 #[test]
1124 fn test_nested_schema() {
1125 let mut arrow_fields = Vec::new();
1126 {
1127 let group1_fields = Fields::from(vec![
1128 Field::new("leaf1", DataType::Boolean, false),
1129 Field::new("leaf2", DataType::Int32, false),
1130 ]);
1131 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1132 arrow_fields.push(group1_struct);
1133
1134 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1135 arrow_fields.push(leaf3_field);
1136 }
1137
1138 let message_type = "
1139 message test_schema {
1140 REQUIRED GROUP group1 {
1141 REQUIRED BOOLEAN leaf1;
1142 REQUIRED INT32 leaf2;
1143 }
1144 REQUIRED INT64 leaf3;
1145 }
1146 ";
1147 let parquet_group_type = parse_message_type(message_type).unwrap();
1148
1149 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1150 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1151 let converted_fields = converted_arrow_schema.fields();
1152
1153 assert_eq!(arrow_fields.len(), converted_fields.len());
1154 for i in 0..arrow_fields.len() {
1155 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1156 }
1157 }
1158
1159 #[test]
1160 fn test_nested_schema_partial() {
1161 let mut arrow_fields = Vec::new();
1162 {
1163 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1164 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1165 arrow_fields.push(group1);
1166
1167 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1168 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1169 arrow_fields.push(group2);
1170
1171 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1172 }
1173
1174 let message_type = "
1175 message test_schema {
1176 REQUIRED GROUP group1 {
1177 REQUIRED INT64 leaf1;
1178 REQUIRED INT64 leaf2;
1179 }
1180 REQUIRED GROUP group2 {
1181 REQUIRED INT64 leaf3;
1182 REQUIRED INT64 leaf4;
1183 }
1184 REQUIRED INT64 leaf5;
1185 }
1186 ";
1187 let parquet_group_type = parse_message_type(message_type).unwrap();
1188
1189 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1199 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1200 let converted_arrow_schema =
1201 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1202 let converted_fields = converted_arrow_schema.fields();
1203
1204 assert_eq!(arrow_fields.len(), converted_fields.len());
1205 for i in 0..arrow_fields.len() {
1206 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1207 }
1208 }
1209
1210 #[test]
1211 fn test_nested_schema_partial_ordering() {
1212 let mut arrow_fields = Vec::new();
1213 {
1214 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1215 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1216 arrow_fields.push(group1);
1217
1218 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1219 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1220 arrow_fields.push(group2);
1221
1222 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1223 }
1224
1225 let message_type = "
1226 message test_schema {
1227 REQUIRED GROUP group1 {
1228 REQUIRED INT64 leaf1;
1229 REQUIRED INT64 leaf2;
1230 }
1231 REQUIRED GROUP group2 {
1232 REQUIRED INT64 leaf3;
1233 REQUIRED INT64 leaf4;
1234 }
1235 REQUIRED INT64 leaf5;
1236 }
1237 ";
1238 let parquet_group_type = parse_message_type(message_type).unwrap();
1239
1240 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1250 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1251 let converted_arrow_schema =
1252 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1253 let converted_fields = converted_arrow_schema.fields();
1254
1255 assert_eq!(arrow_fields.len(), converted_fields.len());
1256 for i in 0..arrow_fields.len() {
1257 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1258 }
1259 }
1260
1261 #[test]
1262 fn test_repeated_nested_schema() {
1263 let mut arrow_fields = Vec::new();
1264 {
1265 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1266
1267 let inner_group_list = Field::new_list(
1268 "innerGroup",
1269 Field::new_struct(
1270 "innerGroup",
1271 vec![Field::new("leaf3", DataType::Int32, true)],
1272 false,
1273 ),
1274 false,
1275 );
1276
1277 let outer_group_list = Field::new_list(
1278 "outerGroup",
1279 Field::new_struct(
1280 "outerGroup",
1281 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1282 false,
1283 ),
1284 false,
1285 );
1286 arrow_fields.push(outer_group_list);
1287 }
1288
1289 let message_type = "
1290 message test_schema {
1291 OPTIONAL INT32 leaf1;
1292 REPEATED GROUP outerGroup {
1293 OPTIONAL INT32 leaf2;
1294 REPEATED GROUP innerGroup {
1295 OPTIONAL INT32 leaf3;
1296 }
1297 }
1298 }
1299 ";
1300 let parquet_group_type = parse_message_type(message_type).unwrap();
1301
1302 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1303 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1304 let converted_fields = converted_arrow_schema.fields();
1305
1306 assert_eq!(arrow_fields.len(), converted_fields.len());
1307 for i in 0..arrow_fields.len() {
1308 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1309 }
1310 }
1311
1312 #[test]
1313 fn test_column_desc_to_field() {
1314 let message_type = "
1315 message test_schema {
1316 REQUIRED BOOLEAN boolean;
1317 REQUIRED INT32 int8 (INT_8);
1318 REQUIRED INT32 uint8 (INTEGER(8,false));
1319 REQUIRED INT32 int16 (INT_16);
1320 REQUIRED INT32 uint16 (INTEGER(16,false));
1321 REQUIRED INT32 int32;
1322 REQUIRED INT64 int64;
1323 OPTIONAL DOUBLE double;
1324 OPTIONAL FLOAT float;
1325 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1326 OPTIONAL BINARY string (UTF8);
1327 REPEATED BOOLEAN bools;
1328 OPTIONAL INT32 date (DATE);
1329 OPTIONAL INT32 time_milli (TIME_MILLIS);
1330 OPTIONAL INT64 time_micro (TIME_MICROS);
1331 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1332 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1333 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1334 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1335 REPEATED INT32 int_list;
1336 REPEATED BINARY byte_list;
1337 REPEATED BINARY string_list (UTF8);
1338 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1339 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1340 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1341 }
1342 ";
1343 let parquet_group_type = parse_message_type(message_type).unwrap();
1344
1345 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1346 let converted_arrow_fields = parquet_schema
1347 .columns()
1348 .iter()
1349 .map(|c| parquet_to_arrow_field(c).unwrap())
1350 .collect::<Vec<Field>>();
1351
1352 let arrow_fields = vec![
1353 Field::new("boolean", DataType::Boolean, false),
1354 Field::new("int8", DataType::Int8, false),
1355 Field::new("uint8", DataType::UInt8, false),
1356 Field::new("int16", DataType::Int16, false),
1357 Field::new("uint16", DataType::UInt16, false),
1358 Field::new("int32", DataType::Int32, false),
1359 Field::new("int64", DataType::Int64, false),
1360 Field::new("double", DataType::Float64, true),
1361 Field::new("float", DataType::Float32, true),
1362 Field::new("float16", DataType::Float16, true),
1363 Field::new("string", DataType::Utf8, true),
1364 Field::new_list(
1365 "bools",
1366 Field::new("bools", DataType::Boolean, false),
1367 false,
1368 ),
1369 Field::new("date", DataType::Date32, true),
1370 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1371 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1372 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1373 Field::new(
1374 "ts_milli",
1375 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1376 true,
1377 ),
1378 Field::new(
1379 "ts_micro",
1380 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1381 false,
1382 ),
1383 Field::new(
1384 "ts_nano",
1385 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1386 false,
1387 ),
1388 Field::new_list(
1389 "int_list",
1390 Field::new("int_list", DataType::Int32, false),
1391 false,
1392 ),
1393 Field::new_list(
1394 "byte_list",
1395 Field::new("byte_list", DataType::Binary, false),
1396 false,
1397 ),
1398 Field::new_list(
1399 "string_list",
1400 Field::new("string_list", DataType::Utf8, false),
1401 false,
1402 ),
1403 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1404 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1405 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1406 ];
1407
1408 assert_eq!(arrow_fields, converted_arrow_fields);
1409 }
1410
1411 #[test]
1412 fn test_field_to_column_desc() {
1413 let message_type = "
1414 message arrow_schema {
1415 REQUIRED BOOLEAN boolean;
1416 REQUIRED INT32 int8 (INT_8);
1417 REQUIRED INT32 int16 (INTEGER(16,true));
1418 REQUIRED INT32 int32;
1419 REQUIRED INT64 int64;
1420 OPTIONAL DOUBLE double;
1421 OPTIONAL FLOAT float;
1422 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1423 OPTIONAL BINARY string (STRING);
1424 OPTIONAL GROUP bools (LIST) {
1425 REPEATED GROUP list {
1426 OPTIONAL BOOLEAN element;
1427 }
1428 }
1429 REQUIRED GROUP bools_non_null (LIST) {
1430 REPEATED GROUP list {
1431 REQUIRED BOOLEAN element;
1432 }
1433 }
1434 OPTIONAL INT32 date (DATE);
1435 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1436 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1437 OPTIONAL INT64 time_micro (TIME_MICROS);
1438 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1439 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1440 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1441 REQUIRED INT64 ts_seconds;
1442 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1443 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1444 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1445 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1446 REQUIRED GROUP struct {
1447 REQUIRED BOOLEAN bools;
1448 REQUIRED INT32 uint32 (INTEGER(32,false));
1449 REQUIRED GROUP int32 (LIST) {
1450 REPEATED GROUP list {
1451 OPTIONAL INT32 element;
1452 }
1453 }
1454 }
1455 REQUIRED BINARY dictionary_strings (STRING);
1456 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1457 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1458 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1459 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1460 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1461 }
1462 ";
1463 let parquet_group_type = parse_message_type(message_type).unwrap();
1464
1465 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1466
1467 let arrow_fields = vec![
1468 Field::new("boolean", DataType::Boolean, false),
1469 Field::new("int8", DataType::Int8, false),
1470 Field::new("int16", DataType::Int16, false),
1471 Field::new("int32", DataType::Int32, false),
1472 Field::new("int64", DataType::Int64, false),
1473 Field::new("double", DataType::Float64, true),
1474 Field::new("float", DataType::Float32, true),
1475 Field::new("float16", DataType::Float16, true),
1476 Field::new("string", DataType::Utf8, true),
1477 Field::new_list(
1478 "bools",
1479 Field::new("element", DataType::Boolean, true),
1480 true,
1481 ),
1482 Field::new_list(
1483 "bools_non_null",
1484 Field::new("element", DataType::Boolean, false),
1485 false,
1486 ),
1487 Field::new("date", DataType::Date32, true),
1488 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1489 Field::new(
1490 "time_milli_utc",
1491 DataType::Time32(TimeUnit::Millisecond),
1492 true,
1493 )
1494 .with_metadata(HashMap::from_iter(vec![(
1495 "adjusted_to_utc".to_string(),
1496 "".to_string(),
1497 )])),
1498 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1499 Field::new(
1500 "time_micro_utc",
1501 DataType::Time64(TimeUnit::Microsecond),
1502 true,
1503 )
1504 .with_metadata(HashMap::from_iter(vec![(
1505 "adjusted_to_utc".to_string(),
1506 "".to_string(),
1507 )])),
1508 Field::new(
1509 "ts_milli",
1510 DataType::Timestamp(TimeUnit::Millisecond, None),
1511 true,
1512 ),
1513 Field::new(
1514 "ts_micro",
1515 DataType::Timestamp(TimeUnit::Microsecond, None),
1516 false,
1517 ),
1518 Field::new(
1519 "ts_seconds",
1520 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1521 false,
1522 ),
1523 Field::new(
1524 "ts_micro_utc",
1525 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1526 false,
1527 ),
1528 Field::new(
1529 "ts_millis_zero_offset",
1530 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1531 false,
1532 ),
1533 Field::new(
1534 "ts_millis_zero_negative_offset",
1535 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1536 false,
1537 ),
1538 Field::new(
1539 "ts_micro_non_utc",
1540 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1541 false,
1542 ),
1543 Field::new_struct(
1544 "struct",
1545 vec![
1546 Field::new("bools", DataType::Boolean, false),
1547 Field::new("uint32", DataType::UInt32, false),
1548 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1549 ],
1550 false,
1551 ),
1552 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1553 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1554 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1555 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1556 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1557 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1558 ];
1559 let arrow_schema = Schema::new(arrow_fields);
1560 let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema).unwrap();
1561
1562 assert_eq!(
1563 parquet_schema.columns().len(),
1564 converted_arrow_schema.columns().len()
1565 );
1566 parquet_schema
1567 .columns()
1568 .iter()
1569 .zip(converted_arrow_schema.columns())
1570 .for_each(|(a, b)| {
1571 match a.logical_type() {
1576 Some(_) => {
1577 assert_eq!(a, b)
1578 }
1579 None => {
1580 assert_eq!(a.name(), b.name());
1581 assert_eq!(a.physical_type(), b.physical_type());
1582 assert_eq!(a.converted_type(), b.converted_type());
1583 }
1584 };
1585 });
1586 }
1587
1588 #[test]
1589 #[should_panic(expected = "Parquet does not support writing empty structs")]
1590 fn test_empty_struct_field() {
1591 let arrow_fields = vec![Field::new(
1592 "struct",
1593 DataType::Struct(Fields::empty()),
1594 false,
1595 )];
1596 let arrow_schema = Schema::new(arrow_fields);
1597 let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
1598
1599 assert!(converted_arrow_schema.is_err());
1600 converted_arrow_schema.unwrap();
1601 }
1602
1603 #[test]
1604 fn test_metadata() {
1605 let message_type = "
1606 message test_schema {
1607 OPTIONAL BINARY string (STRING);
1608 }
1609 ";
1610 let parquet_group_type = parse_message_type(message_type).unwrap();
1611
1612 let key_value_metadata = vec![
1613 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1614 KeyValue::new("baz".to_owned(), None),
1615 ];
1616
1617 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1618 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1619
1620 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1621 let converted_arrow_schema =
1622 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1623
1624 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1625 }
1626
1627 #[test]
1628 fn test_arrow_schema_roundtrip() -> Result<()> {
1629 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1630 a.iter()
1631 .map(|(a, b)| (a.to_string(), b.to_string()))
1632 .collect()
1633 };
1634
1635 let schema = Schema::new_with_metadata(
1636 vec![
1637 Field::new("c1", DataType::Utf8, false)
1638 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1639 Field::new("c2", DataType::Binary, false),
1640 Field::new("c3", DataType::FixedSizeBinary(3), false),
1641 Field::new("c4", DataType::Boolean, false),
1642 Field::new("c5", DataType::Date32, false),
1643 Field::new("c6", DataType::Date64, false),
1644 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1645 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1646 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1647 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1648 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1649 Field::new(
1650 "c16",
1651 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1652 false,
1653 ),
1654 Field::new(
1655 "c17",
1656 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1657 false,
1658 ),
1659 Field::new(
1660 "c18",
1661 DataType::Timestamp(TimeUnit::Nanosecond, None),
1662 false,
1663 ),
1664 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1665 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1666 Field::new_list(
1667 "c21",
1668 Field::new("item", DataType::Boolean, true)
1669 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1670 false,
1671 )
1672 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1673 Field::new(
1674 "c22",
1675 DataType::FixedSizeList(
1676 Arc::new(Field::new("item", DataType::Boolean, true)),
1677 5,
1678 ),
1679 false,
1680 ),
1681 Field::new_list(
1682 "c23",
1683 Field::new_large_list(
1684 "inner",
1685 Field::new(
1686 "item",
1687 DataType::Struct(
1688 vec![
1689 Field::new("a", DataType::Int16, true),
1690 Field::new("b", DataType::Float64, false),
1691 Field::new("c", DataType::Float32, false),
1692 Field::new("d", DataType::Float16, false),
1693 ]
1694 .into(),
1695 ),
1696 false,
1697 ),
1698 true,
1699 ),
1700 false,
1701 ),
1702 Field::new(
1703 "c24",
1704 DataType::Struct(Fields::from(vec![
1705 Field::new("a", DataType::Utf8, false),
1706 Field::new("b", DataType::UInt16, false),
1707 ])),
1708 false,
1709 ),
1710 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1711 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1712 Field::new_dict(
1718 "c31",
1719 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1720 true,
1721 123,
1722 true,
1723 )
1724 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
1725 Field::new("c32", DataType::LargeBinary, true),
1726 Field::new("c33", DataType::LargeUtf8, true),
1727 Field::new_large_list(
1728 "c34",
1729 Field::new_list(
1730 "inner",
1731 Field::new(
1732 "item",
1733 DataType::Struct(
1734 vec![
1735 Field::new("a", DataType::Int16, true),
1736 Field::new("b", DataType::Float64, true),
1737 ]
1738 .into(),
1739 ),
1740 true,
1741 ),
1742 true,
1743 ),
1744 true,
1745 ),
1746 Field::new("c35", DataType::Null, true),
1747 Field::new("c36", DataType::Decimal128(2, 1), false),
1748 Field::new("c37", DataType::Decimal256(50, 20), false),
1749 Field::new("c38", DataType::Decimal128(18, 12), true),
1750 Field::new_map(
1751 "c39",
1752 "key_value",
1753 Field::new("key", DataType::Utf8, false),
1754 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1755 false, true,
1757 ),
1758 Field::new_map(
1759 "c40",
1760 "my_entries",
1761 Field::new("my_key", DataType::Utf8, false)
1762 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
1763 Field::new_list(
1764 "my_value",
1765 Field::new("item", DataType::Utf8, true)
1766 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
1767 true,
1768 )
1769 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
1770 false, true,
1772 )
1773 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
1774 Field::new_map(
1775 "c41",
1776 "my_entries",
1777 Field::new("my_key", DataType::Utf8, false),
1778 Field::new_list(
1779 "my_value",
1780 Field::new("item", DataType::Utf8, true)
1781 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
1782 true,
1783 ),
1784 false, false,
1786 ),
1787 ],
1788 meta(&[("Key", "Value")]),
1789 );
1790
1791 let file = tempfile::tempfile().unwrap();
1793 let writer =
1794 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
1795 writer.close()?;
1796
1797 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1799
1800 let read_schema = arrow_reader.schema();
1802 assert_eq!(&schema, read_schema.as_ref());
1803
1804 let mut stack = Vec::with_capacity(10);
1806 let mut out = Vec::with_capacity(10);
1807
1808 let root = arrow_reader.parquet_schema().root_schema_ptr();
1809 stack.push((root.name().to_string(), root));
1810
1811 while let Some((p, t)) = stack.pop() {
1812 if t.is_group() {
1813 for f in t.get_fields() {
1814 stack.push((format!("{p}.{}", f.name()), f.clone()))
1815 }
1816 }
1817
1818 let info = t.get_basic_info();
1819 if info.has_id() {
1820 out.push(format!("{p} -> {}", info.id()))
1821 }
1822 }
1823 out.sort_unstable();
1824 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
1825
1826 assert_eq!(
1827 &out,
1828 &[
1829 "arrow_schema.c1 -> 2",
1830 "arrow_schema.c21 -> 4",
1831 "arrow_schema.c21.list.item -> 5",
1832 "arrow_schema.c31 -> 6",
1833 "arrow_schema.c40 -> 7",
1834 "arrow_schema.c40.my_entries.my_key -> 8",
1835 "arrow_schema.c40.my_entries.my_value -> 9",
1836 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
1837 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
1838 ]
1839 );
1840
1841 Ok(())
1842 }
1843
1844 #[test]
1845 fn test_read_parquet_field_ids_raw() -> Result<()> {
1846 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1847 a.iter()
1848 .map(|(a, b)| (a.to_string(), b.to_string()))
1849 .collect()
1850 };
1851 let schema = Schema::new_with_metadata(
1852 vec![
1853 Field::new("c1", DataType::Utf8, true)
1854 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
1855 Field::new("c2", DataType::Utf8, true)
1856 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
1857 ],
1858 HashMap::new(),
1859 );
1860
1861 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
1862 let parquet_bytes = writer.into_inner()?;
1863
1864 let reader =
1865 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
1866 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
1867
1868 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
1870
1871 let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
1872 let parq_fields = parq_schema_descr.root_schema().get_fields();
1873 assert_eq!(parq_fields.len(), 2);
1874 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
1875 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
1876
1877 Ok(())
1878 }
1879
1880 #[test]
1881 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
1882 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
1883 .iter()
1884 .cloned()
1885 .collect();
1886
1887 let schema = Schema::new_with_metadata(
1888 vec![
1889 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
1890 Field::new(
1891 "c22",
1892 DataType::FixedSizeList(
1893 Arc::new(Field::new("items", DataType::Boolean, false)),
1894 5,
1895 ),
1896 false,
1897 ),
1898 Field::new_list(
1899 "c23",
1900 Field::new_large_list(
1901 "items",
1902 Field::new_struct(
1903 "items",
1904 vec![
1905 Field::new("a", DataType::Int16, true),
1906 Field::new("b", DataType::Float64, false),
1907 ],
1908 true,
1909 ),
1910 true,
1911 ),
1912 true,
1913 ),
1914 ],
1915 metadata,
1916 );
1917
1918 let file = tempfile::tempfile().unwrap();
1920 let writer =
1921 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
1922 writer.close()?;
1923
1924 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1926 let read_schema = arrow_reader.schema();
1927 assert_eq!(&schema, read_schema.as_ref());
1928 Ok(())
1929 }
1930
1931 #[test]
1932 fn test_get_arrow_schema_from_metadata() {
1933 assert!(get_arrow_schema_from_metadata("").is_err());
1934 }
1935}