1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34 columns: Vec<ArrowColumn>,
35 row_size_bytes: usize,
38 original_schema: Option<Arc<Schema>>,
40}
41
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44 let mut fields = vec![];
45 let mut errs = vec![];
46 let mut seen_names = BTreeSet::new();
47 let mut all_names = BTreeSet::new();
48 for col_name in desc.iter_names() {
49 all_names.insert(col_name.to_string());
50 }
51 for (col_name, col_type) in desc.iter() {
52 let mut col_name = col_name.to_string();
53 if seen_names.contains(&col_name) {
60 let mut new_col_name = col_name.clone();
61 let mut e = 2;
62 while all_names.contains(&new_col_name) {
63 new_col_name = format!("{}{}", col_name, e);
64 e += 1;
65 }
66 col_name = new_col_name;
67 }
68
69 seen_names.insert(col_name.clone());
70 all_names.insert(col_name.clone());
71 match scalar_to_arrow_datatype(&col_type.scalar_type) {
72 Ok((data_type, extension_type_name)) => {
73 fields.push(field_with_typename(
74 &col_name,
75 data_type,
76 col_type.nullable,
77 &extension_type_name,
78 ));
79 }
80 Err(err) => errs.push(err.to_string()),
81 }
82 }
83 if !errs.is_empty() {
84 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
85 }
86 Ok(Schema::new(fields))
87}
88
89impl ArrowBuilder {
90 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
92 let mut errs = vec![];
93 for (col_name, col_type) in desc.iter() {
94 match scalar_to_arrow_datatype(&col_type.scalar_type) {
95 Ok(_) => {}
96 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
97 }
98 }
99 if !errs.is_empty() {
100 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
101 }
102 Ok(())
103 }
104
105 pub fn new(
111 desc: &RelationDesc,
112 item_capacity: usize,
113 data_capacity: usize,
114 ) -> Result<Self, anyhow::Error> {
115 let schema = desc_to_schema(desc)?;
116 let mut columns = vec![];
117 for field in schema.fields() {
118 columns.push(ArrowColumn::new(
119 field.name().clone(),
120 field.is_nullable(),
121 field.data_type().clone(),
122 typename_from_field(field)?,
123 item_capacity,
124 data_capacity,
125 )?);
126 }
127 Ok(Self {
128 columns,
129 row_size_bytes: 0,
130 original_schema: None,
131 })
132 }
133
134 pub fn new_with_schema(
141 schema: Arc<Schema>,
142 item_capacity: usize,
143 data_capacity: usize,
144 ) -> Result<Self, anyhow::Error> {
145 let mut columns = vec![];
146 for field in schema.fields() {
147 columns.push(ArrowColumn::new(
148 field.name().clone(),
149 field.is_nullable(),
150 field.data_type().clone(),
151 typename_from_field(field)?,
152 item_capacity,
153 data_capacity,
154 )?);
155 }
156 Ok(Self {
157 columns,
158 row_size_bytes: 0,
159 original_schema: Some(schema),
160 })
161 }
162
163 pub fn schema(&self) -> Schema {
165 Schema::new(
166 self.columns
167 .iter()
168 .map(Into::<Field>::into)
169 .collect::<Vec<_>>(),
170 )
171 }
172
173 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
175 let mut arrays = vec![];
176 let mut fields: Vec<Field> = vec![];
177 for mut col in self.columns.into_iter() {
178 arrays.push(col.finish());
179 fields.push((&col).into());
180 }
181
182 let schema = if let Some(original_schema) = self.original_schema {
184 original_schema
185 } else {
186 Arc::new(Schema::new(fields))
187 };
188
189 RecordBatch::try_new(schema, arrays)
190 }
191
192 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
195 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
196 col.append_datum(datum)?;
197 }
198 self.row_size_bytes += row.byte_len();
199 Ok(())
200 }
201
202 pub fn row_size_bytes(&self) -> usize {
203 self.row_size_bytes
204 }
205}
206
207fn scalar_to_arrow_datatype(
211 scalar_type: &SqlScalarType,
212) -> Result<(DataType, String), anyhow::Error> {
213 let (data_type, extension_name) = match scalar_type {
214 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
215 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
216 SqlScalarType::Int32 => (DataType::Int32, "integer"),
217 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
218 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
219 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
220 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
221 SqlScalarType::Float32 => (DataType::Float32, "real"),
222 SqlScalarType::Float64 => (DataType::Float64, "double"),
223 SqlScalarType::Date => (DataType::Date32, "date"),
224 SqlScalarType::Time => (
229 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
230 "time",
231 ),
232 SqlScalarType::Timestamp { .. } => (
233 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
234 "timestamp",
235 ),
236 SqlScalarType::TimestampTz { .. } => (
237 DataType::Timestamp(
238 arrow::datatypes::TimeUnit::Microsecond,
239 Some("+00:00".into()),
242 ),
243 "timestamptz",
244 ),
245 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
246 SqlScalarType::Char { length } => {
247 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
248 (DataType::Utf8, "text")
249 } else {
250 (DataType::LargeUtf8, "text")
251 }
252 }
253 SqlScalarType::VarChar { max_length } => {
254 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
255 (DataType::Utf8, "text")
256 } else {
257 (DataType::LargeUtf8, "text")
258 }
259 }
260 SqlScalarType::String => (DataType::LargeUtf8, "text"),
261 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
264 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
267 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
268 SqlScalarType::Numeric { max_scale } => {
269 match max_scale {
283 Some(scale) => {
284 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
285 if scale <= DECIMAL128_MAX_SCALE {
286 (
287 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
288 "numeric",
289 )
290 } else {
291 anyhow::bail!("Numeric max scale {} out of range", scale)
292 }
293 }
294 None => (
295 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
296 "numeric",
297 ),
298 }
299 }
300 SqlScalarType::Array(inner) => {
304 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
310 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
312 let list_field = Arc::new(field_with_typename(
313 "items",
314 DataType::List(inner_field.into()),
315 false,
316 "array_items",
317 ));
318 let dims_field = Arc::new(field_with_typename(
319 "dimensions",
320 DataType::UInt8,
321 false,
322 "array_dimensions",
323 ));
324 (DataType::Struct([list_field, dims_field].into()), "array")
325 }
326 SqlScalarType::List {
327 element_type,
328 custom_id: _,
329 } => {
330 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
331 let field = field_with_typename("item", inner_type, true, &inner_name);
333 (DataType::List(field.into()), "list")
334 }
335 SqlScalarType::Map {
336 value_type,
337 custom_id: _,
338 } => {
339 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
340 let field_names = MapFieldNames::default();
342 let struct_type = DataType::Struct(
343 vec![
344 Field::new(&field_names.key, DataType::Utf8, false),
345 field_with_typename(&field_names.value, value_type, true, &value_name),
346 ]
347 .into(),
348 );
349 (
350 DataType::Map(
351 Field::new(&field_names.entry, struct_type, false).into(),
352 false,
353 ),
354 "map",
355 )
356 }
357 SqlScalarType::Record {
358 fields,
359 custom_id: _,
360 } => {
361 let mut arrow_fields = Vec::with_capacity(fields.len());
364 for (field_name, field_type) in fields.iter() {
365 let (inner_type, inner_extension_name) =
366 scalar_to_arrow_datatype(&field_type.scalar_type)?;
367 let field = field_with_typename(
368 field_name.as_str(),
369 inner_type,
370 field_type.nullable,
371 &inner_extension_name,
372 );
373 arrow_fields.push(Arc::new(field));
374 }
375 (DataType::Struct(arrow_fields.into()), "record")
376 }
377 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
378 };
379 Ok((data_type, extension_name.to_lowercase()))
380}
381
382fn builder_for_datatype(
383 data_type: &DataType,
384 item_capacity: usize,
385 data_capacity: usize,
386) -> Result<ColBuilder, anyhow::Error> {
387 let builder = match &data_type {
388 DataType::Boolean => {
389 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
390 }
391 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
392 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
393 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
394 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
395 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
396 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
397 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
398 DataType::Float32 => {
399 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
400 }
401 DataType::Float64 => {
402 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
403 }
404 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
405 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
406 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
407 item_capacity,
408 ))
409 }
410 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
411 ColBuilder::TimestampMicrosecondBuilder(
412 TimestampMicrosecondBuilder::with_capacity(item_capacity)
413 .with_timezone_opt(timezone.clone()),
414 )
415 }
416 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
417 item_capacity,
418 data_capacity,
419 )),
420 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
421 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
422 ),
423 DataType::Utf8 => {
424 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
425 }
426 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
427 item_capacity,
428 data_capacity,
429 )),
430 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
431 Decimal128Builder::with_capacity(item_capacity)
432 .with_precision_and_scale(*precision, *scale)?,
433 ),
434 DataType::List(field) => {
435 let inner_col_builder = ArrowColumn::new(
436 field.name().clone(),
437 field.is_nullable(),
438 field.data_type().clone(),
439 typename_from_field(field)?,
440 item_capacity,
441 data_capacity,
442 )?;
443 ColBuilder::ListBuilder(Box::new(
444 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
445 ))
446 }
447 DataType::Struct(fields) => {
448 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
449 for field in fields {
450 let inner_col_builder = ArrowColumn::new(
451 field.name().clone(),
452 field.is_nullable(),
453 field.data_type().clone(),
454 typename_from_field(field)?,
455 item_capacity,
456 data_capacity,
457 )?;
458 field_builders.push(Box::new(inner_col_builder));
459 }
460 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
461 }
462 DataType::Map(entries_field, _sorted) => {
463 let entries_field = entries_field.as_ref();
464 if let DataType::Struct(fields) = entries_field.data_type() {
465 if fields.len() != 2 {
466 anyhow::bail!(
467 "Expected map entries to have 2 fields, found {}",
468 fields.len()
469 )
470 }
471 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
472 let value_field = &fields[1];
473 let value_builder = ArrowColumn::new(
474 value_field.name().clone(),
475 value_field.is_nullable(),
476 value_field.data_type().clone(),
477 typename_from_field(value_field)?,
478 item_capacity,
479 data_capacity,
480 )?;
481 ColBuilder::MapBuilder(Box::new(
482 MapBuilder::with_capacity(
483 Some(MapFieldNames::default()),
484 key_builder,
485 value_builder,
486 item_capacity,
487 )
488 .with_values_field(Arc::clone(value_field)),
489 ))
490 } else {
491 anyhow::bail!("Expected map entries to be a struct")
492 }
493 }
494 _ => anyhow::bail!("{:?} unimplemented", data_type),
495 };
496 Ok(builder)
497}
498
499#[derive(Debug)]
500struct ArrowColumn {
501 field_name: String,
502 nullable: bool,
503 data_type: DataType,
504 extension_type_name: String,
505 inner: ColBuilder,
506}
507
508impl From<&ArrowColumn> for Field {
509 fn from(col: &ArrowColumn) -> Self {
510 field_with_typename(
511 &col.field_name,
512 col.data_type.clone(),
513 col.nullable,
514 &col.extension_type_name,
515 )
516 }
517}
518
519fn field_with_typename(
521 name: &str,
522 data_type: DataType,
523 nullable: bool,
524 extension_type_name: &str,
525) -> Field {
526 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
527 ARROW_EXTENSION_NAME_KEY.to_string(),
528 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
529 )]))
530}
531
532fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
535 let metadata = field.metadata();
536 let extension_name = metadata
537 .get(ARROW_EXTENSION_NAME_KEY)
538 .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
539 extension_name
540 .strip_prefix(EXTENSION_PREFIX)
541 .map(|s| s.to_string())
542 .ok_or_else(|| {
543 anyhow::anyhow!(
544 "Field '{}' extension name '{}' missing expected prefix '{}'",
545 field.name(),
546 extension_name,
547 EXTENSION_PREFIX
548 )
549 })
550}
551
552impl ArrowColumn {
553 fn new(
554 field_name: String,
555 nullable: bool,
556 data_type: DataType,
557 extension_type_name: String,
558 item_capacity: usize,
559 data_capacity: usize,
560 ) -> Result<Self, anyhow::Error> {
561 Ok(Self {
562 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
563 field_name,
564 nullable,
565 data_type,
566 extension_type_name,
567 })
568 }
569}
570
571macro_rules! make_col_builder {
572 ($($x:ident), *) => {
573 #[derive(Debug)]
577 enum ColBuilder {
578 $(
579 $x($x),
580 )*
581 ListBuilder(Box<ListBuilder<ArrowColumn>>),
585 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
586 StructBuilder(StructBuilder),
591 }
592
593 impl ColBuilder {
594 fn append_null(&mut self) {
595 match self {
596 $(
597 ColBuilder::$x(builder) => builder.append_null(),
598 )*
599 ColBuilder::ListBuilder(builder) => builder.append_null(),
600 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
601 ColBuilder::StructBuilder(builder) => {
602 for i in 0..builder.num_fields() {
603 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
604 field_builder.inner.append_null();
605 }
606 builder.append_null();
607 }
608 }
609 }
610 }
611
612 impl ArrayBuilder for ArrowColumn {
617 fn len(&self) -> usize {
618 match &self.inner {
619 $(
620 ColBuilder::$x(builder) => builder.len(),
621 )*
622 ColBuilder::ListBuilder(builder) => builder.len(),
623 ColBuilder::MapBuilder(builder) => builder.len(),
624 ColBuilder::StructBuilder(builder) => builder.len(),
625 }
626 }
627 fn finish(&mut self) -> ArrayRef {
628 match &mut self.inner {
629 $(
630 ColBuilder::$x(builder) => Arc::new(builder.finish()),
631 )*
632 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
633 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
634 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
635 }
636 }
637 fn finish_cloned(&self) -> ArrayRef {
638 match &self.inner {
639 $(
640 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
641 )*
642 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
643 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
644 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
645 }
646 }
647 fn as_any(&self) -> &(dyn Any + 'static) {
648 self
649 }
650 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
651 self
652 }
653 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
654 self
655 }
656 }
657 };
658}
659
660make_col_builder!(
661 BooleanBuilder,
662 Int16Builder,
663 Int32Builder,
664 Int64Builder,
665 UInt8Builder,
666 UInt16Builder,
667 UInt32Builder,
668 UInt64Builder,
669 Float32Builder,
670 Float64Builder,
671 Date32Builder,
672 Time64MicrosecondBuilder,
673 TimestampMicrosecondBuilder,
674 LargeBinaryBuilder,
675 FixedSizeBinaryBuilder,
676 StringBuilder,
677 LargeStringBuilder,
678 Decimal128Builder
679);
680
681impl ArrowColumn {
682 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
683 match (&mut self.inner, datum) {
684 (s, Datum::Null) => s.append_null(),
685 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
686 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
687 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
688 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
689 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
690 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
691 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
692 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
693 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
694 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
695 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
696 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
697 builder.append_value(d.unix_epoch_days())
698 }
699 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
700 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
701 * 1_000_000
702 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
703 builder.append_value(micros_since_midnight)
704 }
705 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
706 builder.append_value(ts.and_utc().timestamp_micros())
707 }
708 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
709 builder.append_value(ts.timestamp_micros())
710 }
711 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
712 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
713 builder.append_value(val.as_bytes())?
714 }
715 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
716 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
717 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
718 }
719 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
720 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
721 builder.append_value(ts.into())
722 }
723 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
724 if dec.0.is_special() {
725 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
726 }
727 if let DataType::Decimal128(precision, scale) = self.data_type {
728 if dec.0.digits() > precision.into() {
729 anyhow::bail!(
730 "Decimal value {} out of range for column with precision {}",
731 dec,
732 precision
733 )
734 }
735
736 let coefficient: i128 = dec.0.coefficient()?;
739 let exponent = dec.0.exponent();
740
741 let scale_diff = i32::from(scale) + exponent;
745 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
748 anyhow::anyhow!(
749 "cannot represent decimal value {} in column with scale {}",
750 dec,
751 scale
752 )
753 })?;
754
755 let value = coefficient
756 .checked_mul(10_i128.pow(scale_diff))
757 .ok_or_else(|| {
758 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
759 })?;
760
761 builder.append_value(value)
762 } else {
763 anyhow::bail!("Expected Decimal128 data type")
764 }
765 }
766 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
767 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
770 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
771 let inner_builder = list_builder.values();
772 for datum in arr.elements().into_iter() {
773 inner_builder.append_datum(datum)?;
774 }
775 list_builder.append(true);
776 } else {
777 anyhow::bail!(
778 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
779 struct_builder
780 )
781 }
782 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
783 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
784 dims_builder.append_value(arr.dims().ndims());
785 } else {
786 anyhow::bail!(
787 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
788 struct_builder
789 )
790 }
791 struct_builder.append(true)
792 }
793 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
794 let inner_builder = list_builder.values();
795 for datum in list.into_iter() {
796 inner_builder.append_datum(datum)?;
797 }
798 list_builder.append(true)
799 }
800 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
801 for (key, value) in map.iter() {
802 builder.keys().append_value(key);
803 builder.values().append_datum(value)?;
804 }
805 builder.append(true).unwrap()
806 }
807 (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
810 let field_count = struct_builder.num_fields();
811 for (i, datum) in list.into_iter().enumerate() {
812 if i >= field_count {
813 anyhow::bail!(
814 "Record has more elements ({}) than struct fields ({})",
815 i + 1,
816 field_count
817 );
818 }
819 let field_builder: &mut ArrowColumn = struct_builder
820 .field_builder(i)
821 .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
822 field_builder.append_datum(datum)?;
823 }
824 struct_builder.append(true);
825 }
826 (builder, datum) => {
827 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
828 }
829 }
830 Ok(())
831 }
832}