1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25 BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26 FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27 TimestampMicrosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use num_bigint::BigInt;
31use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
32use parquet::file::statistics::Statistics;
33use rust_decimal::prelude::ToPrimitive;
34use uuid::Uuid;
35
36use crate::error::Result;
37use crate::spec::{
38 Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema,
39 SchemaVisitor, StructType, Type,
40};
41use crate::{Error, ErrorKind};
42
43pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
45pub const UTC_TIME_ZONE: &str = "+00:00";
47
48pub trait ArrowSchemaVisitor {
52 type T;
54
55 type U;
57
58 fn before_field(&mut self, _field: &Field) -> Result<()> {
60 Ok(())
61 }
62
63 fn after_field(&mut self, _field: &Field) -> Result<()> {
65 Ok(())
66 }
67
68 fn before_list_element(&mut self, _field: &Field) -> Result<()> {
70 Ok(())
71 }
72
73 fn after_list_element(&mut self, _field: &Field) -> Result<()> {
75 Ok(())
76 }
77
78 fn before_map_key(&mut self, _field: &Field) -> Result<()> {
80 Ok(())
81 }
82
83 fn after_map_key(&mut self, _field: &Field) -> Result<()> {
85 Ok(())
86 }
87
88 fn before_map_value(&mut self, _field: &Field) -> Result<()> {
90 Ok(())
91 }
92
93 fn after_map_value(&mut self, _field: &Field) -> Result<()> {
95 Ok(())
96 }
97
98 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
100
101 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
103
104 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
106
107 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
109
110 fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
112}
113
114fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
116 match r#type {
117 p if p.is_primitive()
118 || matches!(
119 p,
120 DataType::Boolean
121 | DataType::Utf8
122 | DataType::LargeUtf8
123 | DataType::Utf8View
124 | DataType::Binary
125 | DataType::LargeBinary
126 | DataType::BinaryView
127 | DataType::FixedSizeBinary(_)
128 ) =>
129 {
130 visitor.primitive(p)
131 }
132 DataType::List(element_field) => visit_list(r#type, element_field, visitor),
133 DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
134 DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
135 DataType::Map(field, _) => match field.data_type() {
136 DataType::Struct(fields) => {
137 if fields.len() != 2 {
138 return Err(Error::new(
139 ErrorKind::DataInvalid,
140 "Map field must have exactly 2 fields",
141 ));
142 }
143
144 let key_field = &fields[0];
145 let value_field = &fields[1];
146
147 let key_result = {
148 visitor.before_map_key(key_field)?;
149 let ret = visit_type(key_field.data_type(), visitor)?;
150 visitor.after_map_key(key_field)?;
151 ret
152 };
153
154 let value_result = {
155 visitor.before_map_value(value_field)?;
156 let ret = visit_type(value_field.data_type(), visitor)?;
157 visitor.after_map_value(value_field)?;
158 ret
159 };
160
161 visitor.map(r#type, key_result, value_result)
162 }
163 _ => Err(Error::new(
164 ErrorKind::DataInvalid,
165 "Map field must have struct type",
166 )),
167 },
168 DataType::Struct(fields) => visit_struct(fields, visitor),
169 DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
170 other => Err(Error::new(
171 ErrorKind::DataInvalid,
172 format!("Cannot visit Arrow data type: {other}"),
173 )),
174 }
175}
176
177fn visit_list<V: ArrowSchemaVisitor>(
179 data_type: &DataType,
180 element_field: &Field,
181 visitor: &mut V,
182) -> Result<V::T> {
183 visitor.before_list_element(element_field)?;
184 let value = visit_type(element_field.data_type(), visitor)?;
185 visitor.after_list_element(element_field)?;
186 visitor.list(data_type, value)
187}
188
189fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
191 let mut results = Vec::with_capacity(fields.len());
192 for field in fields {
193 visitor.before_field(field)?;
194 let result = visit_type(field.data_type(), visitor)?;
195 visitor.after_field(field)?;
196 results.push(result);
197 }
198
199 visitor.r#struct(fields, results)
200}
201
202fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
204 let mut results = Vec::with_capacity(schema.fields().len());
205 for field in schema.fields() {
206 visitor.before_field(field)?;
207 let result = visit_type(field.data_type(), visitor)?;
208 visitor.after_field(field)?;
209 results.push(result);
210 }
211 visitor.schema(schema, results)
212}
213
214pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
220 let mut visitor = ArrowSchemaConverter::new();
221 visit_schema(schema, &mut visitor)
222}
223
224pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
226 let mut visitor = ArrowSchemaConverter::new();
227 visit_type(ty, &mut visitor)
228}
229
230const ARROW_FIELD_DOC_KEY: &str = "doc";
231
232pub(super) fn get_field_id(field: &Field) -> Result<i32> {
233 if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
234 return value.parse::<i32>().map_err(|e| {
235 Error::new(
236 ErrorKind::DataInvalid,
237 "Failed to parse field id".to_string(),
238 )
239 .with_context("value", value)
240 .with_source(e)
241 });
242 }
243 Err(Error::new(
244 ErrorKind::DataInvalid,
245 "Field id not found in metadata",
246 ))
247}
248
249fn get_field_doc(field: &Field) -> Option<String> {
250 if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
251 return Some(value.clone());
252 }
253 None
254}
255
256struct ArrowSchemaConverter;
257
258impl ArrowSchemaConverter {
259 fn new() -> Self {
260 Self {}
261 }
262
263 fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result<Vec<NestedFieldRef>> {
264 let mut results = Vec::with_capacity(fields.len());
265 for i in 0..fields.len() {
266 let field = &fields[i];
267 let field_type = &field_results[i];
268 let id = get_field_id(field)?;
269 let doc = get_field_doc(field);
270 let nested_field = NestedField {
271 id,
272 doc,
273 name: field.name().clone(),
274 required: !field.is_nullable(),
275 field_type: Box::new(field_type.clone()),
276 initial_default: None,
277 write_default: None,
278 };
279 results.push(Arc::new(nested_field));
280 }
281 Ok(results)
282 }
283}
284
285impl ArrowSchemaVisitor for ArrowSchemaConverter {
286 type T = Type;
287 type U = Schema;
288
289 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
290 let fields = Self::convert_fields(schema.fields(), &values)?;
291 let builder = Schema::builder().with_fields(fields);
292 builder.build()
293 }
294
295 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
296 let fields = Self::convert_fields(fields, &results)?;
297 Ok(Type::Struct(StructType::new(fields)))
298 }
299
300 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
301 let element_field = match list {
302 DataType::List(element_field) => element_field,
303 DataType::LargeList(element_field) => element_field,
304 DataType::FixedSizeList(element_field, _) => element_field,
305 _ => {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 "List type must have list data type",
309 ));
310 }
311 };
312
313 let id = get_field_id(element_field)?;
314 let doc = get_field_doc(element_field);
315 let mut element_field =
316 NestedField::list_element(id, value.clone(), !element_field.is_nullable());
317 if let Some(doc) = doc {
318 element_field = element_field.with_doc(doc);
319 }
320 let element_field = Arc::new(element_field);
321 Ok(Type::List(ListType { element_field }))
322 }
323
324 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
325 match map {
326 DataType::Map(field, _) => match field.data_type() {
327 DataType::Struct(fields) => {
328 if fields.len() != 2 {
329 return Err(Error::new(
330 ErrorKind::DataInvalid,
331 "Map field must have exactly 2 fields",
332 ));
333 }
334
335 let key_field = &fields[0];
336 let value_field = &fields[1];
337
338 let key_id = get_field_id(key_field)?;
339 let key_doc = get_field_doc(key_field);
340 let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
341 if let Some(doc) = key_doc {
342 key_field = key_field.with_doc(doc);
343 }
344 let key_field = Arc::new(key_field);
345
346 let value_id = get_field_id(value_field)?;
347 let value_doc = get_field_doc(value_field);
348 let mut value_field = NestedField::map_value_element(
349 value_id,
350 value.clone(),
351 !value_field.is_nullable(),
352 );
353 if let Some(doc) = value_doc {
354 value_field = value_field.with_doc(doc);
355 }
356 let value_field = Arc::new(value_field);
357
358 Ok(Type::Map(MapType {
359 key_field,
360 value_field,
361 }))
362 }
363 _ => Err(Error::new(
364 ErrorKind::DataInvalid,
365 "Map field must have struct type",
366 )),
367 },
368 _ => Err(Error::new(
369 ErrorKind::DataInvalid,
370 "Map type must have map data type",
371 )),
372 }
373 }
374
375 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
376 match p {
377 DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
378 DataType::Int8 | DataType::Int16 | DataType::Int32 => {
379 Ok(Type::Primitive(PrimitiveType::Int))
380 }
381 DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
382 DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
383 DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
384 DataType::UInt64 => {
385 Err(Error::new(
387 ErrorKind::DataInvalid,
388 "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
389 ))
390 }
391 DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
392 DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
393 DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
394 Error::new(
395 ErrorKind::DataInvalid,
396 "Failed to create decimal type".to_string(),
397 )
398 .with_source(e)
399 }),
400 DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
401 DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
402 Ok(Type::Primitive(PrimitiveType::Time))
403 }
404 DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
405 Ok(Type::Primitive(PrimitiveType::Timestamp))
406 }
407 DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
408 Ok(Type::Primitive(PrimitiveType::TimestampNs))
409 }
410 DataType::Timestamp(unit, Some(zone))
411 if unit == &TimeUnit::Microsecond
412 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
413 {
414 Ok(Type::Primitive(PrimitiveType::Timestamptz))
415 }
416 DataType::Timestamp(unit, Some(zone))
417 if unit == &TimeUnit::Nanosecond
418 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
419 {
420 Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
421 }
422 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
423 Ok(Type::Primitive(PrimitiveType::Binary))
424 }
425 DataType::FixedSizeBinary(width) => {
426 Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
427 }
428 DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
429 Ok(Type::Primitive(PrimitiveType::String))
430 }
431 _ => Err(Error::new(
432 ErrorKind::DataInvalid,
433 format!("Unsupported Arrow data type: {p}"),
434 )),
435 }
436 }
437}
438
439struct ToArrowSchemaConverter;
440
441enum ArrowSchemaOrFieldOrType {
442 Schema(ArrowSchema),
443 Field(Field),
444 Type(DataType),
445}
446
447impl SchemaVisitor for ToArrowSchemaConverter {
448 type T = ArrowSchemaOrFieldOrType;
449
450 fn schema(
451 &mut self,
452 _schema: &crate::spec::Schema,
453 value: ArrowSchemaOrFieldOrType,
454 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
455 let struct_type = match value {
456 ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
457 _ => unreachable!(),
458 };
459 Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
460 struct_type,
461 )))
462 }
463
464 fn field(
465 &mut self,
466 field: &crate::spec::NestedFieldRef,
467 value: ArrowSchemaOrFieldOrType,
468 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
469 let ty = match value {
470 ArrowSchemaOrFieldOrType::Type(ty) => ty,
471 _ => unreachable!(),
472 };
473 let metadata = if let Some(doc) = &field.doc {
474 HashMap::from([
475 (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
476 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
477 ])
478 } else {
479 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
480 };
481 Ok(ArrowSchemaOrFieldOrType::Field(
482 Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
483 ))
484 }
485
486 fn r#struct(
487 &mut self,
488 _: &crate::spec::StructType,
489 results: Vec<ArrowSchemaOrFieldOrType>,
490 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
491 let fields = results
492 .into_iter()
493 .map(|result| match result {
494 ArrowSchemaOrFieldOrType::Field(field) => field,
495 _ => unreachable!(),
496 })
497 .collect();
498 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
499 }
500
501 fn list(
502 &mut self,
503 list: &crate::spec::ListType,
504 value: ArrowSchemaOrFieldOrType,
505 ) -> crate::Result<Self::T> {
506 let field = match self.field(&list.element_field, value)? {
507 ArrowSchemaOrFieldOrType::Field(field) => field,
508 _ => unreachable!(),
509 };
510 let meta = if let Some(doc) = &list.element_field.doc {
511 HashMap::from([
512 (
513 PARQUET_FIELD_ID_META_KEY.to_string(),
514 list.element_field.id.to_string(),
515 ),
516 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
517 ])
518 } else {
519 HashMap::from([(
520 PARQUET_FIELD_ID_META_KEY.to_string(),
521 list.element_field.id.to_string(),
522 )])
523 };
524 let field = field.with_metadata(meta);
525 Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
526 field,
527 ))))
528 }
529
530 fn map(
531 &mut self,
532 map: &crate::spec::MapType,
533 key_value: ArrowSchemaOrFieldOrType,
534 value: ArrowSchemaOrFieldOrType,
535 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
536 let key_field = match self.field(&map.key_field, key_value)? {
537 ArrowSchemaOrFieldOrType::Field(field) => field,
538 _ => unreachable!(),
539 };
540 let value_field = match self.field(&map.value_field, value)? {
541 ArrowSchemaOrFieldOrType::Field(field) => field,
542 _ => unreachable!(),
543 };
544 let field = Field::new(
545 DEFAULT_MAP_FIELD_NAME,
546 DataType::Struct(vec![key_field, value_field].into()),
547 false,
549 );
550
551 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
552 field.into(),
553 false,
554 )))
555 }
556
557 fn primitive(
558 &mut self,
559 p: &crate::spec::PrimitiveType,
560 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
561 match p {
562 crate::spec::PrimitiveType::Boolean => {
563 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
564 }
565 crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
566 crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
567 crate::spec::PrimitiveType::Float => {
568 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
569 }
570 crate::spec::PrimitiveType::Double => {
571 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
572 }
573 crate::spec::PrimitiveType::Decimal { precision, scale } => {
574 let (precision, scale) = {
575 let precision: u8 = precision.to_owned().try_into().map_err(|err| {
576 Error::new(
577 crate::ErrorKind::DataInvalid,
578 "incompatible precision for decimal type convert",
579 )
580 .with_source(err)
581 })?;
582 let scale = scale.to_owned().try_into().map_err(|err| {
583 Error::new(
584 crate::ErrorKind::DataInvalid,
585 "incompatible scale for decimal type convert",
586 )
587 .with_source(err)
588 })?;
589 (precision, scale)
590 };
591 validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
592 |err| {
593 Error::new(
594 crate::ErrorKind::DataInvalid,
595 "incompatible precision and scale for decimal type convert",
596 )
597 .with_source(err)
598 },
599 )?;
600 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
601 precision, scale,
602 )))
603 }
604 crate::spec::PrimitiveType::Date => {
605 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
606 }
607 crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
608 DataType::Time64(TimeUnit::Microsecond),
609 )),
610 crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
611 DataType::Timestamp(TimeUnit::Microsecond, None),
612 )),
613 crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
614 DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
616 )),
617 crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
618 DataType::Timestamp(TimeUnit::Nanosecond, None),
619 )),
620 crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
621 DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
623 )),
624 crate::spec::PrimitiveType::String => {
625 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
626 }
627 crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
628 DataType::FixedSizeBinary(16),
629 )),
630 crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
631 len.to_i32()
632 .map(DataType::FixedSizeBinary)
633 .unwrap_or(DataType::LargeBinary),
634 )),
635 crate::spec::PrimitiveType::Binary => {
636 Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
637 }
638 }
639 }
640}
641
642pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
644 let mut converter = ToArrowSchemaConverter;
645 match crate::spec::visit_schema(schema, &mut converter)? {
646 ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
647 _ => unreachable!(),
648 }
649}
650
651pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
653 let mut converter = ToArrowSchemaConverter;
654 match crate::spec::visit_type(ty, &mut converter)? {
655 ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
656 _ => unreachable!(),
657 }
658}
659
660pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
662 match (datum.data_type(), datum.literal()) {
663 (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
664 Ok(Arc::new(BooleanArray::new_scalar(*value)))
665 }
666 (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
667 Ok(Arc::new(Int32Array::new_scalar(*value)))
668 }
669 (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
670 Ok(Arc::new(Int64Array::new_scalar(*value)))
671 }
672 (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
673 Ok(Arc::new(Float32Array::new_scalar(value.to_f32().unwrap())))
674 }
675 (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
676 Ok(Arc::new(Float64Array::new_scalar(value.to_f64().unwrap())))
677 }
678 (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
679 Ok(Arc::new(StringArray::new_scalar(value.as_str())))
680 }
681 (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
682 Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
683 }
684 (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
685 Ok(Arc::new(Date32Array::new_scalar(*value)))
686 }
687 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
688 Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
689 }
690 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
691 TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
692 ))),
693 (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
694 let array = Decimal128Array::from_value(*value, 1)
695 .with_precision_and_scale(*precision as _, *scale as _)
696 .unwrap();
697 Ok(Arc::new(Scalar::new(array)))
698 }
699 (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
700 let bytes = Uuid::from_u128(*value).into_bytes();
701 let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
702 Ok(Arc::new(Scalar::new(array)))
703 }
704
705 (primitive_type, _) => Err(Error::new(
706 ErrorKind::FeatureUnsupported,
707 format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
708 )),
709 }
710}
711
712pub(crate) fn get_parquet_stat_min_as_datum(
713 primitive_type: &PrimitiveType,
714 stats: &Statistics,
715) -> Result<Option<Datum>> {
716 Ok(match (primitive_type, stats) {
717 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
718 stats.min_opt().map(|val| Datum::bool(*val))
719 }
720 (PrimitiveType::Int, Statistics::Int32(stats)) => {
721 stats.min_opt().map(|val| Datum::int(*val))
722 }
723 (PrimitiveType::Date, Statistics::Int32(stats)) => {
724 stats.min_opt().map(|val| Datum::date(*val))
725 }
726 (PrimitiveType::Long, Statistics::Int64(stats)) => {
727 stats.min_opt().map(|val| Datum::long(*val))
728 }
729 (PrimitiveType::Time, Statistics::Int64(stats)) => {
730 let Some(val) = stats.min_opt() else {
731 return Ok(None);
732 };
733
734 Some(Datum::time_micros(*val)?)
735 }
736 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
737 stats.min_opt().map(|val| Datum::timestamp_micros(*val))
738 }
739 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
740 stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
741 }
742 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
743 stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
744 }
745 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
746 stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
747 }
748 (PrimitiveType::Float, Statistics::Float(stats)) => {
749 stats.min_opt().map(|val| Datum::float(*val))
750 }
751 (PrimitiveType::Double, Statistics::Double(stats)) => {
752 stats.min_opt().map(|val| Datum::double(*val))
753 }
754 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
755 let Some(val) = stats.min_opt() else {
756 return Ok(None);
757 };
758
759 Some(Datum::string(val.as_utf8()?))
760 }
761 (
762 PrimitiveType::Decimal {
763 precision: _,
764 scale: _,
765 },
766 Statistics::ByteArray(stats),
767 ) => {
768 let Some(bytes) = stats.min_bytes_opt() else {
769 return Ok(None);
770 };
771 Some(Datum::new(
772 primitive_type.clone(),
773 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
774 ))
775 }
776 (
777 PrimitiveType::Decimal {
778 precision: _,
779 scale: _,
780 },
781 Statistics::FixedLenByteArray(stats),
782 ) => {
783 let Some(bytes) = stats.min_bytes_opt() else {
784 return Ok(None);
785 };
786 let unscaled_value = BigInt::from_signed_bytes_be(bytes);
787 Some(Datum::new(
788 primitive_type.clone(),
789 PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
790 Error::new(
791 ErrorKind::DataInvalid,
792 format!("Can't convert bytes to i128: {bytes:?}"),
793 )
794 })?),
795 ))
796 }
797 (
798 PrimitiveType::Decimal {
799 precision: _,
800 scale: _,
801 },
802 Statistics::Int32(stats),
803 ) => stats.min_opt().map(|val| {
804 Datum::new(
805 primitive_type.clone(),
806 PrimitiveLiteral::Int128(i128::from(*val)),
807 )
808 }),
809
810 (
811 PrimitiveType::Decimal {
812 precision: _,
813 scale: _,
814 },
815 Statistics::Int64(stats),
816 ) => stats.min_opt().map(|val| {
817 Datum::new(
818 primitive_type.clone(),
819 PrimitiveLiteral::Int128(i128::from(*val)),
820 )
821 }),
822 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
823 let Some(bytes) = stats.min_bytes_opt() else {
824 return Ok(None);
825 };
826 if bytes.len() != 16 {
827 return Err(Error::new(
828 ErrorKind::Unexpected,
829 "Invalid length of uuid bytes.",
830 ));
831 }
832 Some(Datum::uuid(Uuid::from_bytes(
833 bytes[..16].try_into().unwrap(),
834 )))
835 }
836 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
837 let Some(bytes) = stat.min_bytes_opt() else {
838 return Ok(None);
839 };
840 if bytes.len() != *len as usize {
841 return Err(Error::new(
842 ErrorKind::Unexpected,
843 "Invalid length of fixed bytes.",
844 ));
845 }
846 Some(Datum::fixed(bytes.to_vec()))
847 }
848 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
849 return Ok(stat
850 .min_bytes_opt()
851 .map(|bytes| Datum::binary(bytes.to_vec())));
852 }
853 _ => {
854 return Ok(None);
855 }
856 })
857}
858
859pub(crate) fn get_parquet_stat_max_as_datum(
860 primitive_type: &PrimitiveType,
861 stats: &Statistics,
862) -> Result<Option<Datum>> {
863 Ok(match (primitive_type, stats) {
864 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
865 stats.max_opt().map(|val| Datum::bool(*val))
866 }
867 (PrimitiveType::Int, Statistics::Int32(stats)) => {
868 stats.max_opt().map(|val| Datum::int(*val))
869 }
870 (PrimitiveType::Date, Statistics::Int32(stats)) => {
871 stats.max_opt().map(|val| Datum::date(*val))
872 }
873 (PrimitiveType::Long, Statistics::Int64(stats)) => {
874 stats.max_opt().map(|val| Datum::long(*val))
875 }
876 (PrimitiveType::Time, Statistics::Int64(stats)) => {
877 let Some(val) = stats.max_opt() else {
878 return Ok(None);
879 };
880
881 Some(Datum::time_micros(*val)?)
882 }
883 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
884 stats.max_opt().map(|val| Datum::timestamp_micros(*val))
885 }
886 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
887 stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
888 }
889 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
890 stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
891 }
892 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
893 stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
894 }
895 (PrimitiveType::Float, Statistics::Float(stats)) => {
896 stats.max_opt().map(|val| Datum::float(*val))
897 }
898 (PrimitiveType::Double, Statistics::Double(stats)) => {
899 stats.max_opt().map(|val| Datum::double(*val))
900 }
901 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
902 let Some(val) = stats.max_opt() else {
903 return Ok(None);
904 };
905
906 Some(Datum::string(val.as_utf8()?))
907 }
908 (
909 PrimitiveType::Decimal {
910 precision: _,
911 scale: _,
912 },
913 Statistics::ByteArray(stats),
914 ) => {
915 let Some(bytes) = stats.max_bytes_opt() else {
916 return Ok(None);
917 };
918 Some(Datum::new(
919 primitive_type.clone(),
920 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
921 ))
922 }
923 (
924 PrimitiveType::Decimal {
925 precision: _,
926 scale: _,
927 },
928 Statistics::FixedLenByteArray(stats),
929 ) => {
930 let Some(bytes) = stats.max_bytes_opt() else {
931 return Ok(None);
932 };
933 let unscaled_value = BigInt::from_signed_bytes_be(bytes);
934 Some(Datum::new(
935 primitive_type.clone(),
936 PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
937 Error::new(
938 ErrorKind::DataInvalid,
939 format!("Can't convert bytes to i128: {bytes:?}"),
940 )
941 })?),
942 ))
943 }
944 (
945 PrimitiveType::Decimal {
946 precision: _,
947 scale: _,
948 },
949 Statistics::Int32(stats),
950 ) => stats.max_opt().map(|val| {
951 Datum::new(
952 primitive_type.clone(),
953 PrimitiveLiteral::Int128(i128::from(*val)),
954 )
955 }),
956
957 (
958 PrimitiveType::Decimal {
959 precision: _,
960 scale: _,
961 },
962 Statistics::Int64(stats),
963 ) => stats.max_opt().map(|val| {
964 Datum::new(
965 primitive_type.clone(),
966 PrimitiveLiteral::Int128(i128::from(*val)),
967 )
968 }),
969 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
970 let Some(bytes) = stats.max_bytes_opt() else {
971 return Ok(None);
972 };
973 if bytes.len() != 16 {
974 return Err(Error::new(
975 ErrorKind::Unexpected,
976 "Invalid length of uuid bytes.",
977 ));
978 }
979 Some(Datum::uuid(Uuid::from_bytes(
980 bytes[..16].try_into().unwrap(),
981 )))
982 }
983 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
984 let Some(bytes) = stat.max_bytes_opt() else {
985 return Ok(None);
986 };
987 if bytes.len() != *len as usize {
988 return Err(Error::new(
989 ErrorKind::Unexpected,
990 "Invalid length of fixed bytes.",
991 ));
992 }
993 Some(Datum::fixed(bytes.to_vec()))
994 }
995 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
996 return Ok(stat
997 .max_bytes_opt()
998 .map(|bytes| Datum::binary(bytes.to_vec())));
999 }
1000 _ => {
1001 return Ok(None);
1002 }
1003 })
1004}
1005
1006impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1007 type Error = Error;
1008
1009 fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1010 arrow_schema_to_schema(schema)
1011 }
1012}
1013
1014impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1015 type Error = Error;
1016
1017 fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1018 schema_to_arrow_schema(schema)
1019 }
1020}
1021
1022pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1044 let make_ree = |values_type: DataType| -> DataType {
1048 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1049 let values_field = Arc::new(Field::new("values", values_type, true));
1050 DataType::RunEndEncoded(run_ends_field, values_field)
1051 };
1052
1053 match datum.data_type() {
1055 PrimitiveType::Boolean => make_ree(DataType::Boolean),
1056 PrimitiveType::Int => make_ree(DataType::Int32),
1057 PrimitiveType::Long => make_ree(DataType::Int64),
1058 PrimitiveType::Float => make_ree(DataType::Float32),
1059 PrimitiveType::Double => make_ree(DataType::Float64),
1060 PrimitiveType::Date => make_ree(DataType::Date32),
1061 PrimitiveType::Time => make_ree(DataType::Int64),
1062 PrimitiveType::Timestamp => make_ree(DataType::Int64),
1063 PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1064 PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1065 PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1066 PrimitiveType::String => make_ree(DataType::Utf8),
1067 PrimitiveType::Uuid => make_ree(DataType::Binary),
1068 PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1069 PrimitiveType::Binary => make_ree(DataType::Binary),
1070 PrimitiveType::Decimal { precision, scale } => {
1071 make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1072 }
1073 }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use std::collections::HashMap;
1079 use std::sync::Arc;
1080
1081 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1082 use rust_decimal::Decimal;
1083
1084 use super::*;
1085 use crate::spec::{Literal, Schema};
1086
1087 fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1089 Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1090 PARQUET_FIELD_ID_META_KEY.to_string(),
1091 value.to_string(),
1092 )]))
1093 }
1094
1095 fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1096 let fields = Fields::from(vec![
1097 simple_field("key", DataType::Int32, false, "28"),
1098 simple_field("value", DataType::Utf8, true, "29"),
1099 ]);
1100
1101 let r#struct = DataType::Struct(fields);
1102 let map = DataType::Map(
1103 Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1104 false,
1105 );
1106 let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1107
1108 let fields = Fields::from(vec![
1109 simple_field("aa", DataType::Int32, false, "18"),
1110 simple_field("bb", DataType::Utf8, true, "19"),
1111 simple_field(
1112 "cc",
1113 DataType::Timestamp(TimeUnit::Microsecond, None),
1114 false,
1115 "20",
1116 ),
1117 ]);
1118
1119 let r#struct = DataType::Struct(fields);
1120
1121 ArrowSchema::new(vec![
1122 simple_field("a", DataType::Int32, false, "2"),
1123 simple_field("b", DataType::Int64, false, "1"),
1124 simple_field("c", DataType::Utf8, false, "3"),
1125 simple_field("n", DataType::Utf8, false, "21"),
1126 simple_field(
1127 "d",
1128 DataType::Timestamp(TimeUnit::Microsecond, None),
1129 true,
1130 "4",
1131 ),
1132 simple_field("e", DataType::Boolean, true, "6"),
1133 simple_field("f", DataType::Float32, false, "5"),
1134 simple_field("g", DataType::Float64, false, "7"),
1135 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1136 simple_field("h", DataType::Date32, false, "8"),
1137 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1138 simple_field(
1139 "j",
1140 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1141 false,
1142 "10",
1143 ),
1144 simple_field(
1145 "k",
1146 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1147 false,
1148 "12",
1149 ),
1150 simple_field("l", DataType::Binary, false, "13"),
1151 simple_field("o", DataType::LargeBinary, false, "22"),
1152 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1153 simple_field(
1154 "list",
1155 DataType::List(Arc::new(simple_field(
1156 "element",
1157 DataType::Int32,
1158 false,
1159 "15",
1160 ))),
1161 true,
1162 "14",
1163 ),
1164 simple_field(
1165 "large_list",
1166 DataType::LargeList(Arc::new(simple_field(
1167 "element",
1168 DataType::Utf8,
1169 false,
1170 "23",
1171 ))),
1172 true,
1173 "24",
1174 ),
1175 simple_field(
1176 "fixed_list",
1177 DataType::FixedSizeList(
1178 Arc::new(simple_field("element", DataType::Binary, false, "26")),
1179 10,
1180 ),
1181 true,
1182 "25",
1183 ),
1184 simple_field("map", map, false, "16"),
1185 simple_field("struct", r#struct, false, "17"),
1186 simple_field("dictionary", dictionary, false, "30"),
1187 ])
1188 }
1189
1190 fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1191 let schema_json = r#"{
1192 "type":"struct",
1193 "schema-id":0,
1194 "fields":[
1195 {
1196 "id":2,
1197 "name":"a",
1198 "required":true,
1199 "type":"int"
1200 },
1201 {
1202 "id":1,
1203 "name":"b",
1204 "required":true,
1205 "type":"long"
1206 },
1207 {
1208 "id":3,
1209 "name":"c",
1210 "required":true,
1211 "type":"string"
1212 },
1213 {
1214 "id":21,
1215 "name":"n",
1216 "required":true,
1217 "type":"string"
1218 },
1219 {
1220 "id":4,
1221 "name":"d",
1222 "required":false,
1223 "type":"timestamp"
1224 },
1225 {
1226 "id":6,
1227 "name":"e",
1228 "required":false,
1229 "type":"boolean"
1230 },
1231 {
1232 "id":5,
1233 "name":"f",
1234 "required":true,
1235 "type":"float"
1236 },
1237 {
1238 "id":7,
1239 "name":"g",
1240 "required":true,
1241 "type":"double"
1242 },
1243 {
1244 "id":27,
1245 "name":"p",
1246 "required":true,
1247 "type":"decimal(10,2)"
1248 },
1249 {
1250 "id":8,
1251 "name":"h",
1252 "required":true,
1253 "type":"date"
1254 },
1255 {
1256 "id":9,
1257 "name":"i",
1258 "required":true,
1259 "type":"time"
1260 },
1261 {
1262 "id":10,
1263 "name":"j",
1264 "required":true,
1265 "type":"timestamptz"
1266 },
1267 {
1268 "id":12,
1269 "name":"k",
1270 "required":true,
1271 "type":"timestamptz"
1272 },
1273 {
1274 "id":13,
1275 "name":"l",
1276 "required":true,
1277 "type":"binary"
1278 },
1279 {
1280 "id":22,
1281 "name":"o",
1282 "required":true,
1283 "type":"binary"
1284 },
1285 {
1286 "id":11,
1287 "name":"m",
1288 "required":true,
1289 "type":"fixed[10]"
1290 },
1291 {
1292 "id":14,
1293 "name":"list",
1294 "required": false,
1295 "type": {
1296 "type": "list",
1297 "element-id": 15,
1298 "element-required": true,
1299 "element": "int"
1300 }
1301 },
1302 {
1303 "id":24,
1304 "name":"large_list",
1305 "required": false,
1306 "type": {
1307 "type": "list",
1308 "element-id": 23,
1309 "element-required": true,
1310 "element": "string"
1311 }
1312 },
1313 {
1314 "id":25,
1315 "name":"fixed_list",
1316 "required": false,
1317 "type": {
1318 "type": "list",
1319 "element-id": 26,
1320 "element-required": true,
1321 "element": "binary"
1322 }
1323 },
1324 {
1325 "id":16,
1326 "name":"map",
1327 "required": true,
1328 "type": {
1329 "type": "map",
1330 "key-id": 28,
1331 "key": "int",
1332 "value-id": 29,
1333 "value-required": false,
1334 "value": "string"
1335 }
1336 },
1337 {
1338 "id":17,
1339 "name":"struct",
1340 "required": true,
1341 "type": {
1342 "type": "struct",
1343 "fields": [
1344 {
1345 "id":18,
1346 "name":"aa",
1347 "required":true,
1348 "type":"int"
1349 },
1350 {
1351 "id":19,
1352 "name":"bb",
1353 "required":false,
1354 "type":"string"
1355 },
1356 {
1357 "id":20,
1358 "name":"cc",
1359 "required":true,
1360 "type":"timestamp"
1361 }
1362 ]
1363 }
1364 },
1365 {
1366 "id":30,
1367 "name":"dictionary",
1368 "required":true,
1369 "type":"string"
1370 }
1371 ],
1372 "identifier-field-ids":[]
1373 }"#;
1374
1375 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1376 schema
1377 }
1378
1379 #[test]
1380 fn test_arrow_schema_to_schema() {
1381 let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1382 let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1383 let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1384 pretty_assertions::assert_eq!(converted_schema, schema);
1385 }
1386
1387 fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1388 let fields = Fields::from(vec![
1389 simple_field("key", DataType::Int32, false, "28"),
1390 simple_field("value", DataType::Utf8, true, "29"),
1391 ]);
1392
1393 let r#struct = DataType::Struct(fields);
1394 let map = DataType::Map(
1395 Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1396 false,
1397 );
1398
1399 let fields = Fields::from(vec![
1400 simple_field("aa", DataType::Int32, false, "18"),
1401 simple_field("bb", DataType::Utf8, true, "19"),
1402 simple_field(
1403 "cc",
1404 DataType::Timestamp(TimeUnit::Microsecond, None),
1405 false,
1406 "20",
1407 ),
1408 ]);
1409
1410 let r#struct = DataType::Struct(fields);
1411
1412 ArrowSchema::new(vec![
1413 simple_field("a", DataType::Int32, false, "2"),
1414 simple_field("b", DataType::Int64, false, "1"),
1415 simple_field("c", DataType::Utf8, false, "3"),
1416 simple_field("n", DataType::Utf8, false, "21"),
1417 simple_field(
1418 "d",
1419 DataType::Timestamp(TimeUnit::Microsecond, None),
1420 true,
1421 "4",
1422 ),
1423 simple_field("e", DataType::Boolean, true, "6"),
1424 simple_field("f", DataType::Float32, false, "5"),
1425 simple_field("g", DataType::Float64, false, "7"),
1426 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1427 simple_field("h", DataType::Date32, false, "8"),
1428 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1429 simple_field(
1430 "j",
1431 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1432 false,
1433 "10",
1434 ),
1435 simple_field(
1436 "k",
1437 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1438 false,
1439 "12",
1440 ),
1441 simple_field("l", DataType::LargeBinary, false, "13"),
1442 simple_field("o", DataType::LargeBinary, false, "22"),
1443 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1444 simple_field(
1445 "list",
1446 DataType::List(Arc::new(simple_field(
1447 "element",
1448 DataType::Int32,
1449 false,
1450 "15",
1451 ))),
1452 true,
1453 "14",
1454 ),
1455 simple_field(
1456 "large_list",
1457 DataType::List(Arc::new(simple_field(
1458 "element",
1459 DataType::Utf8,
1460 false,
1461 "23",
1462 ))),
1463 true,
1464 "24",
1465 ),
1466 simple_field(
1467 "fixed_list",
1468 DataType::List(Arc::new(simple_field(
1469 "element",
1470 DataType::LargeBinary,
1471 false,
1472 "26",
1473 ))),
1474 true,
1475 "25",
1476 ),
1477 simple_field("map", map, false, "16"),
1478 simple_field("struct", r#struct, false, "17"),
1479 simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1480 ])
1481 }
1482
1483 fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1484 let schema_json = r#"{
1485 "type":"struct",
1486 "schema-id":0,
1487 "fields":[
1488 {
1489 "id":2,
1490 "name":"a",
1491 "required":true,
1492 "type":"int"
1493 },
1494 {
1495 "id":1,
1496 "name":"b",
1497 "required":true,
1498 "type":"long"
1499 },
1500 {
1501 "id":3,
1502 "name":"c",
1503 "required":true,
1504 "type":"string"
1505 },
1506 {
1507 "id":21,
1508 "name":"n",
1509 "required":true,
1510 "type":"string"
1511 },
1512 {
1513 "id":4,
1514 "name":"d",
1515 "required":false,
1516 "type":"timestamp"
1517 },
1518 {
1519 "id":6,
1520 "name":"e",
1521 "required":false,
1522 "type":"boolean"
1523 },
1524 {
1525 "id":5,
1526 "name":"f",
1527 "required":true,
1528 "type":"float"
1529 },
1530 {
1531 "id":7,
1532 "name":"g",
1533 "required":true,
1534 "type":"double"
1535 },
1536 {
1537 "id":27,
1538 "name":"p",
1539 "required":true,
1540 "type":"decimal(10,2)"
1541 },
1542 {
1543 "id":8,
1544 "name":"h",
1545 "required":true,
1546 "type":"date"
1547 },
1548 {
1549 "id":9,
1550 "name":"i",
1551 "required":true,
1552 "type":"time"
1553 },
1554 {
1555 "id":10,
1556 "name":"j",
1557 "required":true,
1558 "type":"timestamptz"
1559 },
1560 {
1561 "id":12,
1562 "name":"k",
1563 "required":true,
1564 "type":"timestamptz"
1565 },
1566 {
1567 "id":13,
1568 "name":"l",
1569 "required":true,
1570 "type":"binary"
1571 },
1572 {
1573 "id":22,
1574 "name":"o",
1575 "required":true,
1576 "type":"binary"
1577 },
1578 {
1579 "id":11,
1580 "name":"m",
1581 "required":true,
1582 "type":"fixed[10]"
1583 },
1584 {
1585 "id":14,
1586 "name":"list",
1587 "required": false,
1588 "type": {
1589 "type": "list",
1590 "element-id": 15,
1591 "element-required": true,
1592 "element": "int"
1593 }
1594 },
1595 {
1596 "id":24,
1597 "name":"large_list",
1598 "required": false,
1599 "type": {
1600 "type": "list",
1601 "element-id": 23,
1602 "element-required": true,
1603 "element": "string"
1604 }
1605 },
1606 {
1607 "id":25,
1608 "name":"fixed_list",
1609 "required": false,
1610 "type": {
1611 "type": "list",
1612 "element-id": 26,
1613 "element-required": true,
1614 "element": "binary"
1615 }
1616 },
1617 {
1618 "id":16,
1619 "name":"map",
1620 "required": true,
1621 "type": {
1622 "type": "map",
1623 "key-id": 28,
1624 "key": "int",
1625 "value-id": 29,
1626 "value-required": false,
1627 "value": "string"
1628 }
1629 },
1630 {
1631 "id":17,
1632 "name":"struct",
1633 "required": true,
1634 "type": {
1635 "type": "struct",
1636 "fields": [
1637 {
1638 "id":18,
1639 "name":"aa",
1640 "required":true,
1641 "type":"int"
1642 },
1643 {
1644 "id":19,
1645 "name":"bb",
1646 "required":false,
1647 "type":"string"
1648 },
1649 {
1650 "id":20,
1651 "name":"cc",
1652 "required":true,
1653 "type":"timestamp"
1654 }
1655 ]
1656 }
1657 },
1658 {
1659 "id":30,
1660 "name":"uuid",
1661 "required":true,
1662 "type":"uuid"
1663 }
1664 ],
1665 "identifier-field-ids":[]
1666 }"#;
1667
1668 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1669 schema
1670 }
1671
1672 #[test]
1673 fn test_schema_to_arrow_schema() {
1674 let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1675 let schema = iceberg_schema_for_schema_to_arrow_schema();
1676 let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1677 assert_eq!(converted_arrow_schema, arrow_schema);
1678 }
1679
1680 #[test]
1681 fn test_type_conversion() {
1682 {
1684 let arrow_type = DataType::Int32;
1685 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1686 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1687 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1688 }
1689
1690 {
1692 let arrow_type = DataType::Struct(Fields::from(vec![
1694 Field::new("a", DataType::Int64, false),
1695 Field::new("b", DataType::Utf8, true),
1696 ]));
1697 assert_eq!(
1698 &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1699 "DataInvalid => Field id not found in metadata"
1700 );
1701
1702 let arrow_type = DataType::Struct(Fields::from(vec![
1703 Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1704 PARQUET_FIELD_ID_META_KEY.to_string(),
1705 1.to_string(),
1706 )])),
1707 Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1708 PARQUET_FIELD_ID_META_KEY.to_string(),
1709 2.to_string(),
1710 )])),
1711 ]));
1712 let iceberg_type = Type::Struct(StructType::new(vec![
1713 NestedField {
1714 id: 1,
1715 doc: None,
1716 name: "a".to_string(),
1717 required: true,
1718 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1719 initial_default: None,
1720 write_default: None,
1721 }
1722 .into(),
1723 NestedField {
1724 id: 2,
1725 doc: None,
1726 name: "b".to_string(),
1727 required: false,
1728 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1729 initial_default: None,
1730 write_default: None,
1731 }
1732 .into(),
1733 ]));
1734 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1735 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1736
1737 let iceberg_type = Type::Struct(StructType::new(vec![
1739 NestedField {
1740 id: 1,
1741 doc: None,
1742 name: "a".to_string(),
1743 required: true,
1744 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1745 initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1746 write_default: None,
1747 }
1748 .into(),
1749 NestedField {
1750 id: 2,
1751 doc: None,
1752 name: "b".to_string(),
1753 required: false,
1754 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1755 initial_default: None,
1756 write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1757 "514".to_string(),
1758 ))),
1759 }
1760 .into(),
1761 ]));
1762 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1763 }
1764
1765 {
1767 let arrow_type =
1768 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1769 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1770 assert_eq!(
1771 iceberg_type,
1772 arrow_type_to_type(&arrow_type).unwrap(),
1773 "Expected dictionary conversion to use the contained value"
1774 );
1775
1776 let arrow_type =
1777 DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1778 let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1779 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1780 }
1781 }
1782
1783 #[test]
1784 fn test_unsigned_integer_type_conversion() {
1785 let test_cases = vec![
1786 (DataType::UInt8, PrimitiveType::Int),
1787 (DataType::UInt16, PrimitiveType::Int),
1788 (DataType::UInt32, PrimitiveType::Long),
1789 ];
1790
1791 for (arrow_type, expected_iceberg_type) in test_cases {
1792 let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
1793 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1794 );
1795 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1796
1797 let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1798 let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
1799
1800 assert!(
1801 matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
1802 "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
1803 );
1804 }
1805
1806 {
1808 let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
1809 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1810 );
1811 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1812
1813 let result = arrow_schema_to_schema(&arrow_schema);
1814 assert!(result.is_err());
1815 assert!(
1816 result
1817 .unwrap_err()
1818 .to_string()
1819 .contains("UInt64 is not supported")
1820 );
1821 }
1822 }
1823
1824 #[test]
1825 fn test_datum_conversion() {
1826 {
1827 let datum = Datum::bool(true);
1828 let arrow_datum = get_arrow_datum(&datum).unwrap();
1829 let (array, is_scalar) = arrow_datum.get();
1830 let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
1831 assert!(is_scalar);
1832 assert!(array.value(0));
1833 }
1834 {
1835 let datum = Datum::int(42);
1836 let arrow_datum = get_arrow_datum(&datum).unwrap();
1837 let (array, is_scalar) = arrow_datum.get();
1838 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
1839 assert!(is_scalar);
1840 assert_eq!(array.value(0), 42);
1841 }
1842 {
1843 let datum = Datum::long(42);
1844 let arrow_datum = get_arrow_datum(&datum).unwrap();
1845 let (array, is_scalar) = arrow_datum.get();
1846 let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
1847 assert!(is_scalar);
1848 assert_eq!(array.value(0), 42);
1849 }
1850 {
1851 let datum = Datum::float(42.42);
1852 let arrow_datum = get_arrow_datum(&datum).unwrap();
1853 let (array, is_scalar) = arrow_datum.get();
1854 let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
1855 assert!(is_scalar);
1856 assert_eq!(array.value(0), 42.42);
1857 }
1858 {
1859 let datum = Datum::double(42.42);
1860 let arrow_datum = get_arrow_datum(&datum).unwrap();
1861 let (array, is_scalar) = arrow_datum.get();
1862 let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
1863 assert!(is_scalar);
1864 assert_eq!(array.value(0), 42.42);
1865 }
1866 {
1867 let datum = Datum::string("abc");
1868 let arrow_datum = get_arrow_datum(&datum).unwrap();
1869 let (array, is_scalar) = arrow_datum.get();
1870 let array = array.as_any().downcast_ref::<StringArray>().unwrap();
1871 assert!(is_scalar);
1872 assert_eq!(array.value(0), "abc");
1873 }
1874 {
1875 let datum = Datum::binary(vec![1, 2, 3, 4]);
1876 let arrow_datum = get_arrow_datum(&datum).unwrap();
1877 let (array, is_scalar) = arrow_datum.get();
1878 let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
1879 assert!(is_scalar);
1880 assert_eq!(array.value(0), &[1, 2, 3, 4]);
1881 }
1882 {
1883 let datum = Datum::date(42);
1884 let arrow_datum = get_arrow_datum(&datum).unwrap();
1885 let (array, is_scalar) = arrow_datum.get();
1886 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
1887 assert!(is_scalar);
1888 assert_eq!(array.value(0), 42);
1889 }
1890 {
1891 let datum = Datum::timestamp_micros(42);
1892 let arrow_datum = get_arrow_datum(&datum).unwrap();
1893 let (array, is_scalar) = arrow_datum.get();
1894 let array = array
1895 .as_any()
1896 .downcast_ref::<TimestampMicrosecondArray>()
1897 .unwrap();
1898 assert!(is_scalar);
1899 assert_eq!(array.value(0), 42);
1900 }
1901 {
1902 let datum = Datum::timestamptz_micros(42);
1903 let arrow_datum = get_arrow_datum(&datum).unwrap();
1904 let (array, is_scalar) = arrow_datum.get();
1905 let array = array
1906 .as_any()
1907 .downcast_ref::<TimestampMicrosecondArray>()
1908 .unwrap();
1909 assert!(is_scalar);
1910 assert_eq!(array.timezone(), Some("+00:00"));
1911 assert_eq!(array.value(0), 42);
1912 }
1913 {
1914 let datum = Datum::decimal_with_precision(Decimal::new(123, 2), 30).unwrap();
1915 let arrow_datum = get_arrow_datum(&datum).unwrap();
1916 let (array, is_scalar) = arrow_datum.get();
1917 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
1918 assert!(is_scalar);
1919 assert_eq!(array.precision(), 30);
1920 assert_eq!(array.scale(), 2);
1921 assert_eq!(array.value(0), 123);
1922 }
1923 {
1924 let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
1925 let arrow_datum = get_arrow_datum(&datum).unwrap();
1926 let (array, is_scalar) = arrow_datum.get();
1927 let array = array
1928 .as_any()
1929 .downcast_ref::<FixedSizeBinaryArray>()
1930 .unwrap();
1931 assert!(is_scalar);
1932 assert_eq!(array.value(0), [66u8; 16]);
1933 }
1934 }
1935}