Skip to main content

mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34    columns: Vec<ArrowColumn>,
35    /// A crude estimate of the size of the data in the builder
36    /// based on the size of the rows added to it.
37    row_size_bytes: usize,
38    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
39    original_schema: Option<Arc<Schema>>,
40}
41
42/// Converts a RelationDesc to an Arrow Schema.
43pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44    let mut fields = vec![];
45    let mut errs = vec![];
46    let mut seen_names = BTreeMap::new();
47    for (col_name, col_type) in desc.iter() {
48        let mut col_name = col_name.to_string();
49        // If we allow columns with the same name we encounter two issues:
50        // 1. The arrow crate will accidentally reuse the same buffers for the columns
51        // 2. Many parquet readers will error when trying to read the file metadata
52        // Instead we append a number to the end of the column name for any duplicates.
53        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
54        seen_names
55            .entry(col_name.clone())
56            .and_modify(|e: &mut u32| {
57                *e += 1;
58                col_name += &e.to_string();
59            })
60            .or_insert(1);
61        match scalar_to_arrow_datatype(&col_type.scalar_type) {
62            Ok((data_type, extension_type_name)) => {
63                fields.push(field_with_typename(
64                    &col_name,
65                    data_type,
66                    col_type.nullable,
67                    &extension_type_name,
68                ));
69            }
70            Err(err) => errs.push(err.to_string()),
71        }
72    }
73    if !errs.is_empty() {
74        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
75    }
76    Ok(Schema::new(fields))
77}
78
79impl ArrowBuilder {
80    /// Helper to validate that a RelationDesc can be encoded into Arrow.
81    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
82        let mut errs = vec![];
83        for (col_name, col_type) in desc.iter() {
84            match scalar_to_arrow_datatype(&col_type.scalar_type) {
85                Ok(_) => {}
86                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
87            }
88        }
89        if !errs.is_empty() {
90            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
91        }
92        Ok(())
93    }
94
95    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
96    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
97    /// the number of values that can be appended to each column before reallocating.
98    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
99    /// Errors if the relation contains an unimplemented type.
100    pub fn new(
101        desc: &RelationDesc,
102        item_capacity: usize,
103        data_capacity: usize,
104    ) -> Result<Self, anyhow::Error> {
105        let schema = desc_to_schema(desc)?;
106        let mut columns = vec![];
107        for field in schema.fields() {
108            columns.push(ArrowColumn::new(
109                field.name().clone(),
110                field.is_nullable(),
111                field.data_type().clone(),
112                typename_from_field(field)?,
113                item_capacity,
114                data_capacity,
115            )?);
116        }
117        Ok(Self {
118            columns,
119            row_size_bytes: 0,
120            original_schema: None,
121        })
122    }
123
124    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
125    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
126    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
127    /// the number of values that can be appended to each column before reallocating.
128    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
129    /// Errors if the schema contains an unimplemented type.
130    pub fn new_with_schema(
131        schema: Arc<Schema>,
132        item_capacity: usize,
133        data_capacity: usize,
134    ) -> Result<Self, anyhow::Error> {
135        let mut columns = vec![];
136        for field in schema.fields() {
137            columns.push(ArrowColumn::new(
138                field.name().clone(),
139                field.is_nullable(),
140                field.data_type().clone(),
141                typename_from_field(field)?,
142                item_capacity,
143                data_capacity,
144            )?);
145        }
146        Ok(Self {
147            columns,
148            row_size_bytes: 0,
149            original_schema: Some(schema),
150        })
151    }
152
153    /// Returns a copy of the schema of the ArrowBuilder.
154    pub fn schema(&self) -> Schema {
155        Schema::new(
156            self.columns
157                .iter()
158                .map(Into::<Field>::into)
159                .collect::<Vec<_>>(),
160        )
161    }
162
163    /// Converts the ArrowBuilder into an arrow RecordBatch.
164    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
165        let mut arrays = vec![];
166        let mut fields: Vec<Field> = vec![];
167        for mut col in self.columns.into_iter() {
168            arrays.push(col.finish());
169            fields.push((&col).into());
170        }
171
172        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
173        let schema = if let Some(original_schema) = self.original_schema {
174            original_schema
175        } else {
176            Arc::new(Schema::new(fields))
177        };
178
179        RecordBatch::try_new(schema, arrays)
180    }
181
182    /// Appends a row to the builder.
183    /// Errors if the row contains an unimplemented or out-of-range value.
184    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
185        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
186            col.append_datum(datum)?;
187        }
188        self.row_size_bytes += row.byte_len();
189        Ok(())
190    }
191
192    pub fn row_size_bytes(&self) -> usize {
193        self.row_size_bytes
194    }
195}
196
197/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
198/// that should be used as part of the Arrow 'Extension Type' name for fields using
199/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
200fn scalar_to_arrow_datatype(
201    scalar_type: &SqlScalarType,
202) -> Result<(DataType, String), anyhow::Error> {
203    let (data_type, extension_name) = match scalar_type {
204        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
205        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
206        SqlScalarType::Int32 => (DataType::Int32, "integer"),
207        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
208        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
209        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
210        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
211        SqlScalarType::Float32 => (DataType::Float32, "real"),
212        SqlScalarType::Float64 => (DataType::Float64, "double"),
213        SqlScalarType::Date => (DataType::Date32, "date"),
214        // The resolution of our time and timestamp types is microseconds, which is lucky
215        // since the original parquet 'ConvertedType's support microsecond resolution but not
216        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
217        // but many readers don't support them yet.
218        SqlScalarType::Time => (
219            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
220            "time",
221        ),
222        SqlScalarType::Timestamp { .. } => (
223            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
224            "timestamp",
225        ),
226        SqlScalarType::TimestampTz { .. } => (
227            DataType::Timestamp(
228                arrow::datatypes::TimeUnit::Microsecond,
229                // When appending values we always use UTC timestamps, and setting this to a non-empty
230                // value allows readers to know that tz-aware timestamps can be compared directly.
231                Some("+00:00".into()),
232            ),
233            "timestamptz",
234        ),
235        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
236        SqlScalarType::Char { length } => {
237            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
238                (DataType::Utf8, "text")
239            } else {
240                (DataType::LargeUtf8, "text")
241            }
242        }
243        SqlScalarType::VarChar { max_length } => {
244            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
245                (DataType::Utf8, "text")
246            } else {
247                (DataType::LargeUtf8, "text")
248            }
249        }
250        SqlScalarType::String => (DataType::LargeUtf8, "text"),
251        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
252        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
253        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
254        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
255        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
256        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
257        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
258        SqlScalarType::Numeric { max_scale } => {
259            // Materialize allows 39 digits of precision for numeric values, but allows
260            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
261            // the same column. However, Arrow/Parquet only allows static declaration of both
262            // the precision and the scale. To represent the full range of values of a numeric
263            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
264            // type can only support 76 digits, so we are be unable to represent the entire range.
265
266            // Instead of representing the full possible range, we instead try to represent most
267            // values in the most-compatible way. We use a Decimal128 type which can handle 38
268            // digits of precision and has more compatibility with other parquet readers than
269            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
270            // error if we encounter a value that is too large to represent, and if that happens
271            // a user can choose to cast the column to a string to represent the value.
272            match max_scale {
273                Some(scale) => {
274                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
275                    if scale <= DECIMAL128_MAX_SCALE {
276                        (
277                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
278                            "numeric",
279                        )
280                    } else {
281                        anyhow::bail!("Numeric max scale {} out of range", scale)
282                    }
283                }
284                None => (
285                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
286                    "numeric",
287                ),
288            }
289        }
290        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
291        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
292        // SqlScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
293        SqlScalarType::Array(inner) => {
294            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
295            // enforced in the type system, so can change per-value.
296            // We use a struct type with two fields - one containing the array elements as a list
297            // and the other containing the number of dimensions the array represents. Since arrays
298            // are not allowed to be ragged, the number of elements in each dimension is the same.
299            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
300            // TODO: Document these field names in our copy-to-s3 docs
301            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
302            let list_field = Arc::new(field_with_typename(
303                "items",
304                DataType::List(inner_field.into()),
305                false,
306                "array_items",
307            ));
308            let dims_field = Arc::new(field_with_typename(
309                "dimensions",
310                DataType::UInt8,
311                false,
312                "array_dimensions",
313            ));
314            (DataType::Struct([list_field, dims_field].into()), "array")
315        }
316        SqlScalarType::List {
317            element_type,
318            custom_id: _,
319        } => {
320            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
321            // TODO: Document these field names in our copy-to-s3 docs
322            let field = field_with_typename("item", inner_type, true, &inner_name);
323            (DataType::List(field.into()), "list")
324        }
325        SqlScalarType::Map {
326            value_type,
327            custom_id: _,
328        } => {
329            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
330            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
331            let field_names = MapFieldNames::default();
332            let struct_type = DataType::Struct(
333                vec![
334                    Field::new(&field_names.key, DataType::Utf8, false),
335                    field_with_typename(&field_names.value, value_type, true, &value_name),
336                ]
337                .into(),
338            );
339            (
340                DataType::Map(
341                    Field::new(&field_names.entry, struct_type, false).into(),
342                    false,
343                ),
344                "map",
345            )
346        }
347        SqlScalarType::Record {
348            fields,
349            custom_id: _,
350        } => {
351            // Records are represented as Arrow Structs with one field per record field.
352            // At runtime, records are stored as Datum::List with field values in order.
353            let mut arrow_fields = Vec::with_capacity(fields.len());
354            for (field_name, field_type) in fields.iter() {
355                let (inner_type, inner_extension_name) =
356                    scalar_to_arrow_datatype(&field_type.scalar_type)?;
357                let field = field_with_typename(
358                    field_name.as_str(),
359                    inner_type,
360                    field_type.nullable,
361                    &inner_extension_name,
362                );
363                arrow_fields.push(Arc::new(field));
364            }
365            (DataType::Struct(arrow_fields.into()), "record")
366        }
367        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
368    };
369    Ok((data_type, extension_name.to_lowercase()))
370}
371
372fn builder_for_datatype(
373    data_type: &DataType,
374    item_capacity: usize,
375    data_capacity: usize,
376) -> Result<ColBuilder, anyhow::Error> {
377    let builder = match &data_type {
378        DataType::Boolean => {
379            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
380        }
381        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
382        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
383        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
384        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
385        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
386        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
387        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
388        DataType::Float32 => {
389            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
390        }
391        DataType::Float64 => {
392            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
393        }
394        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
395        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
396            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
397                item_capacity,
398            ))
399        }
400        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
401            ColBuilder::TimestampMicrosecondBuilder(
402                TimestampMicrosecondBuilder::with_capacity(item_capacity)
403                    .with_timezone_opt(timezone.clone()),
404            )
405        }
406        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
407            item_capacity,
408            data_capacity,
409        )),
410        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
411            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
412        ),
413        DataType::Utf8 => {
414            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
415        }
416        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
417            item_capacity,
418            data_capacity,
419        )),
420        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
421            Decimal128Builder::with_capacity(item_capacity)
422                .with_precision_and_scale(*precision, *scale)?,
423        ),
424        DataType::List(field) => {
425            let inner_col_builder = ArrowColumn::new(
426                field.name().clone(),
427                field.is_nullable(),
428                field.data_type().clone(),
429                typename_from_field(field)?,
430                item_capacity,
431                data_capacity,
432            )?;
433            ColBuilder::ListBuilder(Box::new(
434                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
435            ))
436        }
437        DataType::Struct(fields) => {
438            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
439            for field in fields {
440                let inner_col_builder = ArrowColumn::new(
441                    field.name().clone(),
442                    field.is_nullable(),
443                    field.data_type().clone(),
444                    typename_from_field(field)?,
445                    item_capacity,
446                    data_capacity,
447                )?;
448                field_builders.push(Box::new(inner_col_builder));
449            }
450            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
451        }
452        DataType::Map(entries_field, _sorted) => {
453            let entries_field = entries_field.as_ref();
454            if let DataType::Struct(fields) = entries_field.data_type() {
455                if fields.len() != 2 {
456                    anyhow::bail!(
457                        "Expected map entries to have 2 fields, found {}",
458                        fields.len()
459                    )
460                }
461                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
462                let value_field = &fields[1];
463                let value_builder = ArrowColumn::new(
464                    value_field.name().clone(),
465                    value_field.is_nullable(),
466                    value_field.data_type().clone(),
467                    typename_from_field(value_field)?,
468                    item_capacity,
469                    data_capacity,
470                )?;
471                ColBuilder::MapBuilder(Box::new(
472                    MapBuilder::with_capacity(
473                        Some(MapFieldNames::default()),
474                        key_builder,
475                        value_builder,
476                        item_capacity,
477                    )
478                    .with_values_field(Arc::clone(value_field)),
479                ))
480            } else {
481                anyhow::bail!("Expected map entries to be a struct")
482            }
483        }
484        _ => anyhow::bail!("{:?} unimplemented", data_type),
485    };
486    Ok(builder)
487}
488
489#[derive(Debug)]
490struct ArrowColumn {
491    field_name: String,
492    nullable: bool,
493    data_type: DataType,
494    extension_type_name: String,
495    inner: ColBuilder,
496}
497
498impl From<&ArrowColumn> for Field {
499    fn from(col: &ArrowColumn) -> Self {
500        field_with_typename(
501            &col.field_name,
502            col.data_type.clone(),
503            col.nullable,
504            &col.extension_type_name,
505        )
506    }
507}
508
509/// Create a Field and include the materialize 'type name' as an extension in the metadata.
510fn field_with_typename(
511    name: &str,
512    data_type: DataType,
513    nullable: bool,
514    extension_type_name: &str,
515) -> Field {
516    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
517        ARROW_EXTENSION_NAME_KEY.to_string(),
518        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
519    )]))
520}
521
522/// Extract the materialize 'type name' from the metadata of a Field.
523/// Returns an error if the field doesn't have extension metadata.
524fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
525    let metadata = field.metadata();
526    let extension_name = metadata
527        .get(ARROW_EXTENSION_NAME_KEY)
528        .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
529    extension_name
530        .strip_prefix(EXTENSION_PREFIX)
531        .map(|s| s.to_string())
532        .ok_or_else(|| {
533            anyhow::anyhow!(
534                "Field '{}' extension name '{}' missing expected prefix '{}'",
535                field.name(),
536                extension_name,
537                EXTENSION_PREFIX
538            )
539        })
540}
541
542impl ArrowColumn {
543    fn new(
544        field_name: String,
545        nullable: bool,
546        data_type: DataType,
547        extension_type_name: String,
548        item_capacity: usize,
549        data_capacity: usize,
550    ) -> Result<Self, anyhow::Error> {
551        Ok(Self {
552            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
553            field_name,
554            nullable,
555            data_type,
556            extension_type_name,
557        })
558    }
559}
560
561macro_rules! make_col_builder {
562    ($($x:ident), *) => {
563        /// An enum wrapper for all arrow builder types that we support. Used to store
564        /// a builder for each column and avoid dynamic dispatch and downcasting
565        /// when appending data.
566        #[derive(Debug)]
567        enum ColBuilder {
568            $(
569                $x($x),
570            )*
571            /// ListBuilder & MapBuilder are handled separately than other builder types since they
572            /// uses generic parameters for the inner types, and are boxed to avoid recursive
573            /// type definitions.
574            ListBuilder(Box<ListBuilder<ArrowColumn>>),
575            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
576            /// StructBuilder is handled separately since its `append_null()` method must be
577            /// overriden to both append nulls to all field builders and to append a null to
578            /// the struct. It's unclear why `arrow-rs` implemented this differently than
579            /// ListBuilder and MapBuilder.
580            StructBuilder(StructBuilder),
581        }
582
583        impl ColBuilder {
584            fn append_null(&mut self) {
585                match self {
586                    $(
587                        ColBuilder::$x(builder) => builder.append_null(),
588                    )*
589                    ColBuilder::ListBuilder(builder) => builder.append_null(),
590                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
591                    ColBuilder::StructBuilder(builder) => {
592                        for i in 0..builder.num_fields() {
593                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
594                            field_builder.inner.append_null();
595                        }
596                        builder.append_null();
597                    }
598                }
599            }
600        }
601
602        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
603        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
604        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
605        /// data to the column.
606        impl ArrayBuilder for ArrowColumn {
607            fn len(&self) -> usize {
608                match &self.inner {
609                    $(
610                        ColBuilder::$x(builder) => builder.len(),
611                    )*
612                    ColBuilder::ListBuilder(builder) => builder.len(),
613                    ColBuilder::MapBuilder(builder) => builder.len(),
614                    ColBuilder::StructBuilder(builder) => builder.len(),
615                }
616            }
617            fn finish(&mut self) -> ArrayRef {
618                match &mut self.inner {
619                    $(
620                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
621                    )*
622                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
623                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
624                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
625                }
626            }
627            fn finish_cloned(&self) -> ArrayRef {
628                match &self.inner {
629                    $(
630                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
631                    )*
632                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
633                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
634                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
635                }
636            }
637            fn as_any(&self) -> &(dyn Any + 'static) {
638                self
639            }
640            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
641                self
642            }
643            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
644                self
645            }
646        }
647    };
648}
649
650make_col_builder!(
651    BooleanBuilder,
652    Int16Builder,
653    Int32Builder,
654    Int64Builder,
655    UInt8Builder,
656    UInt16Builder,
657    UInt32Builder,
658    UInt64Builder,
659    Float32Builder,
660    Float64Builder,
661    Date32Builder,
662    Time64MicrosecondBuilder,
663    TimestampMicrosecondBuilder,
664    LargeBinaryBuilder,
665    FixedSizeBinaryBuilder,
666    StringBuilder,
667    LargeStringBuilder,
668    Decimal128Builder
669);
670
671impl ArrowColumn {
672    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
673        match (&mut self.inner, datum) {
674            (s, Datum::Null) => s.append_null(),
675            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
676            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
677            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
678            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
679            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
680            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
681            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
682            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
683            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
684            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
685            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
686            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
687                builder.append_value(d.unix_epoch_days())
688            }
689            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
690                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
691                    * 1_000_000
692                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
693                builder.append_value(micros_since_midnight)
694            }
695            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
696                builder.append_value(ts.and_utc().timestamp_micros())
697            }
698            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
699                builder.append_value(ts.timestamp_micros())
700            }
701            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
702            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
703                builder.append_value(val.as_bytes())?
704            }
705            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
706            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
707                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
708            }
709            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
710            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
711                builder.append_value(ts.into())
712            }
713            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
714                if dec.0.is_special() {
715                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
716                }
717                if let DataType::Decimal128(precision, scale) = self.data_type {
718                    if dec.0.digits() > precision.into() {
719                        anyhow::bail!(
720                            "Decimal value {} out of range for column with precision {}",
721                            dec,
722                            precision
723                        )
724                    }
725
726                    // Get the signed-coefficient represented as an i128, and the exponent such that
727                    // the number should equal coefficient*10^exponent.
728                    let coefficient: i128 = dec.0.coefficient()?;
729                    let exponent = dec.0.exponent();
730
731                    // Convert the value to use the scale of the column (add 0's to align the decimal
732                    // point correctly). This is done by multiplying the coefficient by
733                    // 10^(scale + exponent).
734                    let scale_diff = i32::from(scale) + exponent;
735                    // If the scale_diff is negative, we know there aren't enough digits in our
736                    // column's scale to represent this value.
737                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
738                        anyhow::anyhow!(
739                            "cannot represent decimal value {} in column with scale {}",
740                            dec,
741                            scale
742                        )
743                    })?;
744
745                    let value = coefficient
746                        .checked_mul(10_i128.pow(scale_diff))
747                        .ok_or_else(|| {
748                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
749                        })?;
750
751                    builder.append_value(value)
752                } else {
753                    anyhow::bail!("Expected Decimal128 data type")
754                }
755            }
756            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
757                // We've received an array datum which we know is represented as an Arrow struct
758                // with two fields: the list of elements and the number of dimensions
759                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
760                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
761                    let inner_builder = list_builder.values();
762                    for datum in arr.elements().into_iter() {
763                        inner_builder.append_datum(datum)?;
764                    }
765                    list_builder.append(true);
766                } else {
767                    anyhow::bail!(
768                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
769                        struct_builder
770                    )
771                }
772                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
773                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
774                    dims_builder.append_value(arr.dims().ndims());
775                } else {
776                    anyhow::bail!(
777                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
778                        struct_builder
779                    )
780                }
781                struct_builder.append(true)
782            }
783            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
784                let inner_builder = list_builder.values();
785                for datum in list.into_iter() {
786                    inner_builder.append_datum(datum)?;
787                }
788                list_builder.append(true)
789            }
790            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
791                for (key, value) in map.iter() {
792                    builder.keys().append_value(key);
793                    builder.values().append_datum(value)?;
794                }
795                builder.append(true).unwrap()
796            }
797            // Records are stored as Datum::List at runtime but written to a StructBuilder.
798            // Arrays use Datum::Array (handled above), so Datum::List with StructBuilder is a record.
799            (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
800                let field_count = struct_builder.num_fields();
801                for (i, datum) in list.into_iter().enumerate() {
802                    if i >= field_count {
803                        anyhow::bail!(
804                            "Record has more elements ({}) than struct fields ({})",
805                            i + 1,
806                            field_count
807                        );
808                    }
809                    let field_builder: &mut ArrowColumn = struct_builder
810                        .field_builder(i)
811                        .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
812                    field_builder.append_datum(datum)?;
813                }
814                struct_builder.append(true);
815            }
816            (builder, datum) => {
817                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
818            }
819        }
820        Ok(())
821    }
822}