1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use mz_ore::cast::CastFrom;
26use mz_repr::adt::jsonb::JsonbRef;
27use mz_repr::{Datum, RelationDesc, Row, ScalarType};
28
29pub struct ArrowBuilder {
30 columns: Vec<ArrowColumn>,
31 row_size_bytes: usize,
34}
35
36impl ArrowBuilder {
37 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
39 let mut errs = vec![];
40 for (col_name, col_type) in desc.iter() {
41 match scalar_to_arrow_datatype(&col_type.scalar_type) {
42 Ok(_) => {}
43 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
44 }
45 }
46 if !errs.is_empty() {
47 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
48 }
49 Ok(())
50 }
51
52 pub fn new(
58 desc: &RelationDesc,
59 item_capacity: usize,
60 data_capacity: usize,
61 ) -> Result<Self, anyhow::Error> {
62 let mut columns = vec![];
63 let mut errs = vec![];
64 let mut seen_names = BTreeMap::new();
65 for (col_name, col_type) in desc.iter() {
66 let mut col_name = col_name.to_string();
67 seen_names
73 .entry(col_name.clone())
74 .and_modify(|e: &mut u32| {
75 *e += 1;
76 col_name += &e.to_string();
77 })
78 .or_insert(1);
79 match scalar_to_arrow_datatype(&col_type.scalar_type) {
80 Ok((data_type, extension_type_name)) => {
81 columns.push(ArrowColumn::new(
82 col_name,
83 col_type.nullable,
84 data_type,
85 extension_type_name,
86 item_capacity,
87 data_capacity,
88 )?);
89 }
90 Err(err) => errs.push(err.to_string()),
91 }
92 }
93 if !errs.is_empty() {
94 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
95 }
96 Ok(Self {
97 columns,
98 row_size_bytes: 0,
99 })
100 }
101
102 pub fn schema(&self) -> Schema {
104 Schema::new(
105 self.columns
106 .iter()
107 .map(Into::<Field>::into)
108 .collect::<Vec<_>>(),
109 )
110 }
111
112 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
114 let mut arrays = vec![];
115 let mut fields: Vec<Field> = vec![];
116 for mut col in self.columns.into_iter() {
117 arrays.push(col.finish());
118 fields.push((&col).into());
119 }
120 RecordBatch::try_new(Schema::new(fields).into(), arrays)
121 }
122
123 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
126 for (col, datum) in self.columns.iter_mut().zip(row.iter()) {
127 col.append_datum(datum)?;
128 }
129 self.row_size_bytes += row.byte_len();
130 Ok(())
131 }
132
133 pub fn row_size_bytes(&self) -> usize {
134 self.row_size_bytes
135 }
136}
137
138fn scalar_to_arrow_datatype(scalar_type: &ScalarType) -> Result<(DataType, String), anyhow::Error> {
142 let (data_type, extension_name) = match scalar_type {
143 ScalarType::Bool => (DataType::Boolean, "boolean"),
144 ScalarType::Int16 => (DataType::Int16, "smallint"),
145 ScalarType::Int32 => (DataType::Int32, "integer"),
146 ScalarType::Int64 => (DataType::Int64, "bigint"),
147 ScalarType::UInt16 => (DataType::UInt16, "uint2"),
148 ScalarType::UInt32 => (DataType::UInt32, "uint4"),
149 ScalarType::UInt64 => (DataType::UInt64, "uint8"),
150 ScalarType::Float32 => (DataType::Float32, "real"),
151 ScalarType::Float64 => (DataType::Float64, "double"),
152 ScalarType::Date => (DataType::Date32, "date"),
153 ScalarType::Time => (
158 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
159 "time",
160 ),
161 ScalarType::Timestamp { .. } => (
162 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
163 "timestamp",
164 ),
165 ScalarType::TimestampTz { .. } => (
166 DataType::Timestamp(
167 arrow::datatypes::TimeUnit::Microsecond,
168 Some("+00:00".into()),
171 ),
172 "timestamptz",
173 ),
174 ScalarType::Bytes => (DataType::LargeBinary, "bytea"),
175 ScalarType::Char { length } => {
176 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
177 (DataType::Utf8, "text")
178 } else {
179 (DataType::LargeUtf8, "text")
180 }
181 }
182 ScalarType::VarChar { max_length } => {
183 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
184 (DataType::Utf8, "text")
185 } else {
186 (DataType::LargeUtf8, "text")
187 }
188 }
189 ScalarType::String => (DataType::LargeUtf8, "text"),
190 ScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
193 ScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
196 ScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
197 ScalarType::Numeric { max_scale } => {
198 match max_scale {
212 Some(scale) => {
213 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
214 if scale <= DECIMAL128_MAX_SCALE {
215 (
216 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
217 "numeric",
218 )
219 } else {
220 anyhow::bail!("Numeric max scale {} out of range", scale)
221 }
222 }
223 None => (
224 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
225 "numeric",
226 ),
227 }
228 }
229 ScalarType::Array(inner) => {
233 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
239 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
241 let list_field = Arc::new(field_with_typename(
242 "items",
243 DataType::List(inner_field.into()),
244 false,
245 "array_items",
246 ));
247 let dims_field = Arc::new(field_with_typename(
248 "dimensions",
249 DataType::UInt8,
250 false,
251 "array_dimensions",
252 ));
253 (DataType::Struct([list_field, dims_field].into()), "array")
254 }
255 ScalarType::List {
256 element_type,
257 custom_id: _,
258 } => {
259 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
260 let field = field_with_typename("item", inner_type, true, &inner_name);
262 (DataType::List(field.into()), "list")
263 }
264 ScalarType::Map {
265 value_type,
266 custom_id: _,
267 } => {
268 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
269 let field_names = MapFieldNames::default();
271 let struct_type = DataType::Struct(
272 vec![
273 Field::new(&field_names.key, DataType::Utf8, false),
274 field_with_typename(&field_names.value, value_type, true, &value_name),
275 ]
276 .into(),
277 );
278 (
279 DataType::Map(
280 Field::new(&field_names.entry, struct_type, false).into(),
281 false,
282 ),
283 "map",
284 )
285 }
286 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
287 };
288 Ok((data_type, extension_name.to_lowercase()))
289}
290
291fn builder_for_datatype(
292 data_type: &DataType,
293 item_capacity: usize,
294 data_capacity: usize,
295) -> Result<ColBuilder, anyhow::Error> {
296 let builder = match &data_type {
297 DataType::Boolean => {
298 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
299 }
300 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
301 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
302 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
303 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
304 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
305 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
306 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
307 DataType::Float32 => {
308 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
309 }
310 DataType::Float64 => {
311 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
312 }
313 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
314 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
315 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
316 item_capacity,
317 ))
318 }
319 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
320 ColBuilder::TimestampMicrosecondBuilder(
321 TimestampMicrosecondBuilder::with_capacity(item_capacity)
322 .with_timezone_opt(timezone.clone()),
323 )
324 }
325 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
326 item_capacity,
327 data_capacity,
328 )),
329 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
330 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
331 ),
332 DataType::Utf8 => {
333 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
334 }
335 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
336 item_capacity,
337 data_capacity,
338 )),
339 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
340 Decimal128Builder::with_capacity(item_capacity)
341 .with_precision_and_scale(*precision, *scale)?,
342 ),
343 DataType::List(field) => {
344 let inner_col_builder = ArrowColumn::new(
345 field.name().clone(),
346 field.is_nullable(),
347 field.data_type().clone(),
348 typename_from_field(field)?,
349 item_capacity,
350 data_capacity,
351 )?;
352 ColBuilder::ListBuilder(Box::new(
353 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
354 ))
355 }
356 DataType::Struct(fields) => {
357 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
358 for field in fields {
359 let inner_col_builder = ArrowColumn::new(
360 field.name().clone(),
361 field.is_nullable(),
362 field.data_type().clone(),
363 typename_from_field(field)?,
364 item_capacity,
365 data_capacity,
366 )?;
367 field_builders.push(Box::new(inner_col_builder));
368 }
369 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
370 }
371 DataType::Map(entries_field, _sorted) => {
372 let entries_field = entries_field.as_ref();
373 if let DataType::Struct(fields) = entries_field.data_type() {
374 if fields.len() != 2 {
375 anyhow::bail!(
376 "Expected map entries to have 2 fields, found {}",
377 fields.len()
378 )
379 }
380 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
381 let value_field = &fields[1];
382 let value_builder = ArrowColumn::new(
383 value_field.name().clone(),
384 value_field.is_nullable(),
385 value_field.data_type().clone(),
386 typename_from_field(value_field)?,
387 item_capacity,
388 data_capacity,
389 )?;
390 ColBuilder::MapBuilder(Box::new(
391 MapBuilder::with_capacity(
392 Some(MapFieldNames::default()),
393 key_builder,
394 value_builder,
395 item_capacity,
396 )
397 .with_values_field(Arc::clone(value_field)),
398 ))
399 } else {
400 anyhow::bail!("Expected map entries to be a struct")
401 }
402 }
403 _ => anyhow::bail!("{:?} unimplemented", data_type),
404 };
405 Ok(builder)
406}
407
408#[derive(Debug)]
409struct ArrowColumn {
410 field_name: String,
411 nullable: bool,
412 data_type: DataType,
413 extension_type_name: String,
414 inner: ColBuilder,
415}
416
417impl From<&ArrowColumn> for Field {
418 fn from(col: &ArrowColumn) -> Self {
419 field_with_typename(
420 &col.field_name,
421 col.data_type.clone(),
422 col.nullable,
423 &col.extension_type_name,
424 )
425 }
426}
427
428fn field_with_typename(
430 name: &str,
431 data_type: DataType,
432 nullable: bool,
433 extension_type_name: &str,
434) -> Field {
435 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
436 "ARROW:extension:name".to_string(),
437 format!("materialize.v1.{}", extension_type_name),
438 )]))
439}
440
441fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
443 let metadata = field.metadata();
444 let extension_name = metadata
445 .get("ARROW:extension:name")
446 .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
447 if let Some(name) = extension_name.strip_prefix("materialize.v1") {
448 Ok(name.to_string())
449 } else {
450 anyhow::bail!("Extension name {} does not match expected", extension_name,)
451 }
452}
453
454impl ArrowColumn {
455 fn new(
456 field_name: String,
457 nullable: bool,
458 data_type: DataType,
459 extension_type_name: String,
460 item_capacity: usize,
461 data_capacity: usize,
462 ) -> Result<Self, anyhow::Error> {
463 Ok(Self {
464 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
465 field_name,
466 nullable,
467 data_type,
468 extension_type_name,
469 })
470 }
471}
472
473macro_rules! make_col_builder {
474 ($($x:ident), *) => {
475 #[derive(Debug)]
479 enum ColBuilder {
480 $(
481 $x($x),
482 )*
483 ListBuilder(Box<ListBuilder<ArrowColumn>>),
487 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
488 StructBuilder(StructBuilder),
493 }
494
495 impl ColBuilder {
496 fn append_null(&mut self) {
497 match self {
498 $(
499 ColBuilder::$x(builder) => builder.append_null(),
500 )*
501 ColBuilder::ListBuilder(builder) => builder.append_null(),
502 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
503 ColBuilder::StructBuilder(builder) => {
504 for i in 0..builder.num_fields() {
505 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
506 field_builder.inner.append_null();
507 }
508 builder.append_null();
509 }
510 }
511 }
512 }
513
514 impl ArrayBuilder for ArrowColumn {
519 fn len(&self) -> usize {
520 match &self.inner {
521 $(
522 ColBuilder::$x(builder) => builder.len(),
523 )*
524 ColBuilder::ListBuilder(builder) => builder.len(),
525 ColBuilder::MapBuilder(builder) => builder.len(),
526 ColBuilder::StructBuilder(builder) => builder.len(),
527 }
528 }
529 fn finish(&mut self) -> ArrayRef {
530 match &mut self.inner {
531 $(
532 ColBuilder::$x(builder) => Arc::new(builder.finish()),
533 )*
534 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
535 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
536 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
537 }
538 }
539 fn finish_cloned(&self) -> ArrayRef {
540 match &self.inner {
541 $(
542 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
543 )*
544 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
545 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
546 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
547 }
548 }
549 fn as_any(&self) -> &(dyn Any + 'static) {
550 self
551 }
552 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
553 self
554 }
555 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
556 self
557 }
558 }
559 };
560}
561
562make_col_builder!(
563 BooleanBuilder,
564 Int16Builder,
565 Int32Builder,
566 Int64Builder,
567 UInt8Builder,
568 UInt16Builder,
569 UInt32Builder,
570 UInt64Builder,
571 Float32Builder,
572 Float64Builder,
573 Date32Builder,
574 Time64MicrosecondBuilder,
575 TimestampMicrosecondBuilder,
576 LargeBinaryBuilder,
577 FixedSizeBinaryBuilder,
578 StringBuilder,
579 LargeStringBuilder,
580 Decimal128Builder
581);
582
583impl ArrowColumn {
584 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
585 match (&mut self.inner, datum) {
586 (s, Datum::Null) => s.append_null(),
587 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
588 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
589 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
590 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
591 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
592 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
593 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
594 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
595 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
596 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
597 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
598 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
599 builder.append_value(d.unix_epoch_days())
600 }
601 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
602 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
603 * 1_000_000
604 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
605 builder.append_value(micros_since_midnight)
606 }
607 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
608 builder.append_value(ts.and_utc().timestamp_micros())
609 }
610 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
611 builder.append_value(ts.timestamp_micros())
612 }
613 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
614 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
615 builder.append_value(val.as_bytes())?
616 }
617 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
618 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
619 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
620 }
621 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
622 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
623 builder.append_value(ts.into())
624 }
625 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
626 if dec.0.is_special() {
627 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
628 }
629 if let DataType::Decimal128(precision, scale) = self.data_type {
630 if dec.0.digits() > precision.into() {
631 anyhow::bail!(
632 "Decimal value {} out of range for column with precision {}",
633 dec,
634 precision
635 )
636 }
637
638 let coefficient: i128 = dec.0.coefficient()?;
641 let exponent = dec.0.exponent();
642
643 let scale_diff = i32::from(scale) + exponent;
647 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
650 anyhow::anyhow!(
651 "cannot represent decimal value {} in column with scale {}",
652 dec,
653 scale
654 )
655 })?;
656
657 let value = coefficient
658 .checked_mul(10_i128.pow(scale_diff))
659 .ok_or_else(|| {
660 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
661 })?;
662
663 builder.append_value(value)
664 } else {
665 anyhow::bail!("Expected Decimal128 data type")
666 }
667 }
668 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
669 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
672 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
673 let inner_builder = list_builder.values();
674 for datum in arr.elements().into_iter() {
675 inner_builder.append_datum(datum)?;
676 }
677 list_builder.append(true);
678 } else {
679 anyhow::bail!(
680 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
681 struct_builder
682 )
683 }
684 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
685 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
686 dims_builder.append_value(arr.dims().ndims());
687 } else {
688 anyhow::bail!(
689 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
690 struct_builder
691 )
692 }
693 struct_builder.append(true)
694 }
695 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
696 let inner_builder = list_builder.values();
697 for datum in list.into_iter() {
698 inner_builder.append_datum(datum)?;
699 }
700 list_builder.append(true)
701 }
702 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
703 for (key, value) in map.iter() {
704 builder.keys().append_value(key);
705 builder.values().append_datum(value)?;
706 }
707 builder.append(true).unwrap()
708 }
709 (_builder, datum) => {
710 anyhow::bail!("Datum {:?} does not match builder", datum)
711 }
712 }
713 Ok(())
714 }
715}