Skip to main content

mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34    columns: Vec<ArrowColumn>,
35    /// A crude estimate of the size of the data in the builder
36    /// based on the size of the rows added to it.
37    row_size_bytes: usize,
38    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
39    original_schema: Option<Arc<Schema>>,
40}
41
42/// Converts a RelationDesc to an Arrow Schema.
43pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44    desc_to_schema_with_overrides(desc, |_| None)
45}
46
47/// Converts a RelationDesc to an Arrow Schema, with optional type overrides.
48///
49/// The `overrides` function receives a SqlScalarType and can return:
50/// - `Some((DataType, String))` to override the default mapping
51/// - `None` to use the default mapping from `scalar_to_arrow_datatype`
52///
53/// This allows destinations like Iceberg to customize type mappings for
54/// unsupported types (e.g., UInt64 -> Decimal128) while keeping the standard
55/// mapping for everything else.
56pub fn desc_to_schema_with_overrides<F>(
57    desc: &RelationDesc,
58    overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63    let mut fields = vec![];
64    let mut errs = vec![];
65    let mut seen_names = BTreeSet::new();
66    let mut all_names = BTreeSet::new();
67    for col_name in desc.iter_names() {
68        all_names.insert(col_name.to_string());
69    }
70    for (col_name, col_type) in desc.iter() {
71        let mut col_name = col_name.to_string();
72        // If we allow columns with the same name we encounter two issues:
73        // 1. The arrow crate will accidentally reuse the same buffers for the columns
74        // 2. Many parquet readers will error when trying to read the file metadata
75        // Instead we append a number to the end of the column name for any duplicates.
76        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
77
78        if seen_names.contains(&col_name) {
79            let mut new_col_name = col_name.clone();
80            let mut e = 2;
81            while all_names.contains(&new_col_name) {
82                new_col_name = format!("{}{}", col_name, e);
83                e += 1;
84            }
85            col_name = new_col_name;
86        }
87
88        seen_names.insert(col_name.clone());
89        all_names.insert(col_name.clone());
90
91        // Use the override-aware version which applies overrides recursively
92        let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94        match result {
95            Ok((data_type, extension_type_name)) => {
96                fields.push(field_with_typename(
97                    &col_name,
98                    data_type,
99                    col_type.nullable,
100                    &extension_type_name,
101                ));
102            }
103            Err(err) => errs.push(err.to_string()),
104        }
105    }
106    if !errs.is_empty() {
107        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108    }
109    Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113    /// Helper to validate that a RelationDesc can be encoded into Arrow.
114    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115        let mut errs = vec![];
116        for (col_name, col_type) in desc.iter() {
117            match scalar_to_arrow_datatype(&col_type.scalar_type) {
118                Ok(_) => {}
119                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120            }
121        }
122        if !errs.is_empty() {
123            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124        }
125        Ok(())
126    }
127
128    /// Helper to validate that a RelationDesc, after applying `overrides`, can
129    /// be encoded into Arrow AND converted from Arrow into parquet by
130    /// arrow-rs's `ArrowWriter`.
131    ///
132    /// Arrow encodability is a superset of parquet encodability — some Arrow
133    /// types are not (yet) implemented by arrow-rs's parquet writer. Callers
134    /// that write parquet must reject these up-front: copy-to-s3 writes an
135    /// `INCOMPLETE` sentinel during preflight and does not clean it up on a
136    /// failed upload, so a runtime failure inside the parquet writer leaves
137    /// the path in a state that blocks subsequent copies.
138    ///
139    /// Pass the same `overrides` you pass to [`desc_to_schema_with_overrides`]
140    /// — otherwise this check will reject types your sink is going to remap.
141    /// To add a new banned arrow datatype, extend `parquet_incompatible_type`.
142    pub fn validate_desc_for_parquet<F>(
143        desc: &RelationDesc,
144        overrides: F,
145    ) -> Result<(), anyhow::Error>
146    where
147        F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
148    {
149        let mut errs = vec![];
150        for (col_name, col_type) in desc.iter() {
151            let dt =
152                match scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides) {
153                    Ok((dt, _)) => dt,
154                    Err(_) => {
155                        errs.push(format!("{}: {:?}", col_name, col_type.scalar_type));
156                        continue;
157                    }
158                };
159            if let Some(reason) = parquet_incompatible_type(&dt) {
160                errs.push(format!(
161                    "{}: {:?} ({})",
162                    col_name, col_type.scalar_type, reason
163                ));
164            }
165        }
166        if !errs.is_empty() {
167            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
168        }
169        Ok(())
170    }
171
172    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
173    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
174    /// the number of values that can be appended to each column before reallocating.
175    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
176    /// Errors if the relation contains an unimplemented type.
177    pub fn new(
178        desc: &RelationDesc,
179        item_capacity: usize,
180        data_capacity: usize,
181    ) -> Result<Self, anyhow::Error> {
182        let schema = desc_to_schema(desc)?;
183        let mut columns = vec![];
184        for field in schema.fields() {
185            columns.push(ArrowColumn::new(
186                field.name().clone(),
187                field.is_nullable(),
188                field.data_type().clone(),
189                typename_from_field(field)?,
190                item_capacity,
191                data_capacity,
192            )?);
193        }
194        Ok(Self {
195            columns,
196            row_size_bytes: 0,
197            original_schema: None,
198        })
199    }
200
201    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
202    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
203    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
204    /// the number of values that can be appended to each column before reallocating.
205    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
206    /// Errors if the schema contains an unimplemented type.
207    pub fn new_with_schema(
208        schema: Arc<Schema>,
209        item_capacity: usize,
210        data_capacity: usize,
211    ) -> Result<Self, anyhow::Error> {
212        let mut columns = vec![];
213        for field in schema.fields() {
214            columns.push(ArrowColumn::new(
215                field.name().clone(),
216                field.is_nullable(),
217                field.data_type().clone(),
218                typename_from_field(field)?,
219                item_capacity,
220                data_capacity,
221            )?);
222        }
223        Ok(Self {
224            columns,
225            row_size_bytes: 0,
226            original_schema: Some(schema),
227        })
228    }
229
230    /// Returns a copy of the schema of the ArrowBuilder.
231    pub fn schema(&self) -> Schema {
232        Schema::new(
233            self.columns
234                .iter()
235                .map(Into::<Field>::into)
236                .collect::<Vec<_>>(),
237        )
238    }
239
240    /// Converts the ArrowBuilder into an arrow RecordBatch.
241    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
242        let mut arrays = vec![];
243        let mut fields: Vec<Field> = vec![];
244        for mut col in self.columns.into_iter() {
245            arrays.push(col.finish());
246            fields.push((&col).into());
247        }
248
249        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
250        let schema = if let Some(original_schema) = self.original_schema {
251            original_schema
252        } else {
253            Arc::new(Schema::new(fields))
254        };
255
256        RecordBatch::try_new(schema, arrays)
257    }
258
259    /// Appends a row to the builder.
260    /// Errors if the row contains an unimplemented or out-of-range value.
261    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
262        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
263            col.append_datum(datum)?;
264        }
265        self.row_size_bytes += row.byte_len();
266        Ok(())
267    }
268
269    pub fn row_size_bytes(&self) -> usize {
270        self.row_size_bytes
271    }
272}
273
274/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
275/// that should be used as part of the Arrow 'Extension Type' name for fields using
276/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
277fn scalar_to_arrow_datatype(
278    scalar_type: &SqlScalarType,
279) -> Result<(DataType, String), anyhow::Error> {
280    scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
281}
282
283/// Returns a description of why this Arrow [`DataType`] cannot be written to
284/// parquet by arrow-rs's `ArrowWriter`, or `None` if the type is supported.
285/// Recurses into composite types so a banned type is rejected even if it is
286/// nested inside a list/struct/map.
287///
288/// This is an allowlist mirroring what arrow-rs's parquet writer accepts in
289/// its `get_arrow_column_writer` and `write_leaf` paths. To support a new
290/// leaf type, add an arm to the allowlist match below — but only after
291/// verifying that arrow-rs's parquet writer can actually emit it.
292fn parquet_incompatible_type(dt: &DataType) -> Option<&'static str> {
293    use arrow::datatypes::IntervalUnit;
294    match dt {
295        DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
296            parquet_incompatible_type(f.data_type())
297        }
298        DataType::Map(f, _) => parquet_incompatible_type(f.data_type()),
299        DataType::Struct(fields) => fields
300            .iter()
301            .find_map(|f| parquet_incompatible_type(f.data_type())),
302
303        // Allowlist of supported leaf types.
304        DataType::Null
305        | DataType::Boolean
306        | DataType::Int8
307        | DataType::Int16
308        | DataType::Int32
309        | DataType::Int64
310        | DataType::UInt8
311        | DataType::UInt16
312        | DataType::UInt32
313        | DataType::UInt64
314        | DataType::Float16
315        | DataType::Float32
316        | DataType::Float64
317        | DataType::Date32
318        | DataType::Date64
319        | DataType::Time32(_)
320        | DataType::Time64(_)
321        | DataType::Timestamp(_, _)
322        | DataType::Duration(_)
323        | DataType::Interval(IntervalUnit::YearMonth | IntervalUnit::DayTime)
324        | DataType::Utf8
325        | DataType::LargeUtf8
326        | DataType::Utf8View
327        | DataType::Binary
328        | DataType::LargeBinary
329        | DataType::BinaryView
330        | DataType::FixedSizeBinary(_)
331        | DataType::Decimal32(_, _)
332        | DataType::Decimal64(_, _)
333        | DataType::Decimal128(_, _)
334        | DataType::Decimal256(_, _) => None,
335
336        // Fail closed: anything not on the allowlist is rejected.
337        _ => Some("unsupported arrow datatype"),
338    }
339}
340
341/// Return the appropriate Arrow DataType for the given SqlScalarType, with optional
342/// type overrides. The override function is called for each SqlScalarType (including
343/// nested types) and can return Some((DataType, extension_name)) to override the
344/// default mapping.
345fn scalar_to_arrow_datatype_with_overrides<F>(
346    scalar_type: &SqlScalarType,
347    overrides: &F,
348) -> Result<(DataType, String), anyhow::Error>
349where
350    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
351{
352    scalar_to_arrow_datatype_impl(scalar_type, overrides)
353}
354
355/// Core implementation of scalar-to-arrow type conversion with override support.
356fn scalar_to_arrow_datatype_impl<F>(
357    scalar_type: &SqlScalarType,
358    overrides: &F,
359) -> Result<(DataType, String), anyhow::Error>
360where
361    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
362{
363    // Check for override first
364    if let Some(result) = overrides(scalar_type) {
365        return Ok(result);
366    }
367    let (data_type, extension_name) = match scalar_type {
368        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
369        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
370        SqlScalarType::Int32 => (DataType::Int32, "integer"),
371        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
372        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
373        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
374        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
375        SqlScalarType::Oid => (DataType::UInt32, "oid"),
376        SqlScalarType::Float32 => (DataType::Float32, "real"),
377        SqlScalarType::Float64 => (DataType::Float64, "double"),
378        SqlScalarType::Date => (DataType::Date32, "date"),
379        // The resolution of our time and timestamp types is microseconds, which is lucky
380        // since the original parquet 'ConvertedType's support microsecond resolution but not
381        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
382        // but many readers don't support them yet.
383        SqlScalarType::Time => (
384            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
385            "time",
386        ),
387        SqlScalarType::Timestamp { .. } => (
388            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
389            "timestamp",
390        ),
391        SqlScalarType::TimestampTz { .. } => (
392            DataType::Timestamp(
393                arrow::datatypes::TimeUnit::Microsecond,
394                // When appending values we always use UTC timestamps, and setting this to a non-empty
395                // value allows readers to know that tz-aware timestamps can be compared directly.
396                Some("+00:00".into()),
397            ),
398            "timestamptz",
399        ),
400        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
401        SqlScalarType::Char { length } => {
402            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
403                (DataType::Utf8, "text")
404            } else {
405                (DataType::LargeUtf8, "text")
406            }
407        }
408        SqlScalarType::VarChar { max_length } => {
409            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
410                (DataType::Utf8, "text")
411            } else {
412                (DataType::LargeUtf8, "text")
413            }
414        }
415        SqlScalarType::String => (DataType::LargeUtf8, "text"),
416        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
417        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
418        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
419        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
420        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
421        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
422        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
423        SqlScalarType::Numeric { max_scale } => {
424            // Materialize allows 39 digits of precision for numeric values, but allows
425            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
426            // the same column. However, Arrow/Parquet only allows static declaration of both
427            // the precision and the scale. To represent the full range of values of a numeric
428            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
429            // type can only support 76 digits, so we are be unable to represent the entire range.
430
431            // Instead of representing the full possible range, we instead try to represent most
432            // values in the most-compatible way. We use a Decimal128 type which can handle 38
433            // digits of precision and has more compatibility with other parquet readers than
434            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
435            // error if we encounter a value that is too large to represent, and if that happens
436            // a user can choose to cast the column to a string to represent the value.
437            match max_scale {
438                Some(scale) => {
439                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
440                    if scale <= DECIMAL128_MAX_SCALE {
441                        (
442                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
443                            "numeric",
444                        )
445                    } else {
446                        anyhow::bail!("Numeric max scale {} out of range", scale)
447                    }
448                }
449                None => (
450                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
451                    "numeric",
452                ),
453            }
454        }
455        SqlScalarType::Interval => (
456            DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano),
457            "interval",
458        ),
459        SqlScalarType::Array(inner) => {
460            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
461            // enforced in the type system, so can change per-value.
462            // We use a struct type with two fields - one containing the array elements as a list
463            // and the other containing the number of dimensions the array represents. Since arrays
464            // are not allowed to be ragged, the number of elements in each dimension is the same.
465            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
466            // TODO: Document these field names in our copy-to-s3 docs
467            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
468            let list_field = Arc::new(field_with_typename(
469                "items",
470                DataType::List(inner_field.into()),
471                false,
472                "array_items",
473            ));
474            let dims_field = Arc::new(field_with_typename(
475                "dimensions",
476                DataType::UInt8,
477                false,
478                "array_dimensions",
479            ));
480            (DataType::Struct([list_field, dims_field].into()), "array")
481        }
482        SqlScalarType::List {
483            element_type,
484            custom_id: _,
485        } => {
486            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
487            // TODO: Document these field names in our copy-to-s3 docs
488            let field = field_with_typename("item", inner_type, true, &inner_name);
489            (DataType::List(field.into()), "list")
490        }
491        SqlScalarType::Map {
492            value_type,
493            custom_id: _,
494        } => {
495            let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
496            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
497            let field_names = MapFieldNames::default();
498            let struct_type = DataType::Struct(
499                vec![
500                    Field::new(&field_names.key, DataType::Utf8, false),
501                    field_with_typename(&field_names.value, value_type, true, &value_name),
502                ]
503                .into(),
504            );
505            (
506                DataType::Map(
507                    Field::new(&field_names.entry, struct_type, false).into(),
508                    false,
509                ),
510                "map",
511            )
512        }
513        SqlScalarType::Record {
514            fields,
515            custom_id: _,
516        } => {
517            // Records are represented as Arrow Structs with one field per record field.
518            // At runtime, records are stored as Datum::List with field values in order.
519            let mut arrow_fields = Vec::with_capacity(fields.len());
520            for (field_name, field_type) in fields.iter() {
521                let (inner_type, inner_extension_name) =
522                    scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
523                let field = field_with_typename(
524                    field_name.as_str(),
525                    inner_type,
526                    field_type.nullable,
527                    &inner_extension_name,
528                );
529                arrow_fields.push(Arc::new(field));
530            }
531            (DataType::Struct(arrow_fields.into()), "record")
532        }
533        SqlScalarType::Range { element_type } => {
534            // Ranges are represented as Arrow Structs with 5 fields:
535            //   - lower: nullable element value (the bound datum, null = infinite)
536            //   - upper: nullable element value (the bound datum, null = infinite)
537            //   - lower_inclusive: bool
538            //   - upper_inclusive: bool
539            //   - empty: bool
540            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
541            let lower_field = Arc::new(field_with_typename(
542                "lower",
543                inner_type.clone(),
544                true,
545                &inner_name,
546            ));
547            let upper_field = Arc::new(field_with_typename("upper", inner_type, true, &inner_name));
548            let lower_inclusive_field = Arc::new(field_with_typename(
549                "lower_inclusive",
550                DataType::Boolean,
551                false,
552                "boolean",
553            ));
554            let upper_inclusive_field = Arc::new(field_with_typename(
555                "upper_inclusive",
556                DataType::Boolean,
557                false,
558                "boolean",
559            ));
560            let empty_field = Arc::new(field_with_typename(
561                "empty",
562                DataType::Boolean,
563                false,
564                "boolean",
565            ));
566            (
567                DataType::Struct(
568                    [
569                        lower_field,
570                        upper_field,
571                        lower_inclusive_field,
572                        upper_inclusive_field,
573                        empty_field,
574                    ]
575                    .into(),
576                ),
577                "range",
578            )
579        }
580        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
581    };
582    Ok((data_type, extension_name.to_lowercase()))
583}
584
585fn builder_for_datatype(
586    data_type: &DataType,
587    item_capacity: usize,
588    data_capacity: usize,
589) -> Result<ColBuilder, anyhow::Error> {
590    let builder = match &data_type {
591        DataType::Boolean => {
592            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
593        }
594        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
595        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
596        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
597        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
598        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
599        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
600        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
601        DataType::Float32 => {
602            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
603        }
604        DataType::Float64 => {
605            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
606        }
607        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
608        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
609            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
610                item_capacity,
611            ))
612        }
613        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
614            ColBuilder::TimestampMicrosecondBuilder(
615                TimestampMicrosecondBuilder::with_capacity(item_capacity)
616                    .with_timezone_opt(timezone.clone()),
617            )
618        }
619        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
620            item_capacity,
621            data_capacity,
622        )),
623        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
624            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
625        ),
626        DataType::Utf8 => {
627            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
628        }
629        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
630            item_capacity,
631            data_capacity,
632        )),
633        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
634            Decimal128Builder::with_capacity(item_capacity)
635                .with_precision_and_scale(*precision, *scale)?,
636        ),
637        DataType::List(field) => {
638            let inner_col_builder = ArrowColumn::new(
639                field.name().clone(),
640                field.is_nullable(),
641                field.data_type().clone(),
642                typename_from_field(field)?,
643                item_capacity,
644                data_capacity,
645            )?;
646            ColBuilder::ListBuilder(Box::new(
647                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
648            ))
649        }
650        DataType::Struct(fields) => {
651            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
652            for field in fields {
653                let inner_col_builder = ArrowColumn::new(
654                    field.name().clone(),
655                    field.is_nullable(),
656                    field.data_type().clone(),
657                    typename_from_field(field)?,
658                    item_capacity,
659                    data_capacity,
660                )?;
661                field_builders.push(Box::new(inner_col_builder));
662            }
663            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
664        }
665        DataType::Map(entries_field, _sorted) => {
666            let entries_field = entries_field.as_ref();
667            if let DataType::Struct(fields) = entries_field.data_type() {
668                if fields.len() != 2 {
669                    anyhow::bail!(
670                        "Expected map entries to have 2 fields, found {}",
671                        fields.len()
672                    )
673                }
674                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
675                let value_field = &fields[1];
676                let value_builder = ArrowColumn::new(
677                    value_field.name().clone(),
678                    value_field.is_nullable(),
679                    value_field.data_type().clone(),
680                    typename_from_field(value_field)?,
681                    item_capacity,
682                    data_capacity,
683                )?;
684                ColBuilder::MapBuilder(Box::new(
685                    MapBuilder::with_capacity(
686                        Some(MapFieldNames::default()),
687                        key_builder,
688                        value_builder,
689                        item_capacity,
690                    )
691                    .with_values_field(Arc::clone(value_field)),
692                ))
693            } else {
694                anyhow::bail!("Expected map entries to be a struct")
695            }
696        }
697        DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano) => {
698            ColBuilder::IntervalMonthDayNanoBuilder(IntervalMonthDayNanoBuilder::with_capacity(
699                item_capacity,
700            ))
701        }
702        _ => anyhow::bail!("{:?} unimplemented", data_type),
703    };
704    Ok(builder)
705}
706
707#[derive(Debug)]
708struct ArrowColumn {
709    field_name: String,
710    nullable: bool,
711    data_type: DataType,
712    extension_type_name: String,
713    inner: ColBuilder,
714}
715
716impl From<&ArrowColumn> for Field {
717    fn from(col: &ArrowColumn) -> Self {
718        field_with_typename(
719            &col.field_name,
720            col.data_type.clone(),
721            col.nullable,
722            &col.extension_type_name,
723        )
724    }
725}
726
727/// Create a Field and include the materialize 'type name' as an extension in the metadata.
728fn field_with_typename(
729    name: &str,
730    data_type: DataType,
731    nullable: bool,
732    extension_type_name: &str,
733) -> Field {
734    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
735        ARROW_EXTENSION_NAME_KEY.to_string(),
736        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
737    )]))
738}
739
740/// Extract the materialize 'type name' from the metadata of a Field.
741/// Returns an error if the field doesn't have extension metadata.
742fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
743    let metadata = field.metadata();
744    let extension_name = metadata
745        .get(ARROW_EXTENSION_NAME_KEY)
746        .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
747    extension_name
748        .strip_prefix(EXTENSION_PREFIX)
749        .map(|s| s.to_string())
750        .ok_or_else(|| {
751            anyhow::anyhow!(
752                "Field '{}' extension name '{}' missing expected prefix '{}'",
753                field.name(),
754                extension_name,
755                EXTENSION_PREFIX
756            )
757        })
758}
759
760impl ArrowColumn {
761    fn new(
762        field_name: String,
763        nullable: bool,
764        data_type: DataType,
765        extension_type_name: String,
766        item_capacity: usize,
767        data_capacity: usize,
768    ) -> Result<Self, anyhow::Error> {
769        Ok(Self {
770            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
771            field_name,
772            nullable,
773            data_type,
774            extension_type_name,
775        })
776    }
777}
778
779macro_rules! make_col_builder {
780    ($($x:ident), *) => {
781        /// An enum wrapper for all arrow builder types that we support. Used to store
782        /// a builder for each column and avoid dynamic dispatch and downcasting
783        /// when appending data.
784        #[derive(Debug)]
785        enum ColBuilder {
786            $(
787                $x($x),
788            )*
789            /// ListBuilder & MapBuilder are handled separately than other builder types since they
790            /// uses generic parameters for the inner types, and are boxed to avoid recursive
791            /// type definitions.
792            ListBuilder(Box<ListBuilder<ArrowColumn>>),
793            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
794            /// StructBuilder is handled separately since its `append_null()` method must be
795            /// overriden to both append nulls to all field builders and to append a null to
796            /// the struct. It's unclear why `arrow-rs` implemented this differently than
797            /// ListBuilder and MapBuilder.
798            StructBuilder(StructBuilder),
799        }
800
801        impl ColBuilder {
802            fn append_null(&mut self) {
803                match self {
804                    $(
805                        ColBuilder::$x(builder) => builder.append_null(),
806                    )*
807                    ColBuilder::ListBuilder(builder) => builder.append_null(),
808                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
809                    ColBuilder::StructBuilder(builder) => {
810                        for i in 0..builder.num_fields() {
811                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
812                            field_builder.inner.append_null();
813                        }
814                        builder.append_null();
815                    }
816                }
817            }
818        }
819
820        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
821        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
822        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
823        /// data to the column.
824        impl ArrayBuilder for ArrowColumn {
825            fn len(&self) -> usize {
826                match &self.inner {
827                    $(
828                        ColBuilder::$x(builder) => builder.len(),
829                    )*
830                    ColBuilder::ListBuilder(builder) => builder.len(),
831                    ColBuilder::MapBuilder(builder) => builder.len(),
832                    ColBuilder::StructBuilder(builder) => builder.len(),
833                }
834            }
835            fn finish(&mut self) -> ArrayRef {
836                match &mut self.inner {
837                    $(
838                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
839                    )*
840                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
841                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
842                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
843                }
844            }
845            fn finish_cloned(&self) -> ArrayRef {
846                match &self.inner {
847                    $(
848                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
849                    )*
850                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
851                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
852                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
853                }
854            }
855            fn as_any(&self) -> &(dyn Any + 'static) {
856                self
857            }
858            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
859                self
860            }
861            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
862                self
863            }
864        }
865    };
866}
867
868make_col_builder!(
869    BooleanBuilder,
870    Int16Builder,
871    Int32Builder,
872    Int64Builder,
873    UInt8Builder,
874    UInt16Builder,
875    UInt32Builder,
876    UInt64Builder,
877    Float32Builder,
878    Float64Builder,
879    Date32Builder,
880    Time64MicrosecondBuilder,
881    TimestampMicrosecondBuilder,
882    LargeBinaryBuilder,
883    FixedSizeBinaryBuilder,
884    StringBuilder,
885    LargeStringBuilder,
886    Decimal128Builder,
887    IntervalMonthDayNanoBuilder
888);
889
890impl ArrowColumn {
891    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
892        match (&mut self.inner, datum) {
893            (s, Datum::Null) => s.append_null(),
894            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
895            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
896            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
897            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
898            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
899            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
900            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
901            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
902            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
903            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
904            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
905            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
906                builder.append_value(d.unix_epoch_days())
907            }
908            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
909                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
910                    * 1_000_000
911                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
912                builder.append_value(micros_since_midnight)
913            }
914            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
915                builder.append_value(ts.and_utc().timestamp_micros())
916            }
917            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
918                builder.append_value(ts.timestamp_micros())
919            }
920            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
921            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
922                builder.append_value(val.as_bytes())?
923            }
924            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
925            (ColBuilder::StringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
926                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
927            }
928            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
929                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
930            }
931            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
932            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
933                builder.append_value(ts.into())
934            }
935            // Lossless unsigned-to-signed promotions for destinations that don't
936            // support unsigned types (e.g., Iceberg).
937            (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
938                builder.append_value(i32::from(i))
939            }
940            (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
941                builder.append_value(i64::from(i))
942            }
943            (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
944                builder.append_value(i128::from(i))
945            }
946            (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
947                builder.append_value(i128::from(u64::from(ts)))
948            }
949            // Interval-to-string conversion for destinations that don't support
950            // interval types natively (e.g., Iceberg).
951            (ColBuilder::StringBuilder(builder), Datum::Interval(iv)) => {
952                builder.append_value(iv.to_string())
953            }
954            (ColBuilder::LargeStringBuilder(builder), Datum::Interval(iv)) => {
955                builder.append_value(iv.to_string())
956            }
957            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
958                if dec.0.is_special() {
959                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
960                }
961                if let DataType::Decimal128(precision, scale) = self.data_type {
962                    if dec.0.digits() > precision.into() {
963                        anyhow::bail!(
964                            "Decimal value {} out of range for column with precision {}",
965                            dec,
966                            precision
967                        )
968                    }
969
970                    // Get the signed-coefficient represented as an i128, and the exponent such that
971                    // the number should equal coefficient*10^exponent.
972                    let coefficient: i128 = dec.0.coefficient()?;
973                    let exponent = dec.0.exponent();
974
975                    // Convert the value to use the scale of the column (add 0's to align the decimal
976                    // point correctly). This is done by multiplying the coefficient by
977                    // 10^(scale + exponent).
978                    let scale_diff = i32::from(scale) + exponent;
979                    // If the scale_diff is negative, we know there aren't enough digits in our
980                    // column's scale to represent this value.
981                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
982                        anyhow::anyhow!(
983                            "cannot represent decimal value {} in column with scale {}",
984                            dec,
985                            scale
986                        )
987                    })?;
988
989                    let value = coefficient
990                        .checked_mul(10_i128.pow(scale_diff))
991                        .ok_or_else(|| {
992                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
993                        })?;
994
995                    builder.append_value(value)
996                } else {
997                    anyhow::bail!("Expected Decimal128 data type")
998                }
999            }
1000            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
1001                // We've received an array datum which we know is represented as an Arrow struct
1002                // with two fields: the list of elements and the number of dimensions
1003                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
1004                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
1005                    let inner_builder = list_builder.values();
1006                    for datum in arr.elements().into_iter() {
1007                        inner_builder.append_datum(datum)?;
1008                    }
1009                    list_builder.append(true);
1010                } else {
1011                    anyhow::bail!(
1012                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
1013                        struct_builder
1014                    )
1015                }
1016                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
1017                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
1018                    dims_builder.append_value(arr.dims().ndims());
1019                } else {
1020                    anyhow::bail!(
1021                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
1022                        struct_builder
1023                    )
1024                }
1025                struct_builder.append(true)
1026            }
1027            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
1028                let inner_builder = list_builder.values();
1029                for datum in list.into_iter() {
1030                    inner_builder.append_datum(datum)?;
1031                }
1032                list_builder.append(true)
1033            }
1034            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
1035                for (key, value) in map.iter() {
1036                    builder.keys().append_value(key);
1037                    builder.values().append_datum(value)?;
1038                }
1039                builder.append(true).unwrap()
1040            }
1041            // Records are stored as Datum::List at runtime but written to a StructBuilder.
1042            // Arrays use Datum::Array (handled above), so Datum::List with StructBuilder is a record.
1043            (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
1044                let field_count = struct_builder.num_fields();
1045                for (i, datum) in list.into_iter().enumerate() {
1046                    if i >= field_count {
1047                        anyhow::bail!(
1048                            "Record has more elements ({}) than struct fields ({})",
1049                            i + 1,
1050                            field_count
1051                        );
1052                    }
1053                    let field_builder: &mut ArrowColumn = struct_builder
1054                        .field_builder(i)
1055                        .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
1056                    field_builder.append_datum(datum)?;
1057                }
1058                struct_builder.append(true);
1059            }
1060            (ColBuilder::StructBuilder(struct_builder), Datum::Range(range)) => {
1061                // Ranges are represented as a struct with 5 fields:
1062                //   0: lower (nullable element value)
1063                //   1: upper (nullable element value)
1064                //   2: lower_inclusive (bool)
1065                //   3: upper_inclusive (bool)
1066                //   4: empty (bool)
1067                match range.inner {
1068                    None => {
1069                        // Empty range: null bounds, inclusive=false, empty=true
1070                        struct_builder
1071                            .field_builder::<ArrowColumn>(0)
1072                            .unwrap()
1073                            .append_datum(Datum::Null)?;
1074                        struct_builder
1075                            .field_builder::<ArrowColumn>(1)
1076                            .unwrap()
1077                            .append_datum(Datum::Null)?;
1078                        struct_builder
1079                            .field_builder::<ArrowColumn>(2)
1080                            .unwrap()
1081                            .append_datum(Datum::False)?;
1082                        struct_builder
1083                            .field_builder::<ArrowColumn>(3)
1084                            .unwrap()
1085                            .append_datum(Datum::False)?;
1086                        struct_builder
1087                            .field_builder::<ArrowColumn>(4)
1088                            .unwrap()
1089                            .append_datum(Datum::True)?;
1090                    }
1091                    Some(inner) => {
1092                        struct_builder
1093                            .field_builder::<ArrowColumn>(0)
1094                            .unwrap()
1095                            .append_datum(
1096                                inner.lower.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1097                            )?;
1098                        struct_builder
1099                            .field_builder::<ArrowColumn>(1)
1100                            .unwrap()
1101                            .append_datum(
1102                                inner.upper.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1103                            )?;
1104                        struct_builder
1105                            .field_builder::<ArrowColumn>(2)
1106                            .unwrap()
1107                            .append_datum(if inner.lower.inclusive {
1108                                Datum::True
1109                            } else {
1110                                Datum::False
1111                            })?;
1112                        struct_builder
1113                            .field_builder::<ArrowColumn>(3)
1114                            .unwrap()
1115                            .append_datum(if inner.upper.inclusive {
1116                                Datum::True
1117                            } else {
1118                                Datum::False
1119                            })?;
1120                        struct_builder
1121                            .field_builder::<ArrowColumn>(4)
1122                            .unwrap()
1123                            .append_datum(Datum::False)?;
1124                    }
1125                }
1126                // Mark the struct row as non-null. StructBuilder tracks per-row
1127                // validity independently from the child builders, so this call
1128                // is required once per outer row regardless of the field values.
1129                struct_builder.append(true);
1130            }
1131            (ColBuilder::IntervalMonthDayNanoBuilder(builder), Datum::Interval(iv)) => {
1132                let nanos = iv.micros.checked_mul(1_000).ok_or_else(|| {
1133                    anyhow::anyhow!(
1134                        "interval microseconds {} overflow i64 nanoseconds",
1135                        iv.micros
1136                    )
1137                })?;
1138                builder.append_value(arrow::datatypes::IntervalMonthDayNano::new(
1139                    iv.months, iv.days, nanos,
1140                ));
1141            }
1142            (builder, datum) => {
1143                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
1144            }
1145        }
1146        Ok(())
1147    }
1148}