1use std::sync::Arc;
19
20use arrow_array::{
21 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
22 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
23 LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
24 Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
25};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{DataType, FieldRef};
28use uuid::Uuid;
29
30use super::get_field_id;
31use crate::spec::{
32 ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
33 SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
34 visit_type_with_partner,
35};
36use crate::{Error, ErrorKind, Result};
37
38struct ArrowArrayToIcebergStructConverter;
39
40impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
41 type T = Vec<Option<Literal>>;
42
43 fn schema(
44 &mut self,
45 _schema: &crate::spec::Schema,
46 _partner: &ArrayRef,
47 value: Vec<Option<Literal>>,
48 ) -> Result<Vec<Option<Literal>>> {
49 Ok(value)
50 }
51
52 fn field(
53 &mut self,
54 field: &crate::spec::NestedFieldRef,
55 _partner: &ArrayRef,
56 value: Vec<Option<Literal>>,
57 ) -> Result<Vec<Option<Literal>>> {
58 if field.required && value.iter().any(Option::is_none) {
60 return Err(Error::new(
61 ErrorKind::DataInvalid,
62 "The field is required but has null value",
63 )
64 .with_context("field_id", field.id.to_string())
65 .with_context("field_name", &field.name));
66 }
67 Ok(value)
68 }
69
70 fn r#struct(
71 &mut self,
72 _struct: &StructType,
73 array: &ArrayRef,
74 results: Vec<Vec<Option<Literal>>>,
75 ) -> Result<Vec<Option<Literal>>> {
76 let row_len = results.first().map(|column| column.len()).unwrap_or(0);
77 if let Some(col) = results.iter().find(|col| col.len() != row_len) {
78 return Err(Error::new(
79 ErrorKind::DataInvalid,
80 "The struct columns have different row length",
81 )
82 .with_context("first col length", row_len.to_string())
83 .with_context("actual col length", col.len().to_string()));
84 }
85
86 let mut struct_literals = Vec::with_capacity(row_len);
87 let mut columns_iters = results
88 .into_iter()
89 .map(|column| column.into_iter())
90 .collect::<Vec<_>>();
91
92 for i in 0..row_len {
93 let mut literals = Vec::with_capacity(columns_iters.len());
94 for column_iter in columns_iters.iter_mut() {
95 literals.push(column_iter.next().unwrap());
96 }
97 if array.is_null(i) {
98 struct_literals.push(None);
99 } else {
100 struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
101 }
102 }
103
104 Ok(struct_literals)
105 }
106
107 fn list(
108 &mut self,
109 list: &ListType,
110 array: &ArrayRef,
111 elements: Vec<Option<Literal>>,
112 ) -> Result<Vec<Option<Literal>>> {
113 if list.element_field.required && elements.iter().any(Option::is_none) {
114 return Err(Error::new(
115 ErrorKind::DataInvalid,
116 "The list should not have null value",
117 ));
118 }
119 match array.data_type() {
120 DataType::List(_) => {
121 let offset = array
122 .as_any()
123 .downcast_ref::<ListArray>()
124 .ok_or_else(|| {
125 Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
126 })?
127 .offsets();
128 let mut result = Vec::with_capacity(offset.len() - 1);
130 for i in 0..offset.len() - 1 {
131 let start = offset[i] as usize;
132 let end = offset[i + 1] as usize;
133 result.push(Some(Literal::List(elements[start..end].to_vec())));
134 }
135 Ok(result)
136 }
137 DataType::LargeList(_) => {
138 let offset = array
139 .as_any()
140 .downcast_ref::<LargeListArray>()
141 .ok_or_else(|| {
142 Error::new(
143 ErrorKind::DataInvalid,
144 "The partner is not a large list array",
145 )
146 })?
147 .offsets();
148 let mut result = Vec::with_capacity(offset.len() - 1);
150 for i in 0..offset.len() - 1 {
151 let start = offset[i] as usize;
152 let end = offset[i + 1] as usize;
153 result.push(Some(Literal::List(elements[start..end].to_vec())));
154 }
155 Ok(result)
156 }
157 DataType::FixedSizeList(_, len) => {
158 let mut result = Vec::with_capacity(elements.len() / *len as usize);
159 for i in 0..elements.len() / *len as usize {
160 let start = i * *len as usize;
161 let end = (i + 1) * *len as usize;
162 result.push(Some(Literal::List(elements[start..end].to_vec())));
163 }
164 Ok(result)
165 }
166 _ => Err(Error::new(
167 ErrorKind::DataInvalid,
168 "The partner is not a list type",
169 )),
170 }
171 }
172
173 fn map(
174 &mut self,
175 _map: &MapType,
176 partner: &ArrayRef,
177 key_values: Vec<Option<Literal>>,
178 values: Vec<Option<Literal>>,
179 ) -> Result<Vec<Option<Literal>>> {
180 if key_values.len() != values.len() {
182 return Err(Error::new(
183 ErrorKind::DataInvalid,
184 "The key value and value of map should have the same row length",
185 ));
186 }
187
188 let offsets = partner
189 .as_any()
190 .downcast_ref::<MapArray>()
191 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
192 .offsets();
193 let mut result = Vec::with_capacity(offsets.len() - 1);
195 for i in 0..offsets.len() - 1 {
196 let start = offsets[i] as usize;
197 let end = offsets[i + 1] as usize;
198 let mut map = Map::new();
199 for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
200 map.insert(key.clone().unwrap(), value.clone());
201 }
202 result.push(Some(Literal::Map(map)));
203 }
204 Ok(result)
205 }
206
207 fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208 match p {
209 PrimitiveType::Boolean => {
210 let array = partner
211 .as_any()
212 .downcast_ref::<BooleanArray>()
213 .ok_or_else(|| {
214 Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
215 })?;
216 Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
217 }
218 PrimitiveType::Int => {
219 let array = partner
220 .as_any()
221 .downcast_ref::<Int32Array>()
222 .ok_or_else(|| {
223 Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
224 })?;
225 Ok(array.iter().map(|v| v.map(Literal::int)).collect())
226 }
227 PrimitiveType::Long => {
228 let array = partner
229 .as_any()
230 .downcast_ref::<Int64Array>()
231 .ok_or_else(|| {
232 Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
233 })?;
234 Ok(array.iter().map(|v| v.map(Literal::long)).collect())
235 }
236 PrimitiveType::Float => {
237 let array = partner
238 .as_any()
239 .downcast_ref::<Float32Array>()
240 .ok_or_else(|| {
241 Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
242 })?;
243 Ok(array.iter().map(|v| v.map(Literal::float)).collect())
244 }
245 PrimitiveType::Double => {
246 let array = partner
247 .as_any()
248 .downcast_ref::<Float64Array>()
249 .ok_or_else(|| {
250 Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
251 })?;
252 Ok(array.iter().map(|v| v.map(Literal::double)).collect())
253 }
254 PrimitiveType::Decimal { precision, scale } => {
255 let array = partner
256 .as_any()
257 .downcast_ref::<Decimal128Array>()
258 .ok_or_else(|| {
259 Error::new(
260 ErrorKind::DataInvalid,
261 "The partner is not a decimal128 array",
262 )
263 })?;
264 if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type()
265 && (*arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale)
266 {
267 return Err(Error::new(
268 ErrorKind::DataInvalid,
269 format!(
270 "The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
271 ),
272 ));
273 }
274 Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
275 }
276 PrimitiveType::Date => {
277 let array = partner
278 .as_any()
279 .downcast_ref::<Date32Array>()
280 .ok_or_else(|| {
281 Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
282 })?;
283 Ok(array.iter().map(|v| v.map(Literal::date)).collect())
284 }
285 PrimitiveType::Time => {
286 let array = partner
287 .as_any()
288 .downcast_ref::<Time64MicrosecondArray>()
289 .ok_or_else(|| {
290 Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
291 })?;
292 Ok(array.iter().map(|v| v.map(Literal::time)).collect())
293 }
294 PrimitiveType::Timestamp => {
295 let array = partner
296 .as_any()
297 .downcast_ref::<TimestampMicrosecondArray>()
298 .ok_or_else(|| {
299 Error::new(
300 ErrorKind::DataInvalid,
301 "The partner is not a timestamp array",
302 )
303 })?;
304 Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
305 }
306 PrimitiveType::Timestamptz => {
307 let array = partner
308 .as_any()
309 .downcast_ref::<TimestampMicrosecondArray>()
310 .ok_or_else(|| {
311 Error::new(
312 ErrorKind::DataInvalid,
313 "The partner is not a timestamptz array",
314 )
315 })?;
316 Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
317 }
318 PrimitiveType::TimestampNs => {
319 let array = partner
320 .as_any()
321 .downcast_ref::<TimestampNanosecondArray>()
322 .ok_or_else(|| {
323 Error::new(
324 ErrorKind::DataInvalid,
325 "The partner is not a timestamp_ns array",
326 )
327 })?;
328 Ok(array
329 .iter()
330 .map(|v| v.map(Literal::timestamp_nano))
331 .collect())
332 }
333 PrimitiveType::TimestamptzNs => {
334 let array = partner
335 .as_any()
336 .downcast_ref::<TimestampNanosecondArray>()
337 .ok_or_else(|| {
338 Error::new(
339 ErrorKind::DataInvalid,
340 "The partner is not a timestamptz_ns array",
341 )
342 })?;
343 Ok(array
344 .iter()
345 .map(|v| v.map(Literal::timestamptz_nano))
346 .collect())
347 }
348 PrimitiveType::String => {
349 if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
350 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
351 } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
352 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
353 } else {
354 Err(Error::new(
355 ErrorKind::DataInvalid,
356 "The partner is not a string array",
357 ))
358 }
359 }
360 PrimitiveType::Uuid => {
361 if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
362 if array.value_length() != 16 {
363 return Err(Error::new(
364 ErrorKind::DataInvalid,
365 "The partner is not a uuid array",
366 ));
367 }
368 Ok(array
369 .iter()
370 .map(|v| {
371 v.map(|v| {
372 Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
373 |_| {
374 Error::new(
375 ErrorKind::DataInvalid,
376 "Failed to convert binary to uuid",
377 )
378 },
379 )?)))
380 })
381 .transpose()
382 })
383 .collect::<Result<Vec<_>>>()?)
384 } else {
385 Err(Error::new(
386 ErrorKind::DataInvalid,
387 "The partner is not a uuid array",
388 ))
389 }
390 }
391 PrimitiveType::Fixed(len) => {
392 let array = partner
393 .as_any()
394 .downcast_ref::<FixedSizeBinaryArray>()
395 .ok_or_else(|| {
396 Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
397 })?;
398 if array.value_length() != *len as i32 {
399 return Err(Error::new(
400 ErrorKind::DataInvalid,
401 "The length of fixed size binary array is not compatible with iceberg fixed type",
402 ));
403 }
404 Ok(array
405 .iter()
406 .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
407 .collect())
408 }
409 PrimitiveType::Binary => {
410 if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
411 Ok(array
412 .iter()
413 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
414 .collect())
415 } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
416 Ok(array
417 .iter()
418 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
419 .collect())
420 } else {
421 Err(Error::new(
422 ErrorKind::DataInvalid,
423 "The partner is not a binary array",
424 ))
425 }
426 }
427 }
428 }
429}
430
431#[derive(Clone, Copy, Debug)]
442pub enum FieldMatchMode {
443 Id,
445 Name,
447}
448
449impl FieldMatchMode {
450 pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
452 match self {
453 FieldMatchMode::Id => get_field_id(arrow_field)
454 .map(|id| id == iceberg_field.id)
455 .unwrap_or(false),
456 FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
457 }
458 }
459}
460
461pub struct ArrowArrayAccessor {
463 match_mode: FieldMatchMode,
464}
465
466impl ArrowArrayAccessor {
467 pub fn new() -> Self {
469 Self {
470 match_mode: FieldMatchMode::Id,
471 }
472 }
473
474 pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
476 Self { match_mode }
477 }
478}
479
480impl Default for ArrowArrayAccessor {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
487 fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
488 if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
489 return Err(Error::new(
490 ErrorKind::DataInvalid,
491 "The schema partner is not a struct type",
492 ));
493 }
494
495 Ok(schema_partner)
496 }
497
498 fn field_partner<'a>(
499 &self,
500 struct_partner: &'a ArrayRef,
501 field: &NestedField,
502 ) -> Result<&'a ArrayRef> {
503 let struct_array = struct_partner
504 .as_any()
505 .downcast_ref::<StructArray>()
506 .ok_or_else(|| {
507 Error::new(
508 ErrorKind::DataInvalid,
509 format!(
510 "The struct partner is not a struct array, partner: {struct_partner:?}"
511 ),
512 )
513 })?;
514
515 let field_pos = struct_array
516 .fields()
517 .iter()
518 .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
519 .ok_or_else(|| {
520 Error::new(
521 ErrorKind::DataInvalid,
522 format!("Field id {} not found in struct array", field.id),
523 )
524 })?;
525
526 Ok(struct_array.column(field_pos))
527 }
528
529 fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
530 match list_partner.data_type() {
531 DataType::List(_) => {
532 let list_array = list_partner
533 .as_any()
534 .downcast_ref::<ListArray>()
535 .ok_or_else(|| {
536 Error::new(
537 ErrorKind::DataInvalid,
538 "The list partner is not a list array",
539 )
540 })?;
541 Ok(list_array.values())
542 }
543 DataType::LargeList(_) => {
544 let list_array = list_partner
545 .as_any()
546 .downcast_ref::<LargeListArray>()
547 .ok_or_else(|| {
548 Error::new(
549 ErrorKind::DataInvalid,
550 "The list partner is not a large list array",
551 )
552 })?;
553 Ok(list_array.values())
554 }
555 DataType::FixedSizeList(_, _) => {
556 let list_array = list_partner
557 .as_any()
558 .downcast_ref::<FixedSizeListArray>()
559 .ok_or_else(|| {
560 Error::new(
561 ErrorKind::DataInvalid,
562 "The list partner is not a fixed size list array",
563 )
564 })?;
565 Ok(list_array.values())
566 }
567 _ => Err(Error::new(
568 ErrorKind::DataInvalid,
569 "The list partner is not a list type",
570 )),
571 }
572 }
573
574 fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
575 let map_array = map_partner
576 .as_any()
577 .downcast_ref::<MapArray>()
578 .ok_or_else(|| {
579 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
580 })?;
581 Ok(map_array.keys())
582 }
583
584 fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
585 let map_array = map_partner
586 .as_any()
587 .downcast_ref::<MapArray>()
588 .ok_or_else(|| {
589 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
590 })?;
591 Ok(map_array.values())
592 }
593}
594
595pub fn arrow_struct_to_literal(
598 struct_array: &ArrayRef,
599 ty: &StructType,
600) -> Result<Vec<Option<Literal>>> {
601 visit_struct_with_partner(
602 ty,
603 struct_array,
604 &mut ArrowArrayToIcebergStructConverter,
605 &ArrowArrayAccessor::new(),
606 )
607}
608
609pub fn arrow_primitive_to_literal(
612 primitive_array: &ArrayRef,
613 ty: &Type,
614) -> Result<Vec<Option<Literal>>> {
615 visit_type_with_partner(
616 ty,
617 primitive_array,
618 &mut ArrowArrayToIcebergStructConverter,
619 &ArrowArrayAccessor::new(),
620 )
621}
622
623pub(crate) fn create_primitive_array_single_element(
628 data_type: &DataType,
629 prim_lit: &Option<PrimitiveLiteral>,
630) -> Result<ArrayRef> {
631 match (data_type, prim_lit) {
632 (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633 Ok(Arc::new(BooleanArray::from(vec![*v])))
634 }
635 (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
636 (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
637 Ok(Arc::new(Int32Array::from(vec![*v])))
638 }
639 (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
640 (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
641 Ok(Arc::new(Date32Array::from(vec![*v])))
642 }
643 (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
644 (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
645 Ok(Arc::new(Int64Array::from(vec![*v])))
646 }
647 (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
648 (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
649 Ok(Arc::new(Float32Array::from(vec![v.0])))
650 }
651 (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
652 (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
653 Ok(Arc::new(Float64Array::from(vec![v.0])))
654 }
655 (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
656 (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
657 Ok(Arc::new(StringArray::from(vec![v.as_str()])))
658 }
659 (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
660 (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
661 Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
662 }
663 (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
664 Option::<&[u8]>::None,
665 ]))),
666 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => {
667 let array = Decimal128Array::from(vec![{ *v }])
668 .with_precision_and_scale(*precision, *scale)
669 .map_err(|e| {
670 Error::new(
671 ErrorKind::DataInvalid,
672 format!(
673 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
674 ),
675 )
676 })?;
677 Ok(Arc::new(array))
678 }
679 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(v))) => {
680 let array = Decimal128Array::from(vec![*v as i128])
681 .with_precision_and_scale(*precision, *scale)
682 .map_err(|e| {
683 Error::new(
684 ErrorKind::DataInvalid,
685 format!(
686 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
687 ),
688 )
689 })?;
690 Ok(Arc::new(array))
691 }
692 (DataType::Decimal128(precision, scale), None) => {
693 let array = Decimal128Array::from(vec![Option::<i128>::None])
694 .with_precision_and_scale(*precision, *scale)
695 .map_err(|e| {
696 Error::new(
697 ErrorKind::DataInvalid,
698 format!(
699 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
700 ),
701 )
702 })?;
703 Ok(Arc::new(array))
704 }
705 (DataType::Struct(fields), None) => {
706 let null_arrays: Vec<ArrayRef> = fields
708 .iter()
709 .map(|f| {
710 match f.data_type() {
713 DataType::Boolean => {
714 Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
715 as ArrayRef)
716 }
717 DataType::Int32 | DataType::Date32 => {
718 Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
719 }
720 DataType::Int64 => {
721 Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
722 }
723 DataType::Float32 => {
724 Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
725 }
726 DataType::Float64 => {
727 Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
728 }
729 DataType::Utf8 => {
730 Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
731 }
732 DataType::Binary => {
733 Ok(
734 Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
735 as ArrayRef,
736 )
737 }
738 _ => Err(Error::new(
739 ErrorKind::Unexpected,
740 format!("Unsupported struct field type: {:?}", f.data_type()),
741 )),
742 }
743 })
744 .collect::<Result<Vec<_>>>()?;
745 Ok(Arc::new(arrow_array::StructArray::new(
746 fields.clone(),
747 null_arrays,
748 Some(arrow_buffer::NullBuffer::new_null(1)),
749 )))
750 }
751 _ => Err(Error::new(
752 ErrorKind::Unexpected,
753 format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"),
754 )),
755 }
756}
757
758pub(crate) fn create_primitive_array_repeated(
763 data_type: &DataType,
764 prim_lit: &Option<PrimitiveLiteral>,
765 num_rows: usize,
766) -> Result<ArrayRef> {
767 Ok(match (data_type, prim_lit) {
768 (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
769 Arc::new(BooleanArray::from(vec![*value; num_rows]))
770 }
771 (DataType::Boolean, None) => {
772 let vals: Vec<Option<bool>> = vec![None; num_rows];
773 Arc::new(BooleanArray::from(vals))
774 }
775 (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
776 Arc::new(Int32Array::from(vec![*value; num_rows]))
777 }
778 (DataType::Int32, None) => {
779 let vals: Vec<Option<i32>> = vec![None; num_rows];
780 Arc::new(Int32Array::from(vals))
781 }
782 (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
783 Arc::new(Date32Array::from(vec![*value; num_rows]))
784 }
785 (DataType::Date32, None) => {
786 let vals: Vec<Option<i32>> = vec![None; num_rows];
787 Arc::new(Date32Array::from(vals))
788 }
789 (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
790 Arc::new(Int64Array::from(vec![*value; num_rows]))
791 }
792 (DataType::Int64, None) => {
793 let vals: Vec<Option<i64>> = vec![None; num_rows];
794 Arc::new(Int64Array::from(vals))
795 }
796 (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
797 Arc::new(Float32Array::from(vec![value.0; num_rows]))
798 }
799 (DataType::Float32, None) => {
800 let vals: Vec<Option<f32>> = vec![None; num_rows];
801 Arc::new(Float32Array::from(vals))
802 }
803 (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
804 Arc::new(Float64Array::from(vec![value.0; num_rows]))
805 }
806 (DataType::Float64, None) => {
807 let vals: Vec<Option<f64>> = vec![None; num_rows];
808 Arc::new(Float64Array::from(vals))
809 }
810 (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
811 Arc::new(StringArray::from(vec![value.clone(); num_rows]))
812 }
813 (DataType::Utf8, None) => {
814 let vals: Vec<Option<String>> = vec![None; num_rows];
815 Arc::new(StringArray::from(vals))
816 }
817 (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
818 Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
819 }
820 (DataType::Binary, None) => {
821 let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
822 Arc::new(BinaryArray::from_opt_vec(vals))
823 }
824 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => {
825 Arc::new(
826 Decimal128Array::from(vec![*value; num_rows])
827 .with_precision_and_scale(*precision, *scale)
828 .map_err(|e| {
829 Error::new(
830 ErrorKind::DataInvalid,
831 format!(
832 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
833 ),
834 )
835 })?,
836 )
837 }
838 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(value))) => {
839 Arc::new(
840 Decimal128Array::from(vec![*value as i128; num_rows])
841 .with_precision_and_scale(*precision, *scale)
842 .map_err(|e| {
843 Error::new(
844 ErrorKind::DataInvalid,
845 format!(
846 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
847 ),
848 )
849 })?,
850 )
851 }
852 (DataType::Decimal128(precision, scale), None) => {
853 let vals: Vec<Option<i128>> = vec![None; num_rows];
854 Arc::new(
855 Decimal128Array::from(vals)
856 .with_precision_and_scale(*precision, *scale)
857 .map_err(|e| {
858 Error::new(
859 ErrorKind::DataInvalid,
860 format!(
861 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
862 ),
863 )
864 })?,
865 )
866 }
867 (DataType::Struct(fields), None) => {
868 let null_arrays: Vec<ArrayRef> = fields
870 .iter()
871 .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows))
872 .collect::<Result<Vec<_>>>()?;
873
874 Arc::new(StructArray::new(
875 fields.clone(),
876 null_arrays,
877 Some(NullBuffer::new_null(num_rows)),
878 ))
879 }
880 (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
881 (dt, _) => {
882 return Err(Error::new(
883 ErrorKind::Unexpected,
884 format!("unexpected target column type {dt}"),
885 ));
886 }
887 })
888}
889
890#[cfg(test)]
891mod test {
892 use std::collections::HashMap;
893 use std::sync::Arc;
894
895 use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
896 use arrow_array::{
897 ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
898 Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
899 TimestampMicrosecondArray, TimestampNanosecondArray,
900 };
901 use arrow_schema::{DataType, Field, Fields, TimeUnit};
902 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
903
904 use super::*;
905 use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
906
907 #[test]
908 fn test_arrow_struct_to_iceberg_struct() {
909 let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
910 let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
911 let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
912 let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
913 let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
914 let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
915 .with_precision_and_scale(10, 2)
916 .unwrap();
917 let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
918 let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
919 let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
920 Some(1622548800000000),
921 Some(1622635200000000),
922 None,
923 ]);
924 let timestamp_nano_array = TimestampNanosecondArray::from(vec![
925 Some(1622548800000000000),
926 Some(1622635200000000000),
927 None,
928 ]);
929 let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
930 let binary_array =
931 BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
932
933 let struct_array = Arc::new(StructArray::from(vec![
934 (
935 Arc::new(
936 Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
937 [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
938 )),
939 ),
940 Arc::new(bool_array) as ArrayRef,
941 ),
942 (
943 Arc::new(
944 Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
945 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
946 )),
947 ),
948 Arc::new(int32_array) as ArrayRef,
949 ),
950 (
951 Arc::new(
952 Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
953 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
954 )),
955 ),
956 Arc::new(int64_array) as ArrayRef,
957 ),
958 (
959 Arc::new(
960 Field::new("float32_field", DataType::Float32, true).with_metadata(
961 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
962 ),
963 ),
964 Arc::new(float32_array) as ArrayRef,
965 ),
966 (
967 Arc::new(
968 Field::new("float64_field", DataType::Float64, true).with_metadata(
969 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
970 ),
971 ),
972 Arc::new(float64_array) as ArrayRef,
973 ),
974 (
975 Arc::new(
976 Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
977 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
978 ),
979 ),
980 Arc::new(decimal_array) as ArrayRef,
981 ),
982 (
983 Arc::new(
984 Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
985 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
986 )),
987 ),
988 Arc::new(date_array) as ArrayRef,
989 ),
990 (
991 Arc::new(
992 Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
993 .with_metadata(HashMap::from([(
994 PARQUET_FIELD_ID_META_KEY.to_string(),
995 "8".to_string(),
996 )])),
997 ),
998 Arc::new(time_array) as ArrayRef,
999 ),
1000 (
1001 Arc::new(
1002 Field::new(
1003 "timestamp_micro_field",
1004 DataType::Timestamp(TimeUnit::Microsecond, None),
1005 true,
1006 )
1007 .with_metadata(HashMap::from([(
1008 PARQUET_FIELD_ID_META_KEY.to_string(),
1009 "9".to_string(),
1010 )])),
1011 ),
1012 Arc::new(timestamp_micro_array) as ArrayRef,
1013 ),
1014 (
1015 Arc::new(
1016 Field::new(
1017 "timestamp_nano_field",
1018 DataType::Timestamp(TimeUnit::Nanosecond, None),
1019 true,
1020 )
1021 .with_metadata(HashMap::from([(
1022 PARQUET_FIELD_ID_META_KEY.to_string(),
1023 "10".to_string(),
1024 )])),
1025 ),
1026 Arc::new(timestamp_nano_array) as ArrayRef,
1027 ),
1028 (
1029 Arc::new(
1030 Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
1031 [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
1032 )),
1033 ),
1034 Arc::new(string_array) as ArrayRef,
1035 ),
1036 (
1037 Arc::new(
1038 Field::new("binary_field", DataType::Binary, true).with_metadata(
1039 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
1040 ),
1041 ),
1042 Arc::new(binary_array) as ArrayRef,
1043 ),
1044 ])) as ArrayRef;
1045
1046 let iceberg_struct_type = StructType::new(vec![
1047 Arc::new(NestedField::optional(
1048 0,
1049 "bool_field",
1050 Type::Primitive(PrimitiveType::Boolean),
1051 )),
1052 Arc::new(NestedField::optional(
1053 2,
1054 "int32_field",
1055 Type::Primitive(PrimitiveType::Int),
1056 )),
1057 Arc::new(NestedField::optional(
1058 3,
1059 "int64_field",
1060 Type::Primitive(PrimitiveType::Long),
1061 )),
1062 Arc::new(NestedField::optional(
1063 4,
1064 "float32_field",
1065 Type::Primitive(PrimitiveType::Float),
1066 )),
1067 Arc::new(NestedField::optional(
1068 5,
1069 "float64_field",
1070 Type::Primitive(PrimitiveType::Double),
1071 )),
1072 Arc::new(NestedField::optional(
1073 6,
1074 "decimal_field",
1075 Type::Primitive(PrimitiveType::Decimal {
1076 precision: 10,
1077 scale: 2,
1078 }),
1079 )),
1080 Arc::new(NestedField::optional(
1081 7,
1082 "date_field",
1083 Type::Primitive(PrimitiveType::Date),
1084 )),
1085 Arc::new(NestedField::optional(
1086 8,
1087 "time_field",
1088 Type::Primitive(PrimitiveType::Time),
1089 )),
1090 Arc::new(NestedField::optional(
1091 9,
1092 "timestamp_micro_field",
1093 Type::Primitive(PrimitiveType::Timestamp),
1094 )),
1095 Arc::new(NestedField::optional(
1096 10,
1097 "timestamp_nao_field",
1098 Type::Primitive(PrimitiveType::TimestampNs),
1099 )),
1100 Arc::new(NestedField::optional(
1101 11,
1102 "string_field",
1103 Type::Primitive(PrimitiveType::String),
1104 )),
1105 Arc::new(NestedField::optional(
1106 12,
1107 "binary_field",
1108 Type::Primitive(PrimitiveType::Binary),
1109 )),
1110 ]);
1111
1112 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1113
1114 assert_eq!(result, vec![
1115 Some(Literal::Struct(Struct::from_iter(vec![
1116 Some(Literal::bool(true)),
1117 Some(Literal::int(3)),
1118 Some(Literal::long(5)),
1119 Some(Literal::float(1.1)),
1120 Some(Literal::double(3.3)),
1121 Some(Literal::decimal(1000)),
1122 Some(Literal::date(18628)),
1123 Some(Literal::time(123456789)),
1124 Some(Literal::timestamp(1622548800000000)),
1125 Some(Literal::timestamp_nano(1622548800000000000)),
1126 Some(Literal::string("a".to_string())),
1127 Some(Literal::binary(b"abc".to_vec())),
1128 ]))),
1129 Some(Literal::Struct(Struct::from_iter(vec![
1130 Some(Literal::bool(false)),
1131 Some(Literal::int(4)),
1132 Some(Literal::long(6)),
1133 Some(Literal::float(2.2)),
1134 Some(Literal::double(4.4)),
1135 Some(Literal::decimal(2000)),
1136 Some(Literal::date(18629)),
1137 Some(Literal::time(987654321)),
1138 Some(Literal::timestamp(1622635200000000)),
1139 Some(Literal::timestamp_nano(1622635200000000000)),
1140 Some(Literal::string("b".to_string())),
1141 Some(Literal::binary(b"def".to_vec())),
1142 ]))),
1143 Some(Literal::Struct(Struct::from_iter(vec![
1144 None, None, None, None, None, None, None, None, None, None, None, None,
1145 ]))),
1146 ]);
1147 }
1148
1149 #[test]
1150 fn test_nullable_struct() {
1151 let struct_array = {
1158 let mut builder = StructBuilder::from_fields(
1159 Fields::from(vec![
1160 Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
1161 PARQUET_FIELD_ID_META_KEY.to_string(),
1162 "0".to_string(),
1163 )])),
1164 Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
1165 PARQUET_FIELD_ID_META_KEY.to_string(),
1166 "1".to_string(),
1167 )])),
1168 ]),
1169 3,
1170 );
1171 builder
1172 .field_builder::<Int32Builder>(0)
1173 .unwrap()
1174 .append_null();
1175 builder
1176 .field_builder::<Int32Builder>(1)
1177 .unwrap()
1178 .append_null();
1179 builder.append(true);
1180
1181 builder
1182 .field_builder::<Int32Builder>(0)
1183 .unwrap()
1184 .append_value(1);
1185 builder
1186 .field_builder::<Int32Builder>(1)
1187 .unwrap()
1188 .append_null();
1189 builder.append(true);
1190
1191 builder
1192 .field_builder::<Int32Builder>(0)
1193 .unwrap()
1194 .append_value(1);
1195 builder
1196 .field_builder::<Int32Builder>(1)
1197 .unwrap()
1198 .append_value(1);
1199 builder.append_null();
1200
1201 Arc::new(builder.finish()) as ArrayRef
1202 };
1203
1204 let iceberg_struct_type = StructType::new(vec![
1205 Arc::new(NestedField::optional(
1206 0,
1207 "a",
1208 Type::Primitive(PrimitiveType::Int),
1209 )),
1210 Arc::new(NestedField::optional(
1211 1,
1212 "b",
1213 Type::Primitive(PrimitiveType::Int),
1214 )),
1215 ]);
1216
1217 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1218 assert_eq!(result, vec![
1219 Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
1220 Some(Literal::Struct(Struct::from_iter(vec![
1221 Some(Literal::int(1)),
1222 None,
1223 ]))),
1224 None,
1225 ]);
1226 }
1227
1228 #[test]
1229 fn test_empty_struct() {
1230 let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
1231 let iceberg_struct_type = StructType::new(vec![]);
1232 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1233 assert_eq!(result, vec![None; 0]);
1234 }
1235
1236 #[test]
1237 fn test_find_field_by_id() {
1238 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1240 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1241
1242 let nested_struct_array =
1244 Arc::new(StructArray::from(vec![
1245 (
1246 Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
1247 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1248 )),
1249 Arc::new(field_a_array) as ArrayRef,
1250 ),
1251 (
1252 Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
1253 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1254 )),
1255 Arc::new(field_b_array) as ArrayRef,
1256 ),
1257 ])) as ArrayRef;
1258
1259 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1260
1261 let struct_array = Arc::new(StructArray::from(vec![
1263 (
1264 Arc::new(
1265 Field::new(
1266 "nested_struct",
1267 DataType::Struct(Fields::from(vec![
1268 Field::new("field_a", DataType::Int32, true).with_metadata(
1269 HashMap::from([(
1270 PARQUET_FIELD_ID_META_KEY.to_string(),
1271 "1".to_string(),
1272 )]),
1273 ),
1274 Field::new("field_b", DataType::Utf8, true).with_metadata(
1275 HashMap::from([(
1276 PARQUET_FIELD_ID_META_KEY.to_string(),
1277 "2".to_string(),
1278 )]),
1279 ),
1280 ])),
1281 true,
1282 )
1283 .with_metadata(HashMap::from([(
1284 PARQUET_FIELD_ID_META_KEY.to_string(),
1285 "3".to_string(),
1286 )])),
1287 ),
1288 nested_struct_array,
1289 ),
1290 (
1291 Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1292 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1293 )),
1294 Arc::new(field_c_array) as ArrayRef,
1295 ),
1296 ])) as ArrayRef;
1297
1298 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1300
1301 let nested_field = NestedField::optional(
1303 3,
1304 "nested_struct",
1305 Type::Struct(StructType::new(vec![
1306 Arc::new(NestedField::optional(
1307 1,
1308 "field_a",
1309 Type::Primitive(PrimitiveType::Int),
1310 )),
1311 Arc::new(NestedField::optional(
1312 2,
1313 "field_b",
1314 Type::Primitive(PrimitiveType::String),
1315 )),
1316 ])),
1317 );
1318 let nested_partner = accessor
1319 .field_partner(&struct_array, &nested_field)
1320 .unwrap();
1321
1322 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1324 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1325
1326 let int_array = field_a_partner
1328 .as_any()
1329 .downcast_ref::<Int32Array>()
1330 .unwrap();
1331 assert_eq!(int_array.value(0), 42);
1332 assert_eq!(int_array.value(1), 43);
1333 assert!(int_array.is_null(2));
1334 }
1335
1336 #[test]
1337 fn test_find_field_by_name() {
1338 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1340 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1341
1342 let nested_struct_array = Arc::new(StructArray::from(vec![
1344 (
1345 Arc::new(Field::new("field_a", DataType::Int32, true)),
1346 Arc::new(field_a_array) as ArrayRef,
1347 ),
1348 (
1349 Arc::new(Field::new("field_b", DataType::Utf8, true)),
1350 Arc::new(field_b_array) as ArrayRef,
1351 ),
1352 ])) as ArrayRef;
1353
1354 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1355
1356 let struct_array = Arc::new(StructArray::from(vec![
1358 (
1359 Arc::new(Field::new(
1360 "nested_struct",
1361 DataType::Struct(Fields::from(vec![
1362 Field::new("field_a", DataType::Int32, true),
1363 Field::new("field_b", DataType::Utf8, true),
1364 ])),
1365 true,
1366 )),
1367 nested_struct_array,
1368 ),
1369 (
1370 Arc::new(Field::new("field_c", DataType::Int32, true)),
1371 Arc::new(field_c_array) as ArrayRef,
1372 ),
1373 ])) as ArrayRef;
1374
1375 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1377
1378 let nested_field = NestedField::optional(
1380 3,
1381 "nested_struct",
1382 Type::Struct(StructType::new(vec![
1383 Arc::new(NestedField::optional(
1384 1,
1385 "field_a",
1386 Type::Primitive(PrimitiveType::Int),
1387 )),
1388 Arc::new(NestedField::optional(
1389 2,
1390 "field_b",
1391 Type::Primitive(PrimitiveType::String),
1392 )),
1393 ])),
1394 );
1395 let nested_partner = accessor
1396 .field_partner(&struct_array, &nested_field)
1397 .unwrap();
1398
1399 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1401 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1402
1403 let int_array = field_a_partner
1405 .as_any()
1406 .downcast_ref::<Int32Array>()
1407 .unwrap();
1408 assert_eq!(int_array.value(0), 42);
1409 assert_eq!(int_array.value(1), 43);
1410 assert!(int_array.is_null(2));
1411 }
1412
1413 #[test]
1414 fn test_complex_nested() {
1415 let struct_type = StructType::new(vec![
1422 Arc::new(NestedField::required(
1423 0,
1424 "A",
1425 Type::List(ListType::new(Arc::new(NestedField::required(
1426 1,
1427 "item",
1428 Type::Struct(StructType::new(vec![
1429 Arc::new(NestedField::required(
1430 2,
1431 "a1",
1432 Type::Primitive(PrimitiveType::Int),
1433 )),
1434 Arc::new(NestedField::required(
1435 3,
1436 "a2",
1437 Type::Primitive(PrimitiveType::Int),
1438 )),
1439 ])),
1440 )))),
1441 )),
1442 Arc::new(NestedField::required(
1443 4,
1444 "B",
1445 Type::List(ListType::new(Arc::new(NestedField::required(
1446 5,
1447 "item",
1448 Type::Map(MapType::new(
1449 NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1450 .into(),
1451 NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1452 .into(),
1453 )),
1454 )))),
1455 )),
1456 Arc::new(NestedField::required(
1457 8,
1458 "C",
1459 Type::List(ListType::new(Arc::new(NestedField::required(
1460 9,
1461 "item",
1462 Type::List(ListType::new(Arc::new(NestedField::optional(
1463 10,
1464 "item",
1465 Type::Primitive(PrimitiveType::Int),
1466 )))),
1467 )))),
1468 )),
1469 ]);
1470
1471 let struct_array =
1477 {
1478 let a_struct_a1_builder = Int32Builder::new();
1479 let a_struct_a2_builder = Int32Builder::new();
1480 let a_struct_builder =
1481 StructBuilder::new(
1482 vec![
1483 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1484 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1485 )),
1486 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1487 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1488 )),
1489 ],
1490 vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1491 );
1492 let a_builder = ListBuilder::new(a_struct_builder);
1493
1494 let map_key_builder = Int32Builder::new();
1495 let map_value_builder = Int32Builder::new();
1496 let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1497 let b_builder = ListBuilder::new(map_builder);
1498
1499 let inner_list_item_builder = Int32Builder::new();
1500 let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1501 let c_builder = ListBuilder::new(inner_list_builder);
1502
1503 let mut top_struct_builder = {
1504 let a_struct_type =
1505 DataType::Struct(Fields::from(vec![
1506 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1507 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1508 )),
1509 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1510 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1511 )),
1512 ]));
1513 let a_type =
1514 DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1515
1516 let b_map_entry_struct = Field::new(
1517 "entries",
1518 DataType::Struct(Fields::from(vec![
1519 Field::new("keys", DataType::Int32, false),
1520 Field::new("values", DataType::Int32, true),
1521 ])),
1522 false,
1523 );
1524 let b_map_type =
1525 DataType::Map(Arc::new(b_map_entry_struct), false);
1526 let b_type =
1527 DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1528
1529 let c_inner_list_type =
1530 DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1531 let c_type = DataType::List(Arc::new(Field::new(
1532 "item",
1533 c_inner_list_type.clone(),
1534 true,
1535 )));
1536 StructBuilder::new(
1537 Fields::from(vec![
1538 Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1539 (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1540 ])),
1541 Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1542 (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1543 ])),
1544 Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1545 (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1546 ])),
1547 ]),
1548 vec![
1549 Box::new(a_builder),
1550 Box::new(b_builder),
1551 Box::new(c_builder),
1552 ],
1553 )
1554 };
1555
1556 {
1559 let a_builder = top_struct_builder
1560 .field_builder::<ListBuilder<StructBuilder>>(0)
1561 .unwrap();
1562 let struct_builder = a_builder.values();
1563 struct_builder
1564 .field_builder::<Int32Builder>(0)
1565 .unwrap()
1566 .append_value(10);
1567 struct_builder
1568 .field_builder::<Int32Builder>(1)
1569 .unwrap()
1570 .append_value(20);
1571 struct_builder.append(true);
1572 let struct_builder = a_builder.values();
1573 struct_builder
1574 .field_builder::<Int32Builder>(0)
1575 .unwrap()
1576 .append_value(11);
1577 struct_builder
1578 .field_builder::<Int32Builder>(1)
1579 .unwrap()
1580 .append_value(21);
1581 struct_builder.append(true);
1582 a_builder.append(true);
1583 }
1584 {
1585 let b_builder = top_struct_builder
1586 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1587 .unwrap();
1588 let map_builder = b_builder.values();
1589 map_builder.keys().append_value(1);
1590 map_builder.values().append_value(100);
1591 map_builder.keys().append_value(3);
1592 map_builder.values().append_value(300);
1593 map_builder.append(true).unwrap();
1594
1595 map_builder.keys().append_value(2);
1596 map_builder.values().append_value(200);
1597 map_builder.append(true).unwrap();
1598
1599 b_builder.append(true);
1600 }
1601 {
1602 let c_builder = top_struct_builder
1603 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1604 .unwrap();
1605 let inner_list_builder = c_builder.values();
1606 inner_list_builder.values().append_value(100);
1607 inner_list_builder.values().append_value(101);
1608 inner_list_builder.values().append_value(102);
1609 inner_list_builder.append(true);
1610 let inner_list_builder = c_builder.values();
1611 inner_list_builder.values().append_value(200);
1612 inner_list_builder.values().append_value(201);
1613 inner_list_builder.append(true);
1614 c_builder.append(true);
1615 }
1616 top_struct_builder.append(true);
1617
1618 {
1621 let a_builder = top_struct_builder
1622 .field_builder::<ListBuilder<StructBuilder>>(0)
1623 .unwrap();
1624 let struct_builder = a_builder.values();
1625 struct_builder
1626 .field_builder::<Int32Builder>(0)
1627 .unwrap()
1628 .append_value(12);
1629 struct_builder
1630 .field_builder::<Int32Builder>(1)
1631 .unwrap()
1632 .append_value(22);
1633 struct_builder.append(true);
1634 let struct_builder = a_builder.values();
1635 struct_builder
1636 .field_builder::<Int32Builder>(0)
1637 .unwrap()
1638 .append_value(13);
1639 struct_builder
1640 .field_builder::<Int32Builder>(1)
1641 .unwrap()
1642 .append_value(23);
1643 struct_builder.append(true);
1644 a_builder.append(true);
1645 }
1646 {
1647 let b_builder = top_struct_builder
1648 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1649 .unwrap();
1650 let map_builder = b_builder.values();
1651 map_builder.keys().append_value(3);
1652 map_builder.values().append_value(300);
1653 map_builder.append(true).unwrap();
1654
1655 b_builder.append(true);
1656 }
1657 {
1658 let c_builder = top_struct_builder
1659 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1660 .unwrap();
1661 let inner_list_builder = c_builder.values();
1662 inner_list_builder.values().append_value(300);
1663 inner_list_builder.values().append_value(301);
1664 inner_list_builder.values().append_value(302);
1665 inner_list_builder.append(true);
1666 let inner_list_builder = c_builder.values();
1667 inner_list_builder.values().append_value(400);
1668 inner_list_builder.values().append_value(401);
1669 inner_list_builder.append(true);
1670 c_builder.append(true);
1671 }
1672 top_struct_builder.append(true);
1673
1674 Arc::new(top_struct_builder.finish()) as ArrayRef
1675 };
1676
1677 let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1678 assert_eq!(result, vec![
1679 Some(Literal::Struct(Struct::from_iter(vec![
1680 Some(Literal::List(vec![
1681 Some(Literal::Struct(Struct::from_iter(vec![
1682 Some(Literal::int(10)),
1683 Some(Literal::int(20)),
1684 ]))),
1685 Some(Literal::Struct(Struct::from_iter(vec![
1686 Some(Literal::int(11)),
1687 Some(Literal::int(21)),
1688 ]))),
1689 ])),
1690 Some(Literal::List(vec![
1691 Some(Literal::Map(Map::from_iter(vec![
1692 (Literal::int(1), Some(Literal::int(100))),
1693 (Literal::int(3), Some(Literal::int(300))),
1694 ]))),
1695 Some(Literal::Map(Map::from_iter(vec![(
1696 Literal::int(2),
1697 Some(Literal::int(200))
1698 ),]))),
1699 ])),
1700 Some(Literal::List(vec![
1701 Some(Literal::List(vec![
1702 Some(Literal::int(100)),
1703 Some(Literal::int(101)),
1704 Some(Literal::int(102)),
1705 ])),
1706 Some(Literal::List(vec![
1707 Some(Literal::int(200)),
1708 Some(Literal::int(201)),
1709 ])),
1710 ])),
1711 ]))),
1712 Some(Literal::Struct(Struct::from_iter(vec![
1713 Some(Literal::List(vec![
1714 Some(Literal::Struct(Struct::from_iter(vec![
1715 Some(Literal::int(12)),
1716 Some(Literal::int(22)),
1717 ]))),
1718 Some(Literal::Struct(Struct::from_iter(vec![
1719 Some(Literal::int(13)),
1720 Some(Literal::int(23)),
1721 ]))),
1722 ])),
1723 Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1724 vec![(Literal::int(3), Some(Literal::int(300))),]
1725 ))),])),
1726 Some(Literal::List(vec![
1727 Some(Literal::List(vec![
1728 Some(Literal::int(300)),
1729 Some(Literal::int(301)),
1730 Some(Literal::int(302)),
1731 ])),
1732 Some(Literal::List(vec![
1733 Some(Literal::int(400)),
1734 Some(Literal::int(401)),
1735 ])),
1736 ])),
1737 ]))),
1738 ]);
1739 }
1740
1741 #[test]
1742 fn test_create_decimal_array_respects_precision() {
1743 let target_precision = 18u8;
1746 let target_scale = 10i8;
1747 let target_type = DataType::Decimal128(target_precision, target_scale);
1748 let value = PrimitiveLiteral::Int128(10000000000);
1749
1750 let array = create_primitive_array_single_element(&target_type, &Some(value))
1751 .expect("Failed to create decimal array");
1752
1753 match array.data_type() {
1754 DataType::Decimal128(precision, scale) => {
1755 assert_eq!(*precision, target_precision);
1756 assert_eq!(*scale, target_scale);
1757 }
1758 other => panic!("Expected Decimal128, got {other:?}"),
1759 }
1760 }
1761
1762 #[test]
1763 fn test_create_decimal_array_repeated_respects_precision() {
1764 let target_precision = 18u8;
1766 let target_scale = 10i8;
1767 let target_type = DataType::Decimal128(target_precision, target_scale);
1768 let value = PrimitiveLiteral::Int128(10000000000);
1769 let num_rows = 5;
1770
1771 let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1772 .expect("Failed to create repeated decimal array");
1773
1774 match array.data_type() {
1775 DataType::Decimal128(precision, scale) => {
1776 assert_eq!(*precision, target_precision);
1777 assert_eq!(*scale, target_scale);
1778 }
1779 other => panic!("Expected Decimal128, got {other:?}"),
1780 }
1781
1782 assert_eq!(array.len(), num_rows);
1783 }
1784}