mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use mz_ore::cast::CastFrom;
26use mz_repr::adt::jsonb::JsonbRef;
27use mz_repr::{Datum, RelationDesc, Row, ScalarType};
28
29pub struct ArrowBuilder {
30    columns: Vec<ArrowColumn>,
31    /// A crude estimate of the size of the data in the builder
32    /// based on the size of the rows added to it.
33    row_size_bytes: usize,
34}
35
36impl ArrowBuilder {
37    /// Helper to validate that a RelationDesc can be encoded into Arrow.
38    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
39        let mut errs = vec![];
40        for (col_name, col_type) in desc.iter() {
41            match scalar_to_arrow_datatype(&col_type.scalar_type) {
42                Ok(_) => {}
43                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
44            }
45        }
46        if !errs.is_empty() {
47            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
48        }
49        Ok(())
50    }
51
52    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
53    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
54    /// the number of values that can be appended to each column before reallocating.
55    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
56    /// Errors if the relation contains an unimplemented type.
57    pub fn new(
58        desc: &RelationDesc,
59        item_capacity: usize,
60        data_capacity: usize,
61    ) -> Result<Self, anyhow::Error> {
62        let mut columns = vec![];
63        let mut errs = vec![];
64        let mut seen_names = BTreeMap::new();
65        for (col_name, col_type) in desc.iter() {
66            let mut col_name = col_name.to_string();
67            // If we allow columns with the same name we encounter two issues:
68            // 1. The arrow crate will accidentally reuse the same buffers for the columns
69            // 2. Many parquet readers will error when trying to read the file metadata
70            // Instead we append a number to the end of the column name for any duplicates.
71            // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
72            seen_names
73                .entry(col_name.clone())
74                .and_modify(|e: &mut u32| {
75                    *e += 1;
76                    col_name += &e.to_string();
77                })
78                .or_insert(1);
79            match scalar_to_arrow_datatype(&col_type.scalar_type) {
80                Ok((data_type, extension_type_name)) => {
81                    columns.push(ArrowColumn::new(
82                        col_name,
83                        col_type.nullable,
84                        data_type,
85                        extension_type_name,
86                        item_capacity,
87                        data_capacity,
88                    )?);
89                }
90                Err(err) => errs.push(err.to_string()),
91            }
92        }
93        if !errs.is_empty() {
94            anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
95        }
96        Ok(Self {
97            columns,
98            row_size_bytes: 0,
99        })
100    }
101
102    /// Returns a copy of the schema of the ArrowBuilder.
103    pub fn schema(&self) -> Schema {
104        Schema::new(
105            self.columns
106                .iter()
107                .map(Into::<Field>::into)
108                .collect::<Vec<_>>(),
109        )
110    }
111
112    /// Converts the ArrowBuilder into an arrow RecordBatch.
113    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
114        let mut arrays = vec![];
115        let mut fields: Vec<Field> = vec![];
116        for mut col in self.columns.into_iter() {
117            arrays.push(col.finish());
118            fields.push((&col).into());
119        }
120        RecordBatch::try_new(Schema::new(fields).into(), arrays)
121    }
122
123    /// Appends a row to the builder.
124    /// Errors if the row contains an unimplemented or out-of-range value.
125    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
126        for (col, datum) in self.columns.iter_mut().zip(row.iter()) {
127            col.append_datum(datum)?;
128        }
129        self.row_size_bytes += row.byte_len();
130        Ok(())
131    }
132
133    pub fn row_size_bytes(&self) -> usize {
134        self.row_size_bytes
135    }
136}
137
138/// Return the appropriate Arrow DataType for the given ScalarType, plus a string
139/// that should be used as part of the Arrow 'Extension Type' name for fields using
140/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
141fn scalar_to_arrow_datatype(scalar_type: &ScalarType) -> Result<(DataType, String), anyhow::Error> {
142    let (data_type, extension_name) = match scalar_type {
143        ScalarType::Bool => (DataType::Boolean, "boolean"),
144        ScalarType::Int16 => (DataType::Int16, "smallint"),
145        ScalarType::Int32 => (DataType::Int32, "integer"),
146        ScalarType::Int64 => (DataType::Int64, "bigint"),
147        ScalarType::UInt16 => (DataType::UInt16, "uint2"),
148        ScalarType::UInt32 => (DataType::UInt32, "uint4"),
149        ScalarType::UInt64 => (DataType::UInt64, "uint8"),
150        ScalarType::Float32 => (DataType::Float32, "real"),
151        ScalarType::Float64 => (DataType::Float64, "double"),
152        ScalarType::Date => (DataType::Date32, "date"),
153        // The resolution of our time and timestamp types is microseconds, which is lucky
154        // since the original parquet 'ConvertedType's support microsecond resolution but not
155        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
156        // but many readers don't support them yet.
157        ScalarType::Time => (
158            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
159            "time",
160        ),
161        ScalarType::Timestamp { .. } => (
162            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
163            "timestamp",
164        ),
165        ScalarType::TimestampTz { .. } => (
166            DataType::Timestamp(
167                arrow::datatypes::TimeUnit::Microsecond,
168                // When appending values we always use UTC timestamps, and setting this to a non-empty
169                // value allows readers to know that tz-aware timestamps can be compared directly.
170                Some("+00:00".into()),
171            ),
172            "timestamptz",
173        ),
174        ScalarType::Bytes => (DataType::LargeBinary, "bytea"),
175        ScalarType::Char { length } => {
176            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
177                (DataType::Utf8, "text")
178            } else {
179                (DataType::LargeUtf8, "text")
180            }
181        }
182        ScalarType::VarChar { max_length } => {
183            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
184                (DataType::Utf8, "text")
185            } else {
186                (DataType::LargeUtf8, "text")
187            }
188        }
189        ScalarType::String => (DataType::LargeUtf8, "text"),
190        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
191        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
192        ScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
193        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
194        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
195        ScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
196        ScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
197        ScalarType::Numeric { max_scale } => {
198            // Materialize allows 39 digits of precision for numeric values, but allows
199            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
200            // the same column. However, Arrow/Parquet only allows static declaration of both
201            // the precision and the scale. To represent the full range of values of a numeric
202            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
203            // type can only support 76 digits, so we are be unable to represent the entire range.
204
205            // Instead of representing the full possible range, we instead try to represent most
206            // values in the most-compatible way. We use a Decimal128 type which can handle 38
207            // digits of precision and has more compatibility with other parquet readers than
208            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
209            // error if we encounter a value that is too large to represent, and if that happens
210            // a user can choose to cast the column to a string to represent the value.
211            match max_scale {
212                Some(scale) => {
213                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
214                    if scale <= DECIMAL128_MAX_SCALE {
215                        (
216                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
217                            "numeric",
218                        )
219                    } else {
220                        anyhow::bail!("Numeric max scale {} out of range", scale)
221                    }
222                }
223                None => (
224                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
225                    "numeric",
226                ),
227            }
228        }
229        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
230        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
231        // ScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
232        ScalarType::Array(inner) => {
233            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
234            // enforced in the type system, so can change per-value.
235            // We use a struct type with two fields - one containing the array elements as a list
236            // and the other containing the number of dimensions the array represents. Since arrays
237            // are not allowed to be ragged, the number of elements in each dimension is the same.
238            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
239            // TODO: Document these field names in our copy-to-s3 docs
240            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
241            let list_field = Arc::new(field_with_typename(
242                "items",
243                DataType::List(inner_field.into()),
244                false,
245                "array_items",
246            ));
247            let dims_field = Arc::new(field_with_typename(
248                "dimensions",
249                DataType::UInt8,
250                false,
251                "array_dimensions",
252            ));
253            (DataType::Struct([list_field, dims_field].into()), "array")
254        }
255        ScalarType::List {
256            element_type,
257            custom_id: _,
258        } => {
259            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
260            // TODO: Document these field names in our copy-to-s3 docs
261            let field = field_with_typename("item", inner_type, true, &inner_name);
262            (DataType::List(field.into()), "list")
263        }
264        ScalarType::Map {
265            value_type,
266            custom_id: _,
267        } => {
268            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
269            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
270            let field_names = MapFieldNames::default();
271            let struct_type = DataType::Struct(
272                vec![
273                    Field::new(&field_names.key, DataType::Utf8, false),
274                    field_with_typename(&field_names.value, value_type, true, &value_name),
275                ]
276                .into(),
277            );
278            (
279                DataType::Map(
280                    Field::new(&field_names.entry, struct_type, false).into(),
281                    false,
282                ),
283                "map",
284            )
285        }
286        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
287    };
288    Ok((data_type, extension_name.to_lowercase()))
289}
290
291fn builder_for_datatype(
292    data_type: &DataType,
293    item_capacity: usize,
294    data_capacity: usize,
295) -> Result<ColBuilder, anyhow::Error> {
296    let builder = match &data_type {
297        DataType::Boolean => {
298            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
299        }
300        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
301        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
302        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
303        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
304        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
305        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
306        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
307        DataType::Float32 => {
308            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
309        }
310        DataType::Float64 => {
311            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
312        }
313        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
314        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
315            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
316                item_capacity,
317            ))
318        }
319        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
320            ColBuilder::TimestampMicrosecondBuilder(
321                TimestampMicrosecondBuilder::with_capacity(item_capacity)
322                    .with_timezone_opt(timezone.clone()),
323            )
324        }
325        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
326            item_capacity,
327            data_capacity,
328        )),
329        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
330            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
331        ),
332        DataType::Utf8 => {
333            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
334        }
335        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
336            item_capacity,
337            data_capacity,
338        )),
339        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
340            Decimal128Builder::with_capacity(item_capacity)
341                .with_precision_and_scale(*precision, *scale)?,
342        ),
343        DataType::List(field) => {
344            let inner_col_builder = ArrowColumn::new(
345                field.name().clone(),
346                field.is_nullable(),
347                field.data_type().clone(),
348                typename_from_field(field)?,
349                item_capacity,
350                data_capacity,
351            )?;
352            ColBuilder::ListBuilder(Box::new(
353                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
354            ))
355        }
356        DataType::Struct(fields) => {
357            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
358            for field in fields {
359                let inner_col_builder = ArrowColumn::new(
360                    field.name().clone(),
361                    field.is_nullable(),
362                    field.data_type().clone(),
363                    typename_from_field(field)?,
364                    item_capacity,
365                    data_capacity,
366                )?;
367                field_builders.push(Box::new(inner_col_builder));
368            }
369            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
370        }
371        DataType::Map(entries_field, _sorted) => {
372            let entries_field = entries_field.as_ref();
373            if let DataType::Struct(fields) = entries_field.data_type() {
374                if fields.len() != 2 {
375                    anyhow::bail!(
376                        "Expected map entries to have 2 fields, found {}",
377                        fields.len()
378                    )
379                }
380                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
381                let value_field = &fields[1];
382                let value_builder = ArrowColumn::new(
383                    value_field.name().clone(),
384                    value_field.is_nullable(),
385                    value_field.data_type().clone(),
386                    typename_from_field(value_field)?,
387                    item_capacity,
388                    data_capacity,
389                )?;
390                ColBuilder::MapBuilder(Box::new(
391                    MapBuilder::with_capacity(
392                        Some(MapFieldNames::default()),
393                        key_builder,
394                        value_builder,
395                        item_capacity,
396                    )
397                    .with_values_field(Arc::clone(value_field)),
398                ))
399            } else {
400                anyhow::bail!("Expected map entries to be a struct")
401            }
402        }
403        _ => anyhow::bail!("{:?} unimplemented", data_type),
404    };
405    Ok(builder)
406}
407
408#[derive(Debug)]
409struct ArrowColumn {
410    field_name: String,
411    nullable: bool,
412    data_type: DataType,
413    extension_type_name: String,
414    inner: ColBuilder,
415}
416
417impl From<&ArrowColumn> for Field {
418    fn from(col: &ArrowColumn) -> Self {
419        field_with_typename(
420            &col.field_name,
421            col.data_type.clone(),
422            col.nullable,
423            &col.extension_type_name,
424        )
425    }
426}
427
428/// Create a Field and include the materialize 'type name' as an extension in the metadata.
429fn field_with_typename(
430    name: &str,
431    data_type: DataType,
432    nullable: bool,
433    extension_type_name: &str,
434) -> Field {
435    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
436        "ARROW:extension:name".to_string(),
437        format!("materialize.v1.{}", extension_type_name),
438    )]))
439}
440
441/// Extract the materialize 'type name' from the metadata of a Field.
442fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
443    let metadata = field.metadata();
444    let extension_name = metadata
445        .get("ARROW:extension:name")
446        .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
447    if let Some(name) = extension_name.strip_prefix("materialize.v1") {
448        Ok(name.to_string())
449    } else {
450        anyhow::bail!("Extension name {} does not match expected", extension_name,)
451    }
452}
453
454impl ArrowColumn {
455    fn new(
456        field_name: String,
457        nullable: bool,
458        data_type: DataType,
459        extension_type_name: String,
460        item_capacity: usize,
461        data_capacity: usize,
462    ) -> Result<Self, anyhow::Error> {
463        Ok(Self {
464            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
465            field_name,
466            nullable,
467            data_type,
468            extension_type_name,
469        })
470    }
471}
472
473macro_rules! make_col_builder {
474    ($($x:ident), *) => {
475        /// An enum wrapper for all arrow builder types that we support. Used to store
476        /// a builder for each column and avoid dynamic dispatch and downcasting
477        /// when appending data.
478        #[derive(Debug)]
479        enum ColBuilder {
480            $(
481                $x($x),
482            )*
483            /// ListBuilder & MapBuilder are handled separately than other builder types since they
484            /// uses generic parameters for the inner types, and are boxed to avoid recursive
485            /// type definitions.
486            ListBuilder(Box<ListBuilder<ArrowColumn>>),
487            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
488            /// StructBuilder is handled separately since its `append_null()` method must be
489            /// overriden to both append nulls to all field builders and to append a null to
490            /// the struct. It's unclear why `arrow-rs` implemented this differently than
491            /// ListBuilder and MapBuilder.
492            StructBuilder(StructBuilder),
493        }
494
495        impl ColBuilder {
496            fn append_null(&mut self) {
497                match self {
498                    $(
499                        ColBuilder::$x(builder) => builder.append_null(),
500                    )*
501                    ColBuilder::ListBuilder(builder) => builder.append_null(),
502                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
503                    ColBuilder::StructBuilder(builder) => {
504                        for i in 0..builder.num_fields() {
505                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
506                            field_builder.inner.append_null();
507                        }
508                        builder.append_null();
509                    }
510                }
511            }
512        }
513
514        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
515        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
516        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
517        /// data to the column.
518        impl ArrayBuilder for ArrowColumn {
519            fn len(&self) -> usize {
520                match &self.inner {
521                    $(
522                        ColBuilder::$x(builder) => builder.len(),
523                    )*
524                    ColBuilder::ListBuilder(builder) => builder.len(),
525                    ColBuilder::MapBuilder(builder) => builder.len(),
526                    ColBuilder::StructBuilder(builder) => builder.len(),
527                }
528            }
529            fn finish(&mut self) -> ArrayRef {
530                match &mut self.inner {
531                    $(
532                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
533                    )*
534                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
535                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
536                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
537                }
538            }
539            fn finish_cloned(&self) -> ArrayRef {
540                match &self.inner {
541                    $(
542                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
543                    )*
544                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
545                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
546                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
547                }
548            }
549            fn as_any(&self) -> &(dyn Any + 'static) {
550                self
551            }
552            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
553                self
554            }
555            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
556                self
557            }
558        }
559    };
560}
561
562make_col_builder!(
563    BooleanBuilder,
564    Int16Builder,
565    Int32Builder,
566    Int64Builder,
567    UInt8Builder,
568    UInt16Builder,
569    UInt32Builder,
570    UInt64Builder,
571    Float32Builder,
572    Float64Builder,
573    Date32Builder,
574    Time64MicrosecondBuilder,
575    TimestampMicrosecondBuilder,
576    LargeBinaryBuilder,
577    FixedSizeBinaryBuilder,
578    StringBuilder,
579    LargeStringBuilder,
580    Decimal128Builder
581);
582
583impl ArrowColumn {
584    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
585        match (&mut self.inner, datum) {
586            (s, Datum::Null) => s.append_null(),
587            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
588            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
589            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
590            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
591            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
592            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
593            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
594            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
595            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
596            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
597            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
598            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
599                builder.append_value(d.unix_epoch_days())
600            }
601            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
602                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
603                    * 1_000_000
604                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
605                builder.append_value(micros_since_midnight)
606            }
607            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
608                builder.append_value(ts.and_utc().timestamp_micros())
609            }
610            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
611                builder.append_value(ts.timestamp_micros())
612            }
613            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
614            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
615                builder.append_value(val.as_bytes())?
616            }
617            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
618            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
619                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
620            }
621            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
622            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
623                builder.append_value(ts.into())
624            }
625            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
626                if dec.0.is_special() {
627                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
628                }
629                if let DataType::Decimal128(precision, scale) = self.data_type {
630                    if dec.0.digits() > precision.into() {
631                        anyhow::bail!(
632                            "Decimal value {} out of range for column with precision {}",
633                            dec,
634                            precision
635                        )
636                    }
637
638                    // Get the signed-coefficient represented as an i128, and the exponent such that
639                    // the number should equal coefficient*10^exponent.
640                    let coefficient: i128 = dec.0.coefficient()?;
641                    let exponent = dec.0.exponent();
642
643                    // Convert the value to use the scale of the column (add 0's to align the decimal
644                    // point correctly). This is done by multiplying the coefficient by
645                    // 10^(scale + exponent).
646                    let scale_diff = i32::from(scale) + exponent;
647                    // If the scale_diff is negative, we know there aren't enough digits in our
648                    // column's scale to represent this value.
649                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
650                        anyhow::anyhow!(
651                            "cannot represent decimal value {} in column with scale {}",
652                            dec,
653                            scale
654                        )
655                    })?;
656
657                    let value = coefficient
658                        .checked_mul(10_i128.pow(scale_diff))
659                        .ok_or_else(|| {
660                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
661                        })?;
662
663                    builder.append_value(value)
664                } else {
665                    anyhow::bail!("Expected Decimal128 data type")
666                }
667            }
668            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
669                // We've received an array datum which we know is represented as an Arrow struct
670                // with two fields: the list of elements and the number of dimensions
671                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
672                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
673                    let inner_builder = list_builder.values();
674                    for datum in arr.elements().into_iter() {
675                        inner_builder.append_datum(datum)?;
676                    }
677                    list_builder.append(true);
678                } else {
679                    anyhow::bail!(
680                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
681                        struct_builder
682                    )
683                }
684                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
685                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
686                    dims_builder.append_value(arr.dims().ndims());
687                } else {
688                    anyhow::bail!(
689                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
690                        struct_builder
691                    )
692                }
693                struct_builder.append(true)
694            }
695            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
696                let inner_builder = list_builder.values();
697                for datum in list.into_iter() {
698                    inner_builder.append_datum(datum)?;
699                }
700                list_builder.append(true)
701            }
702            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
703                for (key, value) in map.iter() {
704                    builder.keys().append_value(key);
705                    builder.values().append_datum(value)?;
706                }
707                builder.append(true).unwrap()
708            }
709            (_builder, datum) => {
710                anyhow::bail!("Datum {:?} does not match builder", datum)
711            }
712        }
713        Ok(())
714    }
715}