Skip to main content

mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34    columns: Vec<ArrowColumn>,
35    /// A crude estimate of the size of the data in the builder
36    /// based on the size of the rows added to it.
37    row_size_bytes: usize,
38    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
39    original_schema: Option<Arc<Schema>>,
40}
41
42/// Converts a RelationDesc to an Arrow Schema.
43pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44    desc_to_schema_with_overrides(desc, |_| None)
45}
46
47/// Converts a RelationDesc to an Arrow Schema, with optional type overrides.
48///
49/// The `overrides` function receives a SqlScalarType and can return:
50/// - `Some((DataType, String))` to override the default mapping
51/// - `None` to use the default mapping from `scalar_to_arrow_datatype`
52///
53/// This allows destinations like Iceberg to customize type mappings for
54/// unsupported types (e.g., UInt64 -> Decimal128) while keeping the standard
55/// mapping for everything else.
56pub fn desc_to_schema_with_overrides<F>(
57    desc: &RelationDesc,
58    overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63    let mut fields = vec![];
64    let mut errs = vec![];
65    let mut seen_names = BTreeSet::new();
66    let mut all_names = BTreeSet::new();
67    for col_name in desc.iter_names() {
68        all_names.insert(col_name.to_string());
69    }
70    for (col_name, col_type) in desc.iter() {
71        let mut col_name = col_name.to_string();
72        // If we allow columns with the same name we encounter two issues:
73        // 1. The arrow crate will accidentally reuse the same buffers for the columns
74        // 2. Many parquet readers will error when trying to read the file metadata
75        // Instead we append a number to the end of the column name for any duplicates.
76        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
77
78        if seen_names.contains(&col_name) {
79            let mut new_col_name = col_name.clone();
80            let mut e = 2;
81            while all_names.contains(&new_col_name) {
82                new_col_name = format!("{}{}", col_name, e);
83                e += 1;
84            }
85            col_name = new_col_name;
86        }
87
88        seen_names.insert(col_name.clone());
89        all_names.insert(col_name.clone());
90
91        // Use the override-aware version which applies overrides recursively
92        let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94        match result {
95            Ok((data_type, extension_type_name)) => {
96                fields.push(field_with_typename(
97                    &col_name,
98                    data_type,
99                    col_type.nullable,
100                    &extension_type_name,
101                ));
102            }
103            Err(err) => errs.push(err.to_string()),
104        }
105    }
106    if !errs.is_empty() {
107        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108    }
109    Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113    /// Helper to validate that a RelationDesc can be encoded into Arrow.
114    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115        let mut errs = vec![];
116        for (col_name, col_type) in desc.iter() {
117            match scalar_to_arrow_datatype(&col_type.scalar_type) {
118                Ok(_) => {}
119                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120            }
121        }
122        if !errs.is_empty() {
123            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124        }
125        Ok(())
126    }
127
128    /// Helper to validate that a RelationDesc, after applying `overrides`, can
129    /// be encoded into Arrow AND converted from Arrow into parquet by
130    /// arrow-rs's `ArrowWriter`.
131    ///
132    /// Arrow encodability is a superset of parquet encodability — some Arrow
133    /// types are not (yet) implemented by arrow-rs's parquet writer. Callers
134    /// that write parquet must reject these up-front: copy-to-s3 writes an
135    /// `INCOMPLETE` sentinel during preflight and does not clean it up on a
136    /// failed upload, so a runtime failure inside the parquet writer leaves
137    /// the path in a state that blocks subsequent copies.
138    ///
139    /// Pass the same `overrides` you pass to [`desc_to_schema_with_overrides`]
140    /// — otherwise this check will reject types your sink is going to remap.
141    /// To add a new banned arrow datatype, extend `parquet_incompatible_type`.
142    pub fn validate_desc_for_parquet<F>(
143        desc: &RelationDesc,
144        overrides: F,
145    ) -> Result<(), anyhow::Error>
146    where
147        F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
148    {
149        let mut errs = vec![];
150        for (col_name, col_type) in desc.iter() {
151            let dt =
152                match scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides) {
153                    Ok((dt, _)) => dt,
154                    Err(_) => {
155                        errs.push(format!("{}: {:?}", col_name, col_type.scalar_type));
156                        continue;
157                    }
158                };
159            if let Some(reason) = parquet_incompatible_type(&dt) {
160                errs.push(format!(
161                    "{}: {:?} ({})",
162                    col_name, col_type.scalar_type, reason
163                ));
164            }
165        }
166        if !errs.is_empty() {
167            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
168        }
169        Ok(())
170    }
171
172    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
173    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
174    /// the number of values that can be appended to each column before reallocating.
175    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
176    /// Errors if the relation contains an unimplemented type.
177    pub fn new(
178        desc: &RelationDesc,
179        item_capacity: usize,
180        data_capacity: usize,
181    ) -> Result<Self, anyhow::Error> {
182        let schema = desc_to_schema(desc)?;
183        let mut columns = vec![];
184        for field in schema.fields() {
185            columns.push(ArrowColumn::new(
186                field.name().clone(),
187                field.is_nullable(),
188                field.data_type().clone(),
189                typename_from_field(field)?,
190                item_capacity,
191                data_capacity,
192            )?);
193        }
194        Ok(Self {
195            columns,
196            row_size_bytes: 0,
197            original_schema: None,
198        })
199    }
200
201    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
202    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
203    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
204    /// the number of values that can be appended to each column before reallocating.
205    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
206    /// Errors if the schema contains an unimplemented type.
207    pub fn new_with_schema(
208        schema: Arc<Schema>,
209        item_capacity: usize,
210        data_capacity: usize,
211    ) -> Result<Self, anyhow::Error> {
212        let mut columns = vec![];
213        for field in schema.fields() {
214            columns.push(ArrowColumn::new(
215                field.name().clone(),
216                field.is_nullable(),
217                field.data_type().clone(),
218                typename_from_field(field)?,
219                item_capacity,
220                data_capacity,
221            )?);
222        }
223        Ok(Self {
224            columns,
225            row_size_bytes: 0,
226            original_schema: Some(schema),
227        })
228    }
229
230    /// Returns a copy of the schema of the ArrowBuilder.
231    pub fn schema(&self) -> Schema {
232        Schema::new(
233            self.columns
234                .iter()
235                .map(Into::<Field>::into)
236                .collect::<Vec<_>>(),
237        )
238    }
239
240    /// Converts the ArrowBuilder into an arrow RecordBatch.
241    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
242        let mut arrays = vec![];
243        let mut fields: Vec<Field> = vec![];
244        for mut col in self.columns.into_iter() {
245            arrays.push(col.finish());
246            fields.push((&col).into());
247        }
248
249        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
250        let schema = if let Some(original_schema) = self.original_schema {
251            original_schema
252        } else {
253            Arc::new(Schema::new(fields))
254        };
255
256        RecordBatch::try_new(schema, arrays)
257    }
258
259    /// Appends a row to the builder.
260    /// Errors if the row contains an unimplemented or out-of-range value.
261    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
262        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
263            col.append_datum(datum)?;
264        }
265        self.row_size_bytes += row.byte_len();
266        Ok(())
267    }
268
269    pub fn row_size_bytes(&self) -> usize {
270        self.row_size_bytes
271    }
272}
273
274/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
275/// that should be used as part of the Arrow 'Extension Type' name for fields using
276/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
277fn scalar_to_arrow_datatype(
278    scalar_type: &SqlScalarType,
279) -> Result<(DataType, String), anyhow::Error> {
280    scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
281}
282
283/// Returns a description of why this Arrow [`DataType`] cannot be written to
284/// parquet by arrow-rs's `ArrowWriter`, or `None` if the type is supported.
285/// Recurses into composite types so a banned type is rejected even if it is
286/// nested inside a list/struct/map.
287///
288/// This is an allowlist mirroring what arrow-rs's parquet writer accepts in
289/// its `get_arrow_column_writer` and `write_leaf` paths. To support a new
290/// leaf type, add an arm to the allowlist match below — but only after
291/// verifying that arrow-rs's parquet writer can actually emit it.
292fn parquet_incompatible_type(dt: &DataType) -> Option<&'static str> {
293    use arrow::datatypes::IntervalUnit;
294    match dt {
295        DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
296            parquet_incompatible_type(f.data_type())
297        }
298        DataType::Map(f, _) => parquet_incompatible_type(f.data_type()),
299        DataType::Struct(fields) => fields
300            .iter()
301            .find_map(|f| parquet_incompatible_type(f.data_type())),
302
303        // Allowlist of supported leaf types.
304        DataType::Null
305        | DataType::Boolean
306        | DataType::Int8
307        | DataType::Int16
308        | DataType::Int32
309        | DataType::Int64
310        | DataType::UInt8
311        | DataType::UInt16
312        | DataType::UInt32
313        | DataType::UInt64
314        | DataType::Float16
315        | DataType::Float32
316        | DataType::Float64
317        | DataType::Date32
318        | DataType::Date64
319        | DataType::Time32(_)
320        | DataType::Time64(_)
321        | DataType::Timestamp(_, _)
322        | DataType::Duration(_)
323        | DataType::Interval(IntervalUnit::YearMonth | IntervalUnit::DayTime)
324        | DataType::Utf8
325        | DataType::LargeUtf8
326        | DataType::Utf8View
327        | DataType::Binary
328        | DataType::LargeBinary
329        | DataType::BinaryView
330        | DataType::FixedSizeBinary(_)
331        | DataType::Decimal32(_, _)
332        | DataType::Decimal64(_, _)
333        | DataType::Decimal128(_, _)
334        | DataType::Decimal256(_, _) => None,
335
336        // Fail closed: anything not on the allowlist is rejected.
337        _ => Some("unsupported arrow datatype"),
338    }
339}
340
341/// Return the appropriate Arrow DataType for the given SqlScalarType, with optional
342/// type overrides. The override function is called for each SqlScalarType (including
343/// nested types) and can return Some((DataType, extension_name)) to override the
344/// default mapping.
345fn scalar_to_arrow_datatype_with_overrides<F>(
346    scalar_type: &SqlScalarType,
347    overrides: &F,
348) -> Result<(DataType, String), anyhow::Error>
349where
350    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
351{
352    scalar_to_arrow_datatype_impl(scalar_type, overrides)
353}
354
355/// Core implementation of scalar-to-arrow type conversion with override support.
356fn scalar_to_arrow_datatype_impl<F>(
357    scalar_type: &SqlScalarType,
358    overrides: &F,
359) -> Result<(DataType, String), anyhow::Error>
360where
361    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
362{
363    // Check for override first
364    if let Some(result) = overrides(scalar_type) {
365        return Ok(result);
366    }
367    let (data_type, extension_name) = match scalar_type {
368        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
369        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
370        SqlScalarType::Int32 => (DataType::Int32, "integer"),
371        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
372        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
373        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
374        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
375        SqlScalarType::Oid => (DataType::UInt32, "oid"),
376        SqlScalarType::Float32 => (DataType::Float32, "real"),
377        SqlScalarType::Float64 => (DataType::Float64, "double"),
378        SqlScalarType::Date => (DataType::Date32, "date"),
379        // The resolution of our time and timestamp types is microseconds, which is lucky
380        // since the original parquet 'ConvertedType's support microsecond resolution but not
381        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
382        // but many readers don't support them yet.
383        SqlScalarType::Time => (
384            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
385            "time",
386        ),
387        SqlScalarType::Timestamp { .. } => (
388            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
389            "timestamp",
390        ),
391        SqlScalarType::TimestampTz { .. } => (
392            DataType::Timestamp(
393                arrow::datatypes::TimeUnit::Microsecond,
394                // When appending values we always use UTC timestamps, and setting this to a non-empty
395                // value allows readers to know that tz-aware timestamps can be compared directly.
396                Some("+00:00".into()),
397            ),
398            "timestamptz",
399        ),
400        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
401        SqlScalarType::Char { length } => {
402            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
403                (DataType::Utf8, "text")
404            } else {
405                (DataType::LargeUtf8, "text")
406            }
407        }
408        SqlScalarType::VarChar { max_length } => {
409            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
410                (DataType::Utf8, "text")
411            } else {
412                (DataType::LargeUtf8, "text")
413            }
414        }
415        SqlScalarType::String => (DataType::LargeUtf8, "text"),
416        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
417        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
418        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
419        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
420        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
421        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
422        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
423        SqlScalarType::Numeric { max_scale } => {
424            // Materialize allows 39 digits of precision for numeric values, but allows
425            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
426            // the same column. However, Arrow/Parquet only allows static declaration of both
427            // the precision and the scale. To represent the full range of values of a numeric
428            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
429            // type can only support 76 digits, so we are be unable to represent the entire range.
430
431            // Instead of representing the full possible range, we instead try to represent most
432            // values in the most-compatible way. We use a Decimal128 type which can handle 38
433            // digits of precision and has more compatibility with other parquet readers than
434            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
435            // error if we encounter a value that is too large to represent, and if that happens
436            // a user can choose to cast the column to a string to represent the value.
437            match max_scale {
438                Some(scale) => {
439                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
440                    if scale <= DECIMAL128_MAX_SCALE {
441                        (
442                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
443                            "numeric",
444                        )
445                    } else {
446                        anyhow::bail!("Numeric max scale {} out of range", scale)
447                    }
448                }
449                None => (
450                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
451                    "numeric",
452                ),
453            }
454        }
455        SqlScalarType::Interval => (
456            DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano),
457            "interval",
458        ),
459        SqlScalarType::Array(inner) => {
460            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
461            // enforced in the type system, so can change per-value.
462            // We use a struct type with two fields - one containing the array elements as a list
463            // and the other containing the number of dimensions the array represents. Since arrays
464            // are not allowed to be ragged, the number of elements in each dimension is the same.
465            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
466            // TODO: Document these field names in our copy-to-s3 docs
467            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
468            let list_field = Arc::new(field_with_typename(
469                "items",
470                DataType::List(inner_field.into()),
471                false,
472                "array_items",
473            ));
474            let dims_field = Arc::new(field_with_typename(
475                "dimensions",
476                DataType::UInt8,
477                false,
478                "array_dimensions",
479            ));
480            (DataType::Struct([list_field, dims_field].into()), "array")
481        }
482        SqlScalarType::List {
483            element_type,
484            custom_id: _,
485        } => {
486            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
487            // TODO: Document these field names in our copy-to-s3 docs
488            let field = field_with_typename("item", inner_type, true, &inner_name);
489            (DataType::List(field.into()), "list")
490        }
491        SqlScalarType::Map {
492            value_type,
493            custom_id: _,
494        } => {
495            let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
496            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
497            let field_names = MapFieldNames::default();
498            let struct_type = DataType::Struct(
499                vec![
500                    Field::new(&field_names.key, DataType::Utf8, false),
501                    field_with_typename(&field_names.value, value_type, true, &value_name),
502                ]
503                .into(),
504            );
505            (
506                DataType::Map(
507                    Field::new(&field_names.entry, struct_type, false).into(),
508                    false,
509                ),
510                "map",
511            )
512        }
513        SqlScalarType::Record {
514            fields,
515            custom_id: _,
516        } => {
517            // Records are represented as Arrow Structs with one field per record field.
518            // At runtime, records are stored as Datum::List with field values in order.
519            let mut arrow_fields = Vec::with_capacity(fields.len());
520            for (field_name, field_type) in fields.iter() {
521                let (inner_type, inner_extension_name) =
522                    scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
523                let field = field_with_typename(
524                    field_name.as_str(),
525                    inner_type,
526                    field_type.nullable,
527                    &inner_extension_name,
528                );
529                arrow_fields.push(Arc::new(field));
530            }
531            (DataType::Struct(arrow_fields.into()), "record")
532        }
533        SqlScalarType::Range { element_type } => {
534            // Ranges are represented as Arrow Structs with 5 fields:
535            //   - lower: nullable element value (the bound datum, null = infinite)
536            //   - upper: nullable element value (the bound datum, null = infinite)
537            //   - lower_inclusive: bool
538            //   - upper_inclusive: bool
539            //   - empty: bool
540            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
541            let lower_field = Arc::new(field_with_typename(
542                "lower",
543                inner_type.clone(),
544                true,
545                &inner_name,
546            ));
547            let upper_field = Arc::new(field_with_typename("upper", inner_type, true, &inner_name));
548            let lower_inclusive_field = Arc::new(field_with_typename(
549                "lower_inclusive",
550                DataType::Boolean,
551                false,
552                "boolean",
553            ));
554            let upper_inclusive_field = Arc::new(field_with_typename(
555                "upper_inclusive",
556                DataType::Boolean,
557                false,
558                "boolean",
559            ));
560            let empty_field = Arc::new(field_with_typename(
561                "empty",
562                DataType::Boolean,
563                false,
564                "boolean",
565            ));
566            (
567                DataType::Struct(
568                    [
569                        lower_field,
570                        upper_field,
571                        lower_inclusive_field,
572                        upper_inclusive_field,
573                        empty_field,
574                    ]
575                    .into(),
576                ),
577                "range",
578            )
579        }
580        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
581    };
582    Ok((data_type, extension_name.to_lowercase()))
583}
584
585fn builder_for_datatype(
586    data_type: &DataType,
587    item_capacity: usize,
588    data_capacity: usize,
589) -> Result<ColBuilder, anyhow::Error> {
590    let builder = match &data_type {
591        DataType::Boolean => {
592            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
593        }
594        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
595        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
596        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
597        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
598        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
599        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
600        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
601        DataType::Float32 => {
602            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
603        }
604        DataType::Float64 => {
605            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
606        }
607        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
608        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
609            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
610                item_capacity,
611            ))
612        }
613        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
614            ColBuilder::TimestampMicrosecondBuilder(
615                TimestampMicrosecondBuilder::with_capacity(item_capacity)
616                    .with_timezone_opt(timezone.clone()),
617            )
618        }
619        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
620            item_capacity,
621            data_capacity,
622        )),
623        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
624            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
625        ),
626        DataType::Utf8 => {
627            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
628        }
629        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
630            item_capacity,
631            data_capacity,
632        )),
633        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
634            Decimal128Builder::with_capacity(item_capacity)
635                .with_precision_and_scale(*precision, *scale)?,
636        ),
637        DataType::List(field) => {
638            let inner_col_builder = ArrowColumn::new(
639                field.name().clone(),
640                field.is_nullable(),
641                field.data_type().clone(),
642                typename_from_field(field)?,
643                item_capacity,
644                data_capacity,
645            )?;
646            ColBuilder::ListBuilder(Box::new(
647                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
648            ))
649        }
650        DataType::Struct(fields) => {
651            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
652            for field in fields {
653                let inner_col_builder = ArrowColumn::new(
654                    field.name().clone(),
655                    field.is_nullable(),
656                    field.data_type().clone(),
657                    typename_from_field(field)?,
658                    item_capacity,
659                    data_capacity,
660                )?;
661                field_builders.push(Box::new(inner_col_builder));
662            }
663            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
664        }
665        DataType::Map(entries_field, _sorted) => {
666            let entries_field = entries_field.as_ref();
667            if let DataType::Struct(fields) = entries_field.data_type() {
668                if fields.len() != 2 {
669                    anyhow::bail!(
670                        "Expected map entries to have 2 fields, found {}",
671                        fields.len()
672                    )
673                }
674                let key_field = &fields[0];
675                let value_field = &fields[1];
676                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
677                let value_builder = ArrowColumn::new(
678                    value_field.name().clone(),
679                    value_field.is_nullable(),
680                    value_field.data_type().clone(),
681                    typename_from_field(value_field)?,
682                    item_capacity,
683                    data_capacity,
684                )?;
685                // Use the names from the schema's entries struct rather than
686                // arrow-rs's defaults (`entries`/`keys`/`values`) — when the
687                // schema came from Iceberg (`key_value`/`key`/`value`) the
688                // RecordBatch validation rejects the mismatched DataType.
689                let field_names = MapFieldNames {
690                    entry: entries_field.name().clone(),
691                    key: key_field.name().clone(),
692                    value: value_field.name().clone(),
693                };
694                // Forward both inner fields so any metadata the schema set
695                // (e.g. Iceberg's PARQUET:field_id) survives onto the
696                // MapArray's nested fields; otherwise RecordBatch::try_new
697                // rejects the batch as schema-mismatched.
698                ColBuilder::MapBuilder(Box::new(
699                    MapBuilder::with_capacity(
700                        Some(field_names),
701                        key_builder,
702                        value_builder,
703                        item_capacity,
704                    )
705                    .with_keys_field(Arc::clone(key_field))
706                    .with_values_field(Arc::clone(value_field)),
707                ))
708            } else {
709                anyhow::bail!("Expected map entries to be a struct")
710            }
711        }
712        DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano) => {
713            ColBuilder::IntervalMonthDayNanoBuilder(IntervalMonthDayNanoBuilder::with_capacity(
714                item_capacity,
715            ))
716        }
717        _ => anyhow::bail!("{:?} unimplemented", data_type),
718    };
719    Ok(builder)
720}
721
722#[derive(Debug)]
723struct ArrowColumn {
724    field_name: String,
725    nullable: bool,
726    data_type: DataType,
727    extension_type_name: String,
728    inner: ColBuilder,
729}
730
731impl From<&ArrowColumn> for Field {
732    fn from(col: &ArrowColumn) -> Self {
733        field_with_typename(
734            &col.field_name,
735            col.data_type.clone(),
736            col.nullable,
737            &col.extension_type_name,
738        )
739    }
740}
741
742/// Create a Field and include the materialize 'type name' as an extension in the metadata.
743fn field_with_typename(
744    name: &str,
745    data_type: DataType,
746    nullable: bool,
747    extension_type_name: &str,
748) -> Field {
749    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
750        ARROW_EXTENSION_NAME_KEY.to_string(),
751        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
752    )]))
753}
754
755/// Extract the materialize 'type name' from the metadata of a Field.
756/// Returns an error if the field doesn't have extension metadata.
757fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
758    let metadata = field.metadata();
759    let extension_name = metadata
760        .get(ARROW_EXTENSION_NAME_KEY)
761        .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
762    extension_name
763        .strip_prefix(EXTENSION_PREFIX)
764        .map(|s| s.to_string())
765        .ok_or_else(|| {
766            anyhow::anyhow!(
767                "Field '{}' extension name '{}' missing expected prefix '{}'",
768                field.name(),
769                extension_name,
770                EXTENSION_PREFIX
771            )
772        })
773}
774
775impl ArrowColumn {
776    fn new(
777        field_name: String,
778        nullable: bool,
779        data_type: DataType,
780        extension_type_name: String,
781        item_capacity: usize,
782        data_capacity: usize,
783    ) -> Result<Self, anyhow::Error> {
784        Ok(Self {
785            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
786            field_name,
787            nullable,
788            data_type,
789            extension_type_name,
790        })
791    }
792}
793
794macro_rules! make_col_builder {
795    ($($x:ident), *) => {
796        /// An enum wrapper for all arrow builder types that we support. Used to store
797        /// a builder for each column and avoid dynamic dispatch and downcasting
798        /// when appending data.
799        #[derive(Debug)]
800        enum ColBuilder {
801            $(
802                $x($x),
803            )*
804            /// ListBuilder & MapBuilder are handled separately than other builder types since they
805            /// uses generic parameters for the inner types, and are boxed to avoid recursive
806            /// type definitions.
807            ListBuilder(Box<ListBuilder<ArrowColumn>>),
808            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
809            /// StructBuilder is handled separately since its `append_null()` method must be
810            /// overriden to both append nulls to all field builders and to append a null to
811            /// the struct. It's unclear why `arrow-rs` implemented this differently than
812            /// ListBuilder and MapBuilder.
813            StructBuilder(StructBuilder),
814        }
815
816        impl ColBuilder {
817            fn append_null(&mut self) {
818                match self {
819                    $(
820                        ColBuilder::$x(builder) => builder.append_null(),
821                    )*
822                    ColBuilder::ListBuilder(builder) => builder.append_null(),
823                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
824                    ColBuilder::StructBuilder(builder) => {
825                        for i in 0..builder.num_fields() {
826                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
827                            field_builder.inner.append_null();
828                        }
829                        builder.append_null();
830                    }
831                }
832            }
833        }
834
835        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
836        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
837        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
838        /// data to the column.
839        impl ArrayBuilder for ArrowColumn {
840            fn len(&self) -> usize {
841                match &self.inner {
842                    $(
843                        ColBuilder::$x(builder) => builder.len(),
844                    )*
845                    ColBuilder::ListBuilder(builder) => builder.len(),
846                    ColBuilder::MapBuilder(builder) => builder.len(),
847                    ColBuilder::StructBuilder(builder) => builder.len(),
848                }
849            }
850            fn finish(&mut self) -> ArrayRef {
851                match &mut self.inner {
852                    $(
853                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
854                    )*
855                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
856                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
857                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
858                }
859            }
860            fn finish_cloned(&self) -> ArrayRef {
861                match &self.inner {
862                    $(
863                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
864                    )*
865                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
866                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
867                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
868                }
869            }
870            fn as_any(&self) -> &(dyn Any + 'static) {
871                self
872            }
873            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
874                self
875            }
876            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
877                self
878            }
879        }
880    };
881}
882
883make_col_builder!(
884    BooleanBuilder,
885    Int16Builder,
886    Int32Builder,
887    Int64Builder,
888    UInt8Builder,
889    UInt16Builder,
890    UInt32Builder,
891    UInt64Builder,
892    Float32Builder,
893    Float64Builder,
894    Date32Builder,
895    Time64MicrosecondBuilder,
896    TimestampMicrosecondBuilder,
897    LargeBinaryBuilder,
898    FixedSizeBinaryBuilder,
899    StringBuilder,
900    LargeStringBuilder,
901    Decimal128Builder,
902    IntervalMonthDayNanoBuilder
903);
904
905impl ArrowColumn {
906    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
907        match (&mut self.inner, datum) {
908            (s, Datum::Null) => s.append_null(),
909            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
910            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
911            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
912            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
913            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
914            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
915            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
916            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
917            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
918            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
919            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
920            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
921                builder.append_value(d.unix_epoch_days())
922            }
923            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
924                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
925                    * 1_000_000
926                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
927                builder.append_value(micros_since_midnight)
928            }
929            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
930                builder.append_value(ts.and_utc().timestamp_micros())
931            }
932            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
933                builder.append_value(ts.timestamp_micros())
934            }
935            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
936            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
937                builder.append_value(val.as_bytes())?
938            }
939            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
940            (ColBuilder::StringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
941                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
942            }
943            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
944                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
945            }
946            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
947            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
948                builder.append_value(ts.into())
949            }
950            // Lossless unsigned-to-signed promotions for destinations that don't
951            // support unsigned types (e.g., Iceberg).
952            (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
953                builder.append_value(i32::from(i))
954            }
955            // Lossless signed-to-signed widening for destinations that don't
956            // support narrow integers (e.g., Iceberg has no smallint).
957            (ColBuilder::Int32Builder(builder), Datum::Int16(i)) => {
958                builder.append_value(i32::from(i))
959            }
960            (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
961                builder.append_value(i64::from(i))
962            }
963            (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
964                builder.append_value(i128::from(i))
965            }
966            (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
967                builder.append_value(i128::from(u64::from(ts)))
968            }
969            // Interval-to-string conversion for destinations that don't support
970            // interval types natively (e.g., Iceberg).
971            (ColBuilder::StringBuilder(builder), Datum::Interval(iv)) => {
972                builder.append_value(iv.to_string())
973            }
974            (ColBuilder::LargeStringBuilder(builder), Datum::Interval(iv)) => {
975                builder.append_value(iv.to_string())
976            }
977            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
978                if dec.0.is_special() {
979                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
980                }
981                if let DataType::Decimal128(precision, scale) = self.data_type {
982                    if dec.0.digits() > precision.into() {
983                        anyhow::bail!(
984                            "Decimal value {} out of range for column with precision {}",
985                            dec,
986                            precision
987                        )
988                    }
989
990                    // Get the signed-coefficient represented as an i128, and the exponent such that
991                    // the number should equal coefficient*10^exponent.
992                    let coefficient: i128 = dec.0.coefficient()?;
993                    let exponent = dec.0.exponent();
994
995                    // Convert the value to use the scale of the column (add 0's to align the decimal
996                    // point correctly). This is done by multiplying the coefficient by
997                    // 10^(scale + exponent).
998                    let scale_diff = i32::from(scale) + exponent;
999                    // If the scale_diff is negative, we know there aren't enough digits in our
1000                    // column's scale to represent this value.
1001                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
1002                        anyhow::anyhow!(
1003                            "cannot represent decimal value {} in column with scale {}",
1004                            dec,
1005                            scale
1006                        )
1007                    })?;
1008
1009                    let value = coefficient
1010                        .checked_mul(10_i128.pow(scale_diff))
1011                        .ok_or_else(|| {
1012                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
1013                        })?;
1014
1015                    builder.append_value(value)
1016                } else {
1017                    anyhow::bail!("Expected Decimal128 data type")
1018                }
1019            }
1020            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
1021                // We've received an array datum which we know is represented as an Arrow struct
1022                // with two fields: the list of elements and the number of dimensions
1023                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
1024                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
1025                    let inner_builder = list_builder.values();
1026                    for datum in arr.elements().into_iter() {
1027                        inner_builder.append_datum(datum)?;
1028                    }
1029                    list_builder.append(true);
1030                } else {
1031                    anyhow::bail!(
1032                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
1033                        struct_builder
1034                    )
1035                }
1036                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
1037                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
1038                    dims_builder.append_value(arr.dims().ndims());
1039                } else {
1040                    anyhow::bail!(
1041                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
1042                        struct_builder
1043                    )
1044                }
1045                struct_builder.append(true)
1046            }
1047            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
1048                let inner_builder = list_builder.values();
1049                for datum in list.into_iter() {
1050                    inner_builder.append_datum(datum)?;
1051                }
1052                list_builder.append(true)
1053            }
1054            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
1055                for (key, value) in map.iter() {
1056                    builder.keys().append_value(key);
1057                    builder.values().append_datum(value)?;
1058                }
1059                builder.append(true).unwrap()
1060            }
1061            // Records are stored as Datum::List at runtime but written to a StructBuilder.
1062            // Arrays use Datum::Array (handled above), so Datum::List with StructBuilder is a record.
1063            (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
1064                let field_count = struct_builder.num_fields();
1065                for (i, datum) in list.into_iter().enumerate() {
1066                    if i >= field_count {
1067                        anyhow::bail!(
1068                            "Record has more elements ({}) than struct fields ({})",
1069                            i + 1,
1070                            field_count
1071                        );
1072                    }
1073                    let field_builder: &mut ArrowColumn = struct_builder
1074                        .field_builder(i)
1075                        .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
1076                    field_builder.append_datum(datum)?;
1077                }
1078                struct_builder.append(true);
1079            }
1080            (ColBuilder::StructBuilder(struct_builder), Datum::Range(range)) => {
1081                // Ranges are represented as a struct with 5 fields:
1082                //   0: lower (nullable element value)
1083                //   1: upper (nullable element value)
1084                //   2: lower_inclusive (bool)
1085                //   3: upper_inclusive (bool)
1086                //   4: empty (bool)
1087                match range.inner {
1088                    None => {
1089                        // Empty range: null bounds, inclusive=false, empty=true
1090                        struct_builder
1091                            .field_builder::<ArrowColumn>(0)
1092                            .unwrap()
1093                            .append_datum(Datum::Null)?;
1094                        struct_builder
1095                            .field_builder::<ArrowColumn>(1)
1096                            .unwrap()
1097                            .append_datum(Datum::Null)?;
1098                        struct_builder
1099                            .field_builder::<ArrowColumn>(2)
1100                            .unwrap()
1101                            .append_datum(Datum::False)?;
1102                        struct_builder
1103                            .field_builder::<ArrowColumn>(3)
1104                            .unwrap()
1105                            .append_datum(Datum::False)?;
1106                        struct_builder
1107                            .field_builder::<ArrowColumn>(4)
1108                            .unwrap()
1109                            .append_datum(Datum::True)?;
1110                    }
1111                    Some(inner) => {
1112                        struct_builder
1113                            .field_builder::<ArrowColumn>(0)
1114                            .unwrap()
1115                            .append_datum(
1116                                inner.lower.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1117                            )?;
1118                        struct_builder
1119                            .field_builder::<ArrowColumn>(1)
1120                            .unwrap()
1121                            .append_datum(
1122                                inner.upper.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1123                            )?;
1124                        struct_builder
1125                            .field_builder::<ArrowColumn>(2)
1126                            .unwrap()
1127                            .append_datum(if inner.lower.inclusive {
1128                                Datum::True
1129                            } else {
1130                                Datum::False
1131                            })?;
1132                        struct_builder
1133                            .field_builder::<ArrowColumn>(3)
1134                            .unwrap()
1135                            .append_datum(if inner.upper.inclusive {
1136                                Datum::True
1137                            } else {
1138                                Datum::False
1139                            })?;
1140                        struct_builder
1141                            .field_builder::<ArrowColumn>(4)
1142                            .unwrap()
1143                            .append_datum(Datum::False)?;
1144                    }
1145                }
1146                // Mark the struct row as non-null. StructBuilder tracks per-row
1147                // validity independently from the child builders, so this call
1148                // is required once per outer row regardless of the field values.
1149                struct_builder.append(true);
1150            }
1151            (ColBuilder::IntervalMonthDayNanoBuilder(builder), Datum::Interval(iv)) => {
1152                let nanos = iv.micros.checked_mul(1_000).ok_or_else(|| {
1153                    anyhow::anyhow!(
1154                        "interval microseconds {} overflow i64 nanoseconds",
1155                        iv.micros
1156                    )
1157                })?;
1158                builder.append_value(arrow::datatypes::IntervalMonthDayNano::new(
1159                    iv.months, iv.days, nanos,
1160                ));
1161            }
1162            (builder, datum) => {
1163                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
1164            }
1165        }
1166        Ok(())
1167    }
1168}