1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, ScalarType};
29
30pub struct ArrowBuilder {
31 columns: Vec<ArrowColumn>,
32 row_size_bytes: usize,
35}
36
37impl ArrowBuilder {
38 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
40 let mut errs = vec![];
41 for (col_name, col_type) in desc.iter() {
42 match scalar_to_arrow_datatype(&col_type.scalar_type) {
43 Ok(_) => {}
44 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
45 }
46 }
47 if !errs.is_empty() {
48 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
49 }
50 Ok(())
51 }
52
53 pub fn new(
59 desc: &RelationDesc,
60 item_capacity: usize,
61 data_capacity: usize,
62 ) -> Result<Self, anyhow::Error> {
63 let mut columns = vec![];
64 let mut errs = vec![];
65 let mut seen_names = BTreeMap::new();
66 for (col_name, col_type) in desc.iter() {
67 let mut col_name = col_name.to_string();
68 seen_names
74 .entry(col_name.clone())
75 .and_modify(|e: &mut u32| {
76 *e += 1;
77 col_name += &e.to_string();
78 })
79 .or_insert(1);
80 match scalar_to_arrow_datatype(&col_type.scalar_type) {
81 Ok((data_type, extension_type_name)) => {
82 columns.push(ArrowColumn::new(
83 col_name,
84 col_type.nullable,
85 data_type,
86 extension_type_name,
87 item_capacity,
88 data_capacity,
89 )?);
90 }
91 Err(err) => errs.push(err.to_string()),
92 }
93 }
94 if !errs.is_empty() {
95 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
96 }
97 Ok(Self {
98 columns,
99 row_size_bytes: 0,
100 })
101 }
102
103 pub fn schema(&self) -> Schema {
105 Schema::new(
106 self.columns
107 .iter()
108 .map(Into::<Field>::into)
109 .collect::<Vec<_>>(),
110 )
111 }
112
113 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
115 let mut arrays = vec![];
116 let mut fields: Vec<Field> = vec![];
117 for mut col in self.columns.into_iter() {
118 arrays.push(col.finish());
119 fields.push((&col).into());
120 }
121 RecordBatch::try_new(Schema::new(fields).into(), arrays)
122 }
123
124 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
127 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
128 col.append_datum(datum)?;
129 }
130 self.row_size_bytes += row.byte_len();
131 Ok(())
132 }
133
134 pub fn row_size_bytes(&self) -> usize {
135 self.row_size_bytes
136 }
137}
138
139fn scalar_to_arrow_datatype(scalar_type: &ScalarType) -> Result<(DataType, String), anyhow::Error> {
143 let (data_type, extension_name) = match scalar_type {
144 ScalarType::Bool => (DataType::Boolean, "boolean"),
145 ScalarType::Int16 => (DataType::Int16, "smallint"),
146 ScalarType::Int32 => (DataType::Int32, "integer"),
147 ScalarType::Int64 => (DataType::Int64, "bigint"),
148 ScalarType::UInt16 => (DataType::UInt16, "uint2"),
149 ScalarType::UInt32 => (DataType::UInt32, "uint4"),
150 ScalarType::UInt64 => (DataType::UInt64, "uint8"),
151 ScalarType::Float32 => (DataType::Float32, "real"),
152 ScalarType::Float64 => (DataType::Float64, "double"),
153 ScalarType::Date => (DataType::Date32, "date"),
154 ScalarType::Time => (
159 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
160 "time",
161 ),
162 ScalarType::Timestamp { .. } => (
163 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
164 "timestamp",
165 ),
166 ScalarType::TimestampTz { .. } => (
167 DataType::Timestamp(
168 arrow::datatypes::TimeUnit::Microsecond,
169 Some("+00:00".into()),
172 ),
173 "timestamptz",
174 ),
175 ScalarType::Bytes => (DataType::LargeBinary, "bytea"),
176 ScalarType::Char { length } => {
177 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
178 (DataType::Utf8, "text")
179 } else {
180 (DataType::LargeUtf8, "text")
181 }
182 }
183 ScalarType::VarChar { max_length } => {
184 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
185 (DataType::Utf8, "text")
186 } else {
187 (DataType::LargeUtf8, "text")
188 }
189 }
190 ScalarType::String => (DataType::LargeUtf8, "text"),
191 ScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
194 ScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
197 ScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
198 ScalarType::Numeric { max_scale } => {
199 match max_scale {
213 Some(scale) => {
214 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
215 if scale <= DECIMAL128_MAX_SCALE {
216 (
217 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
218 "numeric",
219 )
220 } else {
221 anyhow::bail!("Numeric max scale {} out of range", scale)
222 }
223 }
224 None => (
225 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
226 "numeric",
227 ),
228 }
229 }
230 ScalarType::Array(inner) => {
234 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
240 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
242 let list_field = Arc::new(field_with_typename(
243 "items",
244 DataType::List(inner_field.into()),
245 false,
246 "array_items",
247 ));
248 let dims_field = Arc::new(field_with_typename(
249 "dimensions",
250 DataType::UInt8,
251 false,
252 "array_dimensions",
253 ));
254 (DataType::Struct([list_field, dims_field].into()), "array")
255 }
256 ScalarType::List {
257 element_type,
258 custom_id: _,
259 } => {
260 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
261 let field = field_with_typename("item", inner_type, true, &inner_name);
263 (DataType::List(field.into()), "list")
264 }
265 ScalarType::Map {
266 value_type,
267 custom_id: _,
268 } => {
269 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
270 let field_names = MapFieldNames::default();
272 let struct_type = DataType::Struct(
273 vec![
274 Field::new(&field_names.key, DataType::Utf8, false),
275 field_with_typename(&field_names.value, value_type, true, &value_name),
276 ]
277 .into(),
278 );
279 (
280 DataType::Map(
281 Field::new(&field_names.entry, struct_type, false).into(),
282 false,
283 ),
284 "map",
285 )
286 }
287 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
288 };
289 Ok((data_type, extension_name.to_lowercase()))
290}
291
292fn builder_for_datatype(
293 data_type: &DataType,
294 item_capacity: usize,
295 data_capacity: usize,
296) -> Result<ColBuilder, anyhow::Error> {
297 let builder = match &data_type {
298 DataType::Boolean => {
299 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
300 }
301 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
302 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
303 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
304 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
305 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
306 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
307 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
308 DataType::Float32 => {
309 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
310 }
311 DataType::Float64 => {
312 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
313 }
314 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
315 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
316 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
317 item_capacity,
318 ))
319 }
320 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
321 ColBuilder::TimestampMicrosecondBuilder(
322 TimestampMicrosecondBuilder::with_capacity(item_capacity)
323 .with_timezone_opt(timezone.clone()),
324 )
325 }
326 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
327 item_capacity,
328 data_capacity,
329 )),
330 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
331 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
332 ),
333 DataType::Utf8 => {
334 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
335 }
336 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
337 item_capacity,
338 data_capacity,
339 )),
340 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
341 Decimal128Builder::with_capacity(item_capacity)
342 .with_precision_and_scale(*precision, *scale)?,
343 ),
344 DataType::List(field) => {
345 let inner_col_builder = ArrowColumn::new(
346 field.name().clone(),
347 field.is_nullable(),
348 field.data_type().clone(),
349 typename_from_field(field)?,
350 item_capacity,
351 data_capacity,
352 )?;
353 ColBuilder::ListBuilder(Box::new(
354 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
355 ))
356 }
357 DataType::Struct(fields) => {
358 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
359 for field in fields {
360 let inner_col_builder = ArrowColumn::new(
361 field.name().clone(),
362 field.is_nullable(),
363 field.data_type().clone(),
364 typename_from_field(field)?,
365 item_capacity,
366 data_capacity,
367 )?;
368 field_builders.push(Box::new(inner_col_builder));
369 }
370 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
371 }
372 DataType::Map(entries_field, _sorted) => {
373 let entries_field = entries_field.as_ref();
374 if let DataType::Struct(fields) = entries_field.data_type() {
375 if fields.len() != 2 {
376 anyhow::bail!(
377 "Expected map entries to have 2 fields, found {}",
378 fields.len()
379 )
380 }
381 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
382 let value_field = &fields[1];
383 let value_builder = ArrowColumn::new(
384 value_field.name().clone(),
385 value_field.is_nullable(),
386 value_field.data_type().clone(),
387 typename_from_field(value_field)?,
388 item_capacity,
389 data_capacity,
390 )?;
391 ColBuilder::MapBuilder(Box::new(
392 MapBuilder::with_capacity(
393 Some(MapFieldNames::default()),
394 key_builder,
395 value_builder,
396 item_capacity,
397 )
398 .with_values_field(Arc::clone(value_field)),
399 ))
400 } else {
401 anyhow::bail!("Expected map entries to be a struct")
402 }
403 }
404 _ => anyhow::bail!("{:?} unimplemented", data_type),
405 };
406 Ok(builder)
407}
408
409#[derive(Debug)]
410struct ArrowColumn {
411 field_name: String,
412 nullable: bool,
413 data_type: DataType,
414 extension_type_name: String,
415 inner: ColBuilder,
416}
417
418impl From<&ArrowColumn> for Field {
419 fn from(col: &ArrowColumn) -> Self {
420 field_with_typename(
421 &col.field_name,
422 col.data_type.clone(),
423 col.nullable,
424 &col.extension_type_name,
425 )
426 }
427}
428
429fn field_with_typename(
431 name: &str,
432 data_type: DataType,
433 nullable: bool,
434 extension_type_name: &str,
435) -> Field {
436 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
437 "ARROW:extension:name".to_string(),
438 format!("materialize.v1.{}", extension_type_name),
439 )]))
440}
441
442fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
444 let metadata = field.metadata();
445 let extension_name = metadata
446 .get("ARROW:extension:name")
447 .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
448 if let Some(name) = extension_name.strip_prefix("materialize.v1") {
449 Ok(name.to_string())
450 } else {
451 anyhow::bail!("Extension name {} does not match expected", extension_name,)
452 }
453}
454
455impl ArrowColumn {
456 fn new(
457 field_name: String,
458 nullable: bool,
459 data_type: DataType,
460 extension_type_name: String,
461 item_capacity: usize,
462 data_capacity: usize,
463 ) -> Result<Self, anyhow::Error> {
464 Ok(Self {
465 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
466 field_name,
467 nullable,
468 data_type,
469 extension_type_name,
470 })
471 }
472}
473
474macro_rules! make_col_builder {
475 ($($x:ident), *) => {
476 #[derive(Debug)]
480 enum ColBuilder {
481 $(
482 $x($x),
483 )*
484 ListBuilder(Box<ListBuilder<ArrowColumn>>),
488 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
489 StructBuilder(StructBuilder),
494 }
495
496 impl ColBuilder {
497 fn append_null(&mut self) {
498 match self {
499 $(
500 ColBuilder::$x(builder) => builder.append_null(),
501 )*
502 ColBuilder::ListBuilder(builder) => builder.append_null(),
503 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
504 ColBuilder::StructBuilder(builder) => {
505 for i in 0..builder.num_fields() {
506 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
507 field_builder.inner.append_null();
508 }
509 builder.append_null();
510 }
511 }
512 }
513 }
514
515 impl ArrayBuilder for ArrowColumn {
520 fn len(&self) -> usize {
521 match &self.inner {
522 $(
523 ColBuilder::$x(builder) => builder.len(),
524 )*
525 ColBuilder::ListBuilder(builder) => builder.len(),
526 ColBuilder::MapBuilder(builder) => builder.len(),
527 ColBuilder::StructBuilder(builder) => builder.len(),
528 }
529 }
530 fn finish(&mut self) -> ArrayRef {
531 match &mut self.inner {
532 $(
533 ColBuilder::$x(builder) => Arc::new(builder.finish()),
534 )*
535 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
536 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
537 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
538 }
539 }
540 fn finish_cloned(&self) -> ArrayRef {
541 match &self.inner {
542 $(
543 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
544 )*
545 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
546 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
547 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
548 }
549 }
550 fn as_any(&self) -> &(dyn Any + 'static) {
551 self
552 }
553 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
554 self
555 }
556 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
557 self
558 }
559 }
560 };
561}
562
563make_col_builder!(
564 BooleanBuilder,
565 Int16Builder,
566 Int32Builder,
567 Int64Builder,
568 UInt8Builder,
569 UInt16Builder,
570 UInt32Builder,
571 UInt64Builder,
572 Float32Builder,
573 Float64Builder,
574 Date32Builder,
575 Time64MicrosecondBuilder,
576 TimestampMicrosecondBuilder,
577 LargeBinaryBuilder,
578 FixedSizeBinaryBuilder,
579 StringBuilder,
580 LargeStringBuilder,
581 Decimal128Builder
582);
583
584impl ArrowColumn {
585 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
586 match (&mut self.inner, datum) {
587 (s, Datum::Null) => s.append_null(),
588 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
589 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
590 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
591 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
592 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
593 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
594 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
595 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
596 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
597 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
598 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
599 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
600 builder.append_value(d.unix_epoch_days())
601 }
602 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
603 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
604 * 1_000_000
605 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
606 builder.append_value(micros_since_midnight)
607 }
608 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
609 builder.append_value(ts.and_utc().timestamp_micros())
610 }
611 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
612 builder.append_value(ts.timestamp_micros())
613 }
614 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
615 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
616 builder.append_value(val.as_bytes())?
617 }
618 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
619 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
620 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
621 }
622 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
623 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
624 builder.append_value(ts.into())
625 }
626 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
627 if dec.0.is_special() {
628 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
629 }
630 if let DataType::Decimal128(precision, scale) = self.data_type {
631 if dec.0.digits() > precision.into() {
632 anyhow::bail!(
633 "Decimal value {} out of range for column with precision {}",
634 dec,
635 precision
636 )
637 }
638
639 let coefficient: i128 = dec.0.coefficient()?;
642 let exponent = dec.0.exponent();
643
644 let scale_diff = i32::from(scale) + exponent;
648 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
651 anyhow::anyhow!(
652 "cannot represent decimal value {} in column with scale {}",
653 dec,
654 scale
655 )
656 })?;
657
658 let value = coefficient
659 .checked_mul(10_i128.pow(scale_diff))
660 .ok_or_else(|| {
661 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
662 })?;
663
664 builder.append_value(value)
665 } else {
666 anyhow::bail!("Expected Decimal128 data type")
667 }
668 }
669 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
670 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
673 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
674 let inner_builder = list_builder.values();
675 for datum in arr.elements().into_iter() {
676 inner_builder.append_datum(datum)?;
677 }
678 list_builder.append(true);
679 } else {
680 anyhow::bail!(
681 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
682 struct_builder
683 )
684 }
685 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
686 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
687 dims_builder.append_value(arr.dims().ndims());
688 } else {
689 anyhow::bail!(
690 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
691 struct_builder
692 )
693 }
694 struct_builder.append(true)
695 }
696 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
697 let inner_builder = list_builder.values();
698 for datum in list.into_iter() {
699 inner_builder.append_datum(datum)?;
700 }
701 list_builder.append(true)
702 }
703 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
704 for (key, value) in map.iter() {
705 builder.keys().append_value(key);
706 builder.values().append_datum(value)?;
707 }
708 builder.append(true).unwrap()
709 }
710 (_builder, datum) => {
711 anyhow::bail!("Datum {:?} does not match builder", datum)
712 }
713 }
714 Ok(())
715 }
716}