mz_arrow_util/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// We need to allow the std::collections::HashMap type since it is directly used as a type
11// parameter to the arrow Field::with_metadata method.
12#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20    DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30const EXTENSION_PREFIX: &str = "materialize.v1.";
31
32pub struct ArrowBuilder {
33    columns: Vec<ArrowColumn>,
34    /// A crude estimate of the size of the data in the builder
35    /// based on the size of the rows added to it.
36    row_size_bytes: usize,
37    /// The original schema, if provided. Used to preserve metadata in to_record_batch().
38    original_schema: Option<Arc<Schema>>,
39}
40
41/// Converts a RelationDesc to an Arrow Schema.
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
43    let mut fields = vec![];
44    let mut errs = vec![];
45    let mut seen_names = BTreeMap::new();
46    for (col_name, col_type) in desc.iter() {
47        let mut col_name = col_name.to_string();
48        // If we allow columns with the same name we encounter two issues:
49        // 1. The arrow crate will accidentally reuse the same buffers for the columns
50        // 2. Many parquet readers will error when trying to read the file metadata
51        // Instead we append a number to the end of the column name for any duplicates.
52        // TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
53        seen_names
54            .entry(col_name.clone())
55            .and_modify(|e: &mut u32| {
56                *e += 1;
57                col_name += &e.to_string();
58            })
59            .or_insert(1);
60        match scalar_to_arrow_datatype(&col_type.scalar_type) {
61            Ok((data_type, extension_type_name)) => {
62                fields.push(field_with_typename(
63                    &col_name,
64                    data_type,
65                    col_type.nullable,
66                    &extension_type_name,
67                ));
68            }
69            Err(err) => errs.push(err.to_string()),
70        }
71    }
72    if !errs.is_empty() {
73        anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
74    }
75    Ok(Schema::new(fields))
76}
77
78impl ArrowBuilder {
79    /// Helper to validate that a RelationDesc can be encoded into Arrow.
80    pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
81        let mut errs = vec![];
82        for (col_name, col_type) in desc.iter() {
83            match scalar_to_arrow_datatype(&col_type.scalar_type) {
84                Ok(_) => {}
85                Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
86            }
87        }
88        if !errs.is_empty() {
89            anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
90        }
91        Ok(())
92    }
93
94    /// Initializes a new ArrowBuilder with the schema of the provided RelationDesc.
95    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
96    /// the number of values that can be appended to each column before reallocating.
97    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
98    /// Errors if the relation contains an unimplemented type.
99    pub fn new(
100        desc: &RelationDesc,
101        item_capacity: usize,
102        data_capacity: usize,
103    ) -> Result<Self, anyhow::Error> {
104        let schema = desc_to_schema(desc)?;
105        let mut columns = vec![];
106        for field in schema.fields() {
107            columns.push(ArrowColumn::new(
108                field.name().clone(),
109                field.is_nullable(),
110                field.data_type().clone(),
111                typename_from_field(field)?,
112                item_capacity,
113                data_capacity,
114            )?);
115        }
116        Ok(Self {
117            columns,
118            row_size_bytes: 0,
119            original_schema: None,
120        })
121    }
122
123    /// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
124    /// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
125    /// `item_capacity` is used to initialize the capacity of each column's builder which defines
126    /// the number of values that can be appended to each column before reallocating.
127    /// `data_capacity` is used to initialize the buffer size of the string and binary builders.
128    /// Errors if the schema contains an unimplemented type.
129    pub fn new_with_schema(
130        schema: Arc<Schema>,
131        item_capacity: usize,
132        data_capacity: usize,
133    ) -> Result<Self, anyhow::Error> {
134        let mut columns = vec![];
135        for field in schema.fields() {
136            columns.push(ArrowColumn::new(
137                field.name().clone(),
138                field.is_nullable(),
139                field.data_type().clone(),
140                typename_from_field(field)?,
141                item_capacity,
142                data_capacity,
143            )?);
144        }
145        Ok(Self {
146            columns,
147            row_size_bytes: 0,
148            original_schema: Some(schema),
149        })
150    }
151
152    /// Returns a copy of the schema of the ArrowBuilder.
153    pub fn schema(&self) -> Schema {
154        Schema::new(
155            self.columns
156                .iter()
157                .map(Into::<Field>::into)
158                .collect::<Vec<_>>(),
159        )
160    }
161
162    /// Converts the ArrowBuilder into an arrow RecordBatch.
163    pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
164        let mut arrays = vec![];
165        let mut fields: Vec<Field> = vec![];
166        for mut col in self.columns.into_iter() {
167            arrays.push(col.finish());
168            fields.push((&col).into());
169        }
170
171        // If we have an original schema, use it to preserve metadata (e.g., field IDs)
172        let schema = if let Some(original_schema) = self.original_schema {
173            original_schema
174        } else {
175            Arc::new(Schema::new(fields))
176        };
177
178        RecordBatch::try_new(schema, arrays)
179    }
180
181    /// Appends a row to the builder.
182    /// Errors if the row contains an unimplemented or out-of-range value.
183    pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
184        for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
185            col.append_datum(datum)?;
186        }
187        self.row_size_bytes += row.byte_len();
188        Ok(())
189    }
190
191    pub fn row_size_bytes(&self) -> usize {
192        self.row_size_bytes
193    }
194}
195
196/// Return the appropriate Arrow DataType for the given SqlScalarType, plus a string
197/// that should be used as part of the Arrow 'Extension Type' name for fields using
198/// this type: <https://arrow.apache.org/docs/format/Columnar.html#extension-types>
199fn scalar_to_arrow_datatype(
200    scalar_type: &SqlScalarType,
201) -> Result<(DataType, String), anyhow::Error> {
202    let (data_type, extension_name) = match scalar_type {
203        SqlScalarType::Bool => (DataType::Boolean, "boolean"),
204        SqlScalarType::Int16 => (DataType::Int16, "smallint"),
205        SqlScalarType::Int32 => (DataType::Int32, "integer"),
206        SqlScalarType::Int64 => (DataType::Int64, "bigint"),
207        SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
208        SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
209        SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
210        SqlScalarType::Float32 => (DataType::Float32, "real"),
211        SqlScalarType::Float64 => (DataType::Float64, "double"),
212        SqlScalarType::Date => (DataType::Date32, "date"),
213        // The resolution of our time and timestamp types is microseconds, which is lucky
214        // since the original parquet 'ConvertedType's support microsecond resolution but not
215        // nanosecond resolution. The newer parquet 'LogicalType's support nanosecond resolution,
216        // but many readers don't support them yet.
217        SqlScalarType::Time => (
218            DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
219            "time",
220        ),
221        SqlScalarType::Timestamp { .. } => (
222            DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
223            "timestamp",
224        ),
225        SqlScalarType::TimestampTz { .. } => (
226            DataType::Timestamp(
227                arrow::datatypes::TimeUnit::Microsecond,
228                // When appending values we always use UTC timestamps, and setting this to a non-empty
229                // value allows readers to know that tz-aware timestamps can be compared directly.
230                Some("+00:00".into()),
231            ),
232            "timestamptz",
233        ),
234        SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
235        SqlScalarType::Char { length } => {
236            if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
237                (DataType::Utf8, "text")
238            } else {
239                (DataType::LargeUtf8, "text")
240            }
241        }
242        SqlScalarType::VarChar { max_length } => {
243            if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
244                (DataType::Utf8, "text")
245            } else {
246                (DataType::LargeUtf8, "text")
247            }
248        }
249        SqlScalarType::String => (DataType::LargeUtf8, "text"),
250        // Parquet does have a UUID 'Logical Type' in parquet format 2.4+, but there is no arrow
251        // UUID type, so we match the format (a 16-byte fixed-length binary array) ourselves.
252        SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
253        // Parquet does have a JSON 'Logical Type' in parquet format 2.4+, but there is no arrow
254        // JSON type, so for now we represent JSON as 'large' utf8-encoded strings.
255        SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
256        SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
257        SqlScalarType::Numeric { max_scale } => {
258            // Materialize allows 39 digits of precision for numeric values, but allows
259            // arbitrary scales among those values. e.g. 1e38 and 1e-39 are both valid in
260            // the same column. However, Arrow/Parquet only allows static declaration of both
261            // the precision and the scale. To represent the full range of values of a numeric
262            // column, we would need 78-digits to store all possible values. Arrow's Decimal256
263            // type can only support 76 digits, so we are be unable to represent the entire range.
264
265            // Instead of representing the full possible range, we instead try to represent most
266            // values in the most-compatible way. We use a Decimal128 type which can handle 38
267            // digits of precision and has more compatibility with other parquet readers than
268            // Decimal256. We use Arrow's default scale of 10 if max-scale is not set. We will
269            // error if we encounter a value that is too large to represent, and if that happens
270            // a user can choose to cast the column to a string to represent the value.
271            match max_scale {
272                Some(scale) => {
273                    let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
274                    if scale <= DECIMAL128_MAX_SCALE {
275                        (
276                            DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
277                            "numeric",
278                        )
279                    } else {
280                        anyhow::bail!("Numeric max scale {} out of range", scale)
281                    }
282                }
283                None => (
284                    DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
285                    "numeric",
286                ),
287            }
288        }
289        // arrow::datatypes::IntervalUnit::MonthDayNano is not yet implemented in the arrow parquet writer
290        // https://github.com/apache/arrow-rs/blob/0d031cc8aa81296cb1bdfedea7a7cb4ec6aa54ea/parquet/src/arrow/arrow_writer/mod.rs#L859
291        // SqlScalarType::Interval => DataType::Interval(arrow::datatypes::IntervalUnit::DayTime)
292        SqlScalarType::Array(inner) => {
293            // Postgres / MZ Arrays are weird, since they can be multi-dimensional but this is not
294            // enforced in the type system, so can change per-value.
295            // We use a struct type with two fields - one containing the array elements as a list
296            // and the other containing the number of dimensions the array represents. Since arrays
297            // are not allowed to be ragged, the number of elements in each dimension is the same.
298            let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
299            // TODO: Document these field names in our copy-to-s3 docs
300            let inner_field = field_with_typename("item", inner_type, true, &inner_name);
301            let list_field = Arc::new(field_with_typename(
302                "items",
303                DataType::List(inner_field.into()),
304                false,
305                "array_items",
306            ));
307            let dims_field = Arc::new(field_with_typename(
308                "dimensions",
309                DataType::UInt8,
310                false,
311                "array_dimensions",
312            ));
313            (DataType::Struct([list_field, dims_field].into()), "array")
314        }
315        SqlScalarType::List {
316            element_type,
317            custom_id: _,
318        } => {
319            let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
320            // TODO: Document these field names in our copy-to-s3 docs
321            let field = field_with_typename("item", inner_type, true, &inner_name);
322            (DataType::List(field.into()), "list")
323        }
324        SqlScalarType::Map {
325            value_type,
326            custom_id: _,
327        } => {
328            let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
329            // Arrow maps are represented as an 'entries' struct with 'keys' and 'values' fields.
330            let field_names = MapFieldNames::default();
331            let struct_type = DataType::Struct(
332                vec![
333                    Field::new(&field_names.key, DataType::Utf8, false),
334                    field_with_typename(&field_names.value, value_type, true, &value_name),
335                ]
336                .into(),
337            );
338            (
339                DataType::Map(
340                    Field::new(&field_names.entry, struct_type, false).into(),
341                    false,
342                ),
343                "map",
344            )
345        }
346        _ => anyhow::bail!("{:?} unimplemented", scalar_type),
347    };
348    Ok((data_type, extension_name.to_lowercase()))
349}
350
351fn builder_for_datatype(
352    data_type: &DataType,
353    item_capacity: usize,
354    data_capacity: usize,
355) -> Result<ColBuilder, anyhow::Error> {
356    let builder = match &data_type {
357        DataType::Boolean => {
358            ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
359        }
360        DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
361        DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
362        DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
363        DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
364        DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
365        DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
366        DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
367        DataType::Float32 => {
368            ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
369        }
370        DataType::Float64 => {
371            ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
372        }
373        DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
374        DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
375            ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
376                item_capacity,
377            ))
378        }
379        DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
380            ColBuilder::TimestampMicrosecondBuilder(
381                TimestampMicrosecondBuilder::with_capacity(item_capacity)
382                    .with_timezone_opt(timezone.clone()),
383            )
384        }
385        DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
386            item_capacity,
387            data_capacity,
388        )),
389        DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
390            FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
391        ),
392        DataType::Utf8 => {
393            ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
394        }
395        DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
396            item_capacity,
397            data_capacity,
398        )),
399        DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
400            Decimal128Builder::with_capacity(item_capacity)
401                .with_precision_and_scale(*precision, *scale)?,
402        ),
403        DataType::List(field) => {
404            let inner_col_builder = ArrowColumn::new(
405                field.name().clone(),
406                field.is_nullable(),
407                field.data_type().clone(),
408                typename_from_field(field)?,
409                item_capacity,
410                data_capacity,
411            )?;
412            ColBuilder::ListBuilder(Box::new(
413                ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
414            ))
415        }
416        DataType::Struct(fields) => {
417            let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
418            for field in fields {
419                let inner_col_builder = ArrowColumn::new(
420                    field.name().clone(),
421                    field.is_nullable(),
422                    field.data_type().clone(),
423                    typename_from_field(field)?,
424                    item_capacity,
425                    data_capacity,
426                )?;
427                field_builders.push(Box::new(inner_col_builder));
428            }
429            ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
430        }
431        DataType::Map(entries_field, _sorted) => {
432            let entries_field = entries_field.as_ref();
433            if let DataType::Struct(fields) = entries_field.data_type() {
434                if fields.len() != 2 {
435                    anyhow::bail!(
436                        "Expected map entries to have 2 fields, found {}",
437                        fields.len()
438                    )
439                }
440                let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
441                let value_field = &fields[1];
442                let value_builder = ArrowColumn::new(
443                    value_field.name().clone(),
444                    value_field.is_nullable(),
445                    value_field.data_type().clone(),
446                    typename_from_field(value_field)?,
447                    item_capacity,
448                    data_capacity,
449                )?;
450                ColBuilder::MapBuilder(Box::new(
451                    MapBuilder::with_capacity(
452                        Some(MapFieldNames::default()),
453                        key_builder,
454                        value_builder,
455                        item_capacity,
456                    )
457                    .with_values_field(Arc::clone(value_field)),
458                ))
459            } else {
460                anyhow::bail!("Expected map entries to be a struct")
461            }
462        }
463        _ => anyhow::bail!("{:?} unimplemented", data_type),
464    };
465    Ok(builder)
466}
467
468#[derive(Debug)]
469struct ArrowColumn {
470    field_name: String,
471    nullable: bool,
472    data_type: DataType,
473    extension_type_name: String,
474    inner: ColBuilder,
475}
476
477impl From<&ArrowColumn> for Field {
478    fn from(col: &ArrowColumn) -> Self {
479        field_with_typename(
480            &col.field_name,
481            col.data_type.clone(),
482            col.nullable,
483            &col.extension_type_name,
484        )
485    }
486}
487
488/// Create a Field and include the materialize 'type name' as an extension in the metadata.
489fn field_with_typename(
490    name: &str,
491    data_type: DataType,
492    nullable: bool,
493    extension_type_name: &str,
494) -> Field {
495    Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
496        "ARROW:extension:name".to_string(),
497        format!("{}{}", EXTENSION_PREFIX, extension_type_name),
498    )]))
499}
500
501/// Extract the materialize 'type name' from the metadata of a Field.
502fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
503    let metadata = field.metadata();
504    let extension_name = metadata
505        .get("ARROW:extension:name")
506        .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
507    if let Some(name) = extension_name.strip_prefix(EXTENSION_PREFIX) {
508        Ok(name.to_string())
509    } else {
510        anyhow::bail!("Extension name {} does not match expected", extension_name,)
511    }
512}
513
514impl ArrowColumn {
515    fn new(
516        field_name: String,
517        nullable: bool,
518        data_type: DataType,
519        extension_type_name: String,
520        item_capacity: usize,
521        data_capacity: usize,
522    ) -> Result<Self, anyhow::Error> {
523        Ok(Self {
524            inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
525            field_name,
526            nullable,
527            data_type,
528            extension_type_name,
529        })
530    }
531}
532
533macro_rules! make_col_builder {
534    ($($x:ident), *) => {
535        /// An enum wrapper for all arrow builder types that we support. Used to store
536        /// a builder for each column and avoid dynamic dispatch and downcasting
537        /// when appending data.
538        #[derive(Debug)]
539        enum ColBuilder {
540            $(
541                $x($x),
542            )*
543            /// ListBuilder & MapBuilder are handled separately than other builder types since they
544            /// uses generic parameters for the inner types, and are boxed to avoid recursive
545            /// type definitions.
546            ListBuilder(Box<ListBuilder<ArrowColumn>>),
547            MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
548            /// StructBuilder is handled separately since its `append_null()` method must be
549            /// overriden to both append nulls to all field builders and to append a null to
550            /// the struct. It's unclear why `arrow-rs` implemented this differently than
551            /// ListBuilder and MapBuilder.
552            StructBuilder(StructBuilder),
553        }
554
555        impl ColBuilder {
556            fn append_null(&mut self) {
557                match self {
558                    $(
559                        ColBuilder::$x(builder) => builder.append_null(),
560                    )*
561                    ColBuilder::ListBuilder(builder) => builder.append_null(),
562                    ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
563                    ColBuilder::StructBuilder(builder) => {
564                        for i in 0..builder.num_fields() {
565                            let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
566                            field_builder.inner.append_null();
567                        }
568                        builder.append_null();
569                    }
570                }
571            }
572        }
573
574        /// Implement the ArrayBuilder trait for ArrowColumn so that we can use an ArrowColumn as
575        /// an inner-builder type in an [`arrow::array::builder::GenericListBuilder`]
576        /// and an [`arrow::array::builder::StructBuilder`] and re-use our methods for appending
577        /// data to the column.
578        impl ArrayBuilder for ArrowColumn {
579            fn len(&self) -> usize {
580                match &self.inner {
581                    $(
582                        ColBuilder::$x(builder) => builder.len(),
583                    )*
584                    ColBuilder::ListBuilder(builder) => builder.len(),
585                    ColBuilder::MapBuilder(builder) => builder.len(),
586                    ColBuilder::StructBuilder(builder) => builder.len(),
587                }
588            }
589            fn finish(&mut self) -> ArrayRef {
590                match &mut self.inner {
591                    $(
592                        ColBuilder::$x(builder) => Arc::new(builder.finish()),
593                    )*
594                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
595                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
596                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
597                }
598            }
599            fn finish_cloned(&self) -> ArrayRef {
600                match &self.inner {
601                    $(
602                        ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
603                    )*
604                    ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
605                    ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
606                    ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
607                }
608            }
609            fn as_any(&self) -> &(dyn Any + 'static) {
610                self
611            }
612            fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
613                self
614            }
615            fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
616                self
617            }
618        }
619    };
620}
621
622make_col_builder!(
623    BooleanBuilder,
624    Int16Builder,
625    Int32Builder,
626    Int64Builder,
627    UInt8Builder,
628    UInt16Builder,
629    UInt32Builder,
630    UInt64Builder,
631    Float32Builder,
632    Float64Builder,
633    Date32Builder,
634    Time64MicrosecondBuilder,
635    TimestampMicrosecondBuilder,
636    LargeBinaryBuilder,
637    FixedSizeBinaryBuilder,
638    StringBuilder,
639    LargeStringBuilder,
640    Decimal128Builder
641);
642
643impl ArrowColumn {
644    fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
645        match (&mut self.inner, datum) {
646            (s, Datum::Null) => s.append_null(),
647            (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
648            (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
649            (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
650            (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
651            (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
652            (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
653            (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
654            (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
655            (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
656            (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
657            (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
658            (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
659                builder.append_value(d.unix_epoch_days())
660            }
661            (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
662                let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
663                    * 1_000_000
664                    + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
665                builder.append_value(micros_since_midnight)
666            }
667            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
668                builder.append_value(ts.and_utc().timestamp_micros())
669            }
670            (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
671                builder.append_value(ts.timestamp_micros())
672            }
673            (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
674            (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
675                builder.append_value(val.as_bytes())?
676            }
677            (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
678            (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
679                builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
680            }
681            (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
682            (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
683                builder.append_value(ts.into())
684            }
685            (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
686                if dec.0.is_special() {
687                    anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
688                }
689                if let DataType::Decimal128(precision, scale) = self.data_type {
690                    if dec.0.digits() > precision.into() {
691                        anyhow::bail!(
692                            "Decimal value {} out of range for column with precision {}",
693                            dec,
694                            precision
695                        )
696                    }
697
698                    // Get the signed-coefficient represented as an i128, and the exponent such that
699                    // the number should equal coefficient*10^exponent.
700                    let coefficient: i128 = dec.0.coefficient()?;
701                    let exponent = dec.0.exponent();
702
703                    // Convert the value to use the scale of the column (add 0's to align the decimal
704                    // point correctly). This is done by multiplying the coefficient by
705                    // 10^(scale + exponent).
706                    let scale_diff = i32::from(scale) + exponent;
707                    // If the scale_diff is negative, we know there aren't enough digits in our
708                    // column's scale to represent this value.
709                    let scale_diff = u32::try_from(scale_diff).map_err(|_| {
710                        anyhow::anyhow!(
711                            "cannot represent decimal value {} in column with scale {}",
712                            dec,
713                            scale
714                        )
715                    })?;
716
717                    let value = coefficient
718                        .checked_mul(10_i128.pow(scale_diff))
719                        .ok_or_else(|| {
720                            anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
721                        })?;
722
723                    builder.append_value(value)
724                } else {
725                    anyhow::bail!("Expected Decimal128 data type")
726                }
727            }
728            (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
729                // We've received an array datum which we know is represented as an Arrow struct
730                // with two fields: the list of elements and the number of dimensions
731                let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
732                if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
733                    let inner_builder = list_builder.values();
734                    for datum in arr.elements().into_iter() {
735                        inner_builder.append_datum(datum)?;
736                    }
737                    list_builder.append(true);
738                } else {
739                    anyhow::bail!(
740                        "Expected ListBuilder for StructBuilder with Array datum: {:?}",
741                        struct_builder
742                    )
743                }
744                let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
745                if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
746                    dims_builder.append_value(arr.dims().ndims());
747                } else {
748                    anyhow::bail!(
749                        "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
750                        struct_builder
751                    )
752                }
753                struct_builder.append(true)
754            }
755            (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
756                let inner_builder = list_builder.values();
757                for datum in list.into_iter() {
758                    inner_builder.append_datum(datum)?;
759                }
760                list_builder.append(true)
761            }
762            (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
763                for (key, value) in map.iter() {
764                    builder.keys().append_value(key);
765                    builder.values().append_datum(value)?;
766                }
767                builder.append(true).unwrap()
768            }
769            (builder, datum) => {
770                anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
771            }
772        }
773        Ok(())
774    }
775}