Skip to main content

mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34    columns: Vec<ArrowColumn>,
35    /// A crude estimate of the size of the data in the builder
36    /// based on the size of the rows added to it.
37    row_size_bytes: usize,
38    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
39    original_schema: Option<Arc<Schema>>,
40}
41
42/// Converts a RelationDesc to an Arrow Schema.
43pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44    let mut fields = vec![];
45    let mut errs = vec![];
46    let mut seen_names = BTreeSet::new();
47    let mut all_names = BTreeSet::new();
48    for col_name in desc.iter_names() {
49        all_names.insert(col_name.to_string());
50    }
51    for (col_name, col_type) in desc.iter() {
52        let mut col_name = col_name.to_string();
53        // If we allow columns with the same name we encounter two issues:
54        // 1. The arrow crate will accidentally reuse the same buffers for the columns
55        // 2. Many parquet readers will error when trying to read the file metadata
56        // Instead we append a number to the end of the column name for any duplicates.
57        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
58
59        if seen_names.contains(&col_name) {
60            let mut new_col_name = col_name.clone();
61            let mut e = 2;
62            while all_names.contains(&new_col_name) {
63                new_col_name = format!("{}{}", col_name, e);
64                e += 1;
65            }
66            col_name = new_col_name;
67        }
68
69        seen_names.insert(col_name.clone());
70        all_names.insert(col_name.clone());
71        match scalar_to_arrow_datatype(&col_type.scalar_type) {
72            Ok((data_type, extension_type_name)) => {
73                fields.push(field_with_typename(
74                    &col_name,
75                    data_type,
76                    col_type.nullable,
77                    &extension_type_name,
78                ));
79            }
80            Err(err) => errs.push(err.to_string()),
81        }
82    }
83    if !errs.is_empty() {
84        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
85    }
86    Ok(Schema::new(fields))
87}
88
89impl ArrowBuilder {
90    /// Helper to validate that a RelationDesc can be encoded into Arrow.
91    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
92        let mut errs = vec![];
93        for (col_name, col_type) in desc.iter() {
94            match scalar_to_arrow_datatype(&col_type.scalar_type) {
95                Ok(_) => {}
96                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
97            }
98        }
99        if !errs.is_empty() {
100            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
101        }
102        Ok(())
103    }
104
105    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
106    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
107    /// the number of values that can be appended to each column before reallocating.
108    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
109    /// Errors if the relation contains an unimplemented type.
110    pub fn new(
111        desc: &RelationDesc,
112        item_capacity: usize,
113        data_capacity: usize,
114    ) -> Result<Self, anyhow::Error> {
115        let schema = desc_to_schema(desc)?;
116        let mut columns = vec![];
117        for field in schema.fields() {
118            columns.push(ArrowColumn::new(
119                field.name().clone(),
120                field.is_nullable(),
121                field.data_type().clone(),
122                typename_from_field(field)?,
123                item_capacity,
124                data_capacity,
125            )?);
126        }
127        Ok(Self {
128            columns,
129            row_size_bytes: 0,
130            original_schema: None,
131        })
132    }
133
134    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
135    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
136    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
137    /// the number of values that can be appended to each column before reallocating.
138    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
139    /// Errors if the schema contains an unimplemented type.
140    pub fn new_with_schema(
141        schema: Arc<Schema>,
142        item_capacity: usize,
143        data_capacity: usize,
144    ) -> Result<Self, anyhow::Error> {
145        let mut columns = vec![];
146        for field in schema.fields() {
147            columns.push(ArrowColumn::new(
148                field.name().clone(),
149                field.is_nullable(),
150                field.data_type().clone(),
151                typename_from_field(field)?,
152                item_capacity,
153                data_capacity,
154            )?);
155        }
156        Ok(Self {
157            columns,
158            row_size_bytes: 0,
159            original_schema: Some(schema),
160        })
161    }
162
163    /// Returns a copy of the schema of the ArrowBuilder.
164    pub fn schema(&self) -> Schema {
165        Schema::new(
166            self.columns
167                .iter()
168                .map(Into::<Field>::into)
169                .collect::<Vec<_>>(),
170        )
171    }
172
173    /// Converts the ArrowBuilder into an arrow RecordBatch.
174    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
175        let mut arrays = vec![];
176        let mut fields: Vec<Field> = vec![];
177        for mut col in self.columns.into_iter() {
178            arrays.push(col.finish());
179            fields.push((&col).into());
180        }
181
182        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
183        let schema = if let Some(original_schema) = self.original_schema {
184            original_schema
185        } else {
186            Arc::new(Schema::new(fields))
187        };
188
189        RecordBatch::try_new(schema, arrays)
190    }
191
192    /// Appends a row to the builder.
193    /// Errors if the row contains an unimplemented or out-of-range value.
194    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
195        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
196            col.append_datum(datum)?;
197        }
198        self.row_size_bytes += row.byte_len();
199        Ok(())
200    }
201
202    pub fn row_size_bytes(&self) -> usize {
203        self.row_size_bytes
204    }
205}
206
207/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
208/// that should be used as part of the Arrow 'Extension Type' name for fields using
209/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
210fn scalar_to_arrow_datatype(
211    scalar_type: &SqlScalarType,
212) -> Result<(DataType, String), anyhow::Error> {
213    let (data_type, extension_name) = match scalar_type {
214        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
215        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
216        SqlScalarType::Int32 => (DataType::Int32, "integer"),
217        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
218        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
219        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
220        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
221        SqlScalarType::Float32 => (DataType::Float32, "real"),
222        SqlScalarType::Float64 => (DataType::Float64, "double"),
223        SqlScalarType::Date => (DataType::Date32, "date"),
224        // The resolution of our time and timestamp types is microseconds, which is lucky
225        // since the original parquet 'ConvertedType's support microsecond resolution but not
226        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
227        // but many readers don't support them yet.
228        SqlScalarType::Time => (
229            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
230            "time",
231        ),
232        SqlScalarType::Timestamp { .. } => (
233            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
234            "timestamp",
235        ),
236        SqlScalarType::TimestampTz { .. } => (
237            DataType::Timestamp(
238                arrow::datatypes::TimeUnit::Microsecond,
239                // When appending values we always use UTC timestamps, and setting this to a non-empty
240                // value allows readers to know that tz-aware timestamps can be compared directly.
241                Some("+00:00".into()),
242            ),
243            "timestamptz",
244        ),
245        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
246        SqlScalarType::Char { length } => {
247            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
248                (DataType::Utf8, "text")
249            } else {
250                (DataType::LargeUtf8, "text")
251            }
252        }
253        SqlScalarType::VarChar { max_length } => {
254            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
255                (DataType::Utf8, "text")
256            } else {
257                (DataType::LargeUtf8, "text")
258            }
259        }
260        SqlScalarType::String => (DataType::LargeUtf8, "text"),
261        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
262        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
263        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
264        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
265        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
266        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
267        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
268        SqlScalarType::Numeric { max_scale } => {
269            // Materialize allows 39 digits of precision for numeric values, but allows
270            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
271            // the same column. However, Arrow/Parquet only allows static declaration of both
272            // the precision and the scale. To represent the full range of values of a numeric
273            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
274            // type can only support 76 digits, so we are be unable to represent the entire range.
275
276            // Instead of representing the full possible range, we instead try to represent most
277            // values in the most-compatible way. We use a Decimal128 type which can handle 38
278            // digits of precision and has more compatibility with other parquet readers than
279            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
280            // error if we encounter a value that is too large to represent, and if that happens
281            // a user can choose to cast the column to a string to represent the value.
282            match max_scale {
283                Some(scale) => {
284                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
285                    if scale <= DECIMAL128_MAX_SCALE {
286                        (
287                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
288                            "numeric",
289                        )
290                    } else {
291                        anyhow::bail!("Numeric max scale {} out of range", scale)
292                    }
293                }
294                None => (
295                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
296                    "numeric",
297                ),
298            }
299        }
300        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
301        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
302        // SqlScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
303        SqlScalarType::Array(inner) => {
304            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
305            // enforced in the type system, so can change per-value.
306            // We use a struct type with two fields - one containing the array elements as a list
307            // and the other containing the number of dimensions the array represents. Since arrays
308            // are not allowed to be ragged, the number of elements in each dimension is the same.
309            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
310            // TODO: Document these field names in our copy-to-s3 docs
311            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
312            let list_field = Arc::new(field_with_typename(
313                "items",
314                DataType::List(inner_field.into()),
315                false,
316                "array_items",
317            ));
318            let dims_field = Arc::new(field_with_typename(
319                "dimensions",
320                DataType::UInt8,
321                false,
322                "array_dimensions",
323            ));
324            (DataType::Struct([list_field, dims_field].into()), "array")
325        }
326        SqlScalarType::List {
327            element_type,
328            custom_id: _,
329        } => {
330            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
331            // TODO: Document these field names in our copy-to-s3 docs
332            let field = field_with_typename("item", inner_type, true, &inner_name);
333            (DataType::List(field.into()), "list")
334        }
335        SqlScalarType::Map {
336            value_type,
337            custom_id: _,
338        } => {
339            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
340            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
341            let field_names = MapFieldNames::default();
342            let struct_type = DataType::Struct(
343                vec![
344                    Field::new(&field_names.key, DataType::Utf8, false),
345                    field_with_typename(&field_names.value, value_type, true, &value_name),
346                ]
347                .into(),
348            );
349            (
350                DataType::Map(
351                    Field::new(&field_names.entry, struct_type, false).into(),
352                    false,
353                ),
354                "map",
355            )
356        }
357        SqlScalarType::Record {
358            fields,
359            custom_id: _,
360        } => {
361            // Records are represented as Arrow Structs with one field per record field.
362            // At runtime, records are stored as Datum::List with field values in order.
363            let mut arrow_fields = Vec::with_capacity(fields.len());
364            for (field_name, field_type) in fields.iter() {
365                let (inner_type, inner_extension_name) =
366                    scalar_to_arrow_datatype(&field_type.scalar_type)?;
367                let field = field_with_typename(
368                    field_name.as_str(),
369                    inner_type,
370                    field_type.nullable,
371                    &inner_extension_name,
372                );
373                arrow_fields.push(Arc::new(field));
374            }
375            (DataType::Struct(arrow_fields.into()), "record")
376        }
377        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
378    };
379    Ok((data_type, extension_name.to_lowercase()))
380}
381
382fn builder_for_datatype(
383    data_type: &DataType,
384    item_capacity: usize,
385    data_capacity: usize,
386) -> Result<ColBuilder, anyhow::Error> {
387    let builder = match &data_type {
388        DataType::Boolean => {
389            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
390        }
391        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
392        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
393        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
394        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
395        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
396        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
397        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
398        DataType::Float32 => {
399            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
400        }
401        DataType::Float64 => {
402            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
403        }
404        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
405        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
406            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
407                item_capacity,
408            ))
409        }
410        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
411            ColBuilder::TimestampMicrosecondBuilder(
412                TimestampMicrosecondBuilder::with_capacity(item_capacity)
413                    .with_timezone_opt(timezone.clone()),
414            )
415        }
416        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
417            item_capacity,
418            data_capacity,
419        )),
420        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
421            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
422        ),
423        DataType::Utf8 => {
424            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
425        }
426        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
427            item_capacity,
428            data_capacity,
429        )),
430        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
431            Decimal128Builder::with_capacity(item_capacity)
432                .with_precision_and_scale(*precision, *scale)?,
433        ),
434        DataType::List(field) => {
435            let inner_col_builder = ArrowColumn::new(
436                field.name().clone(),
437                field.is_nullable(),
438                field.data_type().clone(),
439                typename_from_field(field)?,
440                item_capacity,
441                data_capacity,
442            )?;
443            ColBuilder::ListBuilder(Box::new(
444                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
445            ))
446        }
447        DataType::Struct(fields) => {
448            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
449            for field in fields {
450                let inner_col_builder = ArrowColumn::new(
451                    field.name().clone(),
452                    field.is_nullable(),
453                    field.data_type().clone(),
454                    typename_from_field(field)?,
455                    item_capacity,
456                    data_capacity,
457                )?;
458                field_builders.push(Box::new(inner_col_builder));
459            }
460            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
461        }
462        DataType::Map(entries_field, _sorted) => {
463            let entries_field = entries_field.as_ref();
464            if let DataType::Struct(fields) = entries_field.data_type() {
465                if fields.len() != 2 {
466                    anyhow::bail!(
467                        "Expected map entries to have 2 fields, found {}",
468                        fields.len()
469                    )
470                }
471                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
472                let value_field = &fields[1];
473                let value_builder = ArrowColumn::new(
474                    value_field.name().clone(),
475                    value_field.is_nullable(),
476                    value_field.data_type().clone(),
477                    typename_from_field(value_field)?,
478                    item_capacity,
479                    data_capacity,
480                )?;
481                ColBuilder::MapBuilder(Box::new(
482                    MapBuilder::with_capacity(
483                        Some(MapFieldNames::default()),
484                        key_builder,
485                        value_builder,
486                        item_capacity,
487                    )
488                    .with_values_field(Arc::clone(value_field)),
489                ))
490            } else {
491                anyhow::bail!("Expected map entries to be a struct")
492            }
493        }
494        _ => anyhow::bail!("{:?} unimplemented", data_type),
495    };
496    Ok(builder)
497}
498
499#[derive(Debug)]
500struct ArrowColumn {
501    field_name: String,
502    nullable: bool,
503    data_type: DataType,
504    extension_type_name: String,
505    inner: ColBuilder,
506}
507
508impl From<&ArrowColumn> for Field {
509    fn from(col: &ArrowColumn) -> Self {
510        field_with_typename(
511            &col.field_name,
512            col.data_type.clone(),
513            col.nullable,
514            &col.extension_type_name,
515        )
516    }
517}
518
519/// Create a Field and include the materialize 'type name' as an extension in the metadata.
520fn field_with_typename(
521    name: &str,
522    data_type: DataType,
523    nullable: bool,
524    extension_type_name: &str,
525) -> Field {
526    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
527        ARROW_EXTENSION_NAME_KEY.to_string(),
528        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
529    )]))
530}
531
532/// Extract the materialize 'type name' from the metadata of a Field.
533/// Returns an error if the field doesn't have extension metadata.
534fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
535    let metadata = field.metadata();
536    let extension_name = metadata
537        .get(ARROW_EXTENSION_NAME_KEY)
538        .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
539    extension_name
540        .strip_prefix(EXTENSION_PREFIX)
541        .map(|s| s.to_string())
542        .ok_or_else(|| {
543            anyhow::anyhow!(
544                "Field '{}' extension name '{}' missing expected prefix '{}'",
545                field.name(),
546                extension_name,
547                EXTENSION_PREFIX
548            )
549        })
550}
551
552impl ArrowColumn {
553    fn new(
554        field_name: String,
555        nullable: bool,
556        data_type: DataType,
557        extension_type_name: String,
558        item_capacity: usize,
559        data_capacity: usize,
560    ) -> Result<Self, anyhow::Error> {
561        Ok(Self {
562            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
563            field_name,
564            nullable,
565            data_type,
566            extension_type_name,
567        })
568    }
569}
570
571macro_rules! make_col_builder {
572    ($($x:ident), *) => {
573        /// An enum wrapper for all arrow builder types that we support. Used to store
574        /// a builder for each column and avoid dynamic dispatch and downcasting
575        /// when appending data.
576        #[derive(Debug)]
577        enum ColBuilder {
578            $(
579                $x($x),
580            )*
581            /// ListBuilder & MapBuilder are handled separately than other builder types since they
582            /// uses generic parameters for the inner types, and are boxed to avoid recursive
583            /// type definitions.
584            ListBuilder(Box<ListBuilder<ArrowColumn>>),
585            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
586            /// StructBuilder is handled separately since its `append_null()` method must be
587            /// overriden to both append nulls to all field builders and to append a null to
588            /// the struct. It's unclear why `arrow-rs` implemented this differently than
589            /// ListBuilder and MapBuilder.
590            StructBuilder(StructBuilder),
591        }
592
593        impl ColBuilder {
594            fn append_null(&mut self) {
595                match self {
596                    $(
597                        ColBuilder::$x(builder) => builder.append_null(),
598                    )*
599                    ColBuilder::ListBuilder(builder) => builder.append_null(),
600                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
601                    ColBuilder::StructBuilder(builder) => {
602                        for i in 0..builder.num_fields() {
603                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
604                            field_builder.inner.append_null();
605                        }
606                        builder.append_null();
607                    }
608                }
609            }
610        }
611
612        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
613        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
614        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
615        /// data to the column.
616        impl ArrayBuilder for ArrowColumn {
617            fn len(&self) -> usize {
618                match &self.inner {
619                    $(
620                        ColBuilder::$x(builder) => builder.len(),
621                    )*
622                    ColBuilder::ListBuilder(builder) => builder.len(),
623                    ColBuilder::MapBuilder(builder) => builder.len(),
624                    ColBuilder::StructBuilder(builder) => builder.len(),
625                }
626            }
627            fn finish(&mut self) -> ArrayRef {
628                match &mut self.inner {
629                    $(
630                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
631                    )*
632                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
633                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
634                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
635                }
636            }
637            fn finish_cloned(&self) -> ArrayRef {
638                match &self.inner {
639                    $(
640                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
641                    )*
642                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
643                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
644                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
645                }
646            }
647            fn as_any(&self) -> &(dyn Any + 'static) {
648                self
649            }
650            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
651                self
652            }
653            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
654                self
655            }
656        }
657    };
658}
659
660make_col_builder!(
661    BooleanBuilder,
662    Int16Builder,
663    Int32Builder,
664    Int64Builder,
665    UInt8Builder,
666    UInt16Builder,
667    UInt32Builder,
668    UInt64Builder,
669    Float32Builder,
670    Float64Builder,
671    Date32Builder,
672    Time64MicrosecondBuilder,
673    TimestampMicrosecondBuilder,
674    LargeBinaryBuilder,
675    FixedSizeBinaryBuilder,
676    StringBuilder,
677    LargeStringBuilder,
678    Decimal128Builder
679);
680
681impl ArrowColumn {
682    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
683        match (&mut self.inner, datum) {
684            (s, Datum::Null) => s.append_null(),
685            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
686            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
687            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
688            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
689            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
690            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
691            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
692            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
693            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
694            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
695            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
696            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
697                builder.append_value(d.unix_epoch_days())
698            }
699            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
700                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
701                    * 1_000_000
702                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
703                builder.append_value(micros_since_midnight)
704            }
705            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
706                builder.append_value(ts.and_utc().timestamp_micros())
707            }
708            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
709                builder.append_value(ts.timestamp_micros())
710            }
711            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
712            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
713                builder.append_value(val.as_bytes())?
714            }
715            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
716            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
717                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
718            }
719            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
720            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
721                builder.append_value(ts.into())
722            }
723            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
724                if dec.0.is_special() {
725                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
726                }
727                if let DataType::Decimal128(precision, scale) = self.data_type {
728                    if dec.0.digits() > precision.into() {
729                        anyhow::bail!(
730                            "Decimal value {} out of range for column with precision {}",
731                            dec,
732                            precision
733                        )
734                    }
735
736                    // Get the signed-coefficient represented as an i128, and the exponent such that
737                    // the number should equal coefficient*10^exponent.
738                    let coefficient: i128 = dec.0.coefficient()?;
739                    let exponent = dec.0.exponent();
740
741                    // Convert the value to use the scale of the column (add 0's to align the decimal
742                    // point correctly). This is done by multiplying the coefficient by
743                    // 10^(scale + exponent).
744                    let scale_diff = i32::from(scale) + exponent;
745                    // If the scale_diff is negative, we know there aren't enough digits in our
746                    // column's scale to represent this value.
747                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
748                        anyhow::anyhow!(
749                            "cannot represent decimal value {} in column with scale {}",
750                            dec,
751                            scale
752                        )
753                    })?;
754
755                    let value = coefficient
756                        .checked_mul(10_i128.pow(scale_diff))
757                        .ok_or_else(|| {
758                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
759                        })?;
760
761                    builder.append_value(value)
762                } else {
763                    anyhow::bail!("Expected Decimal128 data type")
764                }
765            }
766            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
767                // We've received an array datum which we know is represented as an Arrow struct
768                // with two fields: the list of elements and the number of dimensions
769                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
770                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
771                    let inner_builder = list_builder.values();
772                    for datum in arr.elements().into_iter() {
773                        inner_builder.append_datum(datum)?;
774                    }
775                    list_builder.append(true);
776                } else {
777                    anyhow::bail!(
778                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
779                        struct_builder
780                    )
781                }
782                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
783                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
784                    dims_builder.append_value(arr.dims().ndims());
785                } else {
786                    anyhow::bail!(
787                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
788                        struct_builder
789                    )
790                }
791                struct_builder.append(true)
792            }
793            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
794                let inner_builder = list_builder.values();
795                for datum in list.into_iter() {
796                    inner_builder.append_datum(datum)?;
797                }
798                list_builder.append(true)
799            }
800            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
801                for (key, value) in map.iter() {
802                    builder.keys().append_value(key);
803                    builder.values().append_datum(value)?;
804                }
805                builder.append(true).unwrap()
806            }
807            // Records are stored as Datum::List at runtime but written to a StructBuilder.
808            // Arrays use Datum::Array (handled above), so Datum::List with StructBuilder is a record.
809            (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
810                let field_count = struct_builder.num_fields();
811                for (i, datum) in list.into_iter().enumerate() {
812                    if i >= field_count {
813                        anyhow::bail!(
814                            "Record has more elements ({}) than struct fields ({})",
815                            i + 1,
816                            field_count
817                        );
818                    }
819                    let field_builder: &mut ArrowColumn = struct_builder
820                        .field_builder(i)
821                        .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
822                    field_builder.append_datum(datum)?;
823                }
824                struct_builder.append(true);
825            }
826            (builder, datum) => {
827                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
828            }
829        }
830        Ok(())
831    }
832}