mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, ScalarType};
29
30pub struct ArrowBuilder {
31    columns: Vec<ArrowColumn>,
32    /// A crude estimate of the size of the data in the builder
33    /// based on the size of the rows added to it.
34    row_size_bytes: usize,
35}
36
37impl ArrowBuilder {
38    /// Helper to validate that a RelationDesc can be encoded into Arrow.
39    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
40        let mut errs = vec![];
41        for (col_name, col_type) in desc.iter() {
42            match scalar_to_arrow_datatype(&col_type.scalar_type) {
43                Ok(_) => {}
44                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
45            }
46        }
47        if !errs.is_empty() {
48            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
49        }
50        Ok(())
51    }
52
53    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
54    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
55    /// the number of values that can be appended to each column before reallocating.
56    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
57    /// Errors if the relation contains an unimplemented type.
58    pub fn new(
59        desc: &RelationDesc,
60        item_capacity: usize,
61        data_capacity: usize,
62    ) -> Result<Self, anyhow::Error> {
63        let mut columns = vec![];
64        let mut errs = vec![];
65        let mut seen_names = BTreeMap::new();
66        for (col_name, col_type) in desc.iter() {
67            let mut col_name = col_name.to_string();
68            // If we allow columns with the same name we encounter two issues:
69            // 1. The arrow crate will accidentally reuse the same buffers for the columns
70            // 2. Many parquet readers will error when trying to read the file metadata
71            // Instead we append a number to the end of the column name for any duplicates.
72            // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
73            seen_names
74                .entry(col_name.clone())
75                .and_modify(|e: &mut u32| {
76                    *e += 1;
77                    col_name += &e.to_string();
78                })
79                .or_insert(1);
80            match scalar_to_arrow_datatype(&col_type.scalar_type) {
81                Ok((data_type, extension_type_name)) => {
82                    columns.push(ArrowColumn::new(
83                        col_name,
84                        col_type.nullable,
85                        data_type,
86                        extension_type_name,
87                        item_capacity,
88                        data_capacity,
89                    )?);
90                }
91                Err(err) => errs.push(err.to_string()),
92            }
93        }
94        if !errs.is_empty() {
95            anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
96        }
97        Ok(Self {
98            columns,
99            row_size_bytes: 0,
100        })
101    }
102
103    /// Returns a copy of the schema of the ArrowBuilder.
104    pub fn schema(&self) -> Schema {
105        Schema::new(
106            self.columns
107                .iter()
108                .map(Into::<Field>::into)
109                .collect::<Vec<_>>(),
110        )
111    }
112
113    /// Converts the ArrowBuilder into an arrow RecordBatch.
114    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
115        let mut arrays = vec![];
116        let mut fields: Vec<Field> = vec![];
117        for mut col in self.columns.into_iter() {
118            arrays.push(col.finish());
119            fields.push((&col).into());
120        }
121        RecordBatch::try_new(Schema::new(fields).into(), arrays)
122    }
123
124    /// Appends a row to the builder.
125    /// Errors if the row contains an unimplemented or out-of-range value.
126    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
127        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
128            col.append_datum(datum)?;
129        }
130        self.row_size_bytes += row.byte_len();
131        Ok(())
132    }
133
134    pub fn row_size_bytes(&self) -> usize {
135        self.row_size_bytes
136    }
137}
138
139/// Return the appropriate Arrow DataType for the given ScalarType, plus a string
140/// that should be used as part of the Arrow 'Extension Type' name for fields using
141/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
142fn scalar_to_arrow_datatype(scalar_type: &ScalarType) -> Result<(DataType, String), anyhow::Error> {
143    let (data_type, extension_name) = match scalar_type {
144        ScalarType::Bool => (DataType::Boolean, "boolean"),
145        ScalarType::Int16 => (DataType::Int16, "smallint"),
146        ScalarType::Int32 => (DataType::Int32, "integer"),
147        ScalarType::Int64 => (DataType::Int64, "bigint"),
148        ScalarType::UInt16 => (DataType::UInt16, "uint2"),
149        ScalarType::UInt32 => (DataType::UInt32, "uint4"),
150        ScalarType::UInt64 => (DataType::UInt64, "uint8"),
151        ScalarType::Float32 => (DataType::Float32, "real"),
152        ScalarType::Float64 => (DataType::Float64, "double"),
153        ScalarType::Date => (DataType::Date32, "date"),
154        // The resolution of our time and timestamp types is microseconds, which is lucky
155        // since the original parquet 'ConvertedType's support microsecond resolution but not
156        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
157        // but many readers don't support them yet.
158        ScalarType::Time => (
159            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
160            "time",
161        ),
162        ScalarType::Timestamp { .. } => (
163            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
164            "timestamp",
165        ),
166        ScalarType::TimestampTz { .. } => (
167            DataType::Timestamp(
168                arrow::datatypes::TimeUnit::Microsecond,
169                // When appending values we always use UTC timestamps, and setting this to a non-empty
170                // value allows readers to know that tz-aware timestamps can be compared directly.
171                Some("+00:00".into()),
172            ),
173            "timestamptz",
174        ),
175        ScalarType::Bytes => (DataType::LargeBinary, "bytea"),
176        ScalarType::Char { length } => {
177            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
178                (DataType::Utf8, "text")
179            } else {
180                (DataType::LargeUtf8, "text")
181            }
182        }
183        ScalarType::VarChar { max_length } => {
184            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
185                (DataType::Utf8, "text")
186            } else {
187                (DataType::LargeUtf8, "text")
188            }
189        }
190        ScalarType::String => (DataType::LargeUtf8, "text"),
191        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
192        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
193        ScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
194        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
195        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
196        ScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
197        ScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
198        ScalarType::Numeric { max_scale } => {
199            // Materialize allows 39 digits of precision for numeric values, but allows
200            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
201            // the same column. However, Arrow/Parquet only allows static declaration of both
202            // the precision and the scale. To represent the full range of values of a numeric
203            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
204            // type can only support 76 digits, so we are be unable to represent the entire range.
205
206            // Instead of representing the full possible range, we instead try to represent most
207            // values in the most-compatible way. We use a Decimal128 type which can handle 38
208            // digits of precision and has more compatibility with other parquet readers than
209            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
210            // error if we encounter a value that is too large to represent, and if that happens
211            // a user can choose to cast the column to a string to represent the value.
212            match max_scale {
213                Some(scale) => {
214                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
215                    if scale <= DECIMAL128_MAX_SCALE {
216                        (
217                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
218                            "numeric",
219                        )
220                    } else {
221                        anyhow::bail!("Numeric max scale {} out of range", scale)
222                    }
223                }
224                None => (
225                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
226                    "numeric",
227                ),
228            }
229        }
230        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
231        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
232        // ScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
233        ScalarType::Array(inner) => {
234            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
235            // enforced in the type system, so can change per-value.
236            // We use a struct type with two fields - one containing the array elements as a list
237            // and the other containing the number of dimensions the array represents. Since arrays
238            // are not allowed to be ragged, the number of elements in each dimension is the same.
239            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
240            // TODO: Document these field names in our copy-to-s3 docs
241            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
242            let list_field = Arc::new(field_with_typename(
243                "items",
244                DataType::List(inner_field.into()),
245                false,
246                "array_items",
247            ));
248            let dims_field = Arc::new(field_with_typename(
249                "dimensions",
250                DataType::UInt8,
251                false,
252                "array_dimensions",
253            ));
254            (DataType::Struct([list_field, dims_field].into()), "array")
255        }
256        ScalarType::List {
257            element_type,
258            custom_id: _,
259        } => {
260            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
261            // TODO: Document these field names in our copy-to-s3 docs
262            let field = field_with_typename("item", inner_type, true, &inner_name);
263            (DataType::List(field.into()), "list")
264        }
265        ScalarType::Map {
266            value_type,
267            custom_id: _,
268        } => {
269            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
270            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
271            let field_names = MapFieldNames::default();
272            let struct_type = DataType::Struct(
273                vec![
274                    Field::new(&field_names.key, DataType::Utf8, false),
275                    field_with_typename(&field_names.value, value_type, true, &value_name),
276                ]
277                .into(),
278            );
279            (
280                DataType::Map(
281                    Field::new(&field_names.entry, struct_type, false).into(),
282                    false,
283                ),
284                "map",
285            )
286        }
287        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
288    };
289    Ok((data_type, extension_name.to_lowercase()))
290}
291
292fn builder_for_datatype(
293    data_type: &DataType,
294    item_capacity: usize,
295    data_capacity: usize,
296) -> Result<ColBuilder, anyhow::Error> {
297    let builder = match &data_type {
298        DataType::Boolean => {
299            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
300        }
301        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
302        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
303        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
304        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
305        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
306        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
307        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
308        DataType::Float32 => {
309            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
310        }
311        DataType::Float64 => {
312            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
313        }
314        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
315        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
316            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
317                item_capacity,
318            ))
319        }
320        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
321            ColBuilder::TimestampMicrosecondBuilder(
322                TimestampMicrosecondBuilder::with_capacity(item_capacity)
323                    .with_timezone_opt(timezone.clone()),
324            )
325        }
326        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
327            item_capacity,
328            data_capacity,
329        )),
330        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
331            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
332        ),
333        DataType::Utf8 => {
334            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
335        }
336        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
337            item_capacity,
338            data_capacity,
339        )),
340        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
341            Decimal128Builder::with_capacity(item_capacity)
342                .with_precision_and_scale(*precision, *scale)?,
343        ),
344        DataType::List(field) => {
345            let inner_col_builder = ArrowColumn::new(
346                field.name().clone(),
347                field.is_nullable(),
348                field.data_type().clone(),
349                typename_from_field(field)?,
350                item_capacity,
351                data_capacity,
352            )?;
353            ColBuilder::ListBuilder(Box::new(
354                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
355            ))
356        }
357        DataType::Struct(fields) => {
358            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
359            for field in fields {
360                let inner_col_builder = ArrowColumn::new(
361                    field.name().clone(),
362                    field.is_nullable(),
363                    field.data_type().clone(),
364                    typename_from_field(field)?,
365                    item_capacity,
366                    data_capacity,
367                )?;
368                field_builders.push(Box::new(inner_col_builder));
369            }
370            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
371        }
372        DataType::Map(entries_field, _sorted) => {
373            let entries_field = entries_field.as_ref();
374            if let DataType::Struct(fields) = entries_field.data_type() {
375                if fields.len() != 2 {
376                    anyhow::bail!(
377                        "Expected map entries to have 2 fields, found {}",
378                        fields.len()
379                    )
380                }
381                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
382                let value_field = &fields[1];
383                let value_builder = ArrowColumn::new(
384                    value_field.name().clone(),
385                    value_field.is_nullable(),
386                    value_field.data_type().clone(),
387                    typename_from_field(value_field)?,
388                    item_capacity,
389                    data_capacity,
390                )?;
391                ColBuilder::MapBuilder(Box::new(
392                    MapBuilder::with_capacity(
393                        Some(MapFieldNames::default()),
394                        key_builder,
395                        value_builder,
396                        item_capacity,
397                    )
398                    .with_values_field(Arc::clone(value_field)),
399                ))
400            } else {
401                anyhow::bail!("Expected map entries to be a struct")
402            }
403        }
404        _ => anyhow::bail!("{:?} unimplemented", data_type),
405    };
406    Ok(builder)
407}
408
409#[derive(Debug)]
410struct ArrowColumn {
411    field_name: String,
412    nullable: bool,
413    data_type: DataType,
414    extension_type_name: String,
415    inner: ColBuilder,
416}
417
418impl From<&ArrowColumn> for Field {
419    fn from(col: &ArrowColumn) -> Self {
420        field_with_typename(
421            &col.field_name,
422            col.data_type.clone(),
423            col.nullable,
424            &col.extension_type_name,
425        )
426    }
427}
428
429/// Create a Field and include the materialize 'type name' as an extension in the metadata.
430fn field_with_typename(
431    name: &str,
432    data_type: DataType,
433    nullable: bool,
434    extension_type_name: &str,
435) -> Field {
436    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
437        "ARROW:extension:name".to_string(),
438        format!("materialize.v1.{}", extension_type_name),
439    )]))
440}
441
442/// Extract the materialize 'type name' from the metadata of a Field.
443fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
444    let metadata = field.metadata();
445    let extension_name = metadata
446        .get("ARROW:extension:name")
447        .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
448    if let Some(name) = extension_name.strip_prefix("materialize.v1") {
449        Ok(name.to_string())
450    } else {
451        anyhow::bail!("Extension name {} does not match expected", extension_name,)
452    }
453}
454
455impl ArrowColumn {
456    fn new(
457        field_name: String,
458        nullable: bool,
459        data_type: DataType,
460        extension_type_name: String,
461        item_capacity: usize,
462        data_capacity: usize,
463    ) -> Result<Self, anyhow::Error> {
464        Ok(Self {
465            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
466            field_name,
467            nullable,
468            data_type,
469            extension_type_name,
470        })
471    }
472}
473
474macro_rules! make_col_builder {
475    ($($x:ident), *) => {
476        /// An enum wrapper for all arrow builder types that we support. Used to store
477        /// a builder for each column and avoid dynamic dispatch and downcasting
478        /// when appending data.
479        #[derive(Debug)]
480        enum ColBuilder {
481            $(
482                $x($x),
483            )*
484            /// ListBuilder & MapBuilder are handled separately than other builder types since they
485            /// uses generic parameters for the inner types, and are boxed to avoid recursive
486            /// type definitions.
487            ListBuilder(Box<ListBuilder<ArrowColumn>>),
488            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
489            /// StructBuilder is handled separately since its `append_null()` method must be
490            /// overriden to both append nulls to all field builders and to append a null to
491            /// the struct. It's unclear why `arrow-rs` implemented this differently than
492            /// ListBuilder and MapBuilder.
493            StructBuilder(StructBuilder),
494        }
495
496        impl ColBuilder {
497            fn append_null(&mut self) {
498                match self {
499                    $(
500                        ColBuilder::$x(builder) => builder.append_null(),
501                    )*
502                    ColBuilder::ListBuilder(builder) => builder.append_null(),
503                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
504                    ColBuilder::StructBuilder(builder) => {
505                        for i in 0..builder.num_fields() {
506                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
507                            field_builder.inner.append_null();
508                        }
509                        builder.append_null();
510                    }
511                }
512            }
513        }
514
515        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
516        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
517        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
518        /// data to the column.
519        impl ArrayBuilder for ArrowColumn {
520            fn len(&self) -> usize {
521                match &self.inner {
522                    $(
523                        ColBuilder::$x(builder) => builder.len(),
524                    )*
525                    ColBuilder::ListBuilder(builder) => builder.len(),
526                    ColBuilder::MapBuilder(builder) => builder.len(),
527                    ColBuilder::StructBuilder(builder) => builder.len(),
528                }
529            }
530            fn finish(&mut self) -> ArrayRef {
531                match &mut self.inner {
532                    $(
533                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
534                    )*
535                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
536                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
537                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
538                }
539            }
540            fn finish_cloned(&self) -> ArrayRef {
541                match &self.inner {
542                    $(
543                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
544                    )*
545                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
546                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
547                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
548                }
549            }
550            fn as_any(&self) -> &(dyn Any + 'static) {
551                self
552            }
553            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
554                self
555            }
556            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
557                self
558            }
559        }
560    };
561}
562
563make_col_builder!(
564    BooleanBuilder,
565    Int16Builder,
566    Int32Builder,
567    Int64Builder,
568    UInt8Builder,
569    UInt16Builder,
570    UInt32Builder,
571    UInt64Builder,
572    Float32Builder,
573    Float64Builder,
574    Date32Builder,
575    Time64MicrosecondBuilder,
576    TimestampMicrosecondBuilder,
577    LargeBinaryBuilder,
578    FixedSizeBinaryBuilder,
579    StringBuilder,
580    LargeStringBuilder,
581    Decimal128Builder
582);
583
584impl ArrowColumn {
585    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
586        match (&mut self.inner, datum) {
587            (s, Datum::Null) => s.append_null(),
588            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
589            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
590            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
591            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
592            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
593            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
594            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
595            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
596            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
597            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
598            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
599            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
600                builder.append_value(d.unix_epoch_days())
601            }
602            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
603                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
604                    * 1_000_000
605                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
606                builder.append_value(micros_since_midnight)
607            }
608            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
609                builder.append_value(ts.and_utc().timestamp_micros())
610            }
611            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
612                builder.append_value(ts.timestamp_micros())
613            }
614            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
615            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
616                builder.append_value(val.as_bytes())?
617            }
618            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
619            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
620                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
621            }
622            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
623            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
624                builder.append_value(ts.into())
625            }
626            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
627                if dec.0.is_special() {
628                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
629                }
630                if let DataType::Decimal128(precision, scale) = self.data_type {
631                    if dec.0.digits() > precision.into() {
632                        anyhow::bail!(
633                            "Decimal value {} out of range for column with precision {}",
634                            dec,
635                            precision
636                        )
637                    }
638
639                    // Get the signed-coefficient represented as an i128, and the exponent such that
640                    // the number should equal coefficient*10^exponent.
641                    let coefficient: i128 = dec.0.coefficient()?;
642                    let exponent = dec.0.exponent();
643
644                    // Convert the value to use the scale of the column (add 0's to align the decimal
645                    // point correctly). This is done by multiplying the coefficient by
646                    // 10^(scale + exponent).
647                    let scale_diff = i32::from(scale) + exponent;
648                    // If the scale_diff is negative, we know there aren't enough digits in our
649                    // column's scale to represent this value.
650                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
651                        anyhow::anyhow!(
652                            "cannot represent decimal value {} in column with scale {}",
653                            dec,
654                            scale
655                        )
656                    })?;
657
658                    let value = coefficient
659                        .checked_mul(10_i128.pow(scale_diff))
660                        .ok_or_else(|| {
661                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
662                        })?;
663
664                    builder.append_value(value)
665                } else {
666                    anyhow::bail!("Expected Decimal128 data type")
667                }
668            }
669            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
670                // We've received an array datum which we know is represented as an Arrow struct
671                // with two fields: the list of elements and the number of dimensions
672                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
673                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
674                    let inner_builder = list_builder.values();
675                    for datum in arr.elements().into_iter() {
676                        inner_builder.append_datum(datum)?;
677                    }
678                    list_builder.append(true);
679                } else {
680                    anyhow::bail!(
681                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
682                        struct_builder
683                    )
684                }
685                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
686                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
687                    dims_builder.append_value(arr.dims().ndims());
688                } else {
689                    anyhow::bail!(
690                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
691                        struct_builder
692                    )
693                }
694                struct_builder.append(true)
695            }
696            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
697                let inner_builder = list_builder.values();
698                for datum in list.into_iter() {
699                    inner_builder.append_datum(datum)?;
700                }
701                list_builder.append(true)
702            }
703            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
704                for (key, value) in map.iter() {
705                    builder.keys().append_value(key);
706                    builder.values().append_datum(value)?;
707                }
708                builder.append(true).unwrap()
709            }
710            (_builder, datum) => {
711                anyhow::bail!("Datum {:?} does not match builder", datum)
712            }
713        }
714        Ok(())
715    }
716}