1#![allow(clippy::disallowed_types)]
13
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, builder::*};
19use arrow::datatypes::{
20 DECIMAL_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DataType, Field, Schema,
21};
22use arrow::error::ArrowError;
23use arrow::record_batch::RecordBatch;
24use chrono::Timelike;
25use itertools::Itertools;
26use mz_ore::cast::CastFrom;
27use mz_repr::adt::jsonb::JsonbRef;
28use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
29
30pub struct ArrowBuilder {
31 columns: Vec<ArrowColumn>,
32 row_size_bytes: usize,
35}
36
37impl ArrowBuilder {
38 pub fn validate_desc(desc: &RelationDesc) -> Result<(), anyhow::Error> {
40 let mut errs = vec![];
41 for (col_name, col_type) in desc.iter() {
42 match scalar_to_arrow_datatype(&col_type.scalar_type) {
43 Ok(_) => {}
44 Err(_) => errs.push(format!("{}: {:?}", col_name, col_type.scalar_type)),
45 }
46 }
47 if !errs.is_empty() {
48 anyhow::bail!("Cannot encode the following columns/types: {:?}", errs);
49 }
50 Ok(())
51 }
52
53 pub fn new(
59 desc: &RelationDesc,
60 item_capacity: usize,
61 data_capacity: usize,
62 ) -> Result<Self, anyhow::Error> {
63 let mut columns = vec![];
64 let mut errs = vec![];
65 let mut seen_names = BTreeMap::new();
66 for (col_name, col_type) in desc.iter() {
67 let mut col_name = col_name.to_string();
68 seen_names
74 .entry(col_name.clone())
75 .and_modify(|e: &mut u32| {
76 *e += 1;
77 col_name += &e.to_string();
78 })
79 .or_insert(1);
80 match scalar_to_arrow_datatype(&col_type.scalar_type) {
81 Ok((data_type, extension_type_name)) => {
82 columns.push(ArrowColumn::new(
83 col_name,
84 col_type.nullable,
85 data_type,
86 extension_type_name,
87 item_capacity,
88 data_capacity,
89 )?);
90 }
91 Err(err) => errs.push(err.to_string()),
92 }
93 }
94 if !errs.is_empty() {
95 anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
96 }
97 Ok(Self {
98 columns,
99 row_size_bytes: 0,
100 })
101 }
102
103 pub fn schema(&self) -> Schema {
105 Schema::new(
106 self.columns
107 .iter()
108 .map(Into::<Field>::into)
109 .collect::<Vec<_>>(),
110 )
111 }
112
113 pub fn to_record_batch(self) -> Result<RecordBatch, ArrowError> {
115 let mut arrays = vec![];
116 let mut fields: Vec<Field> = vec![];
117 for mut col in self.columns.into_iter() {
118 arrays.push(col.finish());
119 fields.push((&col).into());
120 }
121 RecordBatch::try_new(Schema::new(fields).into(), arrays)
122 }
123
124 pub fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
127 for (col, datum) in self.columns.iter_mut().zip_eq(row.iter()) {
128 col.append_datum(datum)?;
129 }
130 self.row_size_bytes += row.byte_len();
131 Ok(())
132 }
133
134 pub fn row_size_bytes(&self) -> usize {
135 self.row_size_bytes
136 }
137}
138
139fn scalar_to_arrow_datatype(
143 scalar_type: &SqlScalarType,
144) -> Result<(DataType, String), anyhow::Error> {
145 let (data_type, extension_name) = match scalar_type {
146 SqlScalarType::Bool => (DataType::Boolean, "boolean"),
147 SqlScalarType::Int16 => (DataType::Int16, "smallint"),
148 SqlScalarType::Int32 => (DataType::Int32, "integer"),
149 SqlScalarType::Int64 => (DataType::Int64, "bigint"),
150 SqlScalarType::UInt16 => (DataType::UInt16, "uint2"),
151 SqlScalarType::UInt32 => (DataType::UInt32, "uint4"),
152 SqlScalarType::UInt64 => (DataType::UInt64, "uint8"),
153 SqlScalarType::Float32 => (DataType::Float32, "real"),
154 SqlScalarType::Float64 => (DataType::Float64, "double"),
155 SqlScalarType::Date => (DataType::Date32, "date"),
156 SqlScalarType::Time => (
161 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
162 "time",
163 ),
164 SqlScalarType::Timestamp { .. } => (
165 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
166 "timestamp",
167 ),
168 SqlScalarType::TimestampTz { .. } => (
169 DataType::Timestamp(
170 arrow::datatypes::TimeUnit::Microsecond,
171 Some("+00:00".into()),
174 ),
175 "timestamptz",
176 ),
177 SqlScalarType::Bytes => (DataType::LargeBinary, "bytea"),
178 SqlScalarType::Char { length } => {
179 if length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
180 (DataType::Utf8, "text")
181 } else {
182 (DataType::LargeUtf8, "text")
183 }
184 }
185 SqlScalarType::VarChar { max_length } => {
186 if max_length.map_or(false, |l| l.into_u32() < i32::MAX.unsigned_abs()) {
187 (DataType::Utf8, "text")
188 } else {
189 (DataType::LargeUtf8, "text")
190 }
191 }
192 SqlScalarType::String => (DataType::LargeUtf8, "text"),
193 SqlScalarType::Uuid => (DataType::FixedSizeBinary(16), "uuid"),
196 SqlScalarType::Jsonb => (DataType::LargeUtf8, "jsonb"),
199 SqlScalarType::MzTimestamp => (DataType::UInt64, "mz_timestamp"),
200 SqlScalarType::Numeric { max_scale } => {
201 match max_scale {
215 Some(scale) => {
216 let scale = i8::try_from(scale.into_u8()).expect("known <= 39");
217 if scale <= DECIMAL128_MAX_SCALE {
218 (
219 DataType::Decimal128(DECIMAL128_MAX_PRECISION, scale),
220 "numeric",
221 )
222 } else {
223 anyhow::bail!("Numeric max scale {} out of range", scale)
224 }
225 }
226 None => (
227 DataType::Decimal128(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
228 "numeric",
229 ),
230 }
231 }
232 SqlScalarType::Array(inner) => {
236 let (inner_type, inner_name) = scalar_to_arrow_datatype(inner)?;
242 let inner_field = field_with_typename("item", inner_type, true, &inner_name);
244 let list_field = Arc::new(field_with_typename(
245 "items",
246 DataType::List(inner_field.into()),
247 false,
248 "array_items",
249 ));
250 let dims_field = Arc::new(field_with_typename(
251 "dimensions",
252 DataType::UInt8,
253 false,
254 "array_dimensions",
255 ));
256 (DataType::Struct([list_field, dims_field].into()), "array")
257 }
258 SqlScalarType::List {
259 element_type,
260 custom_id: _,
261 } => {
262 let (inner_type, inner_name) = scalar_to_arrow_datatype(element_type)?;
263 let field = field_with_typename("item", inner_type, true, &inner_name);
265 (DataType::List(field.into()), "list")
266 }
267 SqlScalarType::Map {
268 value_type,
269 custom_id: _,
270 } => {
271 let (value_type, value_name) = scalar_to_arrow_datatype(value_type)?;
272 let field_names = MapFieldNames::default();
274 let struct_type = DataType::Struct(
275 vec![
276 Field::new(&field_names.key, DataType::Utf8, false),
277 field_with_typename(&field_names.value, value_type, true, &value_name),
278 ]
279 .into(),
280 );
281 (
282 DataType::Map(
283 Field::new(&field_names.entry, struct_type, false).into(),
284 false,
285 ),
286 "map",
287 )
288 }
289 _ => anyhow::bail!("{:?} unimplemented", scalar_type),
290 };
291 Ok((data_type, extension_name.to_lowercase()))
292}
293
294fn builder_for_datatype(
295 data_type: &DataType,
296 item_capacity: usize,
297 data_capacity: usize,
298) -> Result<ColBuilder, anyhow::Error> {
299 let builder = match &data_type {
300 DataType::Boolean => {
301 ColBuilder::BooleanBuilder(BooleanBuilder::with_capacity(item_capacity))
302 }
303 DataType::Int16 => ColBuilder::Int16Builder(Int16Builder::with_capacity(item_capacity)),
304 DataType::Int32 => ColBuilder::Int32Builder(Int32Builder::with_capacity(item_capacity)),
305 DataType::Int64 => ColBuilder::Int64Builder(Int64Builder::with_capacity(item_capacity)),
306 DataType::UInt8 => ColBuilder::UInt8Builder(UInt8Builder::with_capacity(item_capacity)),
307 DataType::UInt16 => ColBuilder::UInt16Builder(UInt16Builder::with_capacity(item_capacity)),
308 DataType::UInt32 => ColBuilder::UInt32Builder(UInt32Builder::with_capacity(item_capacity)),
309 DataType::UInt64 => ColBuilder::UInt64Builder(UInt64Builder::with_capacity(item_capacity)),
310 DataType::Float32 => {
311 ColBuilder::Float32Builder(Float32Builder::with_capacity(item_capacity))
312 }
313 DataType::Float64 => {
314 ColBuilder::Float64Builder(Float64Builder::with_capacity(item_capacity))
315 }
316 DataType::Date32 => ColBuilder::Date32Builder(Date32Builder::with_capacity(item_capacity)),
317 DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) => {
318 ColBuilder::Time64MicrosecondBuilder(Time64MicrosecondBuilder::with_capacity(
319 item_capacity,
320 ))
321 }
322 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, timezone) => {
323 ColBuilder::TimestampMicrosecondBuilder(
324 TimestampMicrosecondBuilder::with_capacity(item_capacity)
325 .with_timezone_opt(timezone.clone()),
326 )
327 }
328 DataType::LargeBinary => ColBuilder::LargeBinaryBuilder(LargeBinaryBuilder::with_capacity(
329 item_capacity,
330 data_capacity,
331 )),
332 DataType::FixedSizeBinary(byte_width) => ColBuilder::FixedSizeBinaryBuilder(
333 FixedSizeBinaryBuilder::with_capacity(item_capacity, *byte_width),
334 ),
335 DataType::Utf8 => {
336 ColBuilder::StringBuilder(StringBuilder::with_capacity(item_capacity, data_capacity))
337 }
338 DataType::LargeUtf8 => ColBuilder::LargeStringBuilder(LargeStringBuilder::with_capacity(
339 item_capacity,
340 data_capacity,
341 )),
342 DataType::Decimal128(precision, scale) => ColBuilder::Decimal128Builder(
343 Decimal128Builder::with_capacity(item_capacity)
344 .with_precision_and_scale(*precision, *scale)?,
345 ),
346 DataType::List(field) => {
347 let inner_col_builder = ArrowColumn::new(
348 field.name().clone(),
349 field.is_nullable(),
350 field.data_type().clone(),
351 typename_from_field(field)?,
352 item_capacity,
353 data_capacity,
354 )?;
355 ColBuilder::ListBuilder(Box::new(
356 ListBuilder::new(inner_col_builder).with_field(Arc::clone(field)),
357 ))
358 }
359 DataType::Struct(fields) => {
360 let mut field_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
361 for field in fields {
362 let inner_col_builder = ArrowColumn::new(
363 field.name().clone(),
364 field.is_nullable(),
365 field.data_type().clone(),
366 typename_from_field(field)?,
367 item_capacity,
368 data_capacity,
369 )?;
370 field_builders.push(Box::new(inner_col_builder));
371 }
372 ColBuilder::StructBuilder(StructBuilder::new(fields.clone(), field_builders))
373 }
374 DataType::Map(entries_field, _sorted) => {
375 let entries_field = entries_field.as_ref();
376 if let DataType::Struct(fields) = entries_field.data_type() {
377 if fields.len() != 2 {
378 anyhow::bail!(
379 "Expected map entries to have 2 fields, found {}",
380 fields.len()
381 )
382 }
383 let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
384 let value_field = &fields[1];
385 let value_builder = ArrowColumn::new(
386 value_field.name().clone(),
387 value_field.is_nullable(),
388 value_field.data_type().clone(),
389 typename_from_field(value_field)?,
390 item_capacity,
391 data_capacity,
392 )?;
393 ColBuilder::MapBuilder(Box::new(
394 MapBuilder::with_capacity(
395 Some(MapFieldNames::default()),
396 key_builder,
397 value_builder,
398 item_capacity,
399 )
400 .with_values_field(Arc::clone(value_field)),
401 ))
402 } else {
403 anyhow::bail!("Expected map entries to be a struct")
404 }
405 }
406 _ => anyhow::bail!("{:?} unimplemented", data_type),
407 };
408 Ok(builder)
409}
410
411#[derive(Debug)]
412struct ArrowColumn {
413 field_name: String,
414 nullable: bool,
415 data_type: DataType,
416 extension_type_name: String,
417 inner: ColBuilder,
418}
419
420impl From<&ArrowColumn> for Field {
421 fn from(col: &ArrowColumn) -> Self {
422 field_with_typename(
423 &col.field_name,
424 col.data_type.clone(),
425 col.nullable,
426 &col.extension_type_name,
427 )
428 }
429}
430
431fn field_with_typename(
433 name: &str,
434 data_type: DataType,
435 nullable: bool,
436 extension_type_name: &str,
437) -> Field {
438 Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
439 "ARROW:extension:name".to_string(),
440 format!("materialize.v1.{}", extension_type_name),
441 )]))
442}
443
444fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
446 let metadata = field.metadata();
447 let extension_name = metadata
448 .get("ARROW:extension:name")
449 .ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
450 if let Some(name) = extension_name.strip_prefix("materialize.v1") {
451 Ok(name.to_string())
452 } else {
453 anyhow::bail!("Extension name {} does not match expected", extension_name,)
454 }
455}
456
457impl ArrowColumn {
458 fn new(
459 field_name: String,
460 nullable: bool,
461 data_type: DataType,
462 extension_type_name: String,
463 item_capacity: usize,
464 data_capacity: usize,
465 ) -> Result<Self, anyhow::Error> {
466 Ok(Self {
467 inner: builder_for_datatype(&data_type, item_capacity, data_capacity)?,
468 field_name,
469 nullable,
470 data_type,
471 extension_type_name,
472 })
473 }
474}
475
476macro_rules! make_col_builder {
477 ($($x:ident), *) => {
478 #[derive(Debug)]
482 enum ColBuilder {
483 $(
484 $x($x),
485 )*
486 ListBuilder(Box<ListBuilder<ArrowColumn>>),
490 MapBuilder(Box<MapBuilder<StringBuilder, ArrowColumn>>),
491 StructBuilder(StructBuilder),
496 }
497
498 impl ColBuilder {
499 fn append_null(&mut self) {
500 match self {
501 $(
502 ColBuilder::$x(builder) => builder.append_null(),
503 )*
504 ColBuilder::ListBuilder(builder) => builder.append_null(),
505 ColBuilder::MapBuilder(builder) => builder.append(false).unwrap(),
506 ColBuilder::StructBuilder(builder) => {
507 for i in 0..builder.num_fields() {
508 let field_builder: &mut ArrowColumn = builder.field_builder(i).unwrap();
509 field_builder.inner.append_null();
510 }
511 builder.append_null();
512 }
513 }
514 }
515 }
516
517 impl ArrayBuilder for ArrowColumn {
522 fn len(&self) -> usize {
523 match &self.inner {
524 $(
525 ColBuilder::$x(builder) => builder.len(),
526 )*
527 ColBuilder::ListBuilder(builder) => builder.len(),
528 ColBuilder::MapBuilder(builder) => builder.len(),
529 ColBuilder::StructBuilder(builder) => builder.len(),
530 }
531 }
532 fn finish(&mut self) -> ArrayRef {
533 match &mut self.inner {
534 $(
535 ColBuilder::$x(builder) => Arc::new(builder.finish()),
536 )*
537 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish()),
538 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish()),
539 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish()),
540 }
541 }
542 fn finish_cloned(&self) -> ArrayRef {
543 match &self.inner {
544 $(
545 ColBuilder::$x(builder) => Arc::new(builder.finish_cloned()),
546 )*
547 ColBuilder::ListBuilder(builder) => Arc::new(builder.finish_cloned()),
548 ColBuilder::MapBuilder(builder) => Arc::new(builder.finish_cloned()),
549 ColBuilder::StructBuilder(builder) => Arc::new(builder.finish_cloned()),
550 }
551 }
552 fn as_any(&self) -> &(dyn Any + 'static) {
553 self
554 }
555 fn as_any_mut(&mut self) -> &mut (dyn Any + 'static) {
556 self
557 }
558 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
559 self
560 }
561 }
562 };
563}
564
565make_col_builder!(
566 BooleanBuilder,
567 Int16Builder,
568 Int32Builder,
569 Int64Builder,
570 UInt8Builder,
571 UInt16Builder,
572 UInt32Builder,
573 UInt64Builder,
574 Float32Builder,
575 Float64Builder,
576 Date32Builder,
577 Time64MicrosecondBuilder,
578 TimestampMicrosecondBuilder,
579 LargeBinaryBuilder,
580 FixedSizeBinaryBuilder,
581 StringBuilder,
582 LargeStringBuilder,
583 Decimal128Builder
584);
585
586impl ArrowColumn {
587 fn append_datum(&mut self, datum: Datum) -> Result<(), anyhow::Error> {
588 match (&mut self.inner, datum) {
589 (s, Datum::Null) => s.append_null(),
590 (ColBuilder::BooleanBuilder(builder), Datum::False) => builder.append_value(false),
591 (ColBuilder::BooleanBuilder(builder), Datum::True) => builder.append_value(true),
592 (ColBuilder::Int16Builder(builder), Datum::Int16(i)) => builder.append_value(i),
593 (ColBuilder::Int32Builder(builder), Datum::Int32(i)) => builder.append_value(i),
594 (ColBuilder::Int64Builder(builder), Datum::Int64(i)) => builder.append_value(i),
595 (ColBuilder::UInt8Builder(builder), Datum::UInt8(i)) => builder.append_value(i),
596 (ColBuilder::UInt16Builder(builder), Datum::UInt16(i)) => builder.append_value(i),
597 (ColBuilder::UInt32Builder(builder), Datum::UInt32(i)) => builder.append_value(i),
598 (ColBuilder::UInt64Builder(builder), Datum::UInt64(i)) => builder.append_value(i),
599 (ColBuilder::Float32Builder(builder), Datum::Float32(f)) => builder.append_value(*f),
600 (ColBuilder::Float64Builder(builder), Datum::Float64(f)) => builder.append_value(*f),
601 (ColBuilder::Date32Builder(builder), Datum::Date(d)) => {
602 builder.append_value(d.unix_epoch_days())
603 }
604 (ColBuilder::Time64MicrosecondBuilder(builder), Datum::Time(t)) => {
605 let micros_since_midnight = i64::cast_from(t.num_seconds_from_midnight())
606 * 1_000_000
607 + i64::cast_from(t.nanosecond().checked_div(1000).unwrap());
608 builder.append_value(micros_since_midnight)
609 }
610 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::Timestamp(ts)) => {
611 builder.append_value(ts.and_utc().timestamp_micros())
612 }
613 (ColBuilder::TimestampMicrosecondBuilder(builder), Datum::TimestampTz(ts)) => {
614 builder.append_value(ts.timestamp_micros())
615 }
616 (ColBuilder::LargeBinaryBuilder(builder), Datum::Bytes(b)) => builder.append_value(b),
617 (ColBuilder::FixedSizeBinaryBuilder(builder), Datum::Uuid(val)) => {
618 builder.append_value(val.as_bytes())?
619 }
620 (ColBuilder::StringBuilder(builder), Datum::String(s)) => builder.append_value(s),
621 (ColBuilder::LargeStringBuilder(builder), _) if self.extension_type_name == "jsonb" => {
622 builder.append_value(JsonbRef::from_datum(datum).to_serde_json().to_string())
623 }
624 (ColBuilder::LargeStringBuilder(builder), Datum::String(s)) => builder.append_value(s),
625 (ColBuilder::UInt64Builder(builder), Datum::MzTimestamp(ts)) => {
626 builder.append_value(ts.into())
627 }
628 (ColBuilder::Decimal128Builder(builder), Datum::Numeric(mut dec)) => {
629 if dec.0.is_special() {
630 anyhow::bail!("Cannot represent special numeric value {} in parquet", dec)
631 }
632 if let DataType::Decimal128(precision, scale) = self.data_type {
633 if dec.0.digits() > precision.into() {
634 anyhow::bail!(
635 "Decimal value {} out of range for column with precision {}",
636 dec,
637 precision
638 )
639 }
640
641 let coefficient: i128 = dec.0.coefficient()?;
644 let exponent = dec.0.exponent();
645
646 let scale_diff = i32::from(scale) + exponent;
650 let scale_diff = u32::try_from(scale_diff).map_err(|_| {
653 anyhow::anyhow!(
654 "cannot represent decimal value {} in column with scale {}",
655 dec,
656 scale
657 )
658 })?;
659
660 let value = coefficient
661 .checked_mul(10_i128.pow(scale_diff))
662 .ok_or_else(|| {
663 anyhow::anyhow!("Decimal value {} out of range for parquet", dec)
664 })?;
665
666 builder.append_value(value)
667 } else {
668 anyhow::bail!("Expected Decimal128 data type")
669 }
670 }
671 (ColBuilder::StructBuilder(struct_builder), Datum::Array(arr)) => {
672 let list_builder: &mut ArrowColumn = struct_builder.field_builder(0).unwrap();
675 if let ColBuilder::ListBuilder(list_builder) = &mut list_builder.inner {
676 let inner_builder = list_builder.values();
677 for datum in arr.elements().into_iter() {
678 inner_builder.append_datum(datum)?;
679 }
680 list_builder.append(true);
681 } else {
682 anyhow::bail!(
683 "Expected ListBuilder for StructBuilder with Array datum: {:?}",
684 struct_builder
685 )
686 }
687 let dims_builder: &mut ArrowColumn = struct_builder.field_builder(1).unwrap();
688 if let ColBuilder::UInt8Builder(dims_builder) = &mut dims_builder.inner {
689 dims_builder.append_value(arr.dims().ndims());
690 } else {
691 anyhow::bail!(
692 "Expected UInt8Builder for StructBuilder with Array datum: {:?}",
693 struct_builder
694 )
695 }
696 struct_builder.append(true)
697 }
698 (ColBuilder::ListBuilder(list_builder), Datum::List(list)) => {
699 let inner_builder = list_builder.values();
700 for datum in list.into_iter() {
701 inner_builder.append_datum(datum)?;
702 }
703 list_builder.append(true)
704 }
705 (ColBuilder::MapBuilder(builder), Datum::Map(map)) => {
706 for (key, value) in map.iter() {
707 builder.keys().append_value(key);
708 builder.values().append_datum(value)?;
709 }
710 builder.append(true).unwrap()
711 }
712 (_builder, datum) => {
713 anyhow::bail!("Datum {:?} does not match builder", datum)
714 }
715 }
716 Ok(())
717 }
718}