mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub struct ArrowBuilder {
31    columns: Vec<ArrowColumn>,
32    /// A crude estimate of the size of the data in the builder
33    /// based on the size of the rows added to it.
34    row_size_bytes: usize,
35}
36
37impl ArrowBuilder {
38    /// Helper to validate that a RelationDesc can be encoded into Arrow.
39    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
40        let mut errs = vec![];
41        for (col_name, col_type) in desc.iter() {
42            match scalar_to_arrow_datatype(&col_type.scalar_type) {
43                Ok(_) => {}
44                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
45            }
46        }
47        if !errs.is_empty() {
48            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
49        }
50        Ok(())
51    }
52
53    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
54    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
55    /// the number of values that can be appended to each column before reallocating.
56    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
57    /// Errors if the relation contains an unimplemented type.
58    pub fn new(
59        desc: &RelationDesc,
60        item_capacity: usize,
61        data_capacity: usize,
62    ) -> Result<Self, anyhow::Error> {
63        let mut columns = vec![];
64        let mut errs = vec![];
65        let mut seen_names = BTreeMap::new();
66        for (col_name, col_type) in desc.iter() {
67            let mut col_name = col_name.to_string();
68            // If we allow columns with the same name we encounter two issues:
69            // 1. The arrow crate will accidentally reuse the same buffers for the columns
70            // 2. Many parquet readers will error when trying to read the file metadata
71            // Instead we append a number to the end of the column name for any duplicates.
72            // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
73            seen_names
74                .entry(col_name.clone())
75                .and_modify(|e: &mut u32| {
76                    *e += 1;
77                    col_name += &e.to_string();
78                })
79                .or_insert(1);
80            match scalar_to_arrow_datatype(&col_type.scalar_type) {
81                Ok((data_type, extension_type_name)) => {
82                    columns.push(ArrowColumn::new(
83                        col_name,
84                        col_type.nullable,
85                        data_type,
86                        extension_type_name,
87                        item_capacity,
88                        data_capacity,
89                    )?);
90                }
91                Err(err) => errs.push(err.to_string()),
92            }
93        }
94        if !errs.is_empty() {
95            anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
96        }
97        Ok(Self {
98            columns,
99            row_size_bytes: 0,
100        })
101    }
102
103    /// Returns a copy of the schema of the ArrowBuilder.
104    pub fn schema(&self) -> Schema {
105        Schema::new(
106            self.columns
107                .iter()
108                .map(Into::<Field>::into)
109                .collect::<Vec<_>>(),
110        )
111    }
112
113    /// Converts the ArrowBuilder into an arrow RecordBatch.
114    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
115        let mut arrays = vec![];
116        let mut fields: Vec<Field> = vec![];
117        for mut col in self.columns.into_iter() {
118            arrays.push(col.finish());
119            fields.push((&col).into());
120        }
121        RecordBatch::try_new(Schema::new(fields).into(), arrays)
122    }
123
124    /// Appends a row to the builder.
125    /// Errors if the row contains an unimplemented or out-of-range value.
126    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
127        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
128            col.append_datum(datum)?;
129        }
130        self.row_size_bytes += row.byte_len();
131        Ok(())
132    }
133
134    pub fn row_size_bytes(&self) -> usize {
135        self.row_size_bytes
136    }
137}
138
139/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
140/// that should be used as part of the Arrow 'Extension Type' name for fields using
141/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
142fn scalar_to_arrow_datatype(
143    scalar_type: &SqlScalarType,
144) -> Result<(DataType, String), anyhow::Error> {
145    let (data_type, extension_name) = match scalar_type {
146        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
147        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
148        SqlScalarType::Int32 => (DataType::Int32, "integer"),
149        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
150        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
151        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
152        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
153        SqlScalarType::Float32 => (DataType::Float32, "real"),
154        SqlScalarType::Float64 => (DataType::Float64, "double"),
155        SqlScalarType::Date => (DataType::Date32, "date"),
156        // The resolution of our time and timestamp types is microseconds, which is lucky
157        // since the original parquet 'ConvertedType's support microsecond resolution but not
158        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
159        // but many readers don't support them yet.
160        SqlScalarType::Time => (
161            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
162            "time",
163        ),
164        SqlScalarType::Timestamp { .. } => (
165            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
166            "timestamp",
167        ),
168        SqlScalarType::TimestampTz { .. } => (
169            DataType::Timestamp(
170                arrow::datatypes::TimeUnit::Microsecond,
171                // When appending values we always use UTC timestamps, and setting this to a non-empty
172                // value allows readers to know that tz-aware timestamps can be compared directly.
173                Some("+00:00".into()),
174            ),
175            "timestamptz",
176        ),
177        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
178        SqlScalarType::Char { length } => {
179            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
180                (DataType::Utf8, "text")
181            } else {
182                (DataType::LargeUtf8, "text")
183            }
184        }
185        SqlScalarType::VarChar { max_length } => {
186            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
187                (DataType::Utf8, "text")
188            } else {
189                (DataType::LargeUtf8, "text")
190            }
191        }
192        SqlScalarType::String => (DataType::LargeUtf8, "text"),
193        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
194        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
195        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
196        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
197        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
198        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
199        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
200        SqlScalarType::Numeric { max_scale } => {
201            // Materialize allows 39 digits of precision for numeric values, but allows
202            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
203            // the same column. However, Arrow/Parquet only allows static declaration of both
204            // the precision and the scale. To represent the full range of values of a numeric
205            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
206            // type can only support 76 digits, so we are be unable to represent the entire range.
207
208            // Instead of representing the full possible range, we instead try to represent most
209            // values in the most-compatible way. We use a Decimal128 type which can handle 38
210            // digits of precision and has more compatibility with other parquet readers than
211            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
212            // error if we encounter a value that is too large to represent, and if that happens
213            // a user can choose to cast the column to a string to represent the value.
214            match max_scale {
215                Some(scale) => {
216                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
217                    if scale <= DECIMAL128_MAX_SCALE {
218                        (
219                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
220                            "numeric",
221                        )
222                    } else {
223                        anyhow::bail!("Numeric max scale {} out of range", scale)
224                    }
225                }
226                None => (
227                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
228                    "numeric",
229                ),
230            }
231        }
232        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
233        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
234        // SqlScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
235        SqlScalarType::Array(inner) => {
236            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
237            // enforced in the type system, so can change per-value.
238            // We use a struct type with two fields - one containing the array elements as a list
239            // and the other containing the number of dimensions the array represents. Since arrays
240            // are not allowed to be ragged, the number of elements in each dimension is the same.
241            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
242            // TODO: Document these field names in our copy-to-s3 docs
243            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
244            let list_field = Arc::new(field_with_typename(
245                "items",
246                DataType::List(inner_field.into()),
247                false,
248                "array_items",
249            ));
250            let dims_field = Arc::new(field_with_typename(
251                "dimensions",
252                DataType::UInt8,
253                false,
254                "array_dimensions",
255            ));
256            (DataType::Struct([list_field, dims_field].into()), "array")
257        }
258        SqlScalarType::List {
259            element_type,
260            custom_id: _,
261        } => {
262            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
263            // TODO: Document these field names in our copy-to-s3 docs
264            let field = field_with_typename("item", inner_type, true, &inner_name);
265            (DataType::List(field.into()), "list")
266        }
267        SqlScalarType::Map {
268            value_type,
269            custom_id: _,
270        } => {
271            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
272            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
273            let field_names = MapFieldNames::default();
274            let struct_type = DataType::Struct(
275                vec![
276                    Field::new(&field_names.key, DataType::Utf8, false),
277                    field_with_typename(&field_names.value, value_type, true, &value_name),
278                ]
279                .into(),
280            );
281            (
282                DataType::Map(
283                    Field::new(&field_names.entry, struct_type, false).into(),
284                    false,
285                ),
286                "map",
287            )
288        }
289        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
290    };
291    Ok((data_type, extension_name.to_lowercase()))
292}
293
294fn builder_for_datatype(
295    data_type: &DataType,
296    item_capacity: usize,
297    data_capacity: usize,
298) -> Result<ColBuilder, anyhow::Error> {
299    let builder = match &data_type {
300        DataType::Boolean => {
301            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
302        }
303        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
304        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
305        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
306        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
307        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
308        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
309        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
310        DataType::Float32 => {
311            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
312        }
313        DataType::Float64 => {
314            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
315        }
316        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
317        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
318            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
319                item_capacity,
320            ))
321        }
322        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
323            ColBuilder::TimestampMicrosecondBuilder(
324                TimestampMicrosecondBuilder::with_capacity(item_capacity)
325                    .with_timezone_opt(timezone.clone()),
326            )
327        }
328        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
329            item_capacity,
330            data_capacity,
331        )),
332        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
333            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
334        ),
335        DataType::Utf8 => {
336            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
337        }
338        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
339            item_capacity,
340            data_capacity,
341        )),
342        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
343            Decimal128Builder::with_capacity(item_capacity)
344                .with_precision_and_scale(*precision, *scale)?,
345        ),
346        DataType::List(field) => {
347            let inner_col_builder = ArrowColumn::new(
348                field.name().clone(),
349                field.is_nullable(),
350                field.data_type().clone(),
351                typename_from_field(field)?,
352                item_capacity,
353                data_capacity,
354            )?;
355            ColBuilder::ListBuilder(Box::new(
356                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
357            ))
358        }
359        DataType::Struct(fields) => {
360            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
361            for field in fields {
362                let inner_col_builder = ArrowColumn::new(
363                    field.name().clone(),
364                    field.is_nullable(),
365                    field.data_type().clone(),
366                    typename_from_field(field)?,
367                    item_capacity,
368                    data_capacity,
369                )?;
370                field_builders.push(Box::new(inner_col_builder));
371            }
372            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
373        }
374        DataType::Map(entries_field, _sorted) => {
375            let entries_field = entries_field.as_ref();
376            if let DataType::Struct(fields) = entries_field.data_type() {
377                if fields.len() != 2 {
378                    anyhow::bail!(
379                        "Expected map entries to have 2 fields, found {}",
380                        fields.len()
381                    )
382                }
383                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
384                let value_field = &fields[1];
385                let value_builder = ArrowColumn::new(
386                    value_field.name().clone(),
387                    value_field.is_nullable(),
388                    value_field.data_type().clone(),
389                    typename_from_field(value_field)?,
390                    item_capacity,
391                    data_capacity,
392                )?;
393                ColBuilder::MapBuilder(Box::new(
394                    MapBuilder::with_capacity(
395                        Some(MapFieldNames::default()),
396                        key_builder,
397                        value_builder,
398                        item_capacity,
399                    )
400                    .with_values_field(Arc::clone(value_field)),
401                ))
402            } else {
403                anyhow::bail!("Expected map entries to be a struct")
404            }
405        }
406        _ => anyhow::bail!("{:?} unimplemented", data_type),
407    };
408    Ok(builder)
409}
410
411#[derive(Debug)]
412struct ArrowColumn {
413    field_name: String,
414    nullable: bool,
415    data_type: DataType,
416    extension_type_name: String,
417    inner: ColBuilder,
418}
419
420impl From<&ArrowColumn> for Field {
421    fn from(col: &ArrowColumn) -> Self {
422        field_with_typename(
423            &col.field_name,
424            col.data_type.clone(),
425            col.nullable,
426            &col.extension_type_name,
427        )
428    }
429}
430
431/// Create a Field and include the materialize 'type name' as an extension in the metadata.
432fn field_with_typename(
433    name: &str,
434    data_type: DataType,
435    nullable: bool,
436    extension_type_name: &str,
437) -> Field {
438    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
439        "ARROW:extension:name".to_string(),
440        format!("materialize.v1.{}", extension_type_name),
441    )]))
442}
443
444/// Extract the materialize 'type name' from the metadata of a Field.
445fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
446    let metadata = field.metadata();
447    let extension_name = metadata
448        .get("ARROW:extension:name")
449        .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
450    if let Some(name) = extension_name.strip_prefix("materialize.v1") {
451        Ok(name.to_string())
452    } else {
453        anyhow::bail!("Extension name {} does not match expected", extension_name,)
454    }
455}
456
457impl ArrowColumn {
458    fn new(
459        field_name: String,
460        nullable: bool,
461        data_type: DataType,
462        extension_type_name: String,
463        item_capacity: usize,
464        data_capacity: usize,
465    ) -> Result<Self, anyhow::Error> {
466        Ok(Self {
467            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
468            field_name,
469            nullable,
470            data_type,
471            extension_type_name,
472        })
473    }
474}
475
476macro_rules! make_col_builder {
477    ($($x:ident), *) => {
478        /// An enum wrapper for all arrow builder types that we support. Used to store
479        /// a builder for each column and avoid dynamic dispatch and downcasting
480        /// when appending data.
481        #[derive(Debug)]
482        enum ColBuilder {
483            $(
484                $x($x),
485            )*
486            /// ListBuilder & MapBuilder are handled separately than other builder types since they
487            /// uses generic parameters for the inner types, and are boxed to avoid recursive
488            /// type definitions.
489            ListBuilder(Box<ListBuilder<ArrowColumn>>),
490            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
491            /// StructBuilder is handled separately since its `append_null()` method must be
492            /// overriden to both append nulls to all field builders and to append a null to
493            /// the struct. It's unclear why `arrow-rs` implemented this differently than
494            /// ListBuilder and MapBuilder.
495            StructBuilder(StructBuilder),
496        }
497
498        impl ColBuilder {
499            fn append_null(&mut self) {
500                match self {
501                    $(
502                        ColBuilder::$x(builder) => builder.append_null(),
503                    )*
504                    ColBuilder::ListBuilder(builder) => builder.append_null(),
505                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
506                    ColBuilder::StructBuilder(builder) => {
507                        for i in 0..builder.num_fields() {
508                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
509                            field_builder.inner.append_null();
510                        }
511                        builder.append_null();
512                    }
513                }
514            }
515        }
516
517        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
518        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
519        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
520        /// data to the column.
521        impl ArrayBuilder for ArrowColumn {
522            fn len(&self) -> usize {
523                match &self.inner {
524                    $(
525                        ColBuilder::$x(builder) => builder.len(),
526                    )*
527                    ColBuilder::ListBuilder(builder) => builder.len(),
528                    ColBuilder::MapBuilder(builder) => builder.len(),
529                    ColBuilder::StructBuilder(builder) => builder.len(),
530                }
531            }
532            fn finish(&mut self) -> ArrayRef {
533                match &mut self.inner {
534                    $(
535                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
536                    )*
537                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
538                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
539                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
540                }
541            }
542            fn finish_cloned(&self) -> ArrayRef {
543                match &self.inner {
544                    $(
545                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
546                    )*
547                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
548                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
549                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
550                }
551            }
552            fn as_any(&self) -> &(dyn Any + 'static) {
553                self
554            }
555            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
556                self
557            }
558            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
559                self
560            }
561        }
562    };
563}
564
565make_col_builder!(
566    BooleanBuilder,
567    Int16Builder,
568    Int32Builder,
569    Int64Builder,
570    UInt8Builder,
571    UInt16Builder,
572    UInt32Builder,
573    UInt64Builder,
574    Float32Builder,
575    Float64Builder,
576    Date32Builder,
577    Time64MicrosecondBuilder,
578    TimestampMicrosecondBuilder,
579    LargeBinaryBuilder,
580    FixedSizeBinaryBuilder,
581    StringBuilder,
582    LargeStringBuilder,
583    Decimal128Builder
584);
585
586impl ArrowColumn {
587    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
588        match (&mut self.inner, datum) {
589            (s, Datum::Null) => s.append_null(),
590            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
591            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
592            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
593            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
594            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
595            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
596            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
597            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
598            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
599            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
600            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
601            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
602                builder.append_value(d.unix_epoch_days())
603            }
604            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
605                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
606                    * 1_000_000
607                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
608                builder.append_value(micros_since_midnight)
609            }
610            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
611                builder.append_value(ts.and_utc().timestamp_micros())
612            }
613            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
614                builder.append_value(ts.timestamp_micros())
615            }
616            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
617            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
618                builder.append_value(val.as_bytes())?
619            }
620            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
621            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
622                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
623            }
624            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
625            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
626                builder.append_value(ts.into())
627            }
628            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
629                if dec.0.is_special() {
630                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
631                }
632                if let DataType::Decimal128(precision, scale) = self.data_type {
633                    if dec.0.digits() > precision.into() {
634                        anyhow::bail!(
635                            "Decimal value {} out of range for column with precision {}",
636                            dec,
637                            precision
638                        )
639                    }
640
641                    // Get the signed-coefficient represented as an i128, and the exponent such that
642                    // the number should equal coefficient*10^exponent.
643                    let coefficient: i128 = dec.0.coefficient()?;
644                    let exponent = dec.0.exponent();
645
646                    // Convert the value to use the scale of the column (add 0's to align the decimal
647                    // point correctly). This is done by multiplying the coefficient by
648                    // 10^(scale + exponent).
649                    let scale_diff = i32::from(scale) + exponent;
650                    // If the scale_diff is negative, we know there aren't enough digits in our
651                    // column's scale to represent this value.
652                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
653                        anyhow::anyhow!(
654                            "cannot represent decimal value {} in column with scale {}",
655                            dec,
656                            scale
657                        )
658                    })?;
659
660                    let value = coefficient
661                        .checked_mul(10_i128.pow(scale_diff))
662                        .ok_or_else(|| {
663                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
664                        })?;
665
666                    builder.append_value(value)
667                } else {
668                    anyhow::bail!("Expected Decimal128 data type")
669                }
670            }
671            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
672                // We've received an array datum which we know is represented as an Arrow struct
673                // with two fields: the list of elements and the number of dimensions
674                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
675                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
676                    let inner_builder = list_builder.values();
677                    for datum in arr.elements().into_iter() {
678                        inner_builder.append_datum(datum)?;
679                    }
680                    list_builder.append(true);
681                } else {
682                    anyhow::bail!(
683                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
684                        struct_builder
685                    )
686                }
687                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
688                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
689                    dims_builder.append_value(arr.dims().ndims());
690                } else {
691                    anyhow::bail!(
692                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
693                        struct_builder
694                    )
695                }
696                struct_builder.append(true)
697            }
698            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
699                let inner_builder = list_builder.values();
700                for datum in list.into_iter() {
701                    inner_builder.append_datum(datum)?;
702                }
703                list_builder.append(true)
704            }
705            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
706                for (key, value) in map.iter() {
707                    builder.keys().append_value(key);
708                    builder.values().append_datum(value)?;
709                }
710                builder.append(true).unwrap()
711            }
712            (_builder, datum) => {
713                anyhow::bail!("Datum {:?} does not match builder", datum)
714            }
715        }
716        Ok(())
717    }
718}