1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30const EXTENSION_PREFIX: &str = "materialize.v1.";
31
32pub struct ArrowBuilder {
33 columns: Vec<ArrowColumn>,
34 row_size_bytes: usize,
37 original_schema: Option<Arc<Schema>>,
39}
40
41pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
43 let mut fields = vec![];
44 let mut errs = vec![];
45 let mut seen_names = BTreeMap::new();
46 for (col_name, col_type) in desc.iter() {
47 let mut col_name = col_name.to_string();
48 seen_names
54 .entry(col_name.clone())
55 .and_modify(|e: &mut u32| {
56 *e += 1;
57 col_name += &e.to_string();
58 })
59 .or_insert(1);
60 match scalar_to_arrow_datatype(&col_type.scalar_type) {
61 Ok((data_type, extension_type_name)) => {
62 fields.push(field_with_typename(
63 &col_name,
64 data_type,
65 col_type.nullable,
66 &extension_type_name,
67 ));
68 }
69 Err(err) => errs.push(err.to_string()),
70 }
71 }
72 if !errs.is_empty() {
73 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
74 }
75 Ok(Schema::new(fields))
76}
77
78impl ArrowBuilder {
79 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
81 let mut errs = vec![];
82 for (col_name, col_type) in desc.iter() {
83 match scalar_to_arrow_datatype(&col_type.scalar_type) {
84 Ok(_) => {}
85 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
86 }
87 }
88 if !errs.is_empty() {
89 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
90 }
91 Ok(())
92 }
93
94 pub fn new(
100 desc: &RelationDesc,
101 item_capacity: usize,
102 data_capacity: usize,
103 ) -> Result<Self, anyhow::Error> {
104 let schema = desc_to_schema(desc)?;
105 let mut columns = vec![];
106 for field in schema.fields() {
107 columns.push(ArrowColumn::new(
108 field.name().clone(),
109 field.is_nullable(),
110 field.data_type().clone(),
111 typename_from_field(field)?,
112 item_capacity,
113 data_capacity,
114 )?);
115 }
116 Ok(Self {
117 columns,
118 row_size_bytes: 0,
119 original_schema: None,
120 })
121 }
122
123 pub fn new_with_schema(
130 schema: Arc<Schema>,
131 item_capacity: usize,
132 data_capacity: usize,
133 ) -> Result<Self, anyhow::Error> {
134 let mut columns = vec![];
135 for field in schema.fields() {
136 columns.push(ArrowColumn::new(
137 field.name().clone(),
138 field.is_nullable(),
139 field.data_type().clone(),
140 typename_from_field(field)?,
141 item_capacity,
142 data_capacity,
143 )?);
144 }
145 Ok(Self {
146 columns,
147 row_size_bytes: 0,
148 original_schema: Some(schema),
149 })
150 }
151
152 pub fn schema(&self) -> Schema {
154 Schema::new(
155 self.columns
156 .iter()
157 .map(Into::<Field>::into)
158 .collect::<Vec<_>>(),
159 )
160 }
161
162 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
164 let mut arrays = vec![];
165 let mut fields: Vec<Field> = vec![];
166 for mut col in self.columns.into_iter() {
167 arrays.push(col.finish());
168 fields.push((&col).into());
169 }
170
171 let schema = if let Some(original_schema) = self.original_schema {
173 original_schema
174 } else {
175 Arc::new(Schema::new(fields))
176 };
177
178 RecordBatch::try_new(schema, arrays)
179 }
180
181 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
184 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
185 col.append_datum(datum)?;
186 }
187 self.row_size_bytes += row.byte_len();
188 Ok(())
189 }
190
191 pub fn row_size_bytes(&self) -> usize {
192 self.row_size_bytes
193 }
194}
195
196fn scalar_to_arrow_datatype(
200 scalar_type: &SqlScalarType,
201) -> Result<(DataType, String), anyhow::Error> {
202 let (data_type, extension_name) = match scalar_type {
203 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
204 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
205 SqlScalarType::Int32 => (DataType::Int32, "integer"),
206 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
207 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
208 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
209 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
210 SqlScalarType::Float32 => (DataType::Float32, "real"),
211 SqlScalarType::Float64 => (DataType::Float64, "double"),
212 SqlScalarType::Date => (DataType::Date32, "date"),
213 SqlScalarType::Time => (
218 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
219 "time",
220 ),
221 SqlScalarType::Timestamp { .. } => (
222 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
223 "timestamp",
224 ),
225 SqlScalarType::TimestampTz { .. } => (
226 DataType::Timestamp(
227 arrow::datatypes::TimeUnit::Microsecond,
228 Some("+00:00".into()),
231 ),
232 "timestamptz",
233 ),
234 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
235 SqlScalarType::Char { length } => {
236 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
237 (DataType::Utf8, "text")
238 } else {
239 (DataType::LargeUtf8, "text")
240 }
241 }
242 SqlScalarType::VarChar { max_length } => {
243 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
244 (DataType::Utf8, "text")
245 } else {
246 (DataType::LargeUtf8, "text")
247 }
248 }
249 SqlScalarType::String => (DataType::LargeUtf8, "text"),
250 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
253 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
256 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
257 SqlScalarType::Numeric { max_scale } => {
258 match max_scale {
272 Some(scale) => {
273 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
274 if scale <= DECIMAL128_MAX_SCALE {
275 (
276 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
277 "numeric",
278 )
279 } else {
280 anyhow::bail!("Numeric max scale {} out of range", scale)
281 }
282 }
283 None => (
284 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
285 "numeric",
286 ),
287 }
288 }
289 SqlScalarType::Array(inner) => {
293 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
299 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
301 let list_field = Arc::new(field_with_typename(
302 "items",
303 DataType::List(inner_field.into()),
304 false,
305 "array_items",
306 ));
307 let dims_field = Arc::new(field_with_typename(
308 "dimensions",
309 DataType::UInt8,
310 false,
311 "array_dimensions",
312 ));
313 (DataType::Struct([list_field, dims_field].into()), "array")
314 }
315 SqlScalarType::List {
316 element_type,
317 custom_id: _,
318 } => {
319 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
320 let field = field_with_typename("item", inner_type, true, &inner_name);
322 (DataType::List(field.into()), "list")
323 }
324 SqlScalarType::Map {
325 value_type,
326 custom_id: _,
327 } => {
328 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
329 let field_names = MapFieldNames::default();
331 let struct_type = DataType::Struct(
332 vec![
333 Field::new(&field_names.key, DataType::Utf8, false),
334 field_with_typename(&field_names.value, value_type, true, &value_name),
335 ]
336 .into(),
337 );
338 (
339 DataType::Map(
340 Field::new(&field_names.entry, struct_type, false).into(),
341 false,
342 ),
343 "map",
344 )
345 }
346 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
347 };
348 Ok((data_type, extension_name.to_lowercase()))
349}
350
351fn builder_for_datatype(
352 data_type: &DataType,
353 item_capacity: usize,
354 data_capacity: usize,
355) -> Result<ColBuilder, anyhow::Error> {
356 let builder = match &data_type {
357 DataType::Boolean => {
358 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
359 }
360 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
361 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
362 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
363 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
364 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
365 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
366 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
367 DataType::Float32 => {
368 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
369 }
370 DataType::Float64 => {
371 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
372 }
373 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
374 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
375 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
376 item_capacity,
377 ))
378 }
379 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
380 ColBuilder::TimestampMicrosecondBuilder(
381 TimestampMicrosecondBuilder::with_capacity(item_capacity)
382 .with_timezone_opt(timezone.clone()),
383 )
384 }
385 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
386 item_capacity,
387 data_capacity,
388 )),
389 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
390 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
391 ),
392 DataType::Utf8 => {
393 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
394 }
395 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
396 item_capacity,
397 data_capacity,
398 )),
399 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
400 Decimal128Builder::with_capacity(item_capacity)
401 .with_precision_and_scale(*precision, *scale)?,
402 ),
403 DataType::List(field) => {
404 let inner_col_builder = ArrowColumn::new(
405 field.name().clone(),
406 field.is_nullable(),
407 field.data_type().clone(),
408 typename_from_field(field)?,
409 item_capacity,
410 data_capacity,
411 )?;
412 ColBuilder::ListBuilder(Box::new(
413 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
414 ))
415 }
416 DataType::Struct(fields) => {
417 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
418 for field in fields {
419 let inner_col_builder = ArrowColumn::new(
420 field.name().clone(),
421 field.is_nullable(),
422 field.data_type().clone(),
423 typename_from_field(field)?,
424 item_capacity,
425 data_capacity,
426 )?;
427 field_builders.push(Box::new(inner_col_builder));
428 }
429 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
430 }
431 DataType::Map(entries_field, _sorted) => {
432 let entries_field = entries_field.as_ref();
433 if let DataType::Struct(fields) = entries_field.data_type() {
434 if fields.len() != 2 {
435 anyhow::bail!(
436 "Expected map entries to have 2 fields, found {}",
437 fields.len()
438 )
439 }
440 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
441 let value_field = &fields[1];
442 let value_builder = ArrowColumn::new(
443 value_field.name().clone(),
444 value_field.is_nullable(),
445 value_field.data_type().clone(),
446 typename_from_field(value_field)?,
447 item_capacity,
448 data_capacity,
449 )?;
450 ColBuilder::MapBuilder(Box::new(
451 MapBuilder::with_capacity(
452 Some(MapFieldNames::default()),
453 key_builder,
454 value_builder,
455 item_capacity,
456 )
457 .with_values_field(Arc::clone(value_field)),
458 ))
459 } else {
460 anyhow::bail!("Expected map entries to be a struct")
461 }
462 }
463 _ => anyhow::bail!("{:?} unimplemented", data_type),
464 };
465 Ok(builder)
466}
467
468#[derive(Debug)]
469struct ArrowColumn {
470 field_name: String,
471 nullable: bool,
472 data_type: DataType,
473 extension_type_name: String,
474 inner: ColBuilder,
475}
476
477impl From<&ArrowColumn> for Field {
478 fn from(col: &ArrowColumn) -> Self {
479 field_with_typename(
480 &col.field_name,
481 col.data_type.clone(),
482 col.nullable,
483 &col.extension_type_name,
484 )
485 }
486}
487
488fn field_with_typename(
490 name: &str,
491 data_type: DataType,
492 nullable: bool,
493 extension_type_name: &str,
494) -> Field {
495 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
496 "ARROW:extension:name".to_string(),
497 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
498 )]))
499}
500
501fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
503 let metadata = field.metadata();
504 let extension_name = metadata
505 .get("ARROW:extension:name")
506 .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
507 if let Some(name) = extension_name.strip_prefix(EXTENSION_PREFIX) {
508 Ok(name.to_string())
509 } else {
510 anyhow::bail!("Extension name {} does not match expected", extension_name,)
511 }
512}
513
514impl ArrowColumn {
515 fn new(
516 field_name: String,
517 nullable: bool,
518 data_type: DataType,
519 extension_type_name: String,
520 item_capacity: usize,
521 data_capacity: usize,
522 ) -> Result<Self, anyhow::Error> {
523 Ok(Self {
524 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
525 field_name,
526 nullable,
527 data_type,
528 extension_type_name,
529 })
530 }
531}
532
533macro_rules! make_col_builder {
534 ($($x:ident), *) => {
535 #[derive(Debug)]
539 enum ColBuilder {
540 $(
541 $x($x),
542 )*
543 ListBuilder(Box<ListBuilder<ArrowColumn>>),
547 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
548 StructBuilder(StructBuilder),
553 }
554
555 impl ColBuilder {
556 fn append_null(&mut self) {
557 match self {
558 $(
559 ColBuilder::$x(builder) => builder.append_null(),
560 )*
561 ColBuilder::ListBuilder(builder) => builder.append_null(),
562 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
563 ColBuilder::StructBuilder(builder) => {
564 for i in 0..builder.num_fields() {
565 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
566 field_builder.inner.append_null();
567 }
568 builder.append_null();
569 }
570 }
571 }
572 }
573
574 impl ArrayBuilder for ArrowColumn {
579 fn len(&self) -> usize {
580 match &self.inner {
581 $(
582 ColBuilder::$x(builder) => builder.len(),
583 )*
584 ColBuilder::ListBuilder(builder) => builder.len(),
585 ColBuilder::MapBuilder(builder) => builder.len(),
586 ColBuilder::StructBuilder(builder) => builder.len(),
587 }
588 }
589 fn finish(&mut self) -> ArrayRef {
590 match &mut self.inner {
591 $(
592 ColBuilder::$x(builder) => Arc::new(builder.finish()),
593 )*
594 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
595 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
596 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
597 }
598 }
599 fn finish_cloned(&self) -> ArrayRef {
600 match &self.inner {
601 $(
602 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
603 )*
604 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
605 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
606 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
607 }
608 }
609 fn as_any(&self) -> &(dyn Any + 'static) {
610 self
611 }
612 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
613 self
614 }
615 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
616 self
617 }
618 }
619 };
620}
621
622make_col_builder!(
623 BooleanBuilder,
624 Int16Builder,
625 Int32Builder,
626 Int64Builder,
627 UInt8Builder,
628 UInt16Builder,
629 UInt32Builder,
630 UInt64Builder,
631 Float32Builder,
632 Float64Builder,
633 Date32Builder,
634 Time64MicrosecondBuilder,
635 TimestampMicrosecondBuilder,
636 LargeBinaryBuilder,
637 FixedSizeBinaryBuilder,
638 StringBuilder,
639 LargeStringBuilder,
640 Decimal128Builder
641);
642
643impl ArrowColumn {
644 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
645 match (&mut self.inner, datum) {
646 (s, Datum::Null) => s.append_null(),
647 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
648 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
649 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
650 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
651 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
652 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
653 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
654 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
655 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
656 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
657 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
658 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
659 builder.append_value(d.unix_epoch_days())
660 }
661 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
662 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
663 * 1_000_000
664 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
665 builder.append_value(micros_since_midnight)
666 }
667 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
668 builder.append_value(ts.and_utc().timestamp_micros())
669 }
670 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
671 builder.append_value(ts.timestamp_micros())
672 }
673 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
674 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
675 builder.append_value(val.as_bytes())?
676 }
677 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
678 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
679 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
680 }
681 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
682 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
683 builder.append_value(ts.into())
684 }
685 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
686 if dec.0.is_special() {
687 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
688 }
689 if let DataType::Decimal128(precision, scale) = self.data_type {
690 if dec.0.digits() > precision.into() {
691 anyhow::bail!(
692 "Decimal value {} out of range for column with precision {}",
693 dec,
694 precision
695 )
696 }
697
698 let coefficient: i128 = dec.0.coefficient()?;
701 let exponent = dec.0.exponent();
702
703 let scale_diff = i32::from(scale) + exponent;
707 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
710 anyhow::anyhow!(
711 "cannot represent decimal value {} in column with scale {}",
712 dec,
713 scale
714 )
715 })?;
716
717 let value = coefficient
718 .checked_mul(10_i128.pow(scale_diff))
719 .ok_or_else(|| {
720 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
721 })?;
722
723 builder.append_value(value)
724 } else {
725 anyhow::bail!("Expected Decimal128 data type")
726 }
727 }
728 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
729 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
732 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
733 let inner_builder = list_builder.values();
734 for datum in arr.elements().into_iter() {
735 inner_builder.append_datum(datum)?;
736 }
737 list_builder.append(true);
738 } else {
739 anyhow::bail!(
740 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
741 struct_builder
742 )
743 }
744 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
745 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
746 dims_builder.append_value(arr.dims().ndims());
747 } else {
748 anyhow::bail!(
749 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
750 struct_builder
751 )
752 }
753 struct_builder.append(true)
754 }
755 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
756 let inner_builder = list_builder.values();
757 for datum in list.into_iter() {
758 inner_builder.append_datum(datum)?;
759 }
760 list_builder.append(true)
761 }
762 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
763 for (key, value) in map.iter() {
764 builder.keys().append_value(key);
765 builder.values().append_datum(value)?;
766 }
767 builder.append(true).unwrap()
768 }
769 (builder, datum) => {
770 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
771 }
772 }
773 Ok(())
774 }
775}