1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34 columns: Vec<ArrowColumn>,
35 row_size_bytes: usize,
38 original_schema: Option<Arc<Schema>>,
40}
41
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44 desc_to_schema_with_overrides(desc, |_| None)
45}
46
47pub fn desc_to_schema_with_overrides<F>(
57 desc: &RelationDesc,
58 overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63 let mut fields = vec![];
64 let mut errs = vec![];
65 let mut seen_names = BTreeSet::new();
66 let mut all_names = BTreeSet::new();
67 for col_name in desc.iter_names() {
68 all_names.insert(col_name.to_string());
69 }
70 for (col_name, col_type) in desc.iter() {
71 let mut col_name = col_name.to_string();
72 if seen_names.contains(&col_name) {
79 let mut new_col_name = col_name.clone();
80 let mut e = 2;
81 while all_names.contains(&new_col_name) {
82 new_col_name = format!("{}{}", col_name, e);
83 e += 1;
84 }
85 col_name = new_col_name;
86 }
87
88 seen_names.insert(col_name.clone());
89 all_names.insert(col_name.clone());
90
91 let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94 match result {
95 Ok((data_type, extension_type_name)) => {
96 fields.push(field_with_typename(
97 &col_name,
98 data_type,
99 col_type.nullable,
100 &extension_type_name,
101 ));
102 }
103 Err(err) => errs.push(err.to_string()),
104 }
105 }
106 if !errs.is_empty() {
107 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108 }
109 Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115 let mut errs = vec![];
116 for (col_name, col_type) in desc.iter() {
117 match scalar_to_arrow_datatype(&col_type.scalar_type) {
118 Ok(_) => {}
119 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120 }
121 }
122 if !errs.is_empty() {
123 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124 }
125 Ok(())
126 }
127
128 pub fn new(
134 desc: &RelationDesc,
135 item_capacity: usize,
136 data_capacity: usize,
137 ) -> Result<Self, anyhow::Error> {
138 let schema = desc_to_schema(desc)?;
139 let mut columns = vec![];
140 for field in schema.fields() {
141 columns.push(ArrowColumn::new(
142 field.name().clone(),
143 field.is_nullable(),
144 field.data_type().clone(),
145 typename_from_field(field)?,
146 item_capacity,
147 data_capacity,
148 )?);
149 }
150 Ok(Self {
151 columns,
152 row_size_bytes: 0,
153 original_schema: None,
154 })
155 }
156
157 pub fn new_with_schema(
164 schema: Arc<Schema>,
165 item_capacity: usize,
166 data_capacity: usize,
167 ) -> Result<Self, anyhow::Error> {
168 let mut columns = vec![];
169 for field in schema.fields() {
170 columns.push(ArrowColumn::new(
171 field.name().clone(),
172 field.is_nullable(),
173 field.data_type().clone(),
174 typename_from_field(field)?,
175 item_capacity,
176 data_capacity,
177 )?);
178 }
179 Ok(Self {
180 columns,
181 row_size_bytes: 0,
182 original_schema: Some(schema),
183 })
184 }
185
186 pub fn schema(&self) -> Schema {
188 Schema::new(
189 self.columns
190 .iter()
191 .map(Into::<Field>::into)
192 .collect::<Vec<_>>(),
193 )
194 }
195
196 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
198 let mut arrays = vec![];
199 let mut fields: Vec<Field> = vec![];
200 for mut col in self.columns.into_iter() {
201 arrays.push(col.finish());
202 fields.push((&col).into());
203 }
204
205 let schema = if let Some(original_schema) = self.original_schema {
207 original_schema
208 } else {
209 Arc::new(Schema::new(fields))
210 };
211
212 RecordBatch::try_new(schema, arrays)
213 }
214
215 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
218 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
219 col.append_datum(datum)?;
220 }
221 self.row_size_bytes += row.byte_len();
222 Ok(())
223 }
224
225 pub fn row_size_bytes(&self) -> usize {
226 self.row_size_bytes
227 }
228}
229
230fn scalar_to_arrow_datatype(
234 scalar_type: &SqlScalarType,
235) -> Result<(DataType, String), anyhow::Error> {
236 scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
237}
238
239fn scalar_to_arrow_datatype_with_overrides<F>(
244 scalar_type: &SqlScalarType,
245 overrides: &F,
246) -> Result<(DataType, String), anyhow::Error>
247where
248 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
249{
250 scalar_to_arrow_datatype_impl(scalar_type, overrides)
251}
252
253fn scalar_to_arrow_datatype_impl<F>(
255 scalar_type: &SqlScalarType,
256 overrides: &F,
257) -> Result<(DataType, String), anyhow::Error>
258where
259 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
260{
261 if let Some(result) = overrides(scalar_type) {
263 return Ok(result);
264 }
265 let (data_type, extension_name) = match scalar_type {
266 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
267 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
268 SqlScalarType::Int32 => (DataType::Int32, "integer"),
269 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
270 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
271 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
272 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
273 SqlScalarType::Oid => (DataType::UInt32, "oid"),
274 SqlScalarType::Float32 => (DataType::Float32, "real"),
275 SqlScalarType::Float64 => (DataType::Float64, "double"),
276 SqlScalarType::Date => (DataType::Date32, "date"),
277 SqlScalarType::Time => (
282 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
283 "time",
284 ),
285 SqlScalarType::Timestamp { .. } => (
286 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
287 "timestamp",
288 ),
289 SqlScalarType::TimestampTz { .. } => (
290 DataType::Timestamp(
291 arrow::datatypes::TimeUnit::Microsecond,
292 Some("+00:00".into()),
295 ),
296 "timestamptz",
297 ),
298 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
299 SqlScalarType::Char { length } => {
300 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
301 (DataType::Utf8, "text")
302 } else {
303 (DataType::LargeUtf8, "text")
304 }
305 }
306 SqlScalarType::VarChar { max_length } => {
307 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
308 (DataType::Utf8, "text")
309 } else {
310 (DataType::LargeUtf8, "text")
311 }
312 }
313 SqlScalarType::String => (DataType::LargeUtf8, "text"),
314 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
317 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
320 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
321 SqlScalarType::Numeric { max_scale } => {
322 match max_scale {
336 Some(scale) => {
337 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
338 if scale <= DECIMAL128_MAX_SCALE {
339 (
340 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
341 "numeric",
342 )
343 } else {
344 anyhow::bail!("Numeric max scale {} out of range", scale)
345 }
346 }
347 None => (
348 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
349 "numeric",
350 ),
351 }
352 }
353 SqlScalarType::Array(inner) => {
357 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
363 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
365 let list_field = Arc::new(field_with_typename(
366 "items",
367 DataType::List(inner_field.into()),
368 false,
369 "array_items",
370 ));
371 let dims_field = Arc::new(field_with_typename(
372 "dimensions",
373 DataType::UInt8,
374 false,
375 "array_dimensions",
376 ));
377 (DataType::Struct([list_field, dims_field].into()), "array")
378 }
379 SqlScalarType::List {
380 element_type,
381 custom_id: _,
382 } => {
383 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
384 let field = field_with_typename("item", inner_type, true, &inner_name);
386 (DataType::List(field.into()), "list")
387 }
388 SqlScalarType::Map {
389 value_type,
390 custom_id: _,
391 } => {
392 let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
393 let field_names = MapFieldNames::default();
395 let struct_type = DataType::Struct(
396 vec![
397 Field::new(&field_names.key, DataType::Utf8, false),
398 field_with_typename(&field_names.value, value_type, true, &value_name),
399 ]
400 .into(),
401 );
402 (
403 DataType::Map(
404 Field::new(&field_names.entry, struct_type, false).into(),
405 false,
406 ),
407 "map",
408 )
409 }
410 SqlScalarType::Record {
411 fields,
412 custom_id: _,
413 } => {
414 let mut arrow_fields = Vec::with_capacity(fields.len());
417 for (field_name, field_type) in fields.iter() {
418 let (inner_type, inner_extension_name) =
419 scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
420 let field = field_with_typename(
421 field_name.as_str(),
422 inner_type,
423 field_type.nullable,
424 &inner_extension_name,
425 );
426 arrow_fields.push(Arc::new(field));
427 }
428 (DataType::Struct(arrow_fields.into()), "record")
429 }
430 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
431 };
432 Ok((data_type, extension_name.to_lowercase()))
433}
434
435fn builder_for_datatype(
436 data_type: &DataType,
437 item_capacity: usize,
438 data_capacity: usize,
439) -> Result<ColBuilder, anyhow::Error> {
440 let builder = match &data_type {
441 DataType::Boolean => {
442 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
443 }
444 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
445 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
446 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
447 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
448 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
449 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
450 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
451 DataType::Float32 => {
452 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
453 }
454 DataType::Float64 => {
455 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
456 }
457 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
458 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
459 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
460 item_capacity,
461 ))
462 }
463 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
464 ColBuilder::TimestampMicrosecondBuilder(
465 TimestampMicrosecondBuilder::with_capacity(item_capacity)
466 .with_timezone_opt(timezone.clone()),
467 )
468 }
469 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
470 item_capacity,
471 data_capacity,
472 )),
473 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
474 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
475 ),
476 DataType::Utf8 => {
477 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
478 }
479 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
480 item_capacity,
481 data_capacity,
482 )),
483 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
484 Decimal128Builder::with_capacity(item_capacity)
485 .with_precision_and_scale(*precision, *scale)?,
486 ),
487 DataType::List(field) => {
488 let inner_col_builder = ArrowColumn::new(
489 field.name().clone(),
490 field.is_nullable(),
491 field.data_type().clone(),
492 typename_from_field(field)?,
493 item_capacity,
494 data_capacity,
495 )?;
496 ColBuilder::ListBuilder(Box::new(
497 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
498 ))
499 }
500 DataType::Struct(fields) => {
501 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
502 for field in fields {
503 let inner_col_builder = ArrowColumn::new(
504 field.name().clone(),
505 field.is_nullable(),
506 field.data_type().clone(),
507 typename_from_field(field)?,
508 item_capacity,
509 data_capacity,
510 )?;
511 field_builders.push(Box::new(inner_col_builder));
512 }
513 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
514 }
515 DataType::Map(entries_field, _sorted) => {
516 let entries_field = entries_field.as_ref();
517 if let DataType::Struct(fields) = entries_field.data_type() {
518 if fields.len() != 2 {
519 anyhow::bail!(
520 "Expected map entries to have 2 fields, found {}",
521 fields.len()
522 )
523 }
524 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
525 let value_field = &fields[1];
526 let value_builder = ArrowColumn::new(
527 value_field.name().clone(),
528 value_field.is_nullable(),
529 value_field.data_type().clone(),
530 typename_from_field(value_field)?,
531 item_capacity,
532 data_capacity,
533 )?;
534 ColBuilder::MapBuilder(Box::new(
535 MapBuilder::with_capacity(
536 Some(MapFieldNames::default()),
537 key_builder,
538 value_builder,
539 item_capacity,
540 )
541 .with_values_field(Arc::clone(value_field)),
542 ))
543 } else {
544 anyhow::bail!("Expected map entries to be a struct")
545 }
546 }
547 _ => anyhow::bail!("{:?} unimplemented", data_type),
548 };
549 Ok(builder)
550}
551
552#[derive(Debug)]
553struct ArrowColumn {
554 field_name: String,
555 nullable: bool,
556 data_type: DataType,
557 extension_type_name: String,
558 inner: ColBuilder,
559}
560
561impl From<&ArrowColumn> for Field {
562 fn from(col: &ArrowColumn) -> Self {
563 field_with_typename(
564 &col.field_name,
565 col.data_type.clone(),
566 col.nullable,
567 &col.extension_type_name,
568 )
569 }
570}
571
572fn field_with_typename(
574 name: &str,
575 data_type: DataType,
576 nullable: bool,
577 extension_type_name: &str,
578) -> Field {
579 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
580 ARROW_EXTENSION_NAME_KEY.to_string(),
581 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
582 )]))
583}
584
585fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
588 let metadata = field.metadata();
589 let extension_name = metadata
590 .get(ARROW_EXTENSION_NAME_KEY)
591 .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
592 extension_name
593 .strip_prefix(EXTENSION_PREFIX)
594 .map(|s| s.to_string())
595 .ok_or_else(|| {
596 anyhow::anyhow!(
597 "Field '{}' extension name '{}' missing expected prefix '{}'",
598 field.name(),
599 extension_name,
600 EXTENSION_PREFIX
601 )
602 })
603}
604
605impl ArrowColumn {
606 fn new(
607 field_name: String,
608 nullable: bool,
609 data_type: DataType,
610 extension_type_name: String,
611 item_capacity: usize,
612 data_capacity: usize,
613 ) -> Result<Self, anyhow::Error> {
614 Ok(Self {
615 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
616 field_name,
617 nullable,
618 data_type,
619 extension_type_name,
620 })
621 }
622}
623
624macro_rules! make_col_builder {
625 ($($x:ident), *) => {
626 #[derive(Debug)]
630 enum ColBuilder {
631 $(
632 $x($x),
633 )*
634 ListBuilder(Box<ListBuilder<ArrowColumn>>),
638 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
639 StructBuilder(StructBuilder),
644 }
645
646 impl ColBuilder {
647 fn append_null(&mut self) {
648 match self {
649 $(
650 ColBuilder::$x(builder) => builder.append_null(),
651 )*
652 ColBuilder::ListBuilder(builder) => builder.append_null(),
653 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
654 ColBuilder::StructBuilder(builder) => {
655 for i in 0..builder.num_fields() {
656 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
657 field_builder.inner.append_null();
658 }
659 builder.append_null();
660 }
661 }
662 }
663 }
664
665 impl ArrayBuilder for ArrowColumn {
670 fn len(&self) -> usize {
671 match &self.inner {
672 $(
673 ColBuilder::$x(builder) => builder.len(),
674 )*
675 ColBuilder::ListBuilder(builder) => builder.len(),
676 ColBuilder::MapBuilder(builder) => builder.len(),
677 ColBuilder::StructBuilder(builder) => builder.len(),
678 }
679 }
680 fn finish(&mut self) -> ArrayRef {
681 match &mut self.inner {
682 $(
683 ColBuilder::$x(builder) => Arc::new(builder.finish()),
684 )*
685 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
686 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
687 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
688 }
689 }
690 fn finish_cloned(&self) -> ArrayRef {
691 match &self.inner {
692 $(
693 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
694 )*
695 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
696 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
697 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
698 }
699 }
700 fn as_any(&self) -> &(dyn Any + 'static) {
701 self
702 }
703 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
704 self
705 }
706 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
707 self
708 }
709 }
710 };
711}
712
713make_col_builder!(
714 BooleanBuilder,
715 Int16Builder,
716 Int32Builder,
717 Int64Builder,
718 UInt8Builder,
719 UInt16Builder,
720 UInt32Builder,
721 UInt64Builder,
722 Float32Builder,
723 Float64Builder,
724 Date32Builder,
725 Time64MicrosecondBuilder,
726 TimestampMicrosecondBuilder,
727 LargeBinaryBuilder,
728 FixedSizeBinaryBuilder,
729 StringBuilder,
730 LargeStringBuilder,
731 Decimal128Builder
732);
733
734impl ArrowColumn {
735 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
736 match (&mut self.inner, datum) {
737 (s, Datum::Null) => s.append_null(),
738 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
739 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
740 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
741 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
742 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
743 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
744 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
745 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
746 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
747 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
748 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
749 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
750 builder.append_value(d.unix_epoch_days())
751 }
752 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
753 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
754 * 1_000_000
755 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
756 builder.append_value(micros_since_midnight)
757 }
758 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
759 builder.append_value(ts.and_utc().timestamp_micros())
760 }
761 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
762 builder.append_value(ts.timestamp_micros())
763 }
764 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
765 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
766 builder.append_value(val.as_bytes())?
767 }
768 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
769 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
770 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
771 }
772 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
773 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
774 builder.append_value(ts.into())
775 }
776 (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
779 builder.append_value(i32::from(i))
780 }
781 (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
782 builder.append_value(i64::from(i))
783 }
784 (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
785 builder.append_value(i128::from(i))
786 }
787 (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
788 builder.append_value(i128::from(u64::from(ts)))
789 }
790 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
791 if dec.0.is_special() {
792 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
793 }
794 if let DataType::Decimal128(precision, scale) = self.data_type {
795 if dec.0.digits() > precision.into() {
796 anyhow::bail!(
797 "Decimal value {} out of range for column with precision {}",
798 dec,
799 precision
800 )
801 }
802
803 let coefficient: i128 = dec.0.coefficient()?;
806 let exponent = dec.0.exponent();
807
808 let scale_diff = i32::from(scale) + exponent;
812 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
815 anyhow::anyhow!(
816 "cannot represent decimal value {} in column with scale {}",
817 dec,
818 scale
819 )
820 })?;
821
822 let value = coefficient
823 .checked_mul(10_i128.pow(scale_diff))
824 .ok_or_else(|| {
825 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
826 })?;
827
828 builder.append_value(value)
829 } else {
830 anyhow::bail!("Expected Decimal128 data type")
831 }
832 }
833 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
834 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
837 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
838 let inner_builder = list_builder.values();
839 for datum in arr.elements().into_iter() {
840 inner_builder.append_datum(datum)?;
841 }
842 list_builder.append(true);
843 } else {
844 anyhow::bail!(
845 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
846 struct_builder
847 )
848 }
849 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
850 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
851 dims_builder.append_value(arr.dims().ndims());
852 } else {
853 anyhow::bail!(
854 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
855 struct_builder
856 )
857 }
858 struct_builder.append(true)
859 }
860 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
861 let inner_builder = list_builder.values();
862 for datum in list.into_iter() {
863 inner_builder.append_datum(datum)?;
864 }
865 list_builder.append(true)
866 }
867 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
868 for (key, value) in map.iter() {
869 builder.keys().append_value(key);
870 builder.values().append_datum(value)?;
871 }
872 builder.append(true).unwrap()
873 }
874 (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
877 let field_count = struct_builder.num_fields();
878 for (i, datum) in list.into_iter().enumerate() {
879 if i >= field_count {
880 anyhow::bail!(
881 "Record has more elements ({}) than struct fields ({})",
882 i + 1,
883 field_count
884 );
885 }
886 let field_builder: &mut ArrowColumn = struct_builder
887 .field_builder(i)
888 .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
889 field_builder.append_datum(datum)?;
890 }
891 struct_builder.append(true);
892 }
893 (builder, datum) => {
894 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
895 }
896 }
897 Ok(())
898 }
899}