1use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
21 LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
22 Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
23};
24use arrow_schema::{DataType, FieldRef};
25use uuid::Uuid;
26
27use super::get_field_id;
28use crate::spec::{
29 ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType,
30 SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
31 visit_type_with_partner,
32};
33use crate::{Error, ErrorKind, Result};
34
35struct ArrowArrayToIcebergStructConverter;
36
37impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
38 type T = Vec<Option<Literal>>;
39
40 fn schema(
41 &mut self,
42 _schema: &crate::spec::Schema,
43 _partner: &ArrayRef,
44 value: Vec<Option<Literal>>,
45 ) -> Result<Vec<Option<Literal>>> {
46 Ok(value)
47 }
48
49 fn field(
50 &mut self,
51 field: &crate::spec::NestedFieldRef,
52 _partner: &ArrayRef,
53 value: Vec<Option<Literal>>,
54 ) -> Result<Vec<Option<Literal>>> {
55 if field.required && value.iter().any(Option::is_none) {
57 return Err(Error::new(
58 ErrorKind::DataInvalid,
59 "The field is required but has null value",
60 )
61 .with_context("field_id", field.id.to_string())
62 .with_context("field_name", &field.name));
63 }
64 Ok(value)
65 }
66
67 fn r#struct(
68 &mut self,
69 _struct: &StructType,
70 array: &ArrayRef,
71 results: Vec<Vec<Option<Literal>>>,
72 ) -> Result<Vec<Option<Literal>>> {
73 let row_len = results.first().map(|column| column.len()).unwrap_or(0);
74 if let Some(col) = results.iter().find(|col| col.len() != row_len) {
75 return Err(Error::new(
76 ErrorKind::DataInvalid,
77 "The struct columns have different row length",
78 )
79 .with_context("first col length", row_len.to_string())
80 .with_context("actual col length", col.len().to_string()));
81 }
82
83 let mut struct_literals = Vec::with_capacity(row_len);
84 let mut columns_iters = results
85 .into_iter()
86 .map(|column| column.into_iter())
87 .collect::<Vec<_>>();
88
89 for i in 0..row_len {
90 let mut literals = Vec::with_capacity(columns_iters.len());
91 for column_iter in columns_iters.iter_mut() {
92 literals.push(column_iter.next().unwrap());
93 }
94 if array.is_null(i) {
95 struct_literals.push(None);
96 } else {
97 struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
98 }
99 }
100
101 Ok(struct_literals)
102 }
103
104 fn list(
105 &mut self,
106 list: &ListType,
107 array: &ArrayRef,
108 elements: Vec<Option<Literal>>,
109 ) -> Result<Vec<Option<Literal>>> {
110 if list.element_field.required && elements.iter().any(Option::is_none) {
111 return Err(Error::new(
112 ErrorKind::DataInvalid,
113 "The list should not have null value",
114 ));
115 }
116 match array.data_type() {
117 DataType::List(_) => {
118 let offset = array
119 .as_any()
120 .downcast_ref::<ListArray>()
121 .ok_or_else(|| {
122 Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
123 })?
124 .offsets();
125 let mut result = Vec::with_capacity(offset.len() - 1);
127 for i in 0..offset.len() - 1 {
128 let start = offset[i] as usize;
129 let end = offset[i + 1] as usize;
130 result.push(Some(Literal::List(elements[start..end].to_vec())));
131 }
132 Ok(result)
133 }
134 DataType::LargeList(_) => {
135 let offset = array
136 .as_any()
137 .downcast_ref::<LargeListArray>()
138 .ok_or_else(|| {
139 Error::new(
140 ErrorKind::DataInvalid,
141 "The partner is not a large list array",
142 )
143 })?
144 .offsets();
145 let mut result = Vec::with_capacity(offset.len() - 1);
147 for i in 0..offset.len() - 1 {
148 let start = offset[i] as usize;
149 let end = offset[i + 1] as usize;
150 result.push(Some(Literal::List(elements[start..end].to_vec())));
151 }
152 Ok(result)
153 }
154 DataType::FixedSizeList(_, len) => {
155 let mut result = Vec::with_capacity(elements.len() / *len as usize);
156 for i in 0..elements.len() / *len as usize {
157 let start = i * *len as usize;
158 let end = (i + 1) * *len as usize;
159 result.push(Some(Literal::List(elements[start..end].to_vec())));
160 }
161 Ok(result)
162 }
163 _ => Err(Error::new(
164 ErrorKind::DataInvalid,
165 "The partner is not a list type",
166 )),
167 }
168 }
169
170 fn map(
171 &mut self,
172 _map: &MapType,
173 partner: &ArrayRef,
174 key_values: Vec<Option<Literal>>,
175 values: Vec<Option<Literal>>,
176 ) -> Result<Vec<Option<Literal>>> {
177 if key_values.len() != values.len() {
179 return Err(Error::new(
180 ErrorKind::DataInvalid,
181 "The key value and value of map should have the same row length",
182 ));
183 }
184
185 let offsets = partner
186 .as_any()
187 .downcast_ref::<MapArray>()
188 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
189 .offsets();
190 let mut result = Vec::with_capacity(offsets.len() - 1);
192 for i in 0..offsets.len() - 1 {
193 let start = offsets[i] as usize;
194 let end = offsets[i + 1] as usize;
195 let mut map = Map::new();
196 for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
197 map.insert(key.clone().unwrap(), value.clone());
198 }
199 result.push(Some(Literal::Map(map)));
200 }
201 Ok(result)
202 }
203
204 fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
205 match p {
206 PrimitiveType::Boolean => {
207 let array = partner
208 .as_any()
209 .downcast_ref::<BooleanArray>()
210 .ok_or_else(|| {
211 Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
212 })?;
213 Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
214 }
215 PrimitiveType::Int => {
216 let array = partner
217 .as_any()
218 .downcast_ref::<Int32Array>()
219 .ok_or_else(|| {
220 Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
221 })?;
222 Ok(array.iter().map(|v| v.map(Literal::int)).collect())
223 }
224 PrimitiveType::Long => {
225 let array = partner
226 .as_any()
227 .downcast_ref::<Int64Array>()
228 .ok_or_else(|| {
229 Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
230 })?;
231 Ok(array.iter().map(|v| v.map(Literal::long)).collect())
232 }
233 PrimitiveType::Float => {
234 let array = partner
235 .as_any()
236 .downcast_ref::<Float32Array>()
237 .ok_or_else(|| {
238 Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
239 })?;
240 Ok(array.iter().map(|v| v.map(Literal::float)).collect())
241 }
242 PrimitiveType::Double => {
243 let array = partner
244 .as_any()
245 .downcast_ref::<Float64Array>()
246 .ok_or_else(|| {
247 Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
248 })?;
249 Ok(array.iter().map(|v| v.map(Literal::double)).collect())
250 }
251 PrimitiveType::Decimal { precision, scale } => {
252 let array = partner
253 .as_any()
254 .downcast_ref::<Decimal128Array>()
255 .ok_or_else(|| {
256 Error::new(
257 ErrorKind::DataInvalid,
258 "The partner is not a decimal128 array",
259 )
260 })?;
261 if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() {
262 if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale {
263 return Err(Error::new(
264 ErrorKind::DataInvalid,
265 format!(
266 "The precision or scale ({},{}) of arrow decimal128 array is not compatible with iceberg decimal type ({},{})",
267 arrow_precision, arrow_scale, precision, scale
268 ),
269 ));
270 }
271 }
272 Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
273 }
274 PrimitiveType::Date => {
275 let array = partner
276 .as_any()
277 .downcast_ref::<Date32Array>()
278 .ok_or_else(|| {
279 Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
280 })?;
281 Ok(array.iter().map(|v| v.map(Literal::date)).collect())
282 }
283 PrimitiveType::Time => {
284 let array = partner
285 .as_any()
286 .downcast_ref::<Time64MicrosecondArray>()
287 .ok_or_else(|| {
288 Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
289 })?;
290 Ok(array.iter().map(|v| v.map(Literal::time)).collect())
291 }
292 PrimitiveType::Timestamp => {
293 let array = partner
294 .as_any()
295 .downcast_ref::<TimestampMicrosecondArray>()
296 .ok_or_else(|| {
297 Error::new(
298 ErrorKind::DataInvalid,
299 "The partner is not a timestamp array",
300 )
301 })?;
302 Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
303 }
304 PrimitiveType::Timestamptz => {
305 let array = partner
306 .as_any()
307 .downcast_ref::<TimestampMicrosecondArray>()
308 .ok_or_else(|| {
309 Error::new(
310 ErrorKind::DataInvalid,
311 "The partner is not a timestamptz array",
312 )
313 })?;
314 Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
315 }
316 PrimitiveType::TimestampNs => {
317 let array = partner
318 .as_any()
319 .downcast_ref::<TimestampNanosecondArray>()
320 .ok_or_else(|| {
321 Error::new(
322 ErrorKind::DataInvalid,
323 "The partner is not a timestamp_ns array",
324 )
325 })?;
326 Ok(array
327 .iter()
328 .map(|v| v.map(Literal::timestamp_nano))
329 .collect())
330 }
331 PrimitiveType::TimestamptzNs => {
332 let array = partner
333 .as_any()
334 .downcast_ref::<TimestampNanosecondArray>()
335 .ok_or_else(|| {
336 Error::new(
337 ErrorKind::DataInvalid,
338 "The partner is not a timestamptz_ns array",
339 )
340 })?;
341 Ok(array
342 .iter()
343 .map(|v| v.map(Literal::timestamptz_nano))
344 .collect())
345 }
346 PrimitiveType::String => {
347 if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
348 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
349 } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
350 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
351 } else {
352 return Err(Error::new(
353 ErrorKind::DataInvalid,
354 "The partner is not a string array",
355 ));
356 }
357 }
358 PrimitiveType::Uuid => {
359 if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
360 if array.value_length() != 16 {
361 return Err(Error::new(
362 ErrorKind::DataInvalid,
363 "The partner is not a uuid array",
364 ));
365 }
366 Ok(array
367 .iter()
368 .map(|v| {
369 v.map(|v| {
370 Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
371 |_| {
372 Error::new(
373 ErrorKind::DataInvalid,
374 "Failed to convert binary to uuid",
375 )
376 },
377 )?)))
378 })
379 .transpose()
380 })
381 .collect::<Result<Vec<_>>>()?)
382 } else {
383 Err(Error::new(
384 ErrorKind::DataInvalid,
385 "The partner is not a uuid array",
386 ))
387 }
388 }
389 PrimitiveType::Fixed(len) => {
390 let array = partner
391 .as_any()
392 .downcast_ref::<FixedSizeBinaryArray>()
393 .ok_or_else(|| {
394 Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
395 })?;
396 if array.value_length() != *len as i32 {
397 return Err(Error::new(
398 ErrorKind::DataInvalid,
399 "The length of fixed size binary array is not compatible with iceberg fixed type",
400 ));
401 }
402 Ok(array
403 .iter()
404 .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
405 .collect())
406 }
407 PrimitiveType::Binary => {
408 if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
409 Ok(array
410 .iter()
411 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
412 .collect())
413 } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
414 Ok(array
415 .iter()
416 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
417 .collect())
418 } else {
419 return Err(Error::new(
420 ErrorKind::DataInvalid,
421 "The partner is not a binary array",
422 ));
423 }
424 }
425 }
426 }
427}
428
429#[derive(Clone, Copy, Debug)]
440pub enum FieldMatchMode {
441 Id,
443 Name,
445}
446
447impl FieldMatchMode {
448 pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
450 match self {
451 FieldMatchMode::Id => get_field_id(arrow_field)
452 .map(|id| id == iceberg_field.id)
453 .unwrap_or(false),
454 FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
455 }
456 }
457}
458
459pub struct ArrowArrayAccessor {
461 match_mode: FieldMatchMode,
462}
463
464impl ArrowArrayAccessor {
465 pub fn new() -> Self {
467 Self {
468 match_mode: FieldMatchMode::Id,
469 }
470 }
471
472 pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
474 Self { match_mode }
475 }
476}
477
478impl Default for ArrowArrayAccessor {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
485 fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
486 if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
487 return Err(Error::new(
488 ErrorKind::DataInvalid,
489 "The schema partner is not a struct type",
490 ));
491 }
492
493 Ok(schema_partner)
494 }
495
496 fn field_partner<'a>(
497 &self,
498 struct_partner: &'a ArrayRef,
499 field: &NestedField,
500 ) -> Result<&'a ArrayRef> {
501 let struct_array = struct_partner
502 .as_any()
503 .downcast_ref::<StructArray>()
504 .ok_or_else(|| {
505 Error::new(
506 ErrorKind::DataInvalid,
507 format!(
508 "The struct partner is not a struct array, partner: {:?}",
509 struct_partner
510 ),
511 )
512 })?;
513
514 let field_pos = struct_array
515 .fields()
516 .iter()
517 .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
518 .ok_or_else(|| {
519 Error::new(
520 ErrorKind::DataInvalid,
521 format!("Field id {} not found in struct array", field.id),
522 )
523 })?;
524
525 Ok(struct_array.column(field_pos))
526 }
527
528 fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
529 match list_partner.data_type() {
530 DataType::List(_) => {
531 let list_array = list_partner
532 .as_any()
533 .downcast_ref::<ListArray>()
534 .ok_or_else(|| {
535 Error::new(
536 ErrorKind::DataInvalid,
537 "The list partner is not a list array",
538 )
539 })?;
540 Ok(list_array.values())
541 }
542 DataType::LargeList(_) => {
543 let list_array = list_partner
544 .as_any()
545 .downcast_ref::<LargeListArray>()
546 .ok_or_else(|| {
547 Error::new(
548 ErrorKind::DataInvalid,
549 "The list partner is not a large list array",
550 )
551 })?;
552 Ok(list_array.values())
553 }
554 DataType::FixedSizeList(_, _) => {
555 let list_array = list_partner
556 .as_any()
557 .downcast_ref::<FixedSizeListArray>()
558 .ok_or_else(|| {
559 Error::new(
560 ErrorKind::DataInvalid,
561 "The list partner is not a fixed size list array",
562 )
563 })?;
564 Ok(list_array.values())
565 }
566 _ => Err(Error::new(
567 ErrorKind::DataInvalid,
568 "The list partner is not a list type",
569 )),
570 }
571 }
572
573 fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
574 let map_array = map_partner
575 .as_any()
576 .downcast_ref::<MapArray>()
577 .ok_or_else(|| {
578 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
579 })?;
580 Ok(map_array.keys())
581 }
582
583 fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
584 let map_array = map_partner
585 .as_any()
586 .downcast_ref::<MapArray>()
587 .ok_or_else(|| {
588 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
589 })?;
590 Ok(map_array.values())
591 }
592}
593
594pub fn arrow_struct_to_literal(
597 struct_array: &ArrayRef,
598 ty: &StructType,
599) -> Result<Vec<Option<Literal>>> {
600 visit_struct_with_partner(
601 ty,
602 struct_array,
603 &mut ArrowArrayToIcebergStructConverter,
604 &ArrowArrayAccessor::new(),
605 )
606}
607
608pub fn arrow_primitive_to_literal(
611 primitive_array: &ArrayRef,
612 ty: &Type,
613) -> Result<Vec<Option<Literal>>> {
614 visit_type_with_partner(
615 ty,
616 primitive_array,
617 &mut ArrowArrayToIcebergStructConverter,
618 &ArrowArrayAccessor::new(),
619 )
620}
621
622#[cfg(test)]
623mod test {
624 use std::collections::HashMap;
625 use std::sync::Arc;
626
627 use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
628 use arrow_array::{
629 ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
630 Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
631 TimestampMicrosecondArray, TimestampNanosecondArray,
632 };
633 use arrow_schema::{DataType, Field, Fields, TimeUnit};
634 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
635
636 use super::*;
637 use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
638
639 #[test]
640 fn test_arrow_struct_to_iceberg_struct() {
641 let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
642 let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
643 let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
644 let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
645 let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
646 let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
647 .with_precision_and_scale(10, 2)
648 .unwrap();
649 let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
650 let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
651 let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
652 Some(1622548800000000),
653 Some(1622635200000000),
654 None,
655 ]);
656 let timestamp_nano_array = TimestampNanosecondArray::from(vec![
657 Some(1622548800000000000),
658 Some(1622635200000000000),
659 None,
660 ]);
661 let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
662 let binary_array =
663 BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
664
665 let struct_array = Arc::new(StructArray::from(vec![
666 (
667 Arc::new(
668 Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
669 [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
670 )),
671 ),
672 Arc::new(bool_array) as ArrayRef,
673 ),
674 (
675 Arc::new(
676 Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
677 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
678 )),
679 ),
680 Arc::new(int32_array) as ArrayRef,
681 ),
682 (
683 Arc::new(
684 Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
685 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
686 )),
687 ),
688 Arc::new(int64_array) as ArrayRef,
689 ),
690 (
691 Arc::new(
692 Field::new("float32_field", DataType::Float32, true).with_metadata(
693 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
694 ),
695 ),
696 Arc::new(float32_array) as ArrayRef,
697 ),
698 (
699 Arc::new(
700 Field::new("float64_field", DataType::Float64, true).with_metadata(
701 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
702 ),
703 ),
704 Arc::new(float64_array) as ArrayRef,
705 ),
706 (
707 Arc::new(
708 Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
709 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
710 ),
711 ),
712 Arc::new(decimal_array) as ArrayRef,
713 ),
714 (
715 Arc::new(
716 Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
717 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
718 )),
719 ),
720 Arc::new(date_array) as ArrayRef,
721 ),
722 (
723 Arc::new(
724 Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
725 .with_metadata(HashMap::from([(
726 PARQUET_FIELD_ID_META_KEY.to_string(),
727 "8".to_string(),
728 )])),
729 ),
730 Arc::new(time_array) as ArrayRef,
731 ),
732 (
733 Arc::new(
734 Field::new(
735 "timestamp_micro_field",
736 DataType::Timestamp(TimeUnit::Microsecond, None),
737 true,
738 )
739 .with_metadata(HashMap::from([(
740 PARQUET_FIELD_ID_META_KEY.to_string(),
741 "9".to_string(),
742 )])),
743 ),
744 Arc::new(timestamp_micro_array) as ArrayRef,
745 ),
746 (
747 Arc::new(
748 Field::new(
749 "timestamp_nano_field",
750 DataType::Timestamp(TimeUnit::Nanosecond, None),
751 true,
752 )
753 .with_metadata(HashMap::from([(
754 PARQUET_FIELD_ID_META_KEY.to_string(),
755 "10".to_string(),
756 )])),
757 ),
758 Arc::new(timestamp_nano_array) as ArrayRef,
759 ),
760 (
761 Arc::new(
762 Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
763 [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
764 )),
765 ),
766 Arc::new(string_array) as ArrayRef,
767 ),
768 (
769 Arc::new(
770 Field::new("binary_field", DataType::Binary, true).with_metadata(
771 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
772 ),
773 ),
774 Arc::new(binary_array) as ArrayRef,
775 ),
776 ])) as ArrayRef;
777
778 let iceberg_struct_type = StructType::new(vec![
779 Arc::new(NestedField::optional(
780 0,
781 "bool_field",
782 Type::Primitive(PrimitiveType::Boolean),
783 )),
784 Arc::new(NestedField::optional(
785 2,
786 "int32_field",
787 Type::Primitive(PrimitiveType::Int),
788 )),
789 Arc::new(NestedField::optional(
790 3,
791 "int64_field",
792 Type::Primitive(PrimitiveType::Long),
793 )),
794 Arc::new(NestedField::optional(
795 4,
796 "float32_field",
797 Type::Primitive(PrimitiveType::Float),
798 )),
799 Arc::new(NestedField::optional(
800 5,
801 "float64_field",
802 Type::Primitive(PrimitiveType::Double),
803 )),
804 Arc::new(NestedField::optional(
805 6,
806 "decimal_field",
807 Type::Primitive(PrimitiveType::Decimal {
808 precision: 10,
809 scale: 2,
810 }),
811 )),
812 Arc::new(NestedField::optional(
813 7,
814 "date_field",
815 Type::Primitive(PrimitiveType::Date),
816 )),
817 Arc::new(NestedField::optional(
818 8,
819 "time_field",
820 Type::Primitive(PrimitiveType::Time),
821 )),
822 Arc::new(NestedField::optional(
823 9,
824 "timestamp_micro_field",
825 Type::Primitive(PrimitiveType::Timestamp),
826 )),
827 Arc::new(NestedField::optional(
828 10,
829 "timestamp_nao_field",
830 Type::Primitive(PrimitiveType::TimestampNs),
831 )),
832 Arc::new(NestedField::optional(
833 11,
834 "string_field",
835 Type::Primitive(PrimitiveType::String),
836 )),
837 Arc::new(NestedField::optional(
838 12,
839 "binary_field",
840 Type::Primitive(PrimitiveType::Binary),
841 )),
842 ]);
843
844 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
845
846 assert_eq!(result, vec![
847 Some(Literal::Struct(Struct::from_iter(vec![
848 Some(Literal::bool(true)),
849 Some(Literal::int(3)),
850 Some(Literal::long(5)),
851 Some(Literal::float(1.1)),
852 Some(Literal::double(3.3)),
853 Some(Literal::decimal(1000)),
854 Some(Literal::date(18628)),
855 Some(Literal::time(123456789)),
856 Some(Literal::timestamp(1622548800000000)),
857 Some(Literal::timestamp_nano(1622548800000000000)),
858 Some(Literal::string("a".to_string())),
859 Some(Literal::binary(b"abc".to_vec())),
860 ]))),
861 Some(Literal::Struct(Struct::from_iter(vec![
862 Some(Literal::bool(false)),
863 Some(Literal::int(4)),
864 Some(Literal::long(6)),
865 Some(Literal::float(2.2)),
866 Some(Literal::double(4.4)),
867 Some(Literal::decimal(2000)),
868 Some(Literal::date(18629)),
869 Some(Literal::time(987654321)),
870 Some(Literal::timestamp(1622635200000000)),
871 Some(Literal::timestamp_nano(1622635200000000000)),
872 Some(Literal::string("b".to_string())),
873 Some(Literal::binary(b"def".to_vec())),
874 ]))),
875 Some(Literal::Struct(Struct::from_iter(vec![
876 None, None, None, None, None, None, None, None, None, None, None, None,
877 ]))),
878 ]);
879 }
880
881 #[test]
882 fn test_nullable_struct() {
883 let struct_array = {
890 let mut builder = StructBuilder::from_fields(
891 Fields::from(vec![
892 Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
893 PARQUET_FIELD_ID_META_KEY.to_string(),
894 "0".to_string(),
895 )])),
896 Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
897 PARQUET_FIELD_ID_META_KEY.to_string(),
898 "1".to_string(),
899 )])),
900 ]),
901 3,
902 );
903 builder
904 .field_builder::<Int32Builder>(0)
905 .unwrap()
906 .append_null();
907 builder
908 .field_builder::<Int32Builder>(1)
909 .unwrap()
910 .append_null();
911 builder.append(true);
912
913 builder
914 .field_builder::<Int32Builder>(0)
915 .unwrap()
916 .append_value(1);
917 builder
918 .field_builder::<Int32Builder>(1)
919 .unwrap()
920 .append_null();
921 builder.append(true);
922
923 builder
924 .field_builder::<Int32Builder>(0)
925 .unwrap()
926 .append_value(1);
927 builder
928 .field_builder::<Int32Builder>(1)
929 .unwrap()
930 .append_value(1);
931 builder.append_null();
932
933 Arc::new(builder.finish()) as ArrayRef
934 };
935
936 let iceberg_struct_type = StructType::new(vec![
937 Arc::new(NestedField::optional(
938 0,
939 "a",
940 Type::Primitive(PrimitiveType::Int),
941 )),
942 Arc::new(NestedField::optional(
943 1,
944 "b",
945 Type::Primitive(PrimitiveType::Int),
946 )),
947 ]);
948
949 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
950 assert_eq!(result, vec![
951 Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
952 Some(Literal::Struct(Struct::from_iter(vec![
953 Some(Literal::int(1)),
954 None,
955 ]))),
956 None,
957 ]);
958 }
959
960 #[test]
961 fn test_empty_struct() {
962 let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
963 let iceberg_struct_type = StructType::new(vec![]);
964 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
965 assert_eq!(result, vec![None; 0]);
966 }
967
968 #[test]
969 fn test_find_field_by_id() {
970 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
972 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
973
974 let nested_struct_array =
976 Arc::new(StructArray::from(vec![
977 (
978 Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
979 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
980 )),
981 Arc::new(field_a_array) as ArrayRef,
982 ),
983 (
984 Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
985 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
986 )),
987 Arc::new(field_b_array) as ArrayRef,
988 ),
989 ])) as ArrayRef;
990
991 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
992
993 let struct_array = Arc::new(StructArray::from(vec![
995 (
996 Arc::new(
997 Field::new(
998 "nested_struct",
999 DataType::Struct(Fields::from(vec![
1000 Field::new("field_a", DataType::Int32, true).with_metadata(
1001 HashMap::from([(
1002 PARQUET_FIELD_ID_META_KEY.to_string(),
1003 "1".to_string(),
1004 )]),
1005 ),
1006 Field::new("field_b", DataType::Utf8, true).with_metadata(
1007 HashMap::from([(
1008 PARQUET_FIELD_ID_META_KEY.to_string(),
1009 "2".to_string(),
1010 )]),
1011 ),
1012 ])),
1013 true,
1014 )
1015 .with_metadata(HashMap::from([(
1016 PARQUET_FIELD_ID_META_KEY.to_string(),
1017 "3".to_string(),
1018 )])),
1019 ),
1020 nested_struct_array,
1021 ),
1022 (
1023 Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1024 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1025 )),
1026 Arc::new(field_c_array) as ArrayRef,
1027 ),
1028 ])) as ArrayRef;
1029
1030 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1032
1033 let nested_field = NestedField::optional(
1035 3,
1036 "nested_struct",
1037 Type::Struct(StructType::new(vec![
1038 Arc::new(NestedField::optional(
1039 1,
1040 "field_a",
1041 Type::Primitive(PrimitiveType::Int),
1042 )),
1043 Arc::new(NestedField::optional(
1044 2,
1045 "field_b",
1046 Type::Primitive(PrimitiveType::String),
1047 )),
1048 ])),
1049 );
1050 let nested_partner = accessor
1051 .field_partner(&struct_array, &nested_field)
1052 .unwrap();
1053
1054 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1056 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1057
1058 let int_array = field_a_partner
1060 .as_any()
1061 .downcast_ref::<Int32Array>()
1062 .unwrap();
1063 assert_eq!(int_array.value(0), 42);
1064 assert_eq!(int_array.value(1), 43);
1065 assert!(int_array.is_null(2));
1066 }
1067
1068 #[test]
1069 fn test_find_field_by_name() {
1070 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1072 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1073
1074 let nested_struct_array = Arc::new(StructArray::from(vec![
1076 (
1077 Arc::new(Field::new("field_a", DataType::Int32, true)),
1078 Arc::new(field_a_array) as ArrayRef,
1079 ),
1080 (
1081 Arc::new(Field::new("field_b", DataType::Utf8, true)),
1082 Arc::new(field_b_array) as ArrayRef,
1083 ),
1084 ])) as ArrayRef;
1085
1086 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1087
1088 let struct_array = Arc::new(StructArray::from(vec![
1090 (
1091 Arc::new(Field::new(
1092 "nested_struct",
1093 DataType::Struct(Fields::from(vec![
1094 Field::new("field_a", DataType::Int32, true),
1095 Field::new("field_b", DataType::Utf8, true),
1096 ])),
1097 true,
1098 )),
1099 nested_struct_array,
1100 ),
1101 (
1102 Arc::new(Field::new("field_c", DataType::Int32, true)),
1103 Arc::new(field_c_array) as ArrayRef,
1104 ),
1105 ])) as ArrayRef;
1106
1107 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1109
1110 let nested_field = NestedField::optional(
1112 3,
1113 "nested_struct",
1114 Type::Struct(StructType::new(vec![
1115 Arc::new(NestedField::optional(
1116 1,
1117 "field_a",
1118 Type::Primitive(PrimitiveType::Int),
1119 )),
1120 Arc::new(NestedField::optional(
1121 2,
1122 "field_b",
1123 Type::Primitive(PrimitiveType::String),
1124 )),
1125 ])),
1126 );
1127 let nested_partner = accessor
1128 .field_partner(&struct_array, &nested_field)
1129 .unwrap();
1130
1131 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1133 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1134
1135 let int_array = field_a_partner
1137 .as_any()
1138 .downcast_ref::<Int32Array>()
1139 .unwrap();
1140 assert_eq!(int_array.value(0), 42);
1141 assert_eq!(int_array.value(1), 43);
1142 assert!(int_array.is_null(2));
1143 }
1144
1145 #[test]
1146 fn test_complex_nested() {
1147 let struct_type = StructType::new(vec![
1154 Arc::new(NestedField::required(
1155 0,
1156 "A",
1157 Type::List(ListType::new(Arc::new(NestedField::required(
1158 1,
1159 "item",
1160 Type::Struct(StructType::new(vec![
1161 Arc::new(NestedField::required(
1162 2,
1163 "a1",
1164 Type::Primitive(PrimitiveType::Int),
1165 )),
1166 Arc::new(NestedField::required(
1167 3,
1168 "a2",
1169 Type::Primitive(PrimitiveType::Int),
1170 )),
1171 ])),
1172 )))),
1173 )),
1174 Arc::new(NestedField::required(
1175 4,
1176 "B",
1177 Type::List(ListType::new(Arc::new(NestedField::required(
1178 5,
1179 "item",
1180 Type::Map(MapType::new(
1181 NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1182 .into(),
1183 NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1184 .into(),
1185 )),
1186 )))),
1187 )),
1188 Arc::new(NestedField::required(
1189 8,
1190 "C",
1191 Type::List(ListType::new(Arc::new(NestedField::required(
1192 9,
1193 "item",
1194 Type::List(ListType::new(Arc::new(NestedField::optional(
1195 10,
1196 "item",
1197 Type::Primitive(PrimitiveType::Int),
1198 )))),
1199 )))),
1200 )),
1201 ]);
1202
1203 let struct_array =
1209 {
1210 let a_struct_a1_builder = Int32Builder::new();
1211 let a_struct_a2_builder = Int32Builder::new();
1212 let a_struct_builder =
1213 StructBuilder::new(
1214 vec![
1215 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1216 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1217 )),
1218 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1219 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1220 )),
1221 ],
1222 vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1223 );
1224 let a_builder = ListBuilder::new(a_struct_builder);
1225
1226 let map_key_builder = Int32Builder::new();
1227 let map_value_builder = Int32Builder::new();
1228 let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1229 let b_builder = ListBuilder::new(map_builder);
1230
1231 let inner_list_item_builder = Int32Builder::new();
1232 let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1233 let c_builder = ListBuilder::new(inner_list_builder);
1234
1235 let mut top_struct_builder = {
1236 let a_struct_type =
1237 DataType::Struct(Fields::from(vec![
1238 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1239 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1240 )),
1241 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1242 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1243 )),
1244 ]));
1245 let a_type =
1246 DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1247
1248 let b_map_entry_struct = Field::new(
1249 "entries",
1250 DataType::Struct(Fields::from(vec![
1251 Field::new("keys", DataType::Int32, false),
1252 Field::new("values", DataType::Int32, true),
1253 ])),
1254 false,
1255 );
1256 let b_map_type =
1257 DataType::Map(Arc::new(b_map_entry_struct), false);
1258 let b_type =
1259 DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1260
1261 let c_inner_list_type =
1262 DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1263 let c_type = DataType::List(Arc::new(Field::new(
1264 "item",
1265 c_inner_list_type.clone(),
1266 true,
1267 )));
1268 StructBuilder::new(
1269 Fields::from(vec![
1270 Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1271 (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1272 ])),
1273 Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1274 (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1275 ])),
1276 Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1277 (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1278 ])),
1279 ]),
1280 vec![
1281 Box::new(a_builder),
1282 Box::new(b_builder),
1283 Box::new(c_builder),
1284 ],
1285 )
1286 };
1287
1288 {
1291 let a_builder = top_struct_builder
1292 .field_builder::<ListBuilder<StructBuilder>>(0)
1293 .unwrap();
1294 let struct_builder = a_builder.values();
1295 struct_builder
1296 .field_builder::<Int32Builder>(0)
1297 .unwrap()
1298 .append_value(10);
1299 struct_builder
1300 .field_builder::<Int32Builder>(1)
1301 .unwrap()
1302 .append_value(20);
1303 struct_builder.append(true);
1304 let struct_builder = a_builder.values();
1305 struct_builder
1306 .field_builder::<Int32Builder>(0)
1307 .unwrap()
1308 .append_value(11);
1309 struct_builder
1310 .field_builder::<Int32Builder>(1)
1311 .unwrap()
1312 .append_value(21);
1313 struct_builder.append(true);
1314 a_builder.append(true);
1315 }
1316 {
1317 let b_builder = top_struct_builder
1318 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1319 .unwrap();
1320 let map_builder = b_builder.values();
1321 map_builder.keys().append_value(1);
1322 map_builder.values().append_value(100);
1323 map_builder.keys().append_value(3);
1324 map_builder.values().append_value(300);
1325 map_builder.append(true).unwrap();
1326
1327 map_builder.keys().append_value(2);
1328 map_builder.values().append_value(200);
1329 map_builder.append(true).unwrap();
1330
1331 b_builder.append(true);
1332 }
1333 {
1334 let c_builder = top_struct_builder
1335 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1336 .unwrap();
1337 let inner_list_builder = c_builder.values();
1338 inner_list_builder.values().append_value(100);
1339 inner_list_builder.values().append_value(101);
1340 inner_list_builder.values().append_value(102);
1341 inner_list_builder.append(true);
1342 let inner_list_builder = c_builder.values();
1343 inner_list_builder.values().append_value(200);
1344 inner_list_builder.values().append_value(201);
1345 inner_list_builder.append(true);
1346 c_builder.append(true);
1347 }
1348 top_struct_builder.append(true);
1349
1350 {
1353 let a_builder = top_struct_builder
1354 .field_builder::<ListBuilder<StructBuilder>>(0)
1355 .unwrap();
1356 let struct_builder = a_builder.values();
1357 struct_builder
1358 .field_builder::<Int32Builder>(0)
1359 .unwrap()
1360 .append_value(12);
1361 struct_builder
1362 .field_builder::<Int32Builder>(1)
1363 .unwrap()
1364 .append_value(22);
1365 struct_builder.append(true);
1366 let struct_builder = a_builder.values();
1367 struct_builder
1368 .field_builder::<Int32Builder>(0)
1369 .unwrap()
1370 .append_value(13);
1371 struct_builder
1372 .field_builder::<Int32Builder>(1)
1373 .unwrap()
1374 .append_value(23);
1375 struct_builder.append(true);
1376 a_builder.append(true);
1377 }
1378 {
1379 let b_builder = top_struct_builder
1380 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1381 .unwrap();
1382 let map_builder = b_builder.values();
1383 map_builder.keys().append_value(3);
1384 map_builder.values().append_value(300);
1385 map_builder.append(true).unwrap();
1386
1387 b_builder.append(true);
1388 }
1389 {
1390 let c_builder = top_struct_builder
1391 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1392 .unwrap();
1393 let inner_list_builder = c_builder.values();
1394 inner_list_builder.values().append_value(300);
1395 inner_list_builder.values().append_value(301);
1396 inner_list_builder.values().append_value(302);
1397 inner_list_builder.append(true);
1398 let inner_list_builder = c_builder.values();
1399 inner_list_builder.values().append_value(400);
1400 inner_list_builder.values().append_value(401);
1401 inner_list_builder.append(true);
1402 c_builder.append(true);
1403 }
1404 top_struct_builder.append(true);
1405
1406 Arc::new(top_struct_builder.finish()) as ArrayRef
1407 };
1408
1409 let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1410 assert_eq!(result, vec![
1411 Some(Literal::Struct(Struct::from_iter(vec![
1412 Some(Literal::List(vec![
1413 Some(Literal::Struct(Struct::from_iter(vec![
1414 Some(Literal::int(10)),
1415 Some(Literal::int(20)),
1416 ]))),
1417 Some(Literal::Struct(Struct::from_iter(vec![
1418 Some(Literal::int(11)),
1419 Some(Literal::int(21)),
1420 ]))),
1421 ])),
1422 Some(Literal::List(vec![
1423 Some(Literal::Map(Map::from_iter(vec![
1424 (Literal::int(1), Some(Literal::int(100))),
1425 (Literal::int(3), Some(Literal::int(300))),
1426 ]))),
1427 Some(Literal::Map(Map::from_iter(vec![(
1428 Literal::int(2),
1429 Some(Literal::int(200))
1430 ),]))),
1431 ])),
1432 Some(Literal::List(vec![
1433 Some(Literal::List(vec![
1434 Some(Literal::int(100)),
1435 Some(Literal::int(101)),
1436 Some(Literal::int(102)),
1437 ])),
1438 Some(Literal::List(vec![
1439 Some(Literal::int(200)),
1440 Some(Literal::int(201)),
1441 ])),
1442 ])),
1443 ]))),
1444 Some(Literal::Struct(Struct::from_iter(vec![
1445 Some(Literal::List(vec![
1446 Some(Literal::Struct(Struct::from_iter(vec![
1447 Some(Literal::int(12)),
1448 Some(Literal::int(22)),
1449 ]))),
1450 Some(Literal::Struct(Struct::from_iter(vec![
1451 Some(Literal::int(13)),
1452 Some(Literal::int(23)),
1453 ]))),
1454 ])),
1455 Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1456 vec![(Literal::int(3), Some(Literal::int(300))),]
1457 ))),])),
1458 Some(Literal::List(vec![
1459 Some(Literal::List(vec![
1460 Some(Literal::int(300)),
1461 Some(Literal::int(301)),
1462 Some(Literal::int(302)),
1463 ])),
1464 Some(Literal::List(vec![
1465 Some(Literal::int(400)),
1466 Some(Literal::int(401)),
1467 ])),
1468 ])),
1469 ]))),
1470 ]);
1471 }
1472}