1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34 columns: Vec<ArrowColumn>,
35 row_size_bytes: usize,
38 original_schema: Option<Arc<Schema>>,
40}
41
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44 desc_to_schema_with_overrides(desc, |_| None)
45}
46
47pub fn desc_to_schema_with_overrides<F>(
57 desc: &RelationDesc,
58 overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63 let mut fields = vec![];
64 let mut errs = vec![];
65 let mut seen_names = BTreeSet::new();
66 let mut all_names = BTreeSet::new();
67 for col_name in desc.iter_names() {
68 all_names.insert(col_name.to_string());
69 }
70 for (col_name, col_type) in desc.iter() {
71 let mut col_name = col_name.to_string();
72 if seen_names.contains(&col_name) {
79 let mut new_col_name = col_name.clone();
80 let mut e = 2;
81 while all_names.contains(&new_col_name) {
82 new_col_name = format!("{}{}", col_name, e);
83 e += 1;
84 }
85 col_name = new_col_name;
86 }
87
88 seen_names.insert(col_name.clone());
89 all_names.insert(col_name.clone());
90
91 let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94 match result {
95 Ok((data_type, extension_type_name)) => {
96 fields.push(field_with_typename(
97 &col_name,
98 data_type,
99 col_type.nullable,
100 &extension_type_name,
101 ));
102 }
103 Err(err) => errs.push(err.to_string()),
104 }
105 }
106 if !errs.is_empty() {
107 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108 }
109 Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115 let mut errs = vec![];
116 for (col_name, col_type) in desc.iter() {
117 match scalar_to_arrow_datatype(&col_type.scalar_type) {
118 Ok(_) => {}
119 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120 }
121 }
122 if !errs.is_empty() {
123 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124 }
125 Ok(())
126 }
127
128 pub fn validate_desc_for_parquet<F>(
143 desc: &RelationDesc,
144 overrides: F,
145 ) -> Result<(), anyhow::Error>
146 where
147 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
148 {
149 let mut errs = vec![];
150 for (col_name, col_type) in desc.iter() {
151 let dt =
152 match scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides) {
153 Ok((dt, _)) => dt,
154 Err(_) => {
155 errs.push(format!("{}: {:?}", col_name, col_type.scalar_type));
156 continue;
157 }
158 };
159 if let Some(reason) = parquet_incompatible_type(&dt) {
160 errs.push(format!(
161 "{}: {:?} ({})",
162 col_name, col_type.scalar_type, reason
163 ));
164 }
165 }
166 if !errs.is_empty() {
167 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
168 }
169 Ok(())
170 }
171
172 pub fn new(
178 desc: &RelationDesc,
179 item_capacity: usize,
180 data_capacity: usize,
181 ) -> Result<Self, anyhow::Error> {
182 let schema = desc_to_schema(desc)?;
183 let mut columns = vec![];
184 for field in schema.fields() {
185 columns.push(ArrowColumn::new(
186 field.name().clone(),
187 field.is_nullable(),
188 field.data_type().clone(),
189 typename_from_field(field)?,
190 item_capacity,
191 data_capacity,
192 )?);
193 }
194 Ok(Self {
195 columns,
196 row_size_bytes: 0,
197 original_schema: None,
198 })
199 }
200
201 pub fn new_with_schema(
208 schema: Arc<Schema>,
209 item_capacity: usize,
210 data_capacity: usize,
211 ) -> Result<Self, anyhow::Error> {
212 let mut columns = vec![];
213 for field in schema.fields() {
214 columns.push(ArrowColumn::new(
215 field.name().clone(),
216 field.is_nullable(),
217 field.data_type().clone(),
218 typename_from_field(field)?,
219 item_capacity,
220 data_capacity,
221 )?);
222 }
223 Ok(Self {
224 columns,
225 row_size_bytes: 0,
226 original_schema: Some(schema),
227 })
228 }
229
230 pub fn schema(&self) -> Schema {
232 Schema::new(
233 self.columns
234 .iter()
235 .map(Into::<Field>::into)
236 .collect::<Vec<_>>(),
237 )
238 }
239
240 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
242 let mut arrays = vec![];
243 let mut fields: Vec<Field> = vec![];
244 for mut col in self.columns.into_iter() {
245 arrays.push(col.finish());
246 fields.push((&col).into());
247 }
248
249 let schema = if let Some(original_schema) = self.original_schema {
251 original_schema
252 } else {
253 Arc::new(Schema::new(fields))
254 };
255
256 RecordBatch::try_new(schema, arrays)
257 }
258
259 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
262 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
263 col.append_datum(datum)?;
264 }
265 self.row_size_bytes += row.byte_len();
266 Ok(())
267 }
268
269 pub fn row_size_bytes(&self) -> usize {
270 self.row_size_bytes
271 }
272}
273
274fn scalar_to_arrow_datatype(
278 scalar_type: &SqlScalarType,
279) -> Result<(DataType, String), anyhow::Error> {
280 scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
281}
282
283fn parquet_incompatible_type(dt: &DataType) -> Option<&'static str> {
293 use arrow::datatypes::IntervalUnit;
294 match dt {
295 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
296 parquet_incompatible_type(f.data_type())
297 }
298 DataType::Map(f, _) => parquet_incompatible_type(f.data_type()),
299 DataType::Struct(fields) => fields
300 .iter()
301 .find_map(|f| parquet_incompatible_type(f.data_type())),
302
303 DataType::Null
305 | DataType::Boolean
306 | DataType::Int8
307 | DataType::Int16
308 | DataType::Int32
309 | DataType::Int64
310 | DataType::UInt8
311 | DataType::UInt16
312 | DataType::UInt32
313 | DataType::UInt64
314 | DataType::Float16
315 | DataType::Float32
316 | DataType::Float64
317 | DataType::Date32
318 | DataType::Date64
319 | DataType::Time32(_)
320 | DataType::Time64(_)
321 | DataType::Timestamp(_, _)
322 | DataType::Duration(_)
323 | DataType::Interval(IntervalUnit::YearMonth | IntervalUnit::DayTime)
324 | DataType::Utf8
325 | DataType::LargeUtf8
326 | DataType::Utf8View
327 | DataType::Binary
328 | DataType::LargeBinary
329 | DataType::BinaryView
330 | DataType::FixedSizeBinary(_)
331 | DataType::Decimal32(_, _)
332 | DataType::Decimal64(_, _)
333 | DataType::Decimal128(_, _)
334 | DataType::Decimal256(_, _) => None,
335
336 _ => Some("unsupported arrow datatype"),
338 }
339}
340
341fn scalar_to_arrow_datatype_with_overrides<F>(
346 scalar_type: &SqlScalarType,
347 overrides: &F,
348) -> Result<(DataType, String), anyhow::Error>
349where
350 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
351{
352 scalar_to_arrow_datatype_impl(scalar_type, overrides)
353}
354
355fn scalar_to_arrow_datatype_impl<F>(
357 scalar_type: &SqlScalarType,
358 overrides: &F,
359) -> Result<(DataType, String), anyhow::Error>
360where
361 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
362{
363 if let Some(result) = overrides(scalar_type) {
365 return Ok(result);
366 }
367 let (data_type, extension_name) = match scalar_type {
368 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
369 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
370 SqlScalarType::Int32 => (DataType::Int32, "integer"),
371 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
372 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
373 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
374 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
375 SqlScalarType::Oid => (DataType::UInt32, "oid"),
376 SqlScalarType::Float32 => (DataType::Float32, "real"),
377 SqlScalarType::Float64 => (DataType::Float64, "double"),
378 SqlScalarType::Date => (DataType::Date32, "date"),
379 SqlScalarType::Time => (
384 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
385 "time",
386 ),
387 SqlScalarType::Timestamp { .. } => (
388 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
389 "timestamp",
390 ),
391 SqlScalarType::TimestampTz { .. } => (
392 DataType::Timestamp(
393 arrow::datatypes::TimeUnit::Microsecond,
394 Some("+00:00".into()),
397 ),
398 "timestamptz",
399 ),
400 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
401 SqlScalarType::Char { length } => {
402 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
403 (DataType::Utf8, "text")
404 } else {
405 (DataType::LargeUtf8, "text")
406 }
407 }
408 SqlScalarType::VarChar { max_length } => {
409 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
410 (DataType::Utf8, "text")
411 } else {
412 (DataType::LargeUtf8, "text")
413 }
414 }
415 SqlScalarType::String => (DataType::LargeUtf8, "text"),
416 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
419 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
422 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
423 SqlScalarType::Numeric { max_scale } => {
424 match max_scale {
438 Some(scale) => {
439 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
440 if scale <= DECIMAL128_MAX_SCALE {
441 (
442 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
443 "numeric",
444 )
445 } else {
446 anyhow::bail!("Numeric max scale {} out of range", scale)
447 }
448 }
449 None => (
450 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
451 "numeric",
452 ),
453 }
454 }
455 SqlScalarType::Interval => (
456 DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano),
457 "interval",
458 ),
459 SqlScalarType::Array(inner) => {
460 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
466 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
468 let list_field = Arc::new(field_with_typename(
469 "items",
470 DataType::List(inner_field.into()),
471 false,
472 "array_items",
473 ));
474 let dims_field = Arc::new(field_with_typename(
475 "dimensions",
476 DataType::UInt8,
477 false,
478 "array_dimensions",
479 ));
480 (DataType::Struct([list_field, dims_field].into()), "array")
481 }
482 SqlScalarType::List {
483 element_type,
484 custom_id: _,
485 } => {
486 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
487 let field = field_with_typename("item", inner_type, true, &inner_name);
489 (DataType::List(field.into()), "list")
490 }
491 SqlScalarType::Map {
492 value_type,
493 custom_id: _,
494 } => {
495 let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
496 let field_names = MapFieldNames::default();
498 let struct_type = DataType::Struct(
499 vec![
500 Field::new(&field_names.key, DataType::Utf8, false),
501 field_with_typename(&field_names.value, value_type, true, &value_name),
502 ]
503 .into(),
504 );
505 (
506 DataType::Map(
507 Field::new(&field_names.entry, struct_type, false).into(),
508 false,
509 ),
510 "map",
511 )
512 }
513 SqlScalarType::Record {
514 fields,
515 custom_id: _,
516 } => {
517 let mut arrow_fields = Vec::with_capacity(fields.len());
520 for (field_name, field_type) in fields.iter() {
521 let (inner_type, inner_extension_name) =
522 scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
523 let field = field_with_typename(
524 field_name.as_str(),
525 inner_type,
526 field_type.nullable,
527 &inner_extension_name,
528 );
529 arrow_fields.push(Arc::new(field));
530 }
531 (DataType::Struct(arrow_fields.into()), "record")
532 }
533 SqlScalarType::Range { element_type } => {
534 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
541 let lower_field = Arc::new(field_with_typename(
542 "lower",
543 inner_type.clone(),
544 true,
545 &inner_name,
546 ));
547 let upper_field = Arc::new(field_with_typename("upper", inner_type, true, &inner_name));
548 let lower_inclusive_field = Arc::new(field_with_typename(
549 "lower_inclusive",
550 DataType::Boolean,
551 false,
552 "boolean",
553 ));
554 let upper_inclusive_field = Arc::new(field_with_typename(
555 "upper_inclusive",
556 DataType::Boolean,
557 false,
558 "boolean",
559 ));
560 let empty_field = Arc::new(field_with_typename(
561 "empty",
562 DataType::Boolean,
563 false,
564 "boolean",
565 ));
566 (
567 DataType::Struct(
568 [
569 lower_field,
570 upper_field,
571 lower_inclusive_field,
572 upper_inclusive_field,
573 empty_field,
574 ]
575 .into(),
576 ),
577 "range",
578 )
579 }
580 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
581 };
582 Ok((data_type, extension_name.to_lowercase()))
583}
584
585fn builder_for_datatype(
586 data_type: &DataType,
587 item_capacity: usize,
588 data_capacity: usize,
589) -> Result<ColBuilder, anyhow::Error> {
590 let builder = match &data_type {
591 DataType::Boolean => {
592 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
593 }
594 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
595 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
596 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
597 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
598 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
599 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
600 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
601 DataType::Float32 => {
602 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
603 }
604 DataType::Float64 => {
605 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
606 }
607 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
608 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
609 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
610 item_capacity,
611 ))
612 }
613 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
614 ColBuilder::TimestampMicrosecondBuilder(
615 TimestampMicrosecondBuilder::with_capacity(item_capacity)
616 .with_timezone_opt(timezone.clone()),
617 )
618 }
619 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
620 item_capacity,
621 data_capacity,
622 )),
623 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
624 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
625 ),
626 DataType::Utf8 => {
627 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
628 }
629 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
630 item_capacity,
631 data_capacity,
632 )),
633 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
634 Decimal128Builder::with_capacity(item_capacity)
635 .with_precision_and_scale(*precision, *scale)?,
636 ),
637 DataType::List(field) => {
638 let inner_col_builder = ArrowColumn::new(
639 field.name().clone(),
640 field.is_nullable(),
641 field.data_type().clone(),
642 typename_from_field(field)?,
643 item_capacity,
644 data_capacity,
645 )?;
646 ColBuilder::ListBuilder(Box::new(
647 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
648 ))
649 }
650 DataType::Struct(fields) => {
651 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
652 for field in fields {
653 let inner_col_builder = ArrowColumn::new(
654 field.name().clone(),
655 field.is_nullable(),
656 field.data_type().clone(),
657 typename_from_field(field)?,
658 item_capacity,
659 data_capacity,
660 )?;
661 field_builders.push(Box::new(inner_col_builder));
662 }
663 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
664 }
665 DataType::Map(entries_field, _sorted) => {
666 let entries_field = entries_field.as_ref();
667 if let DataType::Struct(fields) = entries_field.data_type() {
668 if fields.len() != 2 {
669 anyhow::bail!(
670 "Expected map entries to have 2 fields, found {}",
671 fields.len()
672 )
673 }
674 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
675 let value_field = &fields[1];
676 let value_builder = ArrowColumn::new(
677 value_field.name().clone(),
678 value_field.is_nullable(),
679 value_field.data_type().clone(),
680 typename_from_field(value_field)?,
681 item_capacity,
682 data_capacity,
683 )?;
684 ColBuilder::MapBuilder(Box::new(
685 MapBuilder::with_capacity(
686 Some(MapFieldNames::default()),
687 key_builder,
688 value_builder,
689 item_capacity,
690 )
691 .with_values_field(Arc::clone(value_field)),
692 ))
693 } else {
694 anyhow::bail!("Expected map entries to be a struct")
695 }
696 }
697 DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano) => {
698 ColBuilder::IntervalMonthDayNanoBuilder(IntervalMonthDayNanoBuilder::with_capacity(
699 item_capacity,
700 ))
701 }
702 _ => anyhow::bail!("{:?} unimplemented", data_type),
703 };
704 Ok(builder)
705}
706
707#[derive(Debug)]
708struct ArrowColumn {
709 field_name: String,
710 nullable: bool,
711 data_type: DataType,
712 extension_type_name: String,
713 inner: ColBuilder,
714}
715
716impl From<&ArrowColumn> for Field {
717 fn from(col: &ArrowColumn) -> Self {
718 field_with_typename(
719 &col.field_name,
720 col.data_type.clone(),
721 col.nullable,
722 &col.extension_type_name,
723 )
724 }
725}
726
727fn field_with_typename(
729 name: &str,
730 data_type: DataType,
731 nullable: bool,
732 extension_type_name: &str,
733) -> Field {
734 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
735 ARROW_EXTENSION_NAME_KEY.to_string(),
736 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
737 )]))
738}
739
740fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
743 let metadata = field.metadata();
744 let extension_name = metadata
745 .get(ARROW_EXTENSION_NAME_KEY)
746 .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
747 extension_name
748 .strip_prefix(EXTENSION_PREFIX)
749 .map(|s| s.to_string())
750 .ok_or_else(|| {
751 anyhow::anyhow!(
752 "Field '{}' extension name '{}' missing expected prefix '{}'",
753 field.name(),
754 extension_name,
755 EXTENSION_PREFIX
756 )
757 })
758}
759
760impl ArrowColumn {
761 fn new(
762 field_name: String,
763 nullable: bool,
764 data_type: DataType,
765 extension_type_name: String,
766 item_capacity: usize,
767 data_capacity: usize,
768 ) -> Result<Self, anyhow::Error> {
769 Ok(Self {
770 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
771 field_name,
772 nullable,
773 data_type,
774 extension_type_name,
775 })
776 }
777}
778
779macro_rules! make_col_builder {
780 ($($x:ident), *) => {
781 #[derive(Debug)]
785 enum ColBuilder {
786 $(
787 $x($x),
788 )*
789 ListBuilder(Box<ListBuilder<ArrowColumn>>),
793 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
794 StructBuilder(StructBuilder),
799 }
800
801 impl ColBuilder {
802 fn append_null(&mut self) {
803 match self {
804 $(
805 ColBuilder::$x(builder) => builder.append_null(),
806 )*
807 ColBuilder::ListBuilder(builder) => builder.append_null(),
808 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
809 ColBuilder::StructBuilder(builder) => {
810 for i in 0..builder.num_fields() {
811 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
812 field_builder.inner.append_null();
813 }
814 builder.append_null();
815 }
816 }
817 }
818 }
819
820 impl ArrayBuilder for ArrowColumn {
825 fn len(&self) -> usize {
826 match &self.inner {
827 $(
828 ColBuilder::$x(builder) => builder.len(),
829 )*
830 ColBuilder::ListBuilder(builder) => builder.len(),
831 ColBuilder::MapBuilder(builder) => builder.len(),
832 ColBuilder::StructBuilder(builder) => builder.len(),
833 }
834 }
835 fn finish(&mut self) -> ArrayRef {
836 match &mut self.inner {
837 $(
838 ColBuilder::$x(builder) => Arc::new(builder.finish()),
839 )*
840 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
841 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
842 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
843 }
844 }
845 fn finish_cloned(&self) -> ArrayRef {
846 match &self.inner {
847 $(
848 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
849 )*
850 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
851 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
852 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
853 }
854 }
855 fn as_any(&self) -> &(dyn Any + 'static) {
856 self
857 }
858 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
859 self
860 }
861 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
862 self
863 }
864 }
865 };
866}
867
868make_col_builder!(
869 BooleanBuilder,
870 Int16Builder,
871 Int32Builder,
872 Int64Builder,
873 UInt8Builder,
874 UInt16Builder,
875 UInt32Builder,
876 UInt64Builder,
877 Float32Builder,
878 Float64Builder,
879 Date32Builder,
880 Time64MicrosecondBuilder,
881 TimestampMicrosecondBuilder,
882 LargeBinaryBuilder,
883 FixedSizeBinaryBuilder,
884 StringBuilder,
885 LargeStringBuilder,
886 Decimal128Builder,
887 IntervalMonthDayNanoBuilder
888);
889
890impl ArrowColumn {
891 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
892 match (&mut self.inner, datum) {
893 (s, Datum::Null) => s.append_null(),
894 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
895 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
896 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
897 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
898 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
899 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
900 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
901 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
902 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
903 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
904 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
905 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
906 builder.append_value(d.unix_epoch_days())
907 }
908 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
909 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
910 * 1_000_000
911 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
912 builder.append_value(micros_since_midnight)
913 }
914 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
915 builder.append_value(ts.and_utc().timestamp_micros())
916 }
917 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
918 builder.append_value(ts.timestamp_micros())
919 }
920 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
921 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
922 builder.append_value(val.as_bytes())?
923 }
924 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
925 (ColBuilder::StringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
926 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
927 }
928 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
929 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
930 }
931 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
932 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
933 builder.append_value(ts.into())
934 }
935 (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
938 builder.append_value(i32::from(i))
939 }
940 (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
941 builder.append_value(i64::from(i))
942 }
943 (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
944 builder.append_value(i128::from(i))
945 }
946 (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
947 builder.append_value(i128::from(u64::from(ts)))
948 }
949 (ColBuilder::StringBuilder(builder), Datum::Interval(iv)) => {
952 builder.append_value(iv.to_string())
953 }
954 (ColBuilder::LargeStringBuilder(builder), Datum::Interval(iv)) => {
955 builder.append_value(iv.to_string())
956 }
957 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
958 if dec.0.is_special() {
959 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
960 }
961 if let DataType::Decimal128(precision, scale) = self.data_type {
962 if dec.0.digits() > precision.into() {
963 anyhow::bail!(
964 "Decimal value {} out of range for column with precision {}",
965 dec,
966 precision
967 )
968 }
969
970 let coefficient: i128 = dec.0.coefficient()?;
973 let exponent = dec.0.exponent();
974
975 let scale_diff = i32::from(scale) + exponent;
979 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
982 anyhow::anyhow!(
983 "cannot represent decimal value {} in column with scale {}",
984 dec,
985 scale
986 )
987 })?;
988
989 let value = coefficient
990 .checked_mul(10_i128.pow(scale_diff))
991 .ok_or_else(|| {
992 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
993 })?;
994
995 builder.append_value(value)
996 } else {
997 anyhow::bail!("Expected Decimal128 data type")
998 }
999 }
1000 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
1001 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
1004 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
1005 let inner_builder = list_builder.values();
1006 for datum in arr.elements().into_iter() {
1007 inner_builder.append_datum(datum)?;
1008 }
1009 list_builder.append(true);
1010 } else {
1011 anyhow::bail!(
1012 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
1013 struct_builder
1014 )
1015 }
1016 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
1017 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
1018 dims_builder.append_value(arr.dims().ndims());
1019 } else {
1020 anyhow::bail!(
1021 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
1022 struct_builder
1023 )
1024 }
1025 struct_builder.append(true)
1026 }
1027 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
1028 let inner_builder = list_builder.values();
1029 for datum in list.into_iter() {
1030 inner_builder.append_datum(datum)?;
1031 }
1032 list_builder.append(true)
1033 }
1034 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
1035 for (key, value) in map.iter() {
1036 builder.keys().append_value(key);
1037 builder.values().append_datum(value)?;
1038 }
1039 builder.append(true).unwrap()
1040 }
1041 (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
1044 let field_count = struct_builder.num_fields();
1045 for (i, datum) in list.into_iter().enumerate() {
1046 if i >= field_count {
1047 anyhow::bail!(
1048 "Record has more elements ({}) than struct fields ({})",
1049 i + 1,
1050 field_count
1051 );
1052 }
1053 let field_builder: &mut ArrowColumn = struct_builder
1054 .field_builder(i)
1055 .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
1056 field_builder.append_datum(datum)?;
1057 }
1058 struct_builder.append(true);
1059 }
1060 (ColBuilder::StructBuilder(struct_builder), Datum::Range(range)) => {
1061 match range.inner {
1068 None => {
1069 struct_builder
1071 .field_builder::<ArrowColumn>(0)
1072 .unwrap()
1073 .append_datum(Datum::Null)?;
1074 struct_builder
1075 .field_builder::<ArrowColumn>(1)
1076 .unwrap()
1077 .append_datum(Datum::Null)?;
1078 struct_builder
1079 .field_builder::<ArrowColumn>(2)
1080 .unwrap()
1081 .append_datum(Datum::False)?;
1082 struct_builder
1083 .field_builder::<ArrowColumn>(3)
1084 .unwrap()
1085 .append_datum(Datum::False)?;
1086 struct_builder
1087 .field_builder::<ArrowColumn>(4)
1088 .unwrap()
1089 .append_datum(Datum::True)?;
1090 }
1091 Some(inner) => {
1092 struct_builder
1093 .field_builder::<ArrowColumn>(0)
1094 .unwrap()
1095 .append_datum(
1096 inner.lower.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1097 )?;
1098 struct_builder
1099 .field_builder::<ArrowColumn>(1)
1100 .unwrap()
1101 .append_datum(
1102 inner.upper.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1103 )?;
1104 struct_builder
1105 .field_builder::<ArrowColumn>(2)
1106 .unwrap()
1107 .append_datum(if inner.lower.inclusive {
1108 Datum::True
1109 } else {
1110 Datum::False
1111 })?;
1112 struct_builder
1113 .field_builder::<ArrowColumn>(3)
1114 .unwrap()
1115 .append_datum(if inner.upper.inclusive {
1116 Datum::True
1117 } else {
1118 Datum::False
1119 })?;
1120 struct_builder
1121 .field_builder::<ArrowColumn>(4)
1122 .unwrap()
1123 .append_datum(Datum::False)?;
1124 }
1125 }
1126 struct_builder.append(true);
1130 }
1131 (ColBuilder::IntervalMonthDayNanoBuilder(builder), Datum::Interval(iv)) => {
1132 let nanos = iv.micros.checked_mul(1_000).ok_or_else(|| {
1133 anyhow::anyhow!(
1134 "interval microseconds {} overflow i64 nanoseconds",
1135 iv.micros
1136 )
1137 })?;
1138 builder.append_value(arrow::datatypes::IntervalMonthDayNano::new(
1139 iv.months, iv.days, nanos,
1140 ));
1141 }
1142 (builder, datum) => {
1143 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
1144 }
1145 }
1146 Ok(())
1147 }
1148}