1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub const ARROW_EXTENSION_NAME_KEY: &str = "ARROW:extension:name";
31const EXTENSION_PREFIX: &str = "materialize.v1.";
32
33pub struct ArrowBuilder {
34 columns: Vec<ArrowColumn>,
35 row_size_bytes: usize,
38 original_schema: Option<Arc<Schema>>,
40}
41
42pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
44 desc_to_schema_with_overrides(desc, |_| None)
45}
46
47pub fn desc_to_schema_with_overrides<F>(
57 desc: &RelationDesc,
58 overrides: F,
59) -> Result<Schema, anyhow::Error>
60where
61 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
62{
63 let mut fields = vec![];
64 let mut errs = vec![];
65 let mut seen_names = BTreeSet::new();
66 let mut all_names = BTreeSet::new();
67 for col_name in desc.iter_names() {
68 all_names.insert(col_name.to_string());
69 }
70 for (col_name, col_type) in desc.iter() {
71 let mut col_name = col_name.to_string();
72 if seen_names.contains(&col_name) {
79 let mut new_col_name = col_name.clone();
80 let mut e = 2;
81 while all_names.contains(&new_col_name) {
82 new_col_name = format!("{}{}", col_name, e);
83 e += 1;
84 }
85 col_name = new_col_name;
86 }
87
88 seen_names.insert(col_name.clone());
89 all_names.insert(col_name.clone());
90
91 let result = scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides);
93
94 match result {
95 Ok((data_type, extension_type_name)) => {
96 fields.push(field_with_typename(
97 &col_name,
98 data_type,
99 col_type.nullable,
100 &extension_type_name,
101 ));
102 }
103 Err(err) => errs.push(err.to_string()),
104 }
105 }
106 if !errs.is_empty() {
107 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
108 }
109 Ok(Schema::new(fields))
110}
111
112impl ArrowBuilder {
113 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
115 let mut errs = vec![];
116 for (col_name, col_type) in desc.iter() {
117 match scalar_to_arrow_datatype(&col_type.scalar_type) {
118 Ok(_) => {}
119 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
120 }
121 }
122 if !errs.is_empty() {
123 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
124 }
125 Ok(())
126 }
127
128 pub fn validate_desc_for_parquet<F>(
143 desc: &RelationDesc,
144 overrides: F,
145 ) -> Result<(), anyhow::Error>
146 where
147 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
148 {
149 let mut errs = vec![];
150 for (col_name, col_type) in desc.iter() {
151 let dt =
152 match scalar_to_arrow_datatype_with_overrides(&col_type.scalar_type, &overrides) {
153 Ok((dt, _)) => dt,
154 Err(_) => {
155 errs.push(format!("{}: {:?}", col_name, col_type.scalar_type));
156 continue;
157 }
158 };
159 if let Some(reason) = parquet_incompatible_type(&dt) {
160 errs.push(format!(
161 "{}: {:?} ({})",
162 col_name, col_type.scalar_type, reason
163 ));
164 }
165 }
166 if !errs.is_empty() {
167 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
168 }
169 Ok(())
170 }
171
172 pub fn new(
178 desc: &RelationDesc,
179 item_capacity: usize,
180 data_capacity: usize,
181 ) -> Result<Self, anyhow::Error> {
182 let schema = desc_to_schema(desc)?;
183 let mut columns = vec![];
184 for field in schema.fields() {
185 columns.push(ArrowColumn::new(
186 field.name().clone(),
187 field.is_nullable(),
188 field.data_type().clone(),
189 typename_from_field(field)?,
190 item_capacity,
191 data_capacity,
192 )?);
193 }
194 Ok(Self {
195 columns,
196 row_size_bytes: 0,
197 original_schema: None,
198 })
199 }
200
201 pub fn new_with_schema(
208 schema: Arc<Schema>,
209 item_capacity: usize,
210 data_capacity: usize,
211 ) -> Result<Self, anyhow::Error> {
212 let mut columns = vec![];
213 for field in schema.fields() {
214 columns.push(ArrowColumn::new(
215 field.name().clone(),
216 field.is_nullable(),
217 field.data_type().clone(),
218 typename_from_field(field)?,
219 item_capacity,
220 data_capacity,
221 )?);
222 }
223 Ok(Self {
224 columns,
225 row_size_bytes: 0,
226 original_schema: Some(schema),
227 })
228 }
229
230 pub fn schema(&self) -> Schema {
232 Schema::new(
233 self.columns
234 .iter()
235 .map(Into::<Field>::into)
236 .collect::<Vec<_>>(),
237 )
238 }
239
240 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
242 let mut arrays = vec![];
243 let mut fields: Vec<Field> = vec![];
244 for mut col in self.columns.into_iter() {
245 arrays.push(col.finish());
246 fields.push((&col).into());
247 }
248
249 let schema = if let Some(original_schema) = self.original_schema {
251 original_schema
252 } else {
253 Arc::new(Schema::new(fields))
254 };
255
256 RecordBatch::try_new(schema, arrays)
257 }
258
259 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
262 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
263 col.append_datum(datum)?;
264 }
265 self.row_size_bytes += row.byte_len();
266 Ok(())
267 }
268
269 pub fn row_size_bytes(&self) -> usize {
270 self.row_size_bytes
271 }
272}
273
274fn scalar_to_arrow_datatype(
278 scalar_type: &SqlScalarType,
279) -> Result<(DataType, String), anyhow::Error> {
280 scalar_to_arrow_datatype_impl(scalar_type, &|_| None)
281}
282
283fn parquet_incompatible_type(dt: &DataType) -> Option<&'static str> {
293 use arrow::datatypes::IntervalUnit;
294 match dt {
295 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
296 parquet_incompatible_type(f.data_type())
297 }
298 DataType::Map(f, _) => parquet_incompatible_type(f.data_type()),
299 DataType::Struct(fields) => fields
300 .iter()
301 .find_map(|f| parquet_incompatible_type(f.data_type())),
302
303 DataType::Null
305 | DataType::Boolean
306 | DataType::Int8
307 | DataType::Int16
308 | DataType::Int32
309 | DataType::Int64
310 | DataType::UInt8
311 | DataType::UInt16
312 | DataType::UInt32
313 | DataType::UInt64
314 | DataType::Float16
315 | DataType::Float32
316 | DataType::Float64
317 | DataType::Date32
318 | DataType::Date64
319 | DataType::Time32(_)
320 | DataType::Time64(_)
321 | DataType::Timestamp(_, _)
322 | DataType::Duration(_)
323 | DataType::Interval(IntervalUnit::YearMonth | IntervalUnit::DayTime)
324 | DataType::Utf8
325 | DataType::LargeUtf8
326 | DataType::Utf8View
327 | DataType::Binary
328 | DataType::LargeBinary
329 | DataType::BinaryView
330 | DataType::FixedSizeBinary(_)
331 | DataType::Decimal32(_, _)
332 | DataType::Decimal64(_, _)
333 | DataType::Decimal128(_, _)
334 | DataType::Decimal256(_, _) => None,
335
336 _ => Some("unsupported arrow datatype"),
338 }
339}
340
341fn scalar_to_arrow_datatype_with_overrides<F>(
346 scalar_type: &SqlScalarType,
347 overrides: &F,
348) -> Result<(DataType, String), anyhow::Error>
349where
350 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
351{
352 scalar_to_arrow_datatype_impl(scalar_type, overrides)
353}
354
355fn scalar_to_arrow_datatype_impl<F>(
357 scalar_type: &SqlScalarType,
358 overrides: &F,
359) -> Result<(DataType, String), anyhow::Error>
360where
361 F: Fn(&SqlScalarType) -> Option<(DataType, String)>,
362{
363 if let Some(result) = overrides(scalar_type) {
365 return Ok(result);
366 }
367 let (data_type, extension_name) = match scalar_type {
368 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
369 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
370 SqlScalarType::Int32 => (DataType::Int32, "integer"),
371 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
372 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
373 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
374 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
375 SqlScalarType::Oid => (DataType::UInt32, "oid"),
376 SqlScalarType::Float32 => (DataType::Float32, "real"),
377 SqlScalarType::Float64 => (DataType::Float64, "double"),
378 SqlScalarType::Date => (DataType::Date32, "date"),
379 SqlScalarType::Time => (
384 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
385 "time",
386 ),
387 SqlScalarType::Timestamp { .. } => (
388 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
389 "timestamp",
390 ),
391 SqlScalarType::TimestampTz { .. } => (
392 DataType::Timestamp(
393 arrow::datatypes::TimeUnit::Microsecond,
394 Some("+00:00".into()),
397 ),
398 "timestamptz",
399 ),
400 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
401 SqlScalarType::Char { length } => {
402 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
403 (DataType::Utf8, "text")
404 } else {
405 (DataType::LargeUtf8, "text")
406 }
407 }
408 SqlScalarType::VarChar { max_length } => {
409 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
410 (DataType::Utf8, "text")
411 } else {
412 (DataType::LargeUtf8, "text")
413 }
414 }
415 SqlScalarType::String => (DataType::LargeUtf8, "text"),
416 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
419 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
422 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
423 SqlScalarType::Numeric { max_scale } => {
424 match max_scale {
438 Some(scale) => {
439 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
440 if scale <= DECIMAL128_MAX_SCALE {
441 (
442 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
443 "numeric",
444 )
445 } else {
446 anyhow::bail!("Numeric max scale {} out of range", scale)
447 }
448 }
449 None => (
450 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
451 "numeric",
452 ),
453 }
454 }
455 SqlScalarType::Interval => (
456 DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano),
457 "interval",
458 ),
459 SqlScalarType::Array(inner) => {
460 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(inner, overrides)?;
466 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
468 let list_field = Arc::new(field_with_typename(
469 "items",
470 DataType::List(inner_field.into()),
471 false,
472 "array_items",
473 ));
474 let dims_field = Arc::new(field_with_typename(
475 "dimensions",
476 DataType::UInt8,
477 false,
478 "array_dimensions",
479 ));
480 (DataType::Struct([list_field, dims_field].into()), "array")
481 }
482 SqlScalarType::List {
483 element_type,
484 custom_id: _,
485 } => {
486 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
487 let field = field_with_typename("item", inner_type, true, &inner_name);
489 (DataType::List(field.into()), "list")
490 }
491 SqlScalarType::Map {
492 value_type,
493 custom_id: _,
494 } => {
495 let (value_type, value_name) = scalar_to_arrow_datatype_impl(value_type, overrides)?;
496 let field_names = MapFieldNames::default();
498 let struct_type = DataType::Struct(
499 vec![
500 Field::new(&field_names.key, DataType::Utf8, false),
501 field_with_typename(&field_names.value, value_type, true, &value_name),
502 ]
503 .into(),
504 );
505 (
506 DataType::Map(
507 Field::new(&field_names.entry, struct_type, false).into(),
508 false,
509 ),
510 "map",
511 )
512 }
513 SqlScalarType::Record {
514 fields,
515 custom_id: _,
516 } => {
517 let mut arrow_fields = Vec::with_capacity(fields.len());
520 for (field_name, field_type) in fields.iter() {
521 let (inner_type, inner_extension_name) =
522 scalar_to_arrow_datatype_impl(&field_type.scalar_type, overrides)?;
523 let field = field_with_typename(
524 field_name.as_str(),
525 inner_type,
526 field_type.nullable,
527 &inner_extension_name,
528 );
529 arrow_fields.push(Arc::new(field));
530 }
531 (DataType::Struct(arrow_fields.into()), "record")
532 }
533 SqlScalarType::Range { element_type } => {
534 let (inner_type, inner_name) = scalar_to_arrow_datatype_impl(element_type, overrides)?;
541 let lower_field = Arc::new(field_with_typename(
542 "lower",
543 inner_type.clone(),
544 true,
545 &inner_name,
546 ));
547 let upper_field = Arc::new(field_with_typename("upper", inner_type, true, &inner_name));
548 let lower_inclusive_field = Arc::new(field_with_typename(
549 "lower_inclusive",
550 DataType::Boolean,
551 false,
552 "boolean",
553 ));
554 let upper_inclusive_field = Arc::new(field_with_typename(
555 "upper_inclusive",
556 DataType::Boolean,
557 false,
558 "boolean",
559 ));
560 let empty_field = Arc::new(field_with_typename(
561 "empty",
562 DataType::Boolean,
563 false,
564 "boolean",
565 ));
566 (
567 DataType::Struct(
568 [
569 lower_field,
570 upper_field,
571 lower_inclusive_field,
572 upper_inclusive_field,
573 empty_field,
574 ]
575 .into(),
576 ),
577 "range",
578 )
579 }
580 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
581 };
582 Ok((data_type, extension_name.to_lowercase()))
583}
584
585fn builder_for_datatype(
586 data_type: &DataType,
587 item_capacity: usize,
588 data_capacity: usize,
589) -> Result<ColBuilder, anyhow::Error> {
590 let builder = match &data_type {
591 DataType::Boolean => {
592 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
593 }
594 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
595 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
596 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
597 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
598 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
599 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
600 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
601 DataType::Float32 => {
602 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
603 }
604 DataType::Float64 => {
605 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
606 }
607 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
608 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
609 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
610 item_capacity,
611 ))
612 }
613 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
614 ColBuilder::TimestampMicrosecondBuilder(
615 TimestampMicrosecondBuilder::with_capacity(item_capacity)
616 .with_timezone_opt(timezone.clone()),
617 )
618 }
619 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
620 item_capacity,
621 data_capacity,
622 )),
623 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
624 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
625 ),
626 DataType::Utf8 => {
627 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
628 }
629 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
630 item_capacity,
631 data_capacity,
632 )),
633 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
634 Decimal128Builder::with_capacity(item_capacity)
635 .with_precision_and_scale(*precision, *scale)?,
636 ),
637 DataType::List(field) => {
638 let inner_col_builder = ArrowColumn::new(
639 field.name().clone(),
640 field.is_nullable(),
641 field.data_type().clone(),
642 typename_from_field(field)?,
643 item_capacity,
644 data_capacity,
645 )?;
646 ColBuilder::ListBuilder(Box::new(
647 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
648 ))
649 }
650 DataType::Struct(fields) => {
651 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
652 for field in fields {
653 let inner_col_builder = ArrowColumn::new(
654 field.name().clone(),
655 field.is_nullable(),
656 field.data_type().clone(),
657 typename_from_field(field)?,
658 item_capacity,
659 data_capacity,
660 )?;
661 field_builders.push(Box::new(inner_col_builder));
662 }
663 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
664 }
665 DataType::Map(entries_field, _sorted) => {
666 let entries_field = entries_field.as_ref();
667 if let DataType::Struct(fields) = entries_field.data_type() {
668 if fields.len() != 2 {
669 anyhow::bail!(
670 "Expected map entries to have 2 fields, found {}",
671 fields.len()
672 )
673 }
674 let key_field = &fields[0];
675 let value_field = &fields[1];
676 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
677 let value_builder = ArrowColumn::new(
678 value_field.name().clone(),
679 value_field.is_nullable(),
680 value_field.data_type().clone(),
681 typename_from_field(value_field)?,
682 item_capacity,
683 data_capacity,
684 )?;
685 let field_names = MapFieldNames {
690 entry: entries_field.name().clone(),
691 key: key_field.name().clone(),
692 value: value_field.name().clone(),
693 };
694 ColBuilder::MapBuilder(Box::new(
699 MapBuilder::with_capacity(
700 Some(field_names),
701 key_builder,
702 value_builder,
703 item_capacity,
704 )
705 .with_keys_field(Arc::clone(key_field))
706 .with_values_field(Arc::clone(value_field)),
707 ))
708 } else {
709 anyhow::bail!("Expected map entries to be a struct")
710 }
711 }
712 DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano) => {
713 ColBuilder::IntervalMonthDayNanoBuilder(IntervalMonthDayNanoBuilder::with_capacity(
714 item_capacity,
715 ))
716 }
717 _ => anyhow::bail!("{:?} unimplemented", data_type),
718 };
719 Ok(builder)
720}
721
722#[derive(Debug)]
723struct ArrowColumn {
724 field_name: String,
725 nullable: bool,
726 data_type: DataType,
727 extension_type_name: String,
728 inner: ColBuilder,
729}
730
731impl From<&ArrowColumn> for Field {
732 fn from(col: &ArrowColumn) -> Self {
733 field_with_typename(
734 &col.field_name,
735 col.data_type.clone(),
736 col.nullable,
737 &col.extension_type_name,
738 )
739 }
740}
741
742fn field_with_typename(
744 name: &str,
745 data_type: DataType,
746 nullable: bool,
747 extension_type_name: &str,
748) -> Field {
749 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
750 ARROW_EXTENSION_NAME_KEY.to_string(),
751 format!("{}{}", EXTENSION_PREFIX, extension_type_name),
752 )]))
753}
754
755fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
758 let metadata = field.metadata();
759 let extension_name = metadata
760 .get(ARROW_EXTENSION_NAME_KEY)
761 .ok_or_else(|| anyhow::anyhow!("Field '{}' missing extension metadata", field.name()))?;
762 extension_name
763 .strip_prefix(EXTENSION_PREFIX)
764 .map(|s| s.to_string())
765 .ok_or_else(|| {
766 anyhow::anyhow!(
767 "Field '{}' extension name '{}' missing expected prefix '{}'",
768 field.name(),
769 extension_name,
770 EXTENSION_PREFIX
771 )
772 })
773}
774
775impl ArrowColumn {
776 fn new(
777 field_name: String,
778 nullable: bool,
779 data_type: DataType,
780 extension_type_name: String,
781 item_capacity: usize,
782 data_capacity: usize,
783 ) -> Result<Self, anyhow::Error> {
784 Ok(Self {
785 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
786 field_name,
787 nullable,
788 data_type,
789 extension_type_name,
790 })
791 }
792}
793
794macro_rules! make_col_builder {
795 ($($x:ident), *) => {
796 #[derive(Debug)]
800 enum ColBuilder {
801 $(
802 $x($x),
803 )*
804 ListBuilder(Box<ListBuilder<ArrowColumn>>),
808 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
809 StructBuilder(StructBuilder),
814 }
815
816 impl ColBuilder {
817 fn append_null(&mut self) {
818 match self {
819 $(
820 ColBuilder::$x(builder) => builder.append_null(),
821 )*
822 ColBuilder::ListBuilder(builder) => builder.append_null(),
823 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
824 ColBuilder::StructBuilder(builder) => {
825 for i in 0..builder.num_fields() {
826 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
827 field_builder.inner.append_null();
828 }
829 builder.append_null();
830 }
831 }
832 }
833 }
834
835 impl ArrayBuilder for ArrowColumn {
840 fn len(&self) -> usize {
841 match &self.inner {
842 $(
843 ColBuilder::$x(builder) => builder.len(),
844 )*
845 ColBuilder::ListBuilder(builder) => builder.len(),
846 ColBuilder::MapBuilder(builder) => builder.len(),
847 ColBuilder::StructBuilder(builder) => builder.len(),
848 }
849 }
850 fn finish(&mut self) -> ArrayRef {
851 match &mut self.inner {
852 $(
853 ColBuilder::$x(builder) => Arc::new(builder.finish()),
854 )*
855 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
856 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
857 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
858 }
859 }
860 fn finish_cloned(&self) -> ArrayRef {
861 match &self.inner {
862 $(
863 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
864 )*
865 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
866 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
867 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
868 }
869 }
870 fn as_any(&self) -> &(dyn Any + 'static) {
871 self
872 }
873 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
874 self
875 }
876 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
877 self
878 }
879 }
880 };
881}
882
883make_col_builder!(
884 BooleanBuilder,
885 Int16Builder,
886 Int32Builder,
887 Int64Builder,
888 UInt8Builder,
889 UInt16Builder,
890 UInt32Builder,
891 UInt64Builder,
892 Float32Builder,
893 Float64Builder,
894 Date32Builder,
895 Time64MicrosecondBuilder,
896 TimestampMicrosecondBuilder,
897 LargeBinaryBuilder,
898 FixedSizeBinaryBuilder,
899 StringBuilder,
900 LargeStringBuilder,
901 Decimal128Builder,
902 IntervalMonthDayNanoBuilder
903);
904
905impl ArrowColumn {
906 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
907 match (&mut self.inner, datum) {
908 (s, Datum::Null) => s.append_null(),
909 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
910 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
911 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
912 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
913 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
914 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
915 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
916 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
917 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
918 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
919 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
920 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
921 builder.append_value(d.unix_epoch_days())
922 }
923 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
924 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
925 * 1_000_000
926 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
927 builder.append_value(micros_since_midnight)
928 }
929 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
930 builder.append_value(ts.and_utc().timestamp_micros())
931 }
932 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
933 builder.append_value(ts.timestamp_micros())
934 }
935 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
936 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
937 builder.append_value(val.as_bytes())?
938 }
939 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
940 (ColBuilder::StringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
941 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
942 }
943 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
944 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
945 }
946 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
947 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
948 builder.append_value(ts.into())
949 }
950 (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
953 builder.append_value(i32::from(i))
954 }
955 (ColBuilder::Int32Builder(builder), Datum::Int16(i)) => {
958 builder.append_value(i32::from(i))
959 }
960 (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
961 builder.append_value(i64::from(i))
962 }
963 (ColBuilder::Decimal128Builder(builder), Datum::UInt64(i)) => {
964 builder.append_value(i128::from(i))
965 }
966 (ColBuilder::Decimal128Builder(builder), Datum::MzTimestamp(ts)) => {
967 builder.append_value(i128::from(u64::from(ts)))
968 }
969 (ColBuilder::StringBuilder(builder), Datum::Interval(iv)) => {
972 builder.append_value(iv.to_string())
973 }
974 (ColBuilder::LargeStringBuilder(builder), Datum::Interval(iv)) => {
975 builder.append_value(iv.to_string())
976 }
977 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
978 if dec.0.is_special() {
979 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
980 }
981 if let DataType::Decimal128(precision, scale) = self.data_type {
982 if dec.0.digits() > precision.into() {
983 anyhow::bail!(
984 "Decimal value {} out of range for column with precision {}",
985 dec,
986 precision
987 )
988 }
989
990 let coefficient: i128 = dec.0.coefficient()?;
993 let exponent = dec.0.exponent();
994
995 let scale_diff = i32::from(scale) + exponent;
999 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
1002 anyhow::anyhow!(
1003 "cannot represent decimal value {} in column with scale {}",
1004 dec,
1005 scale
1006 )
1007 })?;
1008
1009 let value = coefficient
1010 .checked_mul(10_i128.pow(scale_diff))
1011 .ok_or_else(|| {
1012 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
1013 })?;
1014
1015 builder.append_value(value)
1016 } else {
1017 anyhow::bail!("Expected Decimal128 data type")
1018 }
1019 }
1020 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
1021 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
1024 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
1025 let inner_builder = list_builder.values();
1026 for datum in arr.elements().into_iter() {
1027 inner_builder.append_datum(datum)?;
1028 }
1029 list_builder.append(true);
1030 } else {
1031 anyhow::bail!(
1032 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
1033 struct_builder
1034 )
1035 }
1036 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
1037 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
1038 dims_builder.append_value(arr.dims().ndims());
1039 } else {
1040 anyhow::bail!(
1041 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
1042 struct_builder
1043 )
1044 }
1045 struct_builder.append(true)
1046 }
1047 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
1048 let inner_builder = list_builder.values();
1049 for datum in list.into_iter() {
1050 inner_builder.append_datum(datum)?;
1051 }
1052 list_builder.append(true)
1053 }
1054 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
1055 for (key, value) in map.iter() {
1056 builder.keys().append_value(key);
1057 builder.values().append_datum(value)?;
1058 }
1059 builder.append(true).unwrap()
1060 }
1061 (ColBuilder::StructBuilder(struct_builder), Datum::List(list)) => {
1064 let field_count = struct_builder.num_fields();
1065 for (i, datum) in list.into_iter().enumerate() {
1066 if i >= field_count {
1067 anyhow::bail!(
1068 "Record has more elements ({}) than struct fields ({})",
1069 i + 1,
1070 field_count
1071 );
1072 }
1073 let field_builder: &mut ArrowColumn = struct_builder
1074 .field_builder(i)
1075 .ok_or_else(|| anyhow::anyhow!("Missing field builder at index {}", i))?;
1076 field_builder.append_datum(datum)?;
1077 }
1078 struct_builder.append(true);
1079 }
1080 (ColBuilder::StructBuilder(struct_builder), Datum::Range(range)) => {
1081 match range.inner {
1088 None => {
1089 struct_builder
1091 .field_builder::<ArrowColumn>(0)
1092 .unwrap()
1093 .append_datum(Datum::Null)?;
1094 struct_builder
1095 .field_builder::<ArrowColumn>(1)
1096 .unwrap()
1097 .append_datum(Datum::Null)?;
1098 struct_builder
1099 .field_builder::<ArrowColumn>(2)
1100 .unwrap()
1101 .append_datum(Datum::False)?;
1102 struct_builder
1103 .field_builder::<ArrowColumn>(3)
1104 .unwrap()
1105 .append_datum(Datum::False)?;
1106 struct_builder
1107 .field_builder::<ArrowColumn>(4)
1108 .unwrap()
1109 .append_datum(Datum::True)?;
1110 }
1111 Some(inner) => {
1112 struct_builder
1113 .field_builder::<ArrowColumn>(0)
1114 .unwrap()
1115 .append_datum(
1116 inner.lower.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1117 )?;
1118 struct_builder
1119 .field_builder::<ArrowColumn>(1)
1120 .unwrap()
1121 .append_datum(
1122 inner.upper.bound.map(|n| n.datum()).unwrap_or(Datum::Null),
1123 )?;
1124 struct_builder
1125 .field_builder::<ArrowColumn>(2)
1126 .unwrap()
1127 .append_datum(if inner.lower.inclusive {
1128 Datum::True
1129 } else {
1130 Datum::False
1131 })?;
1132 struct_builder
1133 .field_builder::<ArrowColumn>(3)
1134 .unwrap()
1135 .append_datum(if inner.upper.inclusive {
1136 Datum::True
1137 } else {
1138 Datum::False
1139 })?;
1140 struct_builder
1141 .field_builder::<ArrowColumn>(4)
1142 .unwrap()
1143 .append_datum(Datum::False)?;
1144 }
1145 }
1146 struct_builder.append(true);
1150 }
1151 (ColBuilder::IntervalMonthDayNanoBuilder(builder), Datum::Interval(iv)) => {
1152 let nanos = iv.micros.checked_mul(1_000).ok_or_else(|| {
1153 anyhow::anyhow!(
1154 "interval microseconds {} overflow i64 nanoseconds",
1155 iv.micros
1156 )
1157 })?;
1158 builder.append_value(arrow::datatypes::IntervalMonthDayNano::new(
1159 iv.months, iv.days, nanos,
1160 ));
1161 }
1162 (builder, datum) => {
1163 anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
1164 }
1165 }
1166 Ok(())
1167 }
1168}