1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34 columns: Vec<ArrowColumn>,
35 row_size_bytes: usize,
38 original_schema: Option<Arc<Schema>>,
40}
41
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44 let mut fields = vec![];
45 let mut errs = vec![];
46 let mut seen_names = BTreeMap::new();
47 for (col_name, col_type) in desc.iter() {
48 let mut col_name = col_name.to_string();
49 seen_names
55 .entry(col_name.clone())
56 .and_modify(|e: &mut u32| {
57 *e += 1;
58 col_name += &e.to_string();
59 })
60 .or_insert(1);
61 match scalar_to_arrow_datatype(&col_type.scalar_type) {
62 Ok((data_type, extension_type_name)) => {
63 fields.push(field_with_typename(
64 &col_name,
65 data_type,
66 col_type.nullable,
67 &extension_type_name,
68 ));
69 }
70 Err(err) => errs.push(err.to_string()),
71 }
72 }
73 if !errs.is_empty() {
74 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
75 }
76 Ok(Schema::new(fields))
77}
78
79impl ArrowBuilder {
80 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
82 let mut errs = vec![];
83 for (col_name, col_type) in desc.iter() {
84 match scalar_to_arrow_datatype(&col_type.scalar_type) {
85 Ok(_) => {}
86 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
87 }
88 }
89 if !errs.is_empty() {
90 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
91 }
92 Ok(())
93 }
94
95 pub fn new(
101 desc: &RelationDesc,
102 item_capacity: usize,
103 data_capacity: usize,
104 ) -> Result<Self, anyhow::Error> {
105 let schema = desc_to_schema(desc)?;
106 let mut columns = vec![];
107 for field in schema.fields() {
108 columns.push(ArrowColumn::new(
109 field.name().clone(),
110 field.is_nullable(),
111 field.data_type().clone(),
112 typename_from_field(field)?,
113 item_capacity,
114 data_capacity,
115 )?);
116 }
117 Ok(Self {
118 columns,
119 row_size_bytes: 0,
120 original_schema: None,
121 })
122 }
123
124 pub fn new_with_schema(
131 schema: Arc<Schema>,
132 item_capacity: usize,
133 data_capacity: usize,
134 ) -> Result<Self, anyhow::Error> {
135 let mut columns = vec![];
136 for field in schema.fields() {
137 columns.push(ArrowColumn::new(
138 field.name().clone(),
139 field.is_nullable(),
140 field.data_type().clone(),
141 typename_from_field(field)?,
142 item_capacity,
143 data_capacity,
144 )?);
145 }
146 Ok(Self {
147 columns,
148 row_size_bytes: 0,
149 original_schema: Some(schema),
150 })
151 }
152
153 pub fn schema(&self) -> Schema {
155 Schema::new(
156 self.columns
157 .iter()
158 .map(Into::<Field>::into)
159 .collect::<Vec<_>>(),
160 )
161 }
162
163 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
165 let mut arrays = vec![];
166 let mut fields: Vec<Field> = vec![];
167 for mut col in self.columns.into_iter() {
168 arrays.push(col.finish());
169 fields.push((&col).into());
170 }
171
172 let schema = if let Some(original_schema) = self.original_schema {
174 original_schema
175 } else {
176 Arc::new(Schema::new(fields))
177 };
178
179 RecordBatch::try_new(schema, arrays)
180 }
181
182 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
185 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
186 col.append_datum(datum)?;
187 }
188 self.row_size_bytes += row.byte_len();
189 Ok(())
190 }
191
192 pub fn row_size_bytes(&self) -> usize {
193 self.row_size_bytes
194 }
195}
196
197fn scalar_to_arrow_datatype(
201 scalar_type: &SqlScalarType,
202) -> Result<(DataType, String), anyhow::Error> {
203 let (data_type, extension_name) = match scalar_type {
204 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
205 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
206 SqlScalarType::Int32 => (DataType::Int32, "integer"),
207 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
208 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
209 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
210 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
211 SqlScalarType::Float32 => (DataType::Float32, "real"),
212 SqlScalarType::Float64 => (DataType::Float64, "double"),
213 SqlScalarType::Date => (DataType::Date32, "date"),
214 SqlScalarType::Time => (
219 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
220 "time",
221 ),
222 SqlScalarType::Timestamp { .. } => (
223 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
224 "timestamp",
225 ),
226 SqlScalarType::TimestampTz { .. } => (
227 DataType::Timestamp(
228 arrow::datatypes::TimeUnit::Microsecond,
229 Some("+00:00".into()),
232 ),
233 "timestamptz",
234 ),
235 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
236 SqlScalarType::Char { length } => {
237 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
238 (DataType::Utf8, "text")
239 } else {
240 (DataType::LargeUtf8, "text")
241 }
242 }
243 SqlScalarType::VarChar { max_length } => {
244 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
245 (DataType::Utf8, "text")
246 } else {
247 (DataType::LargeUtf8, "text")
248 }
249 }
250 SqlScalarType::String => (DataType::LargeUtf8, "text"),
251 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
254 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
257 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
258 SqlScalarType::Numeric { max_scale } => {
259 match max_scale {
273 Some(scale) => {
274 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
275 if scale <= DECIMAL128_MAX_SCALE {
276 (
277 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
278 "numeric",
279 )
280 } else {
281 anyhow::bail!("Numeric max scale {} out of range", scale)
282 }
283 }
284 None => (
285 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
286 "numeric",
287 ),
288 }
289 }
290 SqlScalarType::Array(inner) => {
294 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
300 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
302 let list_field = Arc::new(field_with_typename(
303 "items",
304 DataType::List(inner_field.into()),
305 false,
306 "array_items",
307 ));
308 let dims_field = Arc::new(field_with_typename(
309 "dimensions",
310 DataType::UInt8,
311 false,
312 "array_dimensions",
313 ));
314 (DataType::Struct([list_field, dims_field].into()), "array")
315 }
316 SqlScalarType::List {
317 element_type,
318 custom_id: _,
319 } => {
320 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
321 let field = field_with_typename("item", inner_type, true, &inner_name);
323 (DataType::List(field.into()), "list")
324 }
325 SqlScalarType::Map {
326 value_type,
327 custom_id: _,
328 } => {
329 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
330 let field_names = MapFieldNames::default();
332 let struct_type = DataType::Struct(
333 vec![
334 Field::new(&field_names.key, DataType::Utf8, false),
335 field_with_typename(&field_names.value, value_type, true, &value_name),
336 ]
337 .into(),
338 );
339 (
340 DataType::Map(
341 Field::new(&field_names.entry, struct_type, false).into(),
342 false,
343 ),
344 "map",
345 )
346 }
347 SqlScalarType::Record {
348 fields,
349 custom_id: _,
350 } => {
351 let mut arrow_fields = Vec::with_capacity(fields.len());
354 for (field_name, field_type) in fields.iter() {
355 let (inner_type, inner_extension_name) =
356 scalar_to_arrow_datatype(&field_type.scalar_type)?;
357 let field = field_with_typename(
358 field_name.as_str(),
359 inner_type,
360 field_type.nullable,
361 &inner_extension_name,
362 );
363 arrow_fields.push(Arc::new(field));
364 }
365 (DataType::Struct(arrow_fields.into()), "record")
366 }
367 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
368 };
369 Ok((data_type, extension_name.to_lowercase()))
370}
371
372fn builder_for_datatype(
373 data_type: &DataType,
374 item_capacity: usize,
375 data_capacity: usize,
376) -> Result<ColBuilder, anyhow::Error> {
377 let builder = match &data_type {
378 DataType::Boolean => {
379 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
380 }
381 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
382 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
383 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
384 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
385 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
386 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
387 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
388 DataType::Float32 => {
389 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
390 }
391 DataType::Float64 => {
392 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
393 }
394 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
395 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
396 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
397 item_capacity,
398 ))
399 }
400 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
401 ColBuilder::TimestampMicrosecondBuilder(
402 TimestampMicrosecondBuilder::with_capacity(item_capacity)
403 .with_timezone_opt(timezone.clone()),
404 )
405 }
406 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
407 item_capacity,
408 data_capacity,
409 )),
410 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
411 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
412 ),
413 DataType::Utf8 => {
414 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
415 }
416 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
417 item_capacity,
418 data_capacity,
419 )),
420 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
421 Decimal128Builder::with_capacity(item_capacity)
422 .with_precision_and_scale(*precision, *scale)?,
423 ),
424 DataType::List(field) => {
425 let inner_col_builder = ArrowColumn::new(
426 field.name().clone(),
427 field.is_nullable(),
428 field.data_type().clone(),
429 typename_from_field(field)?,
430 item_capacity,
431 data_capacity,
432 )?;
433 ColBuilder::ListBuilder(Box::new(
434 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
435 ))
436 }
437 DataType::Struct(fields) => {
438 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
439 for field in fields {
440 let inner_col_builder = ArrowColumn::new(
441 field.name().clone(),
442 field.is_nullable(),
443 field.data_type().clone(),
444 typename_from_field(field)?,
445 item_capacity,
446 data_capacity,
447 )?;
448 field_builders.push(Box::new(inner_col_builder));
449 }
450 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
451 }
452 DataType::Map(entries_field, _sorted) => {
453 let entries_field = entries_field.as_ref();
454 if let DataType::Struct(fields) = entries_field.data_type() {
455 if fields.len() != 2 {
456 anyhow::bail!(
457 "Expected map entries to have 2 fields, found {}",
458 fields.len()
459 )
460 }
461 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
462 let value_field = &fields[1];
463 let value_builder = ArrowColumn::new(
464 value_field.name().clone(),
465 value_field.is_nullable(),
466 value_field.data_type().clone(),
467 typename_from_field(value_field)?,
468 item_capacity,
469 data_capacity,
470 )?;
471 ColBuilder::MapBuilder(Box::new(
472 MapBuilder::with_capacity(
473 Some(MapFieldNames::default()),
474 key_builder,
475 value_builder,
476 item_capacity,
477 )
478 .with_values_field(Arc::clone(value_field)),
479 ))
480 } else {
481 anyhow::bail!("Expected map entries to be a struct")
482 }
483 }
484 _ => anyhow::bail!("{:?} unimplemented", data_type),
485 };
486 Ok(builder)
487}
488
489#[derive(Debug)]
490struct ArrowColumn {
491 field_name: String,
492 nullable: bool,
493 data_type: DataType,
494 extension_type_name: String,
495 inner: ColBuilder,
496}
497
498impl From<&ArrowColumn> for Field {
499 fn from(col: &ArrowColumn) -> Self {
500 field_with_typename(
501 &col.field_name,
502 col.data_type.clone(),
503 col.nullable,
504 &col.extension_type_name,
505 )
506 }
507}
508
509fn field_with_typename(
511 name: &str,
512 data_type: DataType,
513 nullable: bool,
514 extension_type_name: &str,
515) -> Field {
516 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
517 ARROW_EXTENSION_NAME_KEY.to_string(),
518 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
519 )]))
520}
521
522fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
525 let metadata = field.metadata();
526 let extension_name = metadata
527 .get(ARROW_EXTENSION_NAME_KEY)
528 .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
529 extension_name
530 .strip_prefix(EXTENSION_PREFIX)
531 .map(|s| s.to_string())
532 .ok_or_else(|| {
533 anyhow::anyhow!(
534 "Field '{}' extension name '{}' missing expected prefix '{}'",
535 field.name(),
536 extension_name,
537 EXTENSION_PREFIX
538 )
539 })
540}
541
542impl ArrowColumn {
543 fn new(
544 field_name: String,
545 nullable: bool,
546 data_type: DataType,
547 extension_type_name: String,
548 item_capacity: usize,
549 data_capacity: usize,
550 ) -> Result<Self, anyhow::Error> {
551 Ok(Self {
552 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
553 field_name,
554 nullable,
555 data_type,
556 extension_type_name,
557 })
558 }
559}
560
561macro_rules! make_col_builder {
562 ($($x:ident), *) => {
563 #[derive(Debug)]
567 enum ColBuilder {
568 $(
569 $x($x),
570 )*
571 ListBuilder(Box<ListBuilder<ArrowColumn>>),
575 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
576 StructBuilder(StructBuilder),
581 }
582
583 impl ColBuilder {
584 fn append_null(&mut self) {
585 match self {
586 $(
587 ColBuilder::$x(builder) => builder.append_null(),
588 )*
589 ColBuilder::ListBuilder(builder) => builder.append_null(),
590 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
591 ColBuilder::StructBuilder(builder) => {
592 for i in 0..builder.num_fields() {
593 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
594 field_builder.inner.append_null();
595 }
596 builder.append_null();
597 }
598 }
599 }
600 }
601
602 impl ArrayBuilder for ArrowColumn {
607 fn len(&self) -> usize {
608 match &self.inner {
609 $(
610 ColBuilder::$x(builder) => builder.len(),
611 )*
612 ColBuilder::ListBuilder(builder) => builder.len(),
613 ColBuilder::MapBuilder(builder) => builder.len(),
614 ColBuilder::StructBuilder(builder) => builder.len(),
615 }
616 }
617 fn finish(&mut self) -> ArrayRef {
618 match &mut self.inner {
619 $(
620 ColBuilder::$x(builder) => Arc::new(builder.finish()),
621 )*
622 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
623 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
624 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
625 }
626 }
627 fn finish_cloned(&self) -> ArrayRef {
628 match &self.inner {
629 $(
630 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
631 )*
632 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
633 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
634 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
635 }
636 }
637 fn as_any(&self) -> &(dyn Any + 'static) {
638 self
639 }
640 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
641 self
642 }
643 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
644 self
645 }
646 }
647 };
648}
649
650make_col_builder!(
651 BooleanBuilder,
652 Int16Builder,
653 Int32Builder,
654 Int64Builder,
655 UInt8Builder,
656 UInt16Builder,
657 UInt32Builder,
658 UInt64Builder,
659 Float32Builder,
660 Float64Builder,
661 Date32Builder,
662 Time64MicrosecondBuilder,
663 TimestampMicrosecondBuilder,
664 LargeBinaryBuilder,
665 FixedSizeBinaryBuilder,
666 StringBuilder,
667 LargeStringBuilder,
668 Decimal128Builder
669);
670
671impl ArrowColumn {
672 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
673 match (&mut self.inner, datum) {
674 (s, Datum::Null) => s.append_null(),
675 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
676 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
677 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
678 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
679 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
680 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
681 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
682 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
683 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
684 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
685 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
686 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
687 builder.append_value(d.unix_epoch_days())
688 }
689 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
690 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
691 * 1_000_000
692 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
693 builder.append_value(micros_since_midnight)
694 }
695 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
696 builder.append_value(ts.and_utc().timestamp_micros())
697 }
698 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
699 builder.append_value(ts.timestamp_micros())
700 }
701 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
702 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
703 builder.append_value(val.as_bytes())?
704 }
705 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
706 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
707 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
708 }
709 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
710 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
711 builder.append_value(ts.into())
712 }
713 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
714 if dec.0.is_special() {
715 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
716 }
717 if let DataType::Decimal128(precision, scale) = self.data_type {
718 if dec.0.digits() > precision.into() {
719 anyhow::bail!(
720 "Decimal value {} out of range for column with precision {}",
721 dec,
722 precision
723 )
724 }
725
726 let coefficient: i128 = dec.0.coefficient()?;
729 let exponent = dec.0.exponent();
730
731 let scale_diff = i32::from(scale) + exponent;
735 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
738 anyhow::anyhow!(
739 "cannot represent decimal value {} in column with scale {}",
740 dec,
741 scale
742 )
743 })?;
744
745 let value = coefficient
746 .checked_mul(10_i128.pow(scale_diff))
747 .ok_or_else(|| {
748 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
749 })?;
750
751 builder.append_value(value)
752 } else {
753 anyhow::bail!("Expected Decimal128 data type")
754 }
755 }
756 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
757 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
760 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
761 let inner_builder = list_builder.values();
762 for datum in arr.elements().into_iter() {
763 inner_builder.append_datum(datum)?;
764 }
765 list_builder.append(true);
766 } else {
767 anyhow::bail!(
768 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
769 struct_builder
770 )
771 }
772 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
773 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
774 dims_builder.append_value(arr.dims().ndims());
775 } else {
776 anyhow::bail!(
777 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
778 struct_builder
779 )
780 }
781 struct_builder.append(true)
782 }
783 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
784 let inner_builder = list_builder.values();
785 for datum in list.into_iter() {
786 inner_builder.append_datum(datum)?;
787 }
788 list_builder.append(true)
789 }
790 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
791 for (key, value) in map.iter() {
792 builder.keys().append_value(key);
793 builder.values().append_datum(value)?;
794 }
795 builder.append(true).unwrap()
796 }
797 (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
800 let field_count = struct_builder.num_fields();
801 for (i, datum) in list.into_iter().enumerate() {
802 if i >= field_count {
803 anyhow::bail!(
804 "Record has more elements ({}) than struct fields ({})",
805 i + 1,
806 field_count
807 );
808 }
809 let field_builder: &mut ArrowColumn = struct_builder
810 .field_builder(i)
811 .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
812 field_builder.append_datum(datum)?;
813 }
814 struct_builder.append(true);
815 }
816 (builder, datum) => {
817 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
818 }
819 }
820 Ok(())
821 }
822}