Skip to main content

mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34    columns: Vec<ArrowColumn>,
35    /// A crude estimate of the size of the data in the builder
36    /// based on the size of the rows added to it.
37    row_size_bytes: usize,
38    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
39    original_schema: Option<Arc<Schema>>,
40}
41
42/// Converts a RelationDesc to an Arrow Schema.
43pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44    desc_to_schema_with_overrides(desc, |_| None)
45}
46
47/// Converts a RelationDesc to an Arrow Schema, with optional type overrides.
48///
49/// The `overrides` function receives a SqlScalarType and can return:
50/// - `Some((DataType, String))` to override the default mapping
51/// - `None` to use the default mapping from `scalar_to_arrow_datatype`
52///
53/// This allows destinations like Iceberg to customize type mappings for
54/// unsupported types (e.g., UInt64 -> Decimal128) while keeping the standard
55/// mapping for everything else.
56pub fn desc_to_schema_with_overrides<F>(
57    desc: &RelationDesc,
58    overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63    let mut fields = vec![];
64    let mut errs = vec![];
65    let mut seen_names = BTreeSet::new();
66    let mut all_names = BTreeSet::new();
67    for col_name in desc.iter_names() {
68        all_names.insert(col_name.to_string());
69    }
70    for (col_name, col_type) in desc.iter() {
71        let mut col_name = col_name.to_string();
72        // If we allow columns with the same name we encounter two issues:
73        // 1. The arrow crate will accidentally reuse the same buffers for the columns
74        // 2. Many parquet readers will error when trying to read the file metadata
75        // Instead we append a number to the end of the column name for any duplicates.
76        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
77
78        if seen_names.contains(&col_name) {
79            let mut new_col_name = col_name.clone();
80            let mut e = 2;
81            while all_names.contains(&new_col_name) {
82                new_col_name = format!("{}{}", col_name, e);
83                e += 1;
84            }
85            col_name = new_col_name;
86        }
87
88        seen_names.insert(col_name.clone());
89        all_names.insert(col_name.clone());
90
91        // Use the override-aware version which applies overrides recursively
92        let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94        match result {
95            Ok((data_type, extension_type_name)) => {
96                fields.push(field_with_typename(
97                    &col_name,
98                    data_type,
99                    col_type.nullable,
100                    &extension_type_name,
101                ));
102            }
103            Err(err) => errs.push(err.to_string()),
104        }
105    }
106    if !errs.is_empty() {
107        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108    }
109    Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113    /// Helper to validate that a RelationDesc can be encoded into Arrow.
114    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115        let mut errs = vec![];
116        for (col_name, col_type) in desc.iter() {
117            match scalar_to_arrow_datatype(&col_type.scalar_type) {
118                Ok(_) => {}
119                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120            }
121        }
122        if !errs.is_empty() {
123            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124        }
125        Ok(())
126    }
127
128    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
129    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
130    /// the number of values that can be appended to each column before reallocating.
131    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
132    /// Errors if the relation contains an unimplemented type.
133    pub fn new(
134        desc: &RelationDesc,
135        item_capacity: usize,
136        data_capacity: usize,
137    ) -> Result<Self, anyhow::Error> {
138        let schema = desc_to_schema(desc)?;
139        let mut columns = vec![];
140        for field in schema.fields() {
141            columns.push(ArrowColumn::new(
142                field.name().clone(),
143                field.is_nullable(),
144                field.data_type().clone(),
145                typename_from_field(field)?,
146                item_capacity,
147                data_capacity,
148            )?);
149        }
150        Ok(Self {
151            columns,
152            row_size_bytes: 0,
153            original_schema: None,
154        })
155    }
156
157    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
158    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
159    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
160    /// the number of values that can be appended to each column before reallocating.
161    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
162    /// Errors if the schema contains an unimplemented type.
163    pub fn new_with_schema(
164        schema: Arc<Schema>,
165        item_capacity: usize,
166        data_capacity: usize,
167    ) -> Result<Self, anyhow::Error> {
168        let mut columns = vec![];
169        for field in schema.fields() {
170            columns.push(ArrowColumn::new(
171                field.name().clone(),
172                field.is_nullable(),
173                field.data_type().clone(),
174                typename_from_field(field)?,
175                item_capacity,
176                data_capacity,
177            )?);
178        }
179        Ok(Self {
180            columns,
181            row_size_bytes: 0,
182            original_schema: Some(schema),
183        })
184    }
185
186    /// Returns a copy of the schema of the ArrowBuilder.
187    pub fn schema(&self) -> Schema {
188        Schema::new(
189            self.columns
190                .iter()
191                .map(Into::<Field>::into)
192                .collect::<Vec<_>>(),
193        )
194    }
195
196    /// Converts the ArrowBuilder into an arrow RecordBatch.
197    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
198        let mut arrays = vec![];
199        let mut fields: Vec<Field> = vec![];
200        for mut col in self.columns.into_iter() {
201            arrays.push(col.finish());
202            fields.push((&col).into());
203        }
204
205        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
206        let schema = if let Some(original_schema) = self.original_schema {
207            original_schema
208        } else {
209            Arc::new(Schema::new(fields))
210        };
211
212        RecordBatch::try_new(schema, arrays)
213    }
214
215    /// Appends a row to the builder.
216    /// Errors if the row contains an unimplemented or out-of-range value.
217    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
218        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
219            col.append_datum(datum)?;
220        }
221        self.row_size_bytes += row.byte_len();
222        Ok(())
223    }
224
225    pub fn row_size_bytes(&self) -> usize {
226        self.row_size_bytes
227    }
228}
229
230/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
231/// that should be used as part of the Arrow 'Extension Type' name for fields using
232/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
233fn scalar_to_arrow_datatype(
234    scalar_type: &SqlScalarType,
235) -> Result<(DataType, String), anyhow::Error> {
236    scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
237}
238
239/// Return the appropriate Arrow DataType for the given SqlScalarType, with optional
240/// type overrides. The override function is called for each SqlScalarType (including
241/// nested types) and can return Some((DataType, extension_name)) to override the
242/// default mapping.
243fn scalar_to_arrow_datatype_with_overrides<F>(
244    scalar_type: &SqlScalarType,
245    overrides: &F,
246) -> Result<(DataType, String), anyhow::Error>
247where
248    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
249{
250    scalar_to_arrow_datatype_impl(scalar_type, overrides)
251}
252
253/// Core implementation of scalar-to-arrow type conversion with override support.
254fn scalar_to_arrow_datatype_impl<F>(
255    scalar_type: &SqlScalarType,
256    overrides: &F,
257) -> Result<(DataType, String), anyhow::Error>
258where
259    F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
260{
261    // Check for override first
262    if let Some(result) = overrides(scalar_type) {
263        return Ok(result);
264    }
265    let (data_type, extension_name) = match scalar_type {
266        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
267        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
268        SqlScalarType::Int32 => (DataType::Int32, "integer"),
269        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
270        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
271        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
272        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
273        SqlScalarType::Oid => (DataType::UInt32, "oid"),
274        SqlScalarType::Float32 => (DataType::Float32, "real"),
275        SqlScalarType::Float64 => (DataType::Float64, "double"),
276        SqlScalarType::Date => (DataType::Date32, "date"),
277        // The resolution of our time and timestamp types is microseconds, which is lucky
278        // since the original parquet 'ConvertedType's support microsecond resolution but not
279        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
280        // but many readers don't support them yet.
281        SqlScalarType::Time => (
282            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
283            "time",
284        ),
285        SqlScalarType::Timestamp { .. } => (
286            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
287            "timestamp",
288        ),
289        SqlScalarType::TimestampTz { .. } => (
290            DataType::Timestamp(
291                arrow::datatypes::TimeUnit::Microsecond,
292                // When appending values we always use UTC timestamps, and setting this to a non-empty
293                // value allows readers to know that tz-aware timestamps can be compared directly.
294                Some("+00:00".into()),
295            ),
296            "timestamptz",
297        ),
298        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
299        SqlScalarType::Char { length } => {
300            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
301                (DataType::Utf8, "text")
302            } else {
303                (DataType::LargeUtf8, "text")
304            }
305        }
306        SqlScalarType::VarChar { max_length } => {
307            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
308                (DataType::Utf8, "text")
309            } else {
310                (DataType::LargeUtf8, "text")
311            }
312        }
313        SqlScalarType::String => (DataType::LargeUtf8, "text"),
314        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
315        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
316        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
317        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
318        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
319        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
320        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
321        SqlScalarType::Numeric { max_scale } => {
322            // Materialize allows 39 digits of precision for numeric values, but allows
323            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
324            // the same column. However, Arrow/Parquet only allows static declaration of both
325            // the precision and the scale. To represent the full range of values of a numeric
326            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
327            // type can only support 76 digits, so we are be unable to represent the entire range.
328
329            // Instead of representing the full possible range, we instead try to represent most
330            // values in the most-compatible way. We use a Decimal128 type which can handle 38
331            // digits of precision and has more compatibility with other parquet readers than
332            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
333            // error if we encounter a value that is too large to represent, and if that happens
334            // a user can choose to cast the column to a string to represent the value.
335            match max_scale {
336                Some(scale) => {
337                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
338                    if scale <= DECIMAL128_MAX_SCALE {
339                        (
340                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
341                            "numeric",
342                        )
343                    } else {
344                        anyhow::bail!("Numeric max scale {} out of range", scale)
345                    }
346                }
347                None => (
348                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
349                    "numeric",
350                ),
351            }
352        }
353        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
354        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
355        // SqlScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
356        SqlScalarType::Array(inner) => {
357            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
358            // enforced in the type system, so can change per-value.
359            // We use a struct type with two fields - one containing the array elements as a list
360            // and the other containing the number of dimensions the array represents. Since arrays
361            // are not allowed to be ragged, the number of elements in each dimension is the same.
362            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
363            // TODO: Document these field names in our copy-to-s3 docs
364            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
365            let list_field = Arc::new(field_with_typename(
366                "items",
367                DataType::List(inner_field.into()),
368                false,
369                "array_items",
370            ));
371            let dims_field = Arc::new(field_with_typename(
372                "dimensions",
373                DataType::UInt8,
374                false,
375                "array_dimensions",
376            ));
377            (DataType::Struct([list_field, dims_field].into()), "array")
378        }
379        SqlScalarType::List {
380            element_type,
381            custom_id: _,
382        } => {
383            let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
384            // TODO: Document these field names in our copy-to-s3 docs
385            let field = field_with_typename("item", inner_type, true, &inner_name);
386            (DataType::List(field.into()), "list")
387        }
388        SqlScalarType::Map {
389            value_type,
390            custom_id: _,
391        } => {
392            let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
393            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
394            let field_names = MapFieldNames::default();
395            let struct_type = DataType::Struct(
396                vec![
397                    Field::new(&field_names.key, DataType::Utf8, false),
398                    field_with_typename(&field_names.value, value_type, true, &value_name),
399                ]
400                .into(),
401            );
402            (
403                DataType::Map(
404                    Field::new(&field_names.entry, struct_type, false).into(),
405                    false,
406                ),
407                "map",
408            )
409        }
410        SqlScalarType::Record {
411            fields,
412            custom_id: _,
413        } => {
414            // Records are represented as Arrow Structs with one field per record field.
415            // At runtime, records are stored as Datum::List with field values in order.
416            let mut arrow_fields = Vec::with_capacity(fields.len());
417            for (field_name, field_type) in fields.iter() {
418                let (inner_type, inner_extension_name) =
419                    scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
420                let field = field_with_typename(
421                    field_name.as_str(),
422                    inner_type,
423                    field_type.nullable,
424                    &inner_extension_name,
425                );
426                arrow_fields.push(Arc::new(field));
427            }
428            (DataType::Struct(arrow_fields.into()), "record")
429        }
430        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
431    };
432    Ok((data_type, extension_name.to_lowercase()))
433}
434
435fn builder_for_datatype(
436    data_type: &DataType,
437    item_capacity: usize,
438    data_capacity: usize,
439) -> Result<ColBuilder, anyhow::Error> {
440    let builder = match &data_type {
441        DataType::Boolean => {
442            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
443        }
444        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
445        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
446        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
447        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
448        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
449        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
450        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
451        DataType::Float32 => {
452            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
453        }
454        DataType::Float64 => {
455            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
456        }
457        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
458        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
459            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
460                item_capacity,
461            ))
462        }
463        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
464            ColBuilder::TimestampMicrosecondBuilder(
465                TimestampMicrosecondBuilder::with_capacity(item_capacity)
466                    .with_timezone_opt(timezone.clone()),
467            )
468        }
469        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
470            item_capacity,
471            data_capacity,
472        )),
473        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
474            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
475        ),
476        DataType::Utf8 => {
477            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
478        }
479        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
480            item_capacity,
481            data_capacity,
482        )),
483        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
484            Decimal128Builder::with_capacity(item_capacity)
485                .with_precision_and_scale(*precision, *scale)?,
486        ),
487        DataType::List(field) => {
488            let inner_col_builder = ArrowColumn::new(
489                field.name().clone(),
490                field.is_nullable(),
491                field.data_type().clone(),
492                typename_from_field(field)?,
493                item_capacity,
494                data_capacity,
495            )?;
496            ColBuilder::ListBuilder(Box::new(
497                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
498            ))
499        }
500        DataType::Struct(fields) => {
501            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
502            for field in fields {
503                let inner_col_builder = ArrowColumn::new(
504                    field.name().clone(),
505                    field.is_nullable(),
506                    field.data_type().clone(),
507                    typename_from_field(field)?,
508                    item_capacity,
509                    data_capacity,
510                )?;
511                field_builders.push(Box::new(inner_col_builder));
512            }
513            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
514        }
515        DataType::Map(entries_field, _sorted) => {
516            let entries_field = entries_field.as_ref();
517            if let DataType::Struct(fields) = entries_field.data_type() {
518                if fields.len() != 2 {
519                    anyhow::bail!(
520                        "Expected map entries to have 2 fields, found {}",
521                        fields.len()
522                    )
523                }
524                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
525                let value_field = &fields[1];
526                let value_builder = ArrowColumn::new(
527                    value_field.name().clone(),
528                    value_field.is_nullable(),
529                    value_field.data_type().clone(),
530                    typename_from_field(value_field)?,
531                    item_capacity,
532                    data_capacity,
533                )?;
534                ColBuilder::MapBuilder(Box::new(
535                    MapBuilder::with_capacity(
536                        Some(MapFieldNames::default()),
537                        key_builder,
538                        value_builder,
539                        item_capacity,
540                    )
541                    .with_values_field(Arc::clone(value_field)),
542                ))
543            } else {
544                anyhow::bail!("Expected map entries to be a struct")
545            }
546        }
547        _ => anyhow::bail!("{:?} unimplemented", data_type),
548    };
549    Ok(builder)
550}
551
552#[derive(Debug)]
553struct ArrowColumn {
554    field_name: String,
555    nullable: bool,
556    data_type: DataType,
557    extension_type_name: String,
558    inner: ColBuilder,
559}
560
561impl From<&ArrowColumn> for Field {
562    fn from(col: &ArrowColumn) -> Self {
563        field_with_typename(
564            &col.field_name,
565            col.data_type.clone(),
566            col.nullable,
567            &col.extension_type_name,
568        )
569    }
570}
571
572/// Create a Field and include the materialize 'type name' as an extension in the metadata.
573fn field_with_typename(
574    name: &str,
575    data_type: DataType,
576    nullable: bool,
577    extension_type_name: &str,
578) -> Field {
579    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
580        ARROW_EXTENSION_NAME_KEY.to_string(),
581        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
582    )]))
583}
584
585/// Extract the materialize 'type name' from the metadata of a Field.
586/// Returns an error if the field doesn't have extension metadata.
587fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
588    let metadata = field.metadata();
589    let extension_name = metadata
590        .get(ARROW_EXTENSION_NAME_KEY)
591        .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
592    extension_name
593        .strip_prefix(EXTENSION_PREFIX)
594        .map(|s| s.to_string())
595        .ok_or_else(|| {
596            anyhow::anyhow!(
597                "Field '{}' extension name '{}' missing expected prefix '{}'",
598                field.name(),
599                extension_name,
600                EXTENSION_PREFIX
601            )
602        })
603}
604
605impl ArrowColumn {
606    fn new(
607        field_name: String,
608        nullable: bool,
609        data_type: DataType,
610        extension_type_name: String,
611        item_capacity: usize,
612        data_capacity: usize,
613    ) -> Result<Self, anyhow::Error> {
614        Ok(Self {
615            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
616            field_name,
617            nullable,
618            data_type,
619            extension_type_name,
620        })
621    }
622}
623
624macro_rules! make_col_builder {
625    ($($x:ident), *) => {
626        /// An enum wrapper for all arrow builder types that we support. Used to store
627        /// a builder for each column and avoid dynamic dispatch and downcasting
628        /// when appending data.
629        #[derive(Debug)]
630        enum ColBuilder {
631            $(
632                $x($x),
633            )*
634            /// ListBuilder & MapBuilder are handled separately than other builder types since they
635            /// uses generic parameters for the inner types, and are boxed to avoid recursive
636            /// type definitions.
637            ListBuilder(Box<ListBuilder<ArrowColumn>>),
638            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
639            /// StructBuilder is handled separately since its `append_null()` method must be
640            /// overriden to both append nulls to all field builders and to append a null to
641            /// the struct. It's unclear why `arrow-rs` implemented this differently than
642            /// ListBuilder and MapBuilder.
643            StructBuilder(StructBuilder),
644        }
645
646        impl ColBuilder {
647            fn append_null(&mut self) {
648                match self {
649                    $(
650                        ColBuilder::$x(builder) => builder.append_null(),
651                    )*
652                    ColBuilder::ListBuilder(builder) => builder.append_null(),
653                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
654                    ColBuilder::StructBuilder(builder) => {
655                        for i in 0..builder.num_fields() {
656                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
657                            field_builder.inner.append_null();
658                        }
659                        builder.append_null();
660                    }
661                }
662            }
663        }
664
665        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
666        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
667        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
668        /// data to the column.
669        impl ArrayBuilder for ArrowColumn {
670            fn len(&self) -> usize {
671                match &self.inner {
672                    $(
673                        ColBuilder::$x(builder) => builder.len(),
674                    )*
675                    ColBuilder::ListBuilder(builder) => builder.len(),
676                    ColBuilder::MapBuilder(builder) => builder.len(),
677                    ColBuilder::StructBuilder(builder) => builder.len(),
678                }
679            }
680            fn finish(&mut self) -> ArrayRef {
681                match &mut self.inner {
682                    $(
683                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
684                    )*
685                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
686                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
687                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
688                }
689            }
690            fn finish_cloned(&self) -> ArrayRef {
691                match &self.inner {
692                    $(
693                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
694                    )*
695                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
696                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
697                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
698                }
699            }
700            fn as_any(&self) -> &(dyn Any + 'static) {
701                self
702            }
703            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
704                self
705            }
706            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
707                self
708            }
709        }
710    };
711}
712
713make_col_builder!(
714    BooleanBuilder,
715    Int16Builder,
716    Int32Builder,
717    Int64Builder,
718    UInt8Builder,
719    UInt16Builder,
720    UInt32Builder,
721    UInt64Builder,
722    Float32Builder,
723    Float64Builder,
724    Date32Builder,
725    Time64MicrosecondBuilder,
726    TimestampMicrosecondBuilder,
727    LargeBinaryBuilder,
728    FixedSizeBinaryBuilder,
729    StringBuilder,
730    LargeStringBuilder,
731    Decimal128Builder
732);
733
734impl ArrowColumn {
735    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
736        match (&mut self.inner, datum) {
737            (s, Datum::Null) => s.append_null(),
738            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
739            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
740            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
741            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
742            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
743            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
744            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
745            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
746            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
747            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
748            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
749            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
750                builder.append_value(d.unix_epoch_days())
751            }
752            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
753                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
754                    * 1_000_000
755                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
756                builder.append_value(micros_since_midnight)
757            }
758            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
759                builder.append_value(ts.and_utc().timestamp_micros())
760            }
761            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
762                builder.append_value(ts.timestamp_micros())
763            }
764            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
765            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
766                builder.append_value(val.as_bytes())?
767            }
768            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
769            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
770                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
771            }
772            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
773            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
774                builder.append_value(ts.into())
775            }
776            // Lossless unsigned-to-signed promotions for destinations that don't
777            // support unsigned types (e.g., Iceberg).
778            (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
779                builder.append_value(i32::from(i))
780            }
781            (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
782                builder.append_value(i64::from(i))
783            }
784            (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
785                builder.append_value(i128::from(i))
786            }
787            (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
788                builder.append_value(i128::from(u64::from(ts)))
789            }
790            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
791                if dec.0.is_special() {
792                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
793                }
794                if let DataType::Decimal128(precision, scale) = self.data_type {
795                    if dec.0.digits() > precision.into() {
796                        anyhow::bail!(
797                            "Decimal value {} out of range for column with precision {}",
798                            dec,
799                            precision
800                        )
801                    }
802
803                    // Get the signed-coefficient represented as an i128, and the exponent such that
804                    // the number should equal coefficient*10^exponent.
805                    let coefficient: i128 = dec.0.coefficient()?;
806                    let exponent = dec.0.exponent();
807
808                    // Convert the value to use the scale of the column (add 0's to align the decimal
809                    // point correctly). This is done by multiplying the coefficient by
810                    // 10^(scale + exponent).
811                    let scale_diff = i32::from(scale) + exponent;
812                    // If the scale_diff is negative, we know there aren't enough digits in our
813                    // column's scale to represent this value.
814                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
815                        anyhow::anyhow!(
816                            "cannot represent decimal value {} in column with scale {}",
817                            dec,
818                            scale
819                        )
820                    })?;
821
822                    let value = coefficient
823                        .checked_mul(10_i128.pow(scale_diff))
824                        .ok_or_else(|| {
825                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
826                        })?;
827
828                    builder.append_value(value)
829                } else {
830                    anyhow::bail!("Expected Decimal128 data type")
831                }
832            }
833            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
834                // We've received an array datum which we know is represented as an Arrow struct
835                // with two fields: the list of elements and the number of dimensions
836                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
837                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
838                    let inner_builder = list_builder.values();
839                    for datum in arr.elements().into_iter() {
840                        inner_builder.append_datum(datum)?;
841                    }
842                    list_builder.append(true);
843                } else {
844                    anyhow::bail!(
845                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
846                        struct_builder
847                    )
848                }
849                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
850                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
851                    dims_builder.append_value(arr.dims().ndims());
852                } else {
853                    anyhow::bail!(
854                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
855                        struct_builder
856                    )
857                }
858                struct_builder.append(true)
859            }
860            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
861                let inner_builder = list_builder.values();
862                for datum in list.into_iter() {
863                    inner_builder.append_datum(datum)?;
864                }
865                list_builder.append(true)
866            }
867            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
868                for (key, value) in map.iter() {
869                    builder.keys().append_value(key);
870                    builder.values().append_datum(value)?;
871                }
872                builder.append(true).unwrap()
873            }
874            // Records are stored as Datum::List at runtime but written to a StructBuilder.
875            // Arrays use Datum::Array (handled above), so Datum::List with StructBuilder is a record.
876            (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
877                let field_count = struct_builder.num_fields();
878                for (i, datum) in list.into_iter().enumerate() {
879                    if i >= field_count {
880                        anyhow::bail!(
881                            "Record has more elements ({}) than struct fields ({})",
882                            i + 1,
883                            field_count
884                        );
885                    }
886                    let field_builder: &mut ArrowColumn = struct_builder
887                        .field_builder(i)
888                        .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
889                    field_builder.append_datum(datum)?;
890                }
891                struct_builder.append(true);
892            }
893            (builder, datum) => {
894                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
895            }
896        }
897        Ok(())
898    }
899}