1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19 IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20 MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use itertools::Itertools;
29use mz_ore::cast::CastFrom;
30use mz_repr::adt::date::Date;
31use mz_repr::adt::interval::Interval;
32use mz_repr::adt::jsonb::JsonbPacker;
33use mz_repr::adt::numeric::Numeric;
34use mz_repr::adt::range::{Range, RangeLowerBound, RangeUpperBound};
35use mz_repr::adt::timestamp::CheckedTimestamp;
36use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
37use ordered_float::OrderedFloat;
38use uuid::Uuid;
39
40use crate::mask_nulls;
41
42pub struct ArrowReader {
52 len: usize,
53 readers: Vec<ColReader>,
54}
55
56impl ArrowReader {
57 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
71 let inner_columns = array.columns();
72 let desc_columns = desc.typ().columns();
73
74 if inner_columns.len() != desc_columns.len() {
75 return Err(anyhow::anyhow!(
76 "wrong number of columns {} vs {}",
77 inner_columns.len(),
78 desc_columns.len()
79 ));
80 }
81
82 let mut readers = Vec::with_capacity(desc_columns.len());
83 for (col_name, col_type) in desc.iter() {
84 let column = array
85 .column_by_name(col_name)
86 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
87 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
88 .context(col_name.clone())?;
89
90 readers.push(reader);
91 }
92
93 Ok(ArrowReader {
94 len: array.len(),
95 readers,
96 })
97 }
98
99 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
101 let mut packer = row.packer();
102 for reader in &self.readers {
103 reader.read(idx, &mut packer).context(idx)?;
104 }
105 Ok(())
106 }
107
108 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
110 for idx in 0..self.len {
111 let mut row = Row::default();
112 self.read(idx, &mut row).context(idx)?;
113 rows.push(row);
114 }
115 Ok(self.len)
116 }
117}
118
119fn scalar_type_and_array_to_reader(
120 scalar_type: &SqlScalarType,
121 array: Arc<dyn Array>,
122) -> Result<ColReader, anyhow::Error> {
123 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
124 array
125 .as_any()
126 .downcast_ref::<T>()
127 .expect("checked DataType")
128 .clone()
129 }
130
131 match (scalar_type, array.data_type()) {
132 (SqlScalarType::Bool, DataType::Boolean) => {
133 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
134 }
135 (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
136 let array = downcast_array::<Int8Array>(array);
137 let cast: fn(i8) -> Datum<'static> = match scalar_type {
138 SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
139 SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
140 SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
141 _ => unreachable!("checked above"),
142 };
143 Ok(ColReader::Int8 { array, cast })
144 }
145 (SqlScalarType::Int16, DataType::Int16) => {
146 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
147 }
148 (SqlScalarType::Int32, DataType::Int32) => {
149 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
150 }
151 (SqlScalarType::Int64, DataType::Int64) => {
152 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
153 }
154 (
155 SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
156 DataType::UInt8,
157 ) => {
158 let array = downcast_array::<UInt8Array>(array);
159 let cast: fn(u8) -> Datum<'static> = match scalar_type {
160 SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
161 SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
162 SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
163 _ => unreachable!("checked above"),
164 };
165 Ok(ColReader::UInt8 { array, cast })
166 }
167 (SqlScalarType::UInt16, DataType::UInt16) => {
168 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
169 }
170 (SqlScalarType::UInt32, DataType::UInt32) => {
171 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
172 }
173 (SqlScalarType::UInt64, DataType::UInt64) => {
174 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
175 }
176 (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
177 let array = downcast_array::<Float16Array>(array);
178 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
179 SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
180 SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
181 _ => unreachable!("checked above"),
182 };
183 Ok(ColReader::Float16 { array, cast })
184 }
185 (SqlScalarType::Float32, DataType::Float32) => {
186 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
187 }
188 (SqlScalarType::Float64, DataType::Float64) => {
189 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
190 }
191 (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
193 use num_traits::Pow;
194
195 let base = Numeric::from(10);
196 let scale = Numeric::from(*scale);
197 let scale_factor = base.pow(scale);
198
199 let precision = usize::cast_from(*precision);
200 let mut ctx = dec::Context::<Numeric>::default();
202 ctx.set_precision(precision).map_err(|e| {
203 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
204 })?;
205
206 let array = downcast_array::<Decimal128Array>(array);
207
208 Ok(ColReader::Decimal128 {
209 array,
210 scale_factor,
211 precision,
212 })
213 }
214 (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
216 use num_traits::Pow;
217
218 let base = Numeric::from(10);
219 let scale = Numeric::from(*scale);
220 let scale_factor = base.pow(scale);
221
222 let precision = usize::cast_from(*precision);
223 let mut ctx = dec::Context::<Numeric>::default();
225 ctx.set_precision(precision).map_err(|e| {
226 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
227 })?;
228
229 let array = downcast_array::<Decimal256Array>(array);
230
231 Ok(ColReader::Decimal256 {
232 array,
233 scale_factor,
234 precision,
235 })
236 }
237 (SqlScalarType::Bytes, DataType::Binary) => {
238 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
239 }
240 (SqlScalarType::Bytes, DataType::LargeBinary) => {
241 let array = downcast_array::<LargeBinaryArray>(array);
242 Ok(ColReader::LargeBinary(array))
243 }
244 (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
245 let array = downcast_array::<FixedSizeBinaryArray>(array);
246 Ok(ColReader::FixedSizeBinary(array))
247 }
248 (SqlScalarType::Bytes, DataType::BinaryView) => {
249 let array = downcast_array::<BinaryViewArray>(array);
250 Ok(ColReader::BinaryView(array))
251 }
252 (
253 SqlScalarType::Uuid,
254 DataType::Binary
255 | DataType::BinaryView
256 | DataType::LargeBinary
257 | DataType::FixedSizeBinary(_),
258 ) => {
259 let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
260 .context("uuid reader")?;
261 Ok(ColReader::Uuid(Box::new(reader)))
262 }
263 (SqlScalarType::String, DataType::Utf8) => {
264 Ok(ColReader::String(downcast_array::<StringArray>(array)))
265 }
266 (SqlScalarType::String, DataType::LargeUtf8) => {
267 let array = downcast_array::<LargeStringArray>(array);
268 Ok(ColReader::LargeString(array))
269 }
270 (SqlScalarType::String, DataType::Utf8View) => {
271 let array = downcast_array::<StringViewArray>(array);
272 Ok(ColReader::StringView(array))
273 }
274 (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
275 let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
276 .context("json reader")?;
277 Ok(ColReader::Jsonb(Box::new(reader)))
278 }
279 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
280 let array = downcast_array::<TimestampSecondArray>(array);
281 Ok(ColReader::TimestampSecond(array))
282 }
283 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
284 let array = downcast_array::<TimestampMillisecondArray>(array);
285 Ok(ColReader::TimestampMillisecond(array))
286 }
287 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
288 let array = downcast_array::<TimestampMicrosecondArray>(array);
289 Ok(ColReader::TimestampMicrosecond(array))
290 }
291 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
292 let array = downcast_array::<TimestampNanosecondArray>(array);
293 Ok(ColReader::TimestampNanosecond(array))
294 }
295 (SqlScalarType::Date, DataType::Date32) => {
296 let array = downcast_array::<Date32Array>(array);
297 Ok(ColReader::Date32(array))
298 }
299 (SqlScalarType::Date, DataType::Date64) => {
300 let array = downcast_array::<Date64Array>(array);
301 Ok(ColReader::Date64(array))
302 }
303 (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
304 let array = downcast_array::<Time32SecondArray>(array);
305 Ok(ColReader::Time32Seconds(array))
306 }
307 (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
308 let array = downcast_array::<Time32MillisecondArray>(array);
309 Ok(ColReader::Time32Milliseconds(array))
310 }
311 (
312 SqlScalarType::List {
313 element_type,
314 custom_id: _,
315 },
316 DataType::List(_),
317 ) => {
318 let array = downcast_array::<ListArray>(array);
319 let inner_decoder =
320 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
321 .context("list")?;
322 Ok(ColReader::List {
323 offsets: array.offsets().clone(),
324 values: Box::new(inner_decoder),
325 nulls: array.nulls().cloned(),
326 })
327 }
328 (
329 SqlScalarType::List {
330 element_type,
331 custom_id: _,
332 },
333 DataType::LargeList(_),
334 ) => {
335 let array = downcast_array::<LargeListArray>(array);
336 let inner_decoder =
337 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
338 .context("large list")?;
339 Ok(ColReader::LargeList {
340 offsets: array.offsets().clone(),
341 values: Box::new(inner_decoder),
342 nulls: array.nulls().cloned(),
343 })
344 }
345 (
346 SqlScalarType::Record {
347 fields,
348 custom_id: _,
349 },
350 DataType::Struct(_),
351 ) => {
352 let record_array = downcast_array::<StructArray>(array);
353 let null_mask = record_array.nulls();
354
355 let mut decoders = Vec::with_capacity(fields.len());
356 for (name, typ) in fields.iter() {
357 let inner_array = record_array
358 .column_by_name(name)
359 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
360 let inner_array = mask_nulls(inner_array, null_mask);
361 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
362 .context(name.clone())?;
363
364 decoders.push(Box::new(inner_decoder));
365 }
366
367 Ok(ColReader::Record {
368 fields: decoders,
369 nulls: null_mask.cloned(),
370 })
371 }
372 (
373 SqlScalarType::Map {
374 value_type,
375 custom_id: _,
376 },
377 DataType::Map(_, _),
378 ) => {
379 let map_array = downcast_array::<MapArray>(array);
380 let keys = map_array
381 .keys()
382 .as_any()
383 .downcast_ref::<StringArray>()
384 .expect("map keys should be Utf8 strings")
385 .clone();
386 let values_reader =
387 scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
388 .context("map values")?;
389 Ok(ColReader::Map {
390 offsets: map_array.offsets().clone(),
391 keys,
392 values: Box::new(values_reader),
393 nulls: map_array.nulls().cloned(),
394 })
395 }
396 (SqlScalarType::Range { element_type }, DataType::Struct(_)) => {
397 let struct_array = downcast_array::<StructArray>(array);
398 let lower_array = struct_array
399 .column_by_name("lower")
400 .ok_or_else(|| anyhow::anyhow!("range struct missing 'lower' field"))?;
401 let upper_array = struct_array
402 .column_by_name("upper")
403 .ok_or_else(|| anyhow::anyhow!("range struct missing 'upper' field"))?;
404 let empty_array = struct_array
405 .column_by_name("empty")
406 .ok_or_else(|| anyhow::anyhow!("range struct missing 'empty' field"))?;
407 let lower_inclusive_array = struct_array
408 .column_by_name("lower_inclusive")
409 .ok_or_else(|| anyhow::anyhow!("range struct missing 'lower_inclusive' field"))?;
410 let upper_inclusive_array = struct_array
411 .column_by_name("upper_inclusive")
412 .ok_or_else(|| anyhow::anyhow!("range struct missing 'upper_inclusive' field"))?;
413
414 let lower_reader =
415 scalar_type_and_array_to_reader(element_type, Arc::clone(lower_array))
416 .context("range lower")?;
417 let upper_reader =
418 scalar_type_and_array_to_reader(element_type, Arc::clone(upper_array))
419 .context("range upper")?;
420
421 let empty = downcast_array::<BooleanArray>(Arc::clone(empty_array));
422 let lower_inclusive = downcast_array::<BooleanArray>(Arc::clone(lower_inclusive_array));
423 let upper_inclusive = downcast_array::<BooleanArray>(Arc::clone(upper_inclusive_array));
424
425 let lower_nulls = lower_array.nulls().cloned();
426 let upper_nulls = upper_array.nulls().cloned();
427
428 Ok(ColReader::Range {
429 lower: Box::new(lower_reader),
430 lower_nulls,
431 upper: Box::new(upper_reader),
432 upper_nulls,
433 lower_inclusive,
434 upper_inclusive,
435 empty,
436 nulls: struct_array.nulls().cloned(),
437 })
438 }
439 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
440 Ok(ColReader::IntervalYearMonth(downcast_array::<
441 IntervalYearMonthArray,
442 >(array)))
443 }
444 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
445 Ok(ColReader::IntervalDayTime(downcast_array::<
446 IntervalDayTimeArray,
447 >(array)))
448 }
449 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
450 Ok(ColReader::IntervalMonthDayNano(downcast_array::<
451 IntervalMonthDayNanoArray,
452 >(array)))
453 }
454 other => anyhow::bail!("unsupported: {other:?}"),
455 }
456}
457
458enum ColReader {
463 Boolean(arrow::array::BooleanArray),
464
465 Int8 {
466 array: arrow::array::Int8Array,
467 cast: fn(i8) -> Datum<'static>,
468 },
469 Int16(arrow::array::Int16Array),
470 Int32(arrow::array::Int32Array),
471 Int64(arrow::array::Int64Array),
472
473 UInt8 {
474 array: arrow::array::UInt8Array,
475 cast: fn(u8) -> Datum<'static>,
476 },
477 UInt16(arrow::array::UInt16Array),
478 UInt32(arrow::array::UInt32Array),
479 UInt64(arrow::array::UInt64Array),
480
481 Float16 {
482 array: arrow::array::Float16Array,
483 cast: fn(half::f16) -> Datum<'static>,
484 },
485 Float32(arrow::array::Float32Array),
486 Float64(arrow::array::Float64Array),
487
488 Decimal128 {
489 array: Decimal128Array,
490 scale_factor: Numeric,
491 precision: usize,
492 },
493 Decimal256 {
494 array: Decimal256Array,
495 scale_factor: Numeric,
496 precision: usize,
497 },
498
499 Binary(arrow::array::BinaryArray),
500 LargeBinary(arrow::array::LargeBinaryArray),
501 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
502 BinaryView(arrow::array::BinaryViewArray),
503 Uuid(Box<ColReader>),
504
505 String(arrow::array::StringArray),
506 LargeString(arrow::array::LargeStringArray),
507 StringView(arrow::array::StringViewArray),
508 Jsonb(Box<ColReader>),
509
510 TimestampSecond(arrow::array::TimestampSecondArray),
511 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
512 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
513 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
514
515 Date32(Date32Array),
516 Date64(Date64Array),
517
518 Time32Seconds(Time32SecondArray),
519 Time32Milliseconds(arrow::array::Time32MillisecondArray),
520
521 List {
522 offsets: OffsetBuffer<i32>,
523 values: Box<ColReader>,
524 nulls: Option<NullBuffer>,
525 },
526 LargeList {
527 offsets: OffsetBuffer<i64>,
528 values: Box<ColReader>,
529 nulls: Option<NullBuffer>,
530 },
531
532 Record {
533 fields: Vec<Box<ColReader>>,
534 nulls: Option<NullBuffer>,
535 },
536
537 Map {
538 offsets: OffsetBuffer<i32>,
539 keys: StringArray,
540 values: Box<ColReader>,
541 nulls: Option<NullBuffer>,
542 },
543
544 Range {
545 lower: Box<ColReader>,
546 lower_nulls: Option<NullBuffer>,
547 upper: Box<ColReader>,
548 upper_nulls: Option<NullBuffer>,
549 lower_inclusive: BooleanArray,
550 upper_inclusive: BooleanArray,
551 empty: BooleanArray,
552 nulls: Option<NullBuffer>,
553 },
554
555 IntervalYearMonth(IntervalYearMonthArray),
556 IntervalDayTime(IntervalDayTimeArray),
557 IntervalMonthDayNano(IntervalMonthDayNanoArray),
558}
559
560impl ColReader {
561 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
562 let datum = match self {
563 ColReader::Boolean(array) => array
564 .is_valid(idx)
565 .then(|| array.value(idx))
566 .map(|x| if x { Datum::True } else { Datum::False }),
567 ColReader::Int8 { array, cast } => {
568 array.is_valid(idx).then(|| array.value(idx)).map(cast)
569 }
570 ColReader::Int16(array) => array
571 .is_valid(idx)
572 .then(|| array.value(idx))
573 .map(Datum::Int16),
574 ColReader::Int32(array) => array
575 .is_valid(idx)
576 .then(|| array.value(idx))
577 .map(Datum::Int32),
578 ColReader::Int64(array) => array
579 .is_valid(idx)
580 .then(|| array.value(idx))
581 .map(Datum::Int64),
582 ColReader::UInt8 { array, cast } => {
583 array.is_valid(idx).then(|| array.value(idx)).map(cast)
584 }
585 ColReader::UInt16(array) => array
586 .is_valid(idx)
587 .then(|| array.value(idx))
588 .map(Datum::UInt16),
589 ColReader::UInt32(array) => array
590 .is_valid(idx)
591 .then(|| array.value(idx))
592 .map(Datum::UInt32),
593 ColReader::UInt64(array) => array
594 .is_valid(idx)
595 .then(|| array.value(idx))
596 .map(Datum::UInt64),
597 ColReader::Float16 { array, cast } => {
598 array.is_valid(idx).then(|| array.value(idx)).map(cast)
599 }
600 ColReader::Float32(array) => array
601 .is_valid(idx)
602 .then(|| array.value(idx))
603 .map(|x| Datum::Float32(OrderedFloat(x))),
604 ColReader::Float64(array) => array
605 .is_valid(idx)
606 .then(|| array.value(idx))
607 .map(|x| Datum::Float64(OrderedFloat(x))),
608 ColReader::Decimal128 {
609 array,
610 scale_factor,
611 precision,
612 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
613 let mut ctx = dec::Context::<Numeric>::default();
615 ctx.set_precision(*precision).expect("checked before");
616 let mut num = ctx.from_i128(x);
617
618 ctx.div(&mut num, scale_factor);
620
621 Datum::Numeric(OrderedDecimal(num))
622 }),
623 ColReader::Decimal256 {
624 array,
625 scale_factor,
626 precision,
627 } => array
628 .is_valid(idx)
629 .then(|| array.value(idx))
630 .map(|x| {
631 let s = x.to_string();
632
633 let mut ctx = dec::Context::<Numeric>::default();
637 ctx.set_precision(*precision).expect("checked before");
638 let mut num = ctx
639 .parse(s)
640 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
641
642 ctx.div(&mut num, scale_factor);
644
645 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
646 })
647 .transpose()?,
648 ColReader::Binary(array) => array
649 .is_valid(idx)
650 .then(|| array.value(idx))
651 .map(Datum::Bytes),
652 ColReader::LargeBinary(array) => array
653 .is_valid(idx)
654 .then(|| array.value(idx))
655 .map(Datum::Bytes),
656 ColReader::FixedSizeBinary(array) => array
657 .is_valid(idx)
658 .then(|| array.value(idx))
659 .map(Datum::Bytes),
660 ColReader::BinaryView(array) => array
661 .is_valid(idx)
662 .then(|| array.value(idx))
663 .map(Datum::Bytes),
664 ColReader::Uuid(reader) => {
665 let mut temp_row = SharedRow::get();
668 reader.read(idx, &mut temp_row.packer()).context("uuid")?;
669 let slice = match temp_row.unpack_first() {
670 Datum::Bytes(slice) => slice,
671 Datum::Null => {
672 packer.push(Datum::Null);
673 return Ok(());
674 }
675 other => anyhow::bail!("expected String, found {other:?}"),
676 };
677
678 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
679 Some(Datum::Uuid(uuid))
680 }
681 ColReader::String(array) => array
682 .is_valid(idx)
683 .then(|| array.value(idx))
684 .map(Datum::String),
685 ColReader::LargeString(array) => array
686 .is_valid(idx)
687 .then(|| array.value(idx))
688 .map(Datum::String),
689 ColReader::StringView(array) => array
690 .is_valid(idx)
691 .then(|| array.value(idx))
692 .map(Datum::String),
693 ColReader::Jsonb(reader) => {
694 let mut temp_row = SharedRow::get();
697 reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
698 let value = match temp_row.unpack_first() {
699 Datum::String(value) => value,
700 Datum::Null => {
701 packer.push(Datum::Null);
702 return Ok(());
703 }
704 other => anyhow::bail!("expected String, found {other:?}"),
705 };
706
707 JsonbPacker::new(packer)
708 .pack_str(value)
709 .context("roundtrip json")?;
710
711 return Ok(());
713 }
714 ColReader::TimestampSecond(array) => array
715 .is_valid(idx)
716 .then(|| array.value(idx))
717 .map(|secs| {
718 let dt = DateTime::from_timestamp(secs, 0)
719 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
720 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
721 .context("TimestampSeconds")?;
722 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
723 })
724 .transpose()?,
725 ColReader::TimestampMillisecond(array) => array
726 .is_valid(idx)
727 .then(|| array.value(idx))
728 .map(|millis| {
729 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
730 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
731 })?;
732 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
733 .context("TimestampMillis")?;
734 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
735 })
736 .transpose()?,
737 ColReader::TimestampMicrosecond(array) => array
738 .is_valid(idx)
739 .then(|| array.value(idx))
740 .map(|micros| {
741 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
742 anyhow::anyhow!("invalid timestamp microseconds {micros}")
743 })?;
744 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
745 .context("TimestampMicros")?;
746 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
747 })
748 .transpose()?,
749 ColReader::TimestampNanosecond(array) => array
750 .is_valid(idx)
751 .then(|| array.value(idx))
752 .map(|nanos| {
753 let dt = DateTime::from_timestamp_nanos(nanos);
754 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
755 .context("TimestampNanos")?;
756 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
757 })
758 .transpose()?,
759 ColReader::Date32(array) => array
760 .is_valid(idx)
761 .then(|| array.value(idx))
762 .map(|unix_days| {
763 let date = Date::from_unix_epoch(unix_days).context("date32")?;
764 Ok::<_, anyhow::Error>(Datum::Date(date))
765 })
766 .transpose()?,
767 ColReader::Date64(array) => array
768 .is_valid(idx)
769 .then(|| array.value(idx))
770 .map(|unix_millis| {
771 let date = DateTime::from_timestamp_millis(unix_millis)
772 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
773 let unix_epoch = DateTime::from_timestamp(0, 0)
774 .expect("UNIX epoch")
775 .date_naive();
776 let delta = date.date_naive().signed_duration_since(unix_epoch);
777 let days: i32 = delta.num_days().try_into().context("date64")?;
778 let date = Date::from_unix_epoch(days).context("date64")?;
779 Ok::<_, anyhow::Error>(Datum::Date(date))
780 })
781 .transpose()?,
782 ColReader::Time32Seconds(array) => array
783 .is_valid(idx)
784 .then(|| array.value(idx))
785 .map(|secs| {
786 let usecs: u32 = secs.try_into().context("time32 seconds")?;
787 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
788 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
789 Ok::<_, anyhow::Error>(Datum::Time(time))
790 })
791 .transpose()?,
792 ColReader::Time32Milliseconds(array) => array
793 .is_valid(idx)
794 .then(|| array.value(idx))
795 .map(|millis| {
796 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
797 let usecs = umillis / 1000;
798 let unanos = (umillis % 1000).saturating_mul(1_000_000);
799 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
800 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
801 Ok::<_, anyhow::Error>(Datum::Time(time))
802 })
803 .transpose()?,
804 ColReader::List {
805 offsets,
806 values,
807 nulls,
808 } => {
809 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
810 if !is_valid {
811 packer.push(Datum::Null);
812 return Ok(());
813 }
814
815 let start: usize = offsets[idx].try_into().context("list start offset")?;
816 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
817
818 packer
819 .push_list_with(|packer| {
820 for idx in start..end {
821 values.read(idx, packer)?;
822 }
823 Ok::<_, anyhow::Error>(())
824 })
825 .context("pack list")?;
826
827 return Ok(());
829 }
830 ColReader::LargeList {
831 offsets,
832 values,
833 nulls,
834 } => {
835 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
836 if !is_valid {
837 packer.push(Datum::Null);
838 return Ok(());
839 }
840
841 let start: usize = offsets[idx].try_into().context("list start offset")?;
842 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
843
844 packer
845 .push_list_with(|packer| {
846 for idx in start..end {
847 values.read(idx, packer)?;
848 }
849 Ok::<_, anyhow::Error>(())
850 })
851 .context("pack list")?;
852
853 return Ok(());
855 }
856 ColReader::Record { fields, nulls } => {
857 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
858 if !is_valid {
859 packer.push(Datum::Null);
860 return Ok(());
861 }
862
863 packer
864 .push_list_with(|packer| {
865 for field in fields {
866 field.read(idx, packer)?;
867 }
868 Ok::<_, anyhow::Error>(())
869 })
870 .context("pack record")?;
871
872 return Ok(());
874 }
875 ColReader::Map {
876 offsets,
877 keys,
878 values,
879 nulls,
880 } => {
881 let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
882 if !is_non_null {
883 packer.push(Datum::Null);
884 return Ok(());
885 }
886
887 let start: usize = offsets[idx].try_into().context("map start offset")?;
888 let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
889
890 let mut kv_sorted = (start..end)
894 .map(|i| (keys.value(i), i))
895 .sorted_by_key(|(k, _)| *k)
896 .peekable();
897
898 packer
899 .push_dict_with(|packer| {
900 while let Some((key, i)) = kv_sorted.next() {
901 if let Some((next_key, _)) = kv_sorted.peek() {
907 if key == *next_key {
908 continue;
909 }
910 }
911 packer.push(Datum::String(key));
912 values.read(i, packer)?;
913 }
914 Ok::<_, anyhow::Error>(())
915 })
916 .context("pack map")?;
917
918 return Ok(());
920 }
921 ColReader::Range {
922 lower,
923 lower_nulls,
924 upper,
925 upper_nulls,
926 lower_inclusive,
927 upper_inclusive,
928 empty,
929 nulls,
930 } => {
931 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
932 if !is_valid {
933 packer.push(Datum::Null);
934 return Ok(());
935 }
936
937 if empty.value(idx) {
938 packer.push(Datum::Range(Range { inner: None }));
939 return Ok(());
940 }
941
942 let lower_is_infinite = lower_nulls
943 .as_ref()
944 .map(|n| !n.is_valid(idx))
945 .unwrap_or(false);
946 let upper_is_infinite = upper_nulls
947 .as_ref()
948 .map(|n| !n.is_valid(idx))
949 .unwrap_or(false);
950
951 let lower_bound = if lower_is_infinite {
954 None
955 } else {
956 let mut temp = SharedRow::get();
957 lower.read(idx, &mut temp.packer())?;
958 let row = temp.clone();
959 Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
960 packer.push(row.unpack_first());
961 Ok(())
962 })
963 };
964
965 let upper_bound = if upper_is_infinite {
966 None
967 } else {
968 let mut temp = SharedRow::get();
969 upper.read(idx, &mut temp.packer())?;
970 let row = temp.clone();
971 Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
972 packer.push(row.unpack_first());
973 Ok(())
974 })
975 };
976
977 packer
978 .push_range_with(
979 RangeLowerBound {
980 inclusive: lower_inclusive.value(idx),
981 bound: lower_bound,
982 },
983 RangeUpperBound {
984 inclusive: upper_inclusive.value(idx),
985 bound: upper_bound,
986 },
987 )
988 .context("pack range")?;
989
990 return Ok(());
991 }
992 ColReader::IntervalYearMonth(array) => array
993 .is_valid(idx)
994 .then(|| array.value(idx))
995 .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
996 ColReader::IntervalDayTime(array) => {
997 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
998 let micros = i64::from(v.milliseconds) * 1_000;
999 Datum::Interval(Interval::new(0, v.days, micros))
1000 })
1001 }
1002 ColReader::IntervalMonthDayNano(array) => {
1003 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
1004 let micros = v.nanoseconds / 1_000;
1005 Datum::Interval(Interval::new(v.months, v.days, micros))
1006 })
1007 }
1008 };
1009
1010 match datum {
1011 Some(d) => packer.push(d),
1012 None => packer.push(Datum::Null),
1013 }
1014
1015 Ok(())
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use arrow::datatypes::Field;
1022 use mz_ore::collections::CollectionExt;
1023
1024 use super::*;
1025
1026 #[mz_ore::test]
1027 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
1029 let desc = RelationDesc::builder()
1030 .with_column("bool", SqlScalarType::Bool.nullable(true))
1031 .with_column("int4", SqlScalarType::Int32.nullable(true))
1032 .with_column("uint8", SqlScalarType::UInt64.nullable(true))
1033 .with_column("float32", SqlScalarType::Float32.nullable(true))
1034 .with_column("string", SqlScalarType::String.nullable(true))
1035 .with_column("bytes", SqlScalarType::Bytes.nullable(true))
1036 .with_column("uuid", SqlScalarType::Uuid.nullable(true))
1037 .with_column("json", SqlScalarType::Jsonb.nullable(true))
1038 .with_column(
1039 "list",
1040 SqlScalarType::List {
1041 element_type: Box::new(SqlScalarType::UInt32),
1042 custom_id: None,
1043 }
1044 .nullable(true),
1045 )
1046 .finish();
1047
1048 let mut og_row = Row::default();
1049 let mut packer = og_row.packer();
1050
1051 packer.extend([
1052 Datum::True,
1053 Datum::Int32(42),
1054 Datum::UInt64(10000),
1055 Datum::Float32(OrderedFloat::from(-1.1f32)),
1056 Datum::String("hello world"),
1057 Datum::Bytes(b"1010101"),
1058 Datum::Uuid(uuid::Uuid::new_v4()),
1059 ]);
1060 JsonbPacker::new(&mut packer)
1061 .pack_serde_json(
1062 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
1063 )
1064 .expect("failed to pack JSON");
1065 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
1066
1067 let null_row = Row::pack(vec![Datum::Null; 9]);
1068
1069 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
1071 builder.add_row(&og_row).unwrap();
1072 builder.add_row(&null_row).unwrap();
1073 let record_batch = builder.to_record_batch().unwrap();
1074
1075 let reader =
1077 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
1078 let mut rnd_row = Row::default();
1079
1080 reader.read(0, &mut rnd_row).unwrap();
1081 assert_eq!(&og_row, &rnd_row);
1082
1083 rnd_row.packer();
1085
1086 reader.read(1, &mut rnd_row).unwrap();
1087 assert_eq!(&null_row, &rnd_row);
1088 }
1089
1090 #[mz_ore::test]
1091 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
1093 let desc = RelationDesc::builder()
1094 .with_column(
1095 "a",
1096 SqlScalarType::Numeric { max_scale: None }.nullable(true),
1097 )
1098 .finish();
1099
1100 let mut dec128 = arrow::array::Decimal128Builder::new();
1101 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
1102
1103 dec128.append_value(1234);
1105 dec128.append_null();
1106 dec128.append_value(100000000009);
1108
1109 let dec128 = dec128.finish();
1110 #[allow(clippy::as_conversions)]
1111 let batch = StructArray::from(vec![(
1112 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
1113 Arc::new(dec128) as arrow::array::ArrayRef,
1114 )]);
1115
1116 let reader = ArrowReader::new(&desc, batch).unwrap();
1118 let mut rnd_row = Row::default();
1119
1120 reader.read(0, &mut rnd_row).unwrap();
1121 let num = rnd_row.into_element().unwrap_numeric();
1122 assert_eq!(num.0, Numeric::from(1.234f64));
1123
1124 rnd_row.packer();
1126
1127 reader.read(1, &mut rnd_row).unwrap();
1128 let num = rnd_row.into_element();
1129 assert_eq!(num, Datum::Null);
1130
1131 rnd_row.packer();
1133
1134 reader.read(2, &mut rnd_row).unwrap();
1135 let num = rnd_row.into_element().unwrap_numeric();
1136 assert_eq!(num.0, Numeric::from(100000000.009f64));
1137 }
1138
1139 #[mz_ore::test]
1140 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
1142 let desc = RelationDesc::builder()
1143 .with_column(
1144 "a",
1145 SqlScalarType::Numeric { max_scale: None }.nullable(true),
1146 )
1147 .finish();
1148
1149 let mut dec256 = arrow::array::Decimal256Builder::new();
1150 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1151
1152 dec256.append_value(arrow::datatypes::i256::from(1234));
1154 dec256.append_null();
1155 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1157
1158 let dec256 = dec256.finish();
1159 #[allow(clippy::as_conversions)]
1160 let batch = StructArray::from(vec![(
1161 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1162 Arc::new(dec256) as arrow::array::ArrayRef,
1163 )]);
1164
1165 let reader = ArrowReader::new(&desc, batch).unwrap();
1167 let mut rnd_row = Row::default();
1168
1169 reader.read(0, &mut rnd_row).unwrap();
1170 let num = rnd_row.into_element().unwrap_numeric();
1171 assert_eq!(num.0, Numeric::from(1.234f64));
1172
1173 rnd_row.packer();
1175
1176 reader.read(1, &mut rnd_row).unwrap();
1177 let num = rnd_row.into_element();
1178 assert_eq!(num, Datum::Null);
1179
1180 rnd_row.packer();
1182
1183 reader.read(2, &mut rnd_row).unwrap();
1184 let num = rnd_row.into_element().unwrap_numeric();
1185 assert_eq!(num.0, Numeric::from(100000000.009f64));
1186 }
1187}