iceberg/arrow/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow_array::{
21    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
22    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
23    LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
24    Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
25};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{DataType, FieldRef};
28use uuid::Uuid;
29
30use super::get_field_id;
31use crate::spec::{
32    ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
33    SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
34    visit_type_with_partner,
35};
36use crate::{Error, ErrorKind, Result};
37
38struct ArrowArrayToIcebergStructConverter;
39
40impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
41    type T = Vec<Option<Literal>>;
42
43    fn schema(
44        &mut self,
45        _schema: &crate::spec::Schema,
46        _partner: &ArrayRef,
47        value: Vec<Option<Literal>>,
48    ) -> Result<Vec<Option<Literal>>> {
49        Ok(value)
50    }
51
52    fn field(
53        &mut self,
54        field: &crate::spec::NestedFieldRef,
55        _partner: &ArrayRef,
56        value: Vec<Option<Literal>>,
57    ) -> Result<Vec<Option<Literal>>> {
58        // Make there is no null value if the field is required
59        if field.required && value.iter().any(Option::is_none) {
60            return Err(Error::new(
61                ErrorKind::DataInvalid,
62                "The field is required but has null value",
63            )
64            .with_context("field_id", field.id.to_string())
65            .with_context("field_name", &field.name));
66        }
67        Ok(value)
68    }
69
70    fn r#struct(
71        &mut self,
72        _struct: &StructType,
73        array: &ArrayRef,
74        results: Vec<Vec<Option<Literal>>>,
75    ) -> Result<Vec<Option<Literal>>> {
76        let row_len = results.first().map(|column| column.len()).unwrap_or(0);
77        if let Some(col) = results.iter().find(|col| col.len() != row_len) {
78            return Err(Error::new(
79                ErrorKind::DataInvalid,
80                "The struct columns have different row length",
81            )
82            .with_context("first col length", row_len.to_string())
83            .with_context("actual col length", col.len().to_string()));
84        }
85
86        let mut struct_literals = Vec::with_capacity(row_len);
87        let mut columns_iters = results
88            .into_iter()
89            .map(|column| column.into_iter())
90            .collect::<Vec<_>>();
91
92        for i in 0..row_len {
93            let mut literals = Vec::with_capacity(columns_iters.len());
94            for column_iter in columns_iters.iter_mut() {
95                literals.push(column_iter.next().unwrap());
96            }
97            if array.is_null(i) {
98                struct_literals.push(None);
99            } else {
100                struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
101            }
102        }
103
104        Ok(struct_literals)
105    }
106
107    fn list(
108        &mut self,
109        list: &ListType,
110        array: &ArrayRef,
111        elements: Vec<Option<Literal>>,
112    ) -> Result<Vec<Option<Literal>>> {
113        if list.element_field.required && elements.iter().any(Option::is_none) {
114            return Err(Error::new(
115                ErrorKind::DataInvalid,
116                "The list should not have null value",
117            ));
118        }
119        match array.data_type() {
120            DataType::List(_) => {
121                let offset = array
122                    .as_any()
123                    .downcast_ref::<ListArray>()
124                    .ok_or_else(|| {
125                        Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
126                    })?
127                    .offsets();
128                // combine the result according to the offset
129                let mut result = Vec::with_capacity(offset.len() - 1);
130                for i in 0..offset.len() - 1 {
131                    let start = offset[i] as usize;
132                    let end = offset[i + 1] as usize;
133                    result.push(Some(Literal::List(elements[start..end].to_vec())));
134                }
135                Ok(result)
136            }
137            DataType::LargeList(_) => {
138                let offset = array
139                    .as_any()
140                    .downcast_ref::<LargeListArray>()
141                    .ok_or_else(|| {
142                        Error::new(
143                            ErrorKind::DataInvalid,
144                            "The partner is not a large list array",
145                        )
146                    })?
147                    .offsets();
148                // combine the result according to the offset
149                let mut result = Vec::with_capacity(offset.len() - 1);
150                for i in 0..offset.len() - 1 {
151                    let start = offset[i] as usize;
152                    let end = offset[i + 1] as usize;
153                    result.push(Some(Literal::List(elements[start..end].to_vec())));
154                }
155                Ok(result)
156            }
157            DataType::FixedSizeList(_, len) => {
158                let mut result = Vec::with_capacity(elements.len() / *len as usize);
159                for i in 0..elements.len() / *len as usize {
160                    let start = i * *len as usize;
161                    let end = (i + 1) * *len as usize;
162                    result.push(Some(Literal::List(elements[start..end].to_vec())));
163                }
164                Ok(result)
165            }
166            _ => Err(Error::new(
167                ErrorKind::DataInvalid,
168                "The partner is not a list type",
169            )),
170        }
171    }
172
173    fn map(
174        &mut self,
175        _map: &MapType,
176        partner: &ArrayRef,
177        key_values: Vec<Option<Literal>>,
178        values: Vec<Option<Literal>>,
179    ) -> Result<Vec<Option<Literal>>> {
180        // Make sure key_value and value have the same row length
181        if key_values.len() != values.len() {
182            return Err(Error::new(
183                ErrorKind::DataInvalid,
184                "The key value and value of map should have the same row length",
185            ));
186        }
187
188        let offsets = partner
189            .as_any()
190            .downcast_ref::<MapArray>()
191            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
192            .offsets();
193        // combine the result according to the offset
194        let mut result = Vec::with_capacity(offsets.len() - 1);
195        for i in 0..offsets.len() - 1 {
196            let start = offsets[i] as usize;
197            let end = offsets[i + 1] as usize;
198            let mut map = Map::new();
199            for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
200                map.insert(key.clone().unwrap(), value.clone());
201            }
202            result.push(Some(Literal::Map(map)));
203        }
204        Ok(result)
205    }
206
207    fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208        match p {
209            PrimitiveType::Boolean => {
210                let array = partner
211                    .as_any()
212                    .downcast_ref::<BooleanArray>()
213                    .ok_or_else(|| {
214                        Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
215                    })?;
216                Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
217            }
218            PrimitiveType::Int => {
219                let array = partner
220                    .as_any()
221                    .downcast_ref::<Int32Array>()
222                    .ok_or_else(|| {
223                        Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
224                    })?;
225                Ok(array.iter().map(|v| v.map(Literal::int)).collect())
226            }
227            PrimitiveType::Long => {
228                let array = partner
229                    .as_any()
230                    .downcast_ref::<Int64Array>()
231                    .ok_or_else(|| {
232                        Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
233                    })?;
234                Ok(array.iter().map(|v| v.map(Literal::long)).collect())
235            }
236            PrimitiveType::Float => {
237                let array = partner
238                    .as_any()
239                    .downcast_ref::<Float32Array>()
240                    .ok_or_else(|| {
241                        Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
242                    })?;
243                Ok(array.iter().map(|v| v.map(Literal::float)).collect())
244            }
245            PrimitiveType::Double => {
246                let array = partner
247                    .as_any()
248                    .downcast_ref::<Float64Array>()
249                    .ok_or_else(|| {
250                        Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
251                    })?;
252                Ok(array.iter().map(|v| v.map(Literal::double)).collect())
253            }
254            PrimitiveType::Decimal { precision, scale } => {
255                let array = partner
256                    .as_any()
257                    .downcast_ref::<Decimal128Array>()
258                    .ok_or_else(|| {
259                        Error::new(
260                            ErrorKind::DataInvalid,
261                            "The partner is not a decimal128 array",
262                        )
263                    })?;
264                if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type()
265                    && (*arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale)
266                {
267                    return Err(Error::new(
268                        ErrorKind::DataInvalid,
269                        format!(
270                            "The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
271                        ),
272                    ));
273                }
274                Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
275            }
276            PrimitiveType::Date => {
277                let array = partner
278                    .as_any()
279                    .downcast_ref::<Date32Array>()
280                    .ok_or_else(|| {
281                        Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
282                    })?;
283                Ok(array.iter().map(|v| v.map(Literal::date)).collect())
284            }
285            PrimitiveType::Time => {
286                let array = partner
287                    .as_any()
288                    .downcast_ref::<Time64MicrosecondArray>()
289                    .ok_or_else(|| {
290                        Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
291                    })?;
292                Ok(array.iter().map(|v| v.map(Literal::time)).collect())
293            }
294            PrimitiveType::Timestamp => {
295                let array = partner
296                    .as_any()
297                    .downcast_ref::<TimestampMicrosecondArray>()
298                    .ok_or_else(|| {
299                        Error::new(
300                            ErrorKind::DataInvalid,
301                            "The partner is not a timestamp array",
302                        )
303                    })?;
304                Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
305            }
306            PrimitiveType::Timestamptz => {
307                let array = partner
308                    .as_any()
309                    .downcast_ref::<TimestampMicrosecondArray>()
310                    .ok_or_else(|| {
311                        Error::new(
312                            ErrorKind::DataInvalid,
313                            "The partner is not a timestamptz array",
314                        )
315                    })?;
316                Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
317            }
318            PrimitiveType::TimestampNs => {
319                let array = partner
320                    .as_any()
321                    .downcast_ref::<TimestampNanosecondArray>()
322                    .ok_or_else(|| {
323                        Error::new(
324                            ErrorKind::DataInvalid,
325                            "The partner is not a timestamp_ns array",
326                        )
327                    })?;
328                Ok(array
329                    .iter()
330                    .map(|v| v.map(Literal::timestamp_nano))
331                    .collect())
332            }
333            PrimitiveType::TimestamptzNs => {
334                let array = partner
335                    .as_any()
336                    .downcast_ref::<TimestampNanosecondArray>()
337                    .ok_or_else(|| {
338                        Error::new(
339                            ErrorKind::DataInvalid,
340                            "The partner is not a timestamptz_ns array",
341                        )
342                    })?;
343                Ok(array
344                    .iter()
345                    .map(|v| v.map(Literal::timestamptz_nano))
346                    .collect())
347            }
348            PrimitiveType::String => {
349                if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
350                    Ok(array.iter().map(|v| v.map(Literal::string)).collect())
351                } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
352                    Ok(array.iter().map(|v| v.map(Literal::string)).collect())
353                } else {
354                    Err(Error::new(
355                        ErrorKind::DataInvalid,
356                        "The partner is not a string array",
357                    ))
358                }
359            }
360            PrimitiveType::Uuid => {
361                if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
362                    if array.value_length() != 16 {
363                        return Err(Error::new(
364                            ErrorKind::DataInvalid,
365                            "The partner is not a uuid array",
366                        ));
367                    }
368                    Ok(array
369                        .iter()
370                        .map(|v| {
371                            v.map(|v| {
372                                Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
373                                    |_| {
374                                        Error::new(
375                                            ErrorKind::DataInvalid,
376                                            "Failed to convert binary to uuid",
377                                        )
378                                    },
379                                )?)))
380                            })
381                            .transpose()
382                        })
383                        .collect::<Result<Vec<_>>>()?)
384                } else {
385                    Err(Error::new(
386                        ErrorKind::DataInvalid,
387                        "The partner is not a uuid array",
388                    ))
389                }
390            }
391            PrimitiveType::Fixed(len) => {
392                let array = partner
393                    .as_any()
394                    .downcast_ref::<FixedSizeBinaryArray>()
395                    .ok_or_else(|| {
396                        Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
397                    })?;
398                if array.value_length() != *len as i32 {
399                    return Err(Error::new(
400                        ErrorKind::DataInvalid,
401                        "The length of fixed size binary array is not compatible with iceberg fixed type",
402                    ));
403                }
404                Ok(array
405                    .iter()
406                    .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
407                    .collect())
408            }
409            PrimitiveType::Binary => {
410                if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
411                    Ok(array
412                        .iter()
413                        .map(|v| v.map(|v| Literal::binary(v.to_vec())))
414                        .collect())
415                } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
416                    Ok(array
417                        .iter()
418                        .map(|v| v.map(|v| Literal::binary(v.to_vec())))
419                        .collect())
420                } else {
421                    Err(Error::new(
422                        ErrorKind::DataInvalid,
423                        "The partner is not a binary array",
424                    ))
425                }
426            }
427        }
428    }
429}
430
431/// Defines how Arrow fields are matched with Iceberg fields when converting data.
432///
433/// This enum provides two strategies for matching fields:
434/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata.
435/// - `Name`: Match fields by their name, ignoring the field ID.
436///
437/// The ID matching mode is the default and preferred approach as it's more robust
438/// against schema evolution where field names might change but IDs remain stable.
439/// The name matching mode can be useful in scenarios where field IDs are not available
440/// or when working with systems that don't preserve field IDs.
441#[derive(Clone, Copy, Debug)]
442pub enum FieldMatchMode {
443    /// Match fields by their ID stored in Arrow field metadata
444    Id,
445    /// Match fields by their name, ignoring field IDs
446    Name,
447}
448
449impl FieldMatchMode {
450    /// Determines if an Arrow field matches an Iceberg field based on the matching mode.
451    pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
452        match self {
453            FieldMatchMode::Id => get_field_id(arrow_field)
454                .map(|id| id == iceberg_field.id)
455                .unwrap_or(false),
456            FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
457        }
458    }
459}
460
461/// Partner type representing accessing and walking arrow arrays alongside iceberg schema
462pub struct ArrowArrayAccessor {
463    match_mode: FieldMatchMode,
464}
465
466impl ArrowArrayAccessor {
467    /// Creates a new instance of ArrowArrayAccessor with the default ID matching mode
468    pub fn new() -> Self {
469        Self {
470            match_mode: FieldMatchMode::Id,
471        }
472    }
473
474    /// Creates a new instance of ArrowArrayAccessor with the specified matching mode
475    pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
476        Self { match_mode }
477    }
478}
479
480impl Default for ArrowArrayAccessor {
481    fn default() -> Self {
482        Self::new()
483    }
484}
485
486impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
487    fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
488        if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
489            return Err(Error::new(
490                ErrorKind::DataInvalid,
491                "The schema partner is not a struct type",
492            ));
493        }
494
495        Ok(schema_partner)
496    }
497
498    fn field_partner<'a>(
499        &self,
500        struct_partner: &'a ArrayRef,
501        field: &NestedField,
502    ) -> Result<&'a ArrayRef> {
503        let struct_array = struct_partner
504            .as_any()
505            .downcast_ref::<StructArray>()
506            .ok_or_else(|| {
507                Error::new(
508                    ErrorKind::DataInvalid,
509                    format!(
510                        "The struct partner is not a struct array, partner: {struct_partner:?}"
511                    ),
512                )
513            })?;
514
515        let field_pos = struct_array
516            .fields()
517            .iter()
518            .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
519            .ok_or_else(|| {
520                Error::new(
521                    ErrorKind::DataInvalid,
522                    format!("Field id {} not found in struct array", field.id),
523                )
524            })?;
525
526        Ok(struct_array.column(field_pos))
527    }
528
529    fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
530        match list_partner.data_type() {
531            DataType::List(_) => {
532                let list_array = list_partner
533                    .as_any()
534                    .downcast_ref::<ListArray>()
535                    .ok_or_else(|| {
536                        Error::new(
537                            ErrorKind::DataInvalid,
538                            "The list partner is not a list array",
539                        )
540                    })?;
541                Ok(list_array.values())
542            }
543            DataType::LargeList(_) => {
544                let list_array = list_partner
545                    .as_any()
546                    .downcast_ref::<LargeListArray>()
547                    .ok_or_else(|| {
548                        Error::new(
549                            ErrorKind::DataInvalid,
550                            "The list partner is not a large list array",
551                        )
552                    })?;
553                Ok(list_array.values())
554            }
555            DataType::FixedSizeList(_, _) => {
556                let list_array = list_partner
557                    .as_any()
558                    .downcast_ref::<FixedSizeListArray>()
559                    .ok_or_else(|| {
560                        Error::new(
561                            ErrorKind::DataInvalid,
562                            "The list partner is not a fixed size list array",
563                        )
564                    })?;
565                Ok(list_array.values())
566            }
567            _ => Err(Error::new(
568                ErrorKind::DataInvalid,
569                "The list partner is not a list type",
570            )),
571        }
572    }
573
574    fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
575        let map_array = map_partner
576            .as_any()
577            .downcast_ref::<MapArray>()
578            .ok_or_else(|| {
579                Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
580            })?;
581        Ok(map_array.keys())
582    }
583
584    fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
585        let map_array = map_partner
586            .as_any()
587            .downcast_ref::<MapArray>()
588            .ok_or_else(|| {
589                Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
590            })?;
591        Ok(map_array.values())
592    }
593}
594
595/// Convert arrow struct array to iceberg struct value array.
596/// This function will assume the schema of arrow struct array is the same as iceberg struct type.
597pub fn arrow_struct_to_literal(
598    struct_array: &ArrayRef,
599    ty: &StructType,
600) -> Result<Vec<Option<Literal>>> {
601    visit_struct_with_partner(
602        ty,
603        struct_array,
604        &mut ArrowArrayToIcebergStructConverter,
605        &ArrowArrayAccessor::new(),
606    )
607}
608
609/// Convert arrow primitive array to iceberg primitive value array.
610/// This function will assume the schema of arrow struct array is the same as iceberg struct type.
611pub fn arrow_primitive_to_literal(
612    primitive_array: &ArrayRef,
613    ty: &Type,
614) -> Result<Vec<Option<Literal>>> {
615    visit_type_with_partner(
616        ty,
617        primitive_array,
618        &mut ArrowArrayToIcebergStructConverter,
619        &ArrowArrayAccessor::new(),
620    )
621}
622
623/// Create a single-element array from a primitive literal.
624///
625/// This is used for creating constant arrays (Run-End Encoded arrays) where we need
626/// a single value that represents all rows.
627pub(crate) fn create_primitive_array_single_element(
628    data_type: &DataType,
629    prim_lit: &Option<PrimitiveLiteral>,
630) -> Result<ArrayRef> {
631    match (data_type, prim_lit) {
632        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633            Ok(Arc::new(BooleanArray::from(vec![*v])))
634        }
635        (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
636        (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
637            Ok(Arc::new(Int32Array::from(vec![*v])))
638        }
639        (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
640        (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
641            Ok(Arc::new(Date32Array::from(vec![*v])))
642        }
643        (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
644        (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
645            Ok(Arc::new(Int64Array::from(vec![*v])))
646        }
647        (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
648        (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
649            Ok(Arc::new(Float32Array::from(vec![v.0])))
650        }
651        (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
652        (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
653            Ok(Arc::new(Float64Array::from(vec![v.0])))
654        }
655        (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
656        (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
657            Ok(Arc::new(StringArray::from(vec![v.as_str()])))
658        }
659        (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
660        (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
661            Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
662        }
663        (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
664            Option::<&[u8]>::None,
665        ]))),
666        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => {
667            let array = Decimal128Array::from(vec![{ *v }])
668                .with_precision_and_scale(*precision, *scale)
669                .map_err(|e| {
670                    Error::new(
671                        ErrorKind::DataInvalid,
672                        format!(
673                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
674                        ),
675                    )
676                })?;
677            Ok(Arc::new(array))
678        }
679        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(v))) => {
680            let array = Decimal128Array::from(vec![*v as i128])
681                .with_precision_and_scale(*precision, *scale)
682                .map_err(|e| {
683                    Error::new(
684                        ErrorKind::DataInvalid,
685                        format!(
686                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
687                        ),
688                    )
689                })?;
690            Ok(Arc::new(array))
691        }
692        (DataType::Decimal128(precision, scale), None) => {
693            let array = Decimal128Array::from(vec![Option::<i128>::None])
694                .with_precision_and_scale(*precision, *scale)
695                .map_err(|e| {
696                    Error::new(
697                        ErrorKind::DataInvalid,
698                        format!(
699                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
700                        ),
701                    )
702                })?;
703            Ok(Arc::new(array))
704        }
705        (DataType::Struct(fields), None) => {
706            // Create a single-element StructArray with nulls
707            let null_arrays: Vec<ArrayRef> = fields
708                .iter()
709                .map(|f| {
710                    // Recursively create null arrays for struct fields
711                    // For primitive fields in structs, use simple null arrays (not REE within struct)
712                    match f.data_type() {
713                        DataType::Boolean => {
714                            Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
715                                as ArrayRef)
716                        }
717                        DataType::Int32 | DataType::Date32 => {
718                            Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
719                        }
720                        DataType::Int64 => {
721                            Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
722                        }
723                        DataType::Float32 => {
724                            Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
725                        }
726                        DataType::Float64 => {
727                            Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
728                        }
729                        DataType::Utf8 => {
730                            Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
731                        }
732                        DataType::Binary => {
733                            Ok(
734                                Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
735                                    as ArrayRef,
736                            )
737                        }
738                        _ => Err(Error::new(
739                            ErrorKind::Unexpected,
740                            format!("Unsupported struct field type: {:?}", f.data_type()),
741                        )),
742                    }
743                })
744                .collect::<Result<Vec<_>>>()?;
745            Ok(Arc::new(arrow_array::StructArray::new(
746                fields.clone(),
747                null_arrays,
748                Some(arrow_buffer::NullBuffer::new_null(1)),
749            )))
750        }
751        _ => Err(Error::new(
752            ErrorKind::Unexpected,
753            format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"),
754        )),
755    }
756}
757
758/// Create a repeated array from a primitive literal for a given number of rows.
759///
760/// This is used for creating non-constant arrays where we need the same value
761/// repeated for each row.
762pub(crate) fn create_primitive_array_repeated(
763    data_type: &DataType,
764    prim_lit: &Option<PrimitiveLiteral>,
765    num_rows: usize,
766) -> Result<ArrayRef> {
767    Ok(match (data_type, prim_lit) {
768        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
769            Arc::new(BooleanArray::from(vec![*value; num_rows]))
770        }
771        (DataType::Boolean, None) => {
772            let vals: Vec<Option<bool>> = vec![None; num_rows];
773            Arc::new(BooleanArray::from(vals))
774        }
775        (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
776            Arc::new(Int32Array::from(vec![*value; num_rows]))
777        }
778        (DataType::Int32, None) => {
779            let vals: Vec<Option<i32>> = vec![None; num_rows];
780            Arc::new(Int32Array::from(vals))
781        }
782        (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
783            Arc::new(Date32Array::from(vec![*value; num_rows]))
784        }
785        (DataType::Date32, None) => {
786            let vals: Vec<Option<i32>> = vec![None; num_rows];
787            Arc::new(Date32Array::from(vals))
788        }
789        (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
790            Arc::new(Int64Array::from(vec![*value; num_rows]))
791        }
792        (DataType::Int64, None) => {
793            let vals: Vec<Option<i64>> = vec![None; num_rows];
794            Arc::new(Int64Array::from(vals))
795        }
796        (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
797            Arc::new(Float32Array::from(vec![value.0; num_rows]))
798        }
799        (DataType::Float32, None) => {
800            let vals: Vec<Option<f32>> = vec![None; num_rows];
801            Arc::new(Float32Array::from(vals))
802        }
803        (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
804            Arc::new(Float64Array::from(vec![value.0; num_rows]))
805        }
806        (DataType::Float64, None) => {
807            let vals: Vec<Option<f64>> = vec![None; num_rows];
808            Arc::new(Float64Array::from(vals))
809        }
810        (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
811            Arc::new(StringArray::from(vec![value.clone(); num_rows]))
812        }
813        (DataType::Utf8, None) => {
814            let vals: Vec<Option<String>> = vec![None; num_rows];
815            Arc::new(StringArray::from(vals))
816        }
817        (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
818            Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
819        }
820        (DataType::Binary, None) => {
821            let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
822            Arc::new(BinaryArray::from_opt_vec(vals))
823        }
824        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => {
825            Arc::new(
826                Decimal128Array::from(vec![*value; num_rows])
827                    .with_precision_and_scale(*precision, *scale)
828                    .map_err(|e| {
829                        Error::new(
830                            ErrorKind::DataInvalid,
831                            format!(
832                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
833                            ),
834                        )
835                    })?,
836            )
837        }
838        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(value))) => {
839            Arc::new(
840                Decimal128Array::from(vec![*value as i128; num_rows])
841                    .with_precision_and_scale(*precision, *scale)
842                    .map_err(|e| {
843                        Error::new(
844                            ErrorKind::DataInvalid,
845                            format!(
846                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
847                            ),
848                        )
849                    })?,
850            )
851        }
852        (DataType::Decimal128(precision, scale), None) => {
853            let vals: Vec<Option<i128>> = vec![None; num_rows];
854            Arc::new(
855                Decimal128Array::from(vals)
856                    .with_precision_and_scale(*precision, *scale)
857                    .map_err(|e| {
858                        Error::new(
859                            ErrorKind::DataInvalid,
860                            format!(
861                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
862                            ),
863                        )
864                    })?,
865            )
866        }
867        (DataType::Struct(fields), None) => {
868            // Create a StructArray filled with nulls
869            let null_arrays: Vec<ArrayRef> = fields
870                .iter()
871                .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows))
872                .collect::<Result<Vec<_>>>()?;
873
874            Arc::new(StructArray::new(
875                fields.clone(),
876                null_arrays,
877                Some(NullBuffer::new_null(num_rows)),
878            ))
879        }
880        (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
881        (dt, _) => {
882            return Err(Error::new(
883                ErrorKind::Unexpected,
884                format!("unexpected target column type {dt}"),
885            ));
886        }
887    })
888}
889
890#[cfg(test)]
891mod test {
892    use std::collections::HashMap;
893    use std::sync::Arc;
894
895    use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
896    use arrow_array::{
897        ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
898        Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
899        TimestampMicrosecondArray, TimestampNanosecondArray,
900    };
901    use arrow_schema::{DataType, Field, Fields, TimeUnit};
902    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
903
904    use super::*;
905    use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
906
907    #[test]
908    fn test_arrow_struct_to_iceberg_struct() {
909        let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
910        let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
911        let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
912        let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
913        let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
914        let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
915            .with_precision_and_scale(10, 2)
916            .unwrap();
917        let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
918        let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
919        let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
920            Some(1622548800000000),
921            Some(1622635200000000),
922            None,
923        ]);
924        let timestamp_nano_array = TimestampNanosecondArray::from(vec![
925            Some(1622548800000000000),
926            Some(1622635200000000000),
927            None,
928        ]);
929        let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
930        let binary_array =
931            BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
932
933        let struct_array = Arc::new(StructArray::from(vec![
934            (
935                Arc::new(
936                    Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
937                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
938                    )),
939                ),
940                Arc::new(bool_array) as ArrayRef,
941            ),
942            (
943                Arc::new(
944                    Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
945                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
946                    )),
947                ),
948                Arc::new(int32_array) as ArrayRef,
949            ),
950            (
951                Arc::new(
952                    Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
953                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
954                    )),
955                ),
956                Arc::new(int64_array) as ArrayRef,
957            ),
958            (
959                Arc::new(
960                    Field::new("float32_field", DataType::Float32, true).with_metadata(
961                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
962                    ),
963                ),
964                Arc::new(float32_array) as ArrayRef,
965            ),
966            (
967                Arc::new(
968                    Field::new("float64_field", DataType::Float64, true).with_metadata(
969                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
970                    ),
971                ),
972                Arc::new(float64_array) as ArrayRef,
973            ),
974            (
975                Arc::new(
976                    Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
977                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
978                    ),
979                ),
980                Arc::new(decimal_array) as ArrayRef,
981            ),
982            (
983                Arc::new(
984                    Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
985                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
986                    )),
987                ),
988                Arc::new(date_array) as ArrayRef,
989            ),
990            (
991                Arc::new(
992                    Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
993                        .with_metadata(HashMap::from([(
994                            PARQUET_FIELD_ID_META_KEY.to_string(),
995                            "8".to_string(),
996                        )])),
997                ),
998                Arc::new(time_array) as ArrayRef,
999            ),
1000            (
1001                Arc::new(
1002                    Field::new(
1003                        "timestamp_micro_field",
1004                        DataType::Timestamp(TimeUnit::Microsecond, None),
1005                        true,
1006                    )
1007                    .with_metadata(HashMap::from([(
1008                        PARQUET_FIELD_ID_META_KEY.to_string(),
1009                        "9".to_string(),
1010                    )])),
1011                ),
1012                Arc::new(timestamp_micro_array) as ArrayRef,
1013            ),
1014            (
1015                Arc::new(
1016                    Field::new(
1017                        "timestamp_nano_field",
1018                        DataType::Timestamp(TimeUnit::Nanosecond, None),
1019                        true,
1020                    )
1021                    .with_metadata(HashMap::from([(
1022                        PARQUET_FIELD_ID_META_KEY.to_string(),
1023                        "10".to_string(),
1024                    )])),
1025                ),
1026                Arc::new(timestamp_nano_array) as ArrayRef,
1027            ),
1028            (
1029                Arc::new(
1030                    Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
1031                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
1032                    )),
1033                ),
1034                Arc::new(string_array) as ArrayRef,
1035            ),
1036            (
1037                Arc::new(
1038                    Field::new("binary_field", DataType::Binary, true).with_metadata(
1039                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
1040                    ),
1041                ),
1042                Arc::new(binary_array) as ArrayRef,
1043            ),
1044        ])) as ArrayRef;
1045
1046        let iceberg_struct_type = StructType::new(vec![
1047            Arc::new(NestedField::optional(
1048                0,
1049                "bool_field",
1050                Type::Primitive(PrimitiveType::Boolean),
1051            )),
1052            Arc::new(NestedField::optional(
1053                2,
1054                "int32_field",
1055                Type::Primitive(PrimitiveType::Int),
1056            )),
1057            Arc::new(NestedField::optional(
1058                3,
1059                "int64_field",
1060                Type::Primitive(PrimitiveType::Long),
1061            )),
1062            Arc::new(NestedField::optional(
1063                4,
1064                "float32_field",
1065                Type::Primitive(PrimitiveType::Float),
1066            )),
1067            Arc::new(NestedField::optional(
1068                5,
1069                "float64_field",
1070                Type::Primitive(PrimitiveType::Double),
1071            )),
1072            Arc::new(NestedField::optional(
1073                6,
1074                "decimal_field",
1075                Type::Primitive(PrimitiveType::Decimal {
1076                    precision: 10,
1077                    scale: 2,
1078                }),
1079            )),
1080            Arc::new(NestedField::optional(
1081                7,
1082                "date_field",
1083                Type::Primitive(PrimitiveType::Date),
1084            )),
1085            Arc::new(NestedField::optional(
1086                8,
1087                "time_field",
1088                Type::Primitive(PrimitiveType::Time),
1089            )),
1090            Arc::new(NestedField::optional(
1091                9,
1092                "timestamp_micro_field",
1093                Type::Primitive(PrimitiveType::Timestamp),
1094            )),
1095            Arc::new(NestedField::optional(
1096                10,
1097                "timestamp_nao_field",
1098                Type::Primitive(PrimitiveType::TimestampNs),
1099            )),
1100            Arc::new(NestedField::optional(
1101                11,
1102                "string_field",
1103                Type::Primitive(PrimitiveType::String),
1104            )),
1105            Arc::new(NestedField::optional(
1106                12,
1107                "binary_field",
1108                Type::Primitive(PrimitiveType::Binary),
1109            )),
1110        ]);
1111
1112        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1113
1114        assert_eq!(result, vec![
1115            Some(Literal::Struct(Struct::from_iter(vec![
1116                Some(Literal::bool(true)),
1117                Some(Literal::int(3)),
1118                Some(Literal::long(5)),
1119                Some(Literal::float(1.1)),
1120                Some(Literal::double(3.3)),
1121                Some(Literal::decimal(1000)),
1122                Some(Literal::date(18628)),
1123                Some(Literal::time(123456789)),
1124                Some(Literal::timestamp(1622548800000000)),
1125                Some(Literal::timestamp_nano(1622548800000000000)),
1126                Some(Literal::string("a".to_string())),
1127                Some(Literal::binary(b"abc".to_vec())),
1128            ]))),
1129            Some(Literal::Struct(Struct::from_iter(vec![
1130                Some(Literal::bool(false)),
1131                Some(Literal::int(4)),
1132                Some(Literal::long(6)),
1133                Some(Literal::float(2.2)),
1134                Some(Literal::double(4.4)),
1135                Some(Literal::decimal(2000)),
1136                Some(Literal::date(18629)),
1137                Some(Literal::time(987654321)),
1138                Some(Literal::timestamp(1622635200000000)),
1139                Some(Literal::timestamp_nano(1622635200000000000)),
1140                Some(Literal::string("b".to_string())),
1141                Some(Literal::binary(b"def".to_vec())),
1142            ]))),
1143            Some(Literal::Struct(Struct::from_iter(vec![
1144                None, None, None, None, None, None, None, None, None, None, None, None,
1145            ]))),
1146        ]);
1147    }
1148
1149    #[test]
1150    fn test_nullable_struct() {
1151        // test case that partial columns are null
1152        // [
1153        //   {a: null, b: null} // child column is null
1154        //   {a: 1, b: null},   // partial child column is null
1155        //   null               // parent column is null
1156        // ]
1157        let struct_array = {
1158            let mut builder = StructBuilder::from_fields(
1159                Fields::from(vec![
1160                    Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
1161                        PARQUET_FIELD_ID_META_KEY.to_string(),
1162                        "0".to_string(),
1163                    )])),
1164                    Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
1165                        PARQUET_FIELD_ID_META_KEY.to_string(),
1166                        "1".to_string(),
1167                    )])),
1168                ]),
1169                3,
1170            );
1171            builder
1172                .field_builder::<Int32Builder>(0)
1173                .unwrap()
1174                .append_null();
1175            builder
1176                .field_builder::<Int32Builder>(1)
1177                .unwrap()
1178                .append_null();
1179            builder.append(true);
1180
1181            builder
1182                .field_builder::<Int32Builder>(0)
1183                .unwrap()
1184                .append_value(1);
1185            builder
1186                .field_builder::<Int32Builder>(1)
1187                .unwrap()
1188                .append_null();
1189            builder.append(true);
1190
1191            builder
1192                .field_builder::<Int32Builder>(0)
1193                .unwrap()
1194                .append_value(1);
1195            builder
1196                .field_builder::<Int32Builder>(1)
1197                .unwrap()
1198                .append_value(1);
1199            builder.append_null();
1200
1201            Arc::new(builder.finish()) as ArrayRef
1202        };
1203
1204        let iceberg_struct_type = StructType::new(vec![
1205            Arc::new(NestedField::optional(
1206                0,
1207                "a",
1208                Type::Primitive(PrimitiveType::Int),
1209            )),
1210            Arc::new(NestedField::optional(
1211                1,
1212                "b",
1213                Type::Primitive(PrimitiveType::Int),
1214            )),
1215        ]);
1216
1217        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1218        assert_eq!(result, vec![
1219            Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
1220            Some(Literal::Struct(Struct::from_iter(vec![
1221                Some(Literal::int(1)),
1222                None,
1223            ]))),
1224            None,
1225        ]);
1226    }
1227
1228    #[test]
1229    fn test_empty_struct() {
1230        let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
1231        let iceberg_struct_type = StructType::new(vec![]);
1232        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1233        assert_eq!(result, vec![None; 0]);
1234    }
1235
1236    #[test]
1237    fn test_find_field_by_id() {
1238        // Create Arrow arrays for the nested structure
1239        let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1240        let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1241
1242        // Create the nested struct array with field IDs in metadata
1243        let nested_struct_array =
1244            Arc::new(StructArray::from(vec![
1245                (
1246                    Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
1247                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1248                    )),
1249                    Arc::new(field_a_array) as ArrayRef,
1250                ),
1251                (
1252                    Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
1253                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1254                    )),
1255                    Arc::new(field_b_array) as ArrayRef,
1256                ),
1257            ])) as ArrayRef;
1258
1259        let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1260
1261        // Create the top-level struct array with field IDs in metadata
1262        let struct_array = Arc::new(StructArray::from(vec![
1263            (
1264                Arc::new(
1265                    Field::new(
1266                        "nested_struct",
1267                        DataType::Struct(Fields::from(vec![
1268                            Field::new("field_a", DataType::Int32, true).with_metadata(
1269                                HashMap::from([(
1270                                    PARQUET_FIELD_ID_META_KEY.to_string(),
1271                                    "1".to_string(),
1272                                )]),
1273                            ),
1274                            Field::new("field_b", DataType::Utf8, true).with_metadata(
1275                                HashMap::from([(
1276                                    PARQUET_FIELD_ID_META_KEY.to_string(),
1277                                    "2".to_string(),
1278                                )]),
1279                            ),
1280                        ])),
1281                        true,
1282                    )
1283                    .with_metadata(HashMap::from([(
1284                        PARQUET_FIELD_ID_META_KEY.to_string(),
1285                        "3".to_string(),
1286                    )])),
1287                ),
1288                nested_struct_array,
1289            ),
1290            (
1291                Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1292                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1293                )),
1294                Arc::new(field_c_array) as ArrayRef,
1295            ),
1296        ])) as ArrayRef;
1297
1298        // Create an ArrowArrayAccessor with ID matching mode
1299        let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1300
1301        // Test finding fields by ID
1302        let nested_field = NestedField::optional(
1303            3,
1304            "nested_struct",
1305            Type::Struct(StructType::new(vec![
1306                Arc::new(NestedField::optional(
1307                    1,
1308                    "field_a",
1309                    Type::Primitive(PrimitiveType::Int),
1310                )),
1311                Arc::new(NestedField::optional(
1312                    2,
1313                    "field_b",
1314                    Type::Primitive(PrimitiveType::String),
1315                )),
1316            ])),
1317        );
1318        let nested_partner = accessor
1319            .field_partner(&struct_array, &nested_field)
1320            .unwrap();
1321
1322        // Verify we can access the nested field
1323        let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1324        let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1325
1326        // Verify the field has the expected value
1327        let int_array = field_a_partner
1328            .as_any()
1329            .downcast_ref::<Int32Array>()
1330            .unwrap();
1331        assert_eq!(int_array.value(0), 42);
1332        assert_eq!(int_array.value(1), 43);
1333        assert!(int_array.is_null(2));
1334    }
1335
1336    #[test]
1337    fn test_find_field_by_name() {
1338        // Create Arrow arrays for the nested structure
1339        let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1340        let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1341
1342        // Create the nested struct array WITHOUT field IDs in metadata
1343        let nested_struct_array = Arc::new(StructArray::from(vec![
1344            (
1345                Arc::new(Field::new("field_a", DataType::Int32, true)),
1346                Arc::new(field_a_array) as ArrayRef,
1347            ),
1348            (
1349                Arc::new(Field::new("field_b", DataType::Utf8, true)),
1350                Arc::new(field_b_array) as ArrayRef,
1351            ),
1352        ])) as ArrayRef;
1353
1354        let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1355
1356        // Create the top-level struct array WITHOUT field IDs in metadata
1357        let struct_array = Arc::new(StructArray::from(vec![
1358            (
1359                Arc::new(Field::new(
1360                    "nested_struct",
1361                    DataType::Struct(Fields::from(vec![
1362                        Field::new("field_a", DataType::Int32, true),
1363                        Field::new("field_b", DataType::Utf8, true),
1364                    ])),
1365                    true,
1366                )),
1367                nested_struct_array,
1368            ),
1369            (
1370                Arc::new(Field::new("field_c", DataType::Int32, true)),
1371                Arc::new(field_c_array) as ArrayRef,
1372            ),
1373        ])) as ArrayRef;
1374
1375        // Create an ArrowArrayAccessor with Name matching mode
1376        let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1377
1378        // Test finding fields by name
1379        let nested_field = NestedField::optional(
1380            3,
1381            "nested_struct",
1382            Type::Struct(StructType::new(vec![
1383                Arc::new(NestedField::optional(
1384                    1,
1385                    "field_a",
1386                    Type::Primitive(PrimitiveType::Int),
1387                )),
1388                Arc::new(NestedField::optional(
1389                    2,
1390                    "field_b",
1391                    Type::Primitive(PrimitiveType::String),
1392                )),
1393            ])),
1394        );
1395        let nested_partner = accessor
1396            .field_partner(&struct_array, &nested_field)
1397            .unwrap();
1398
1399        // Verify we can access the nested field by name
1400        let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1401        let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1402
1403        // Verify the field has the expected value
1404        let int_array = field_a_partner
1405            .as_any()
1406            .downcast_ref::<Int32Array>()
1407            .unwrap();
1408        assert_eq!(int_array.value(0), 42);
1409        assert_eq!(int_array.value(1), 43);
1410        assert!(int_array.is_null(2));
1411    }
1412
1413    #[test]
1414    fn test_complex_nested() {
1415        // complex nested type for test
1416        // <
1417        //   A: list< struct(a1: int, a2: int) >,
1418        //   B: list< map<int, int> >,
1419        //   C: list< list<int> >,
1420        // >
1421        let struct_type = StructType::new(vec![
1422            Arc::new(NestedField::required(
1423                0,
1424                "A",
1425                Type::List(ListType::new(Arc::new(NestedField::required(
1426                    1,
1427                    "item",
1428                    Type::Struct(StructType::new(vec![
1429                        Arc::new(NestedField::required(
1430                            2,
1431                            "a1",
1432                            Type::Primitive(PrimitiveType::Int),
1433                        )),
1434                        Arc::new(NestedField::required(
1435                            3,
1436                            "a2",
1437                            Type::Primitive(PrimitiveType::Int),
1438                        )),
1439                    ])),
1440                )))),
1441            )),
1442            Arc::new(NestedField::required(
1443                4,
1444                "B",
1445                Type::List(ListType::new(Arc::new(NestedField::required(
1446                    5,
1447                    "item",
1448                    Type::Map(MapType::new(
1449                        NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1450                            .into(),
1451                        NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1452                            .into(),
1453                    )),
1454                )))),
1455            )),
1456            Arc::new(NestedField::required(
1457                8,
1458                "C",
1459                Type::List(ListType::new(Arc::new(NestedField::required(
1460                    9,
1461                    "item",
1462                    Type::List(ListType::new(Arc::new(NestedField::optional(
1463                        10,
1464                        "item",
1465                        Type::Primitive(PrimitiveType::Int),
1466                    )))),
1467                )))),
1468            )),
1469        ]);
1470
1471        // Generate a complex nested struct array
1472        // [
1473        //   {A: [{a1: 10, a2: 20}, {a1: 11, a2: 21}], B: [{(1,100),(3,300)},{(2,200)}], C: [[100,101,102], [200,201]]},
1474        //   {A: [{a1: 12, a2: 22}, {a1: 13, a2: 23}], B: [{(3,300)},{(4,400)}], C: [[300,301,302], [400,401]]},
1475        // ]
1476        let struct_array =
1477            {
1478                let a_struct_a1_builder = Int32Builder::new();
1479                let a_struct_a2_builder = Int32Builder::new();
1480                let a_struct_builder =
1481                    StructBuilder::new(
1482                        vec![
1483                            Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1484                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1485                            )),
1486                            Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1487                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1488                            )),
1489                        ],
1490                        vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1491                    );
1492                let a_builder = ListBuilder::new(a_struct_builder);
1493
1494                let map_key_builder = Int32Builder::new();
1495                let map_value_builder = Int32Builder::new();
1496                let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1497                let b_builder = ListBuilder::new(map_builder);
1498
1499                let inner_list_item_builder = Int32Builder::new();
1500                let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1501                let c_builder = ListBuilder::new(inner_list_builder);
1502
1503                let mut top_struct_builder = {
1504                    let a_struct_type =
1505                        DataType::Struct(Fields::from(vec![
1506                            Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1507                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1508                            )),
1509                            Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1510                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1511                            )),
1512                        ]));
1513                    let a_type =
1514                        DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1515
1516                    let b_map_entry_struct = Field::new(
1517                        "entries",
1518                        DataType::Struct(Fields::from(vec![
1519                            Field::new("keys", DataType::Int32, false),
1520                            Field::new("values", DataType::Int32, true),
1521                        ])),
1522                        false,
1523                    );
1524                    let b_map_type =
1525                        DataType::Map(Arc::new(b_map_entry_struct), /* sorted_keys = */ false);
1526                    let b_type =
1527                        DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1528
1529                    let c_inner_list_type =
1530                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1531                    let c_type = DataType::List(Arc::new(Field::new(
1532                        "item",
1533                        c_inner_list_type.clone(),
1534                        true,
1535                    )));
1536                    StructBuilder::new(
1537                        Fields::from(vec![
1538                            Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1539                                (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1540                            ])),
1541                            Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1542                                (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1543                            ])),
1544                            Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1545                                (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1546                            ])),
1547                        ]),
1548                        vec![
1549                            Box::new(a_builder),
1550                            Box::new(b_builder),
1551                            Box::new(c_builder),
1552                        ],
1553                    )
1554                };
1555
1556                // first row
1557                // {A: [{a1: 10, a2: 20}, {a1: 11, a2: 21}], B: [{(1,100),(3,300)},{(2,200)}], C: [[100,101,102], [200,201]]},
1558                {
1559                    let a_builder = top_struct_builder
1560                        .field_builder::<ListBuilder<StructBuilder>>(0)
1561                        .unwrap();
1562                    let struct_builder = a_builder.values();
1563                    struct_builder
1564                        .field_builder::<Int32Builder>(0)
1565                        .unwrap()
1566                        .append_value(10);
1567                    struct_builder
1568                        .field_builder::<Int32Builder>(1)
1569                        .unwrap()
1570                        .append_value(20);
1571                    struct_builder.append(true);
1572                    let struct_builder = a_builder.values();
1573                    struct_builder
1574                        .field_builder::<Int32Builder>(0)
1575                        .unwrap()
1576                        .append_value(11);
1577                    struct_builder
1578                        .field_builder::<Int32Builder>(1)
1579                        .unwrap()
1580                        .append_value(21);
1581                    struct_builder.append(true);
1582                    a_builder.append(true);
1583                }
1584                {
1585                    let b_builder = top_struct_builder
1586                        .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1587                        .unwrap();
1588                    let map_builder = b_builder.values();
1589                    map_builder.keys().append_value(1);
1590                    map_builder.values().append_value(100);
1591                    map_builder.keys().append_value(3);
1592                    map_builder.values().append_value(300);
1593                    map_builder.append(true).unwrap();
1594
1595                    map_builder.keys().append_value(2);
1596                    map_builder.values().append_value(200);
1597                    map_builder.append(true).unwrap();
1598
1599                    b_builder.append(true);
1600                }
1601                {
1602                    let c_builder = top_struct_builder
1603                        .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1604                        .unwrap();
1605                    let inner_list_builder = c_builder.values();
1606                    inner_list_builder.values().append_value(100);
1607                    inner_list_builder.values().append_value(101);
1608                    inner_list_builder.values().append_value(102);
1609                    inner_list_builder.append(true);
1610                    let inner_list_builder = c_builder.values();
1611                    inner_list_builder.values().append_value(200);
1612                    inner_list_builder.values().append_value(201);
1613                    inner_list_builder.append(true);
1614                    c_builder.append(true);
1615                }
1616                top_struct_builder.append(true);
1617
1618                // second row
1619                // {A: [{a1: 12, a2: 22}, {a1: 13, a2: 23}], B: [{(3,300)}], C: [[300,301,302], [400,401]]},
1620                {
1621                    let a_builder = top_struct_builder
1622                        .field_builder::<ListBuilder<StructBuilder>>(0)
1623                        .unwrap();
1624                    let struct_builder = a_builder.values();
1625                    struct_builder
1626                        .field_builder::<Int32Builder>(0)
1627                        .unwrap()
1628                        .append_value(12);
1629                    struct_builder
1630                        .field_builder::<Int32Builder>(1)
1631                        .unwrap()
1632                        .append_value(22);
1633                    struct_builder.append(true);
1634                    let struct_builder = a_builder.values();
1635                    struct_builder
1636                        .field_builder::<Int32Builder>(0)
1637                        .unwrap()
1638                        .append_value(13);
1639                    struct_builder
1640                        .field_builder::<Int32Builder>(1)
1641                        .unwrap()
1642                        .append_value(23);
1643                    struct_builder.append(true);
1644                    a_builder.append(true);
1645                }
1646                {
1647                    let b_builder = top_struct_builder
1648                        .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1649                        .unwrap();
1650                    let map_builder = b_builder.values();
1651                    map_builder.keys().append_value(3);
1652                    map_builder.values().append_value(300);
1653                    map_builder.append(true).unwrap();
1654
1655                    b_builder.append(true);
1656                }
1657                {
1658                    let c_builder = top_struct_builder
1659                        .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1660                        .unwrap();
1661                    let inner_list_builder = c_builder.values();
1662                    inner_list_builder.values().append_value(300);
1663                    inner_list_builder.values().append_value(301);
1664                    inner_list_builder.values().append_value(302);
1665                    inner_list_builder.append(true);
1666                    let inner_list_builder = c_builder.values();
1667                    inner_list_builder.values().append_value(400);
1668                    inner_list_builder.values().append_value(401);
1669                    inner_list_builder.append(true);
1670                    c_builder.append(true);
1671                }
1672                top_struct_builder.append(true);
1673
1674                Arc::new(top_struct_builder.finish()) as ArrayRef
1675            };
1676
1677        let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1678        assert_eq!(result, vec![
1679            Some(Literal::Struct(Struct::from_iter(vec![
1680                Some(Literal::List(vec![
1681                    Some(Literal::Struct(Struct::from_iter(vec![
1682                        Some(Literal::int(10)),
1683                        Some(Literal::int(20)),
1684                    ]))),
1685                    Some(Literal::Struct(Struct::from_iter(vec![
1686                        Some(Literal::int(11)),
1687                        Some(Literal::int(21)),
1688                    ]))),
1689                ])),
1690                Some(Literal::List(vec![
1691                    Some(Literal::Map(Map::from_iter(vec![
1692                        (Literal::int(1), Some(Literal::int(100))),
1693                        (Literal::int(3), Some(Literal::int(300))),
1694                    ]))),
1695                    Some(Literal::Map(Map::from_iter(vec![(
1696                        Literal::int(2),
1697                        Some(Literal::int(200))
1698                    ),]))),
1699                ])),
1700                Some(Literal::List(vec![
1701                    Some(Literal::List(vec![
1702                        Some(Literal::int(100)),
1703                        Some(Literal::int(101)),
1704                        Some(Literal::int(102)),
1705                    ])),
1706                    Some(Literal::List(vec![
1707                        Some(Literal::int(200)),
1708                        Some(Literal::int(201)),
1709                    ])),
1710                ])),
1711            ]))),
1712            Some(Literal::Struct(Struct::from_iter(vec![
1713                Some(Literal::List(vec![
1714                    Some(Literal::Struct(Struct::from_iter(vec![
1715                        Some(Literal::int(12)),
1716                        Some(Literal::int(22)),
1717                    ]))),
1718                    Some(Literal::Struct(Struct::from_iter(vec![
1719                        Some(Literal::int(13)),
1720                        Some(Literal::int(23)),
1721                    ]))),
1722                ])),
1723                Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1724                    vec![(Literal::int(3), Some(Literal::int(300))),]
1725                ))),])),
1726                Some(Literal::List(vec![
1727                    Some(Literal::List(vec![
1728                        Some(Literal::int(300)),
1729                        Some(Literal::int(301)),
1730                        Some(Literal::int(302)),
1731                    ])),
1732                    Some(Literal::List(vec![
1733                        Some(Literal::int(400)),
1734                        Some(Literal::int(401)),
1735                    ])),
1736                ])),
1737            ]))),
1738        ]);
1739    }
1740
1741    #[test]
1742    fn test_create_decimal_array_respects_precision() {
1743        // Decimal128Array::from() uses Arrow's default precision (38) instead of the
1744        // target precision, causing RecordBatch construction to fail when schemas don't match.
1745        let target_precision = 18u8;
1746        let target_scale = 10i8;
1747        let target_type = DataType::Decimal128(target_precision, target_scale);
1748        let value = PrimitiveLiteral::Int128(10000000000);
1749
1750        let array = create_primitive_array_single_element(&target_type, &Some(value))
1751            .expect("Failed to create decimal array");
1752
1753        match array.data_type() {
1754            DataType::Decimal128(precision, scale) => {
1755                assert_eq!(*precision, target_precision);
1756                assert_eq!(*scale, target_scale);
1757            }
1758            other => panic!("Expected Decimal128, got {other:?}"),
1759        }
1760    }
1761
1762    #[test]
1763    fn test_create_decimal_array_repeated_respects_precision() {
1764        // Ensure repeated arrays also respect target precision, not Arrow's default.
1765        let target_precision = 18u8;
1766        let target_scale = 10i8;
1767        let target_type = DataType::Decimal128(target_precision, target_scale);
1768        let value = PrimitiveLiteral::Int128(10000000000);
1769        let num_rows = 5;
1770
1771        let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1772            .expect("Failed to create repeated decimal array");
1773
1774        match array.data_type() {
1775            DataType::Decimal128(precision, scale) => {
1776                assert_eq!(*precision, target_precision);
1777                assert_eq!(*scale, target_scale);
1778            }
1779            other => panic!("Expected Decimal128, got {other:?}"),
1780        }
1781
1782        assert_eq!(array.len(), num_rows);
1783    }
1784}