1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
19 ListArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray,
20 Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
22 UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::jsonb::JsonbPacker;
31use mz_repr::adt::numeric::Numeric;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, RelationDesc, Row, RowPacker, ScalarType, SharedRow};
34use ordered_float::OrderedFloat;
35use uuid::Uuid;
36
37use crate::mask_nulls;
38
39pub struct ArrowReader {
49 len: usize,
50 readers: Vec<ColReader>,
51}
52
53impl ArrowReader {
54 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
68 let inner_columns = array.columns();
69 let desc_columns = desc.typ().columns();
70
71 if inner_columns.len() != desc_columns.len() {
72 return Err(anyhow::anyhow!(
73 "wrong number of columns {} vs {}",
74 inner_columns.len(),
75 desc_columns.len()
76 ));
77 }
78
79 let mut readers = Vec::with_capacity(desc_columns.len());
80 for (col_name, col_type) in desc.iter() {
81 let column = array
82 .column_by_name(col_name)
83 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
84 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
85 .context(col_name.clone())?;
86
87 readers.push(reader);
88 }
89
90 Ok(ArrowReader {
91 len: array.len(),
92 readers,
93 })
94 }
95
96 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
98 let mut packer = row.packer();
99 for reader in &self.readers {
100 reader.read(idx, &mut packer).context(idx)?;
101 }
102 Ok(())
103 }
104
105 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
107 for idx in 0..self.len {
108 let mut row = Row::default();
109 self.read(idx, &mut row).context(idx)?;
110 rows.push(row);
111 }
112 Ok(self.len)
113 }
114}
115
116fn scalar_type_and_array_to_reader(
117 scalar_type: &ScalarType,
118 array: Arc<dyn Array>,
119) -> Result<ColReader, anyhow::Error> {
120 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
121 array
122 .as_any()
123 .downcast_ref::<T>()
124 .expect("checked DataType")
125 .clone()
126 }
127
128 match (scalar_type, array.data_type()) {
129 (ScalarType::Bool, DataType::Boolean) => {
130 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
131 }
132 (ScalarType::Int16 | ScalarType::Int32 | ScalarType::Int64, DataType::Int8) => {
133 let array = downcast_array::<Int8Array>(array);
134 let cast: fn(i8) -> Datum<'static> = match scalar_type {
135 ScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
136 ScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
137 ScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
138 _ => unreachable!("checked above"),
139 };
140 Ok(ColReader::Int8 { array, cast })
141 }
142 (ScalarType::Int16, DataType::Int16) => {
143 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
144 }
145 (ScalarType::Int32, DataType::Int32) => {
146 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
147 }
148 (ScalarType::Int64, DataType::Int64) => {
149 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
150 }
151 (ScalarType::UInt16 | ScalarType::UInt32 | ScalarType::UInt64, DataType::UInt8) => {
152 let array = downcast_array::<UInt8Array>(array);
153 let cast: fn(u8) -> Datum<'static> = match scalar_type {
154 ScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
155 ScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
156 ScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
157 _ => unreachable!("checked above"),
158 };
159 Ok(ColReader::UInt8 { array, cast })
160 }
161 (ScalarType::UInt16, DataType::UInt16) => {
162 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
163 }
164 (ScalarType::UInt32, DataType::UInt32) => {
165 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
166 }
167 (ScalarType::UInt64, DataType::UInt64) => {
168 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
169 }
170 (ScalarType::Float32 | ScalarType::Float64, DataType::Float16) => {
171 let array = downcast_array::<Float16Array>(array);
172 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
173 ScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
174 ScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
175 _ => unreachable!("checked above"),
176 };
177 Ok(ColReader::Float16 { array, cast })
178 }
179 (ScalarType::Float32, DataType::Float32) => {
180 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
181 }
182 (ScalarType::Float64, DataType::Float64) => {
183 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
184 }
185 (ScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
187 use num_traits::Pow;
188
189 let base = Numeric::from(10);
190 let scale = Numeric::from(*scale);
191 let scale_factor = base.pow(scale);
192
193 let precision = usize::cast_from(*precision);
194 let mut ctx = dec::Context::<Numeric>::default();
196 ctx.set_precision(precision).map_err(|e| {
197 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
198 })?;
199
200 let array = downcast_array::<Decimal128Array>(array);
201
202 Ok(ColReader::Decimal128 {
203 array,
204 scale_factor,
205 precision,
206 })
207 }
208 (ScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
210 use num_traits::Pow;
211
212 let base = Numeric::from(10);
213 let scale = Numeric::from(*scale);
214 let scale_factor = base.pow(scale);
215
216 let precision = usize::cast_from(*precision);
217 let mut ctx = dec::Context::<Numeric>::default();
219 ctx.set_precision(precision).map_err(|e| {
220 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
221 })?;
222
223 let array = downcast_array::<Decimal256Array>(array);
224
225 Ok(ColReader::Decimal256 {
226 array,
227 scale_factor,
228 precision,
229 })
230 }
231 (ScalarType::Bytes, DataType::Binary) => {
232 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
233 }
234 (ScalarType::Bytes, DataType::LargeBinary) => {
235 let array = downcast_array::<LargeBinaryArray>(array);
236 Ok(ColReader::LargeBinary(array))
237 }
238 (ScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
239 let array = downcast_array::<FixedSizeBinaryArray>(array);
240 Ok(ColReader::FixedSizeBinary(array))
241 }
242 (ScalarType::Bytes, DataType::BinaryView) => {
243 let array = downcast_array::<BinaryViewArray>(array);
244 Ok(ColReader::BinaryView(array))
245 }
246 (
247 ScalarType::Uuid,
248 DataType::Binary
249 | DataType::BinaryView
250 | DataType::LargeBinary
251 | DataType::FixedSizeBinary(_),
252 ) => {
253 let reader = scalar_type_and_array_to_reader(&ScalarType::Bytes, array)
254 .context("uuid reader")?;
255 Ok(ColReader::Uuid(Box::new(reader)))
256 }
257 (ScalarType::String, DataType::Utf8) => {
258 Ok(ColReader::String(downcast_array::<StringArray>(array)))
259 }
260 (ScalarType::String, DataType::LargeUtf8) => {
261 let array = downcast_array::<LargeStringArray>(array);
262 Ok(ColReader::LargeString(array))
263 }
264 (ScalarType::String, DataType::Utf8View) => {
265 let array = downcast_array::<StringViewArray>(array);
266 Ok(ColReader::StringView(array))
267 }
268 (ScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
269 let reader = scalar_type_and_array_to_reader(&ScalarType::String, array)
270 .context("json reader")?;
271 Ok(ColReader::Jsonb(Box::new(reader)))
272 }
273 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
274 let array = downcast_array::<TimestampSecondArray>(array);
275 Ok(ColReader::TimestampSecond(array))
276 }
277 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
278 let array = downcast_array::<TimestampMillisecondArray>(array);
279 Ok(ColReader::TimestampMillisecond(array))
280 }
281 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
282 let array = downcast_array::<TimestampMicrosecondArray>(array);
283 Ok(ColReader::TimestampMicrosecond(array))
284 }
285 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
286 let array = downcast_array::<TimestampNanosecondArray>(array);
287 Ok(ColReader::TimestampNanosecond(array))
288 }
289 (ScalarType::Date, DataType::Date32) => {
290 let array = downcast_array::<Date32Array>(array);
291 Ok(ColReader::Date32(array))
292 }
293 (ScalarType::Date, DataType::Date64) => {
294 let array = downcast_array::<Date64Array>(array);
295 Ok(ColReader::Date64(array))
296 }
297 (ScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
298 let array = downcast_array::<Time32SecondArray>(array);
299 Ok(ColReader::Time32Seconds(array))
300 }
301 (ScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
302 let array = downcast_array::<Time32MillisecondArray>(array);
303 Ok(ColReader::Time32Milliseconds(array))
304 }
305 (
306 ScalarType::List {
307 element_type,
308 custom_id: _,
309 },
310 DataType::List(_),
311 ) => {
312 let array = downcast_array::<ListArray>(array);
313 let inner_decoder =
314 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
315 .context("list")?;
316 Ok(ColReader::List {
317 offsets: array.offsets().clone(),
318 values: Box::new(inner_decoder),
319 nulls: array.nulls().cloned(),
320 })
321 }
322 (
323 ScalarType::List {
324 element_type,
325 custom_id: _,
326 },
327 DataType::LargeList(_),
328 ) => {
329 let array = downcast_array::<LargeListArray>(array);
330 let inner_decoder =
331 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
332 .context("large list")?;
333 Ok(ColReader::LargeList {
334 offsets: array.offsets().clone(),
335 values: Box::new(inner_decoder),
336 nulls: array.nulls().cloned(),
337 })
338 }
339 (
340 ScalarType::Record {
341 fields,
342 custom_id: _,
343 },
344 DataType::Struct(_),
345 ) => {
346 let record_array = downcast_array::<StructArray>(array);
347 let null_mask = record_array.nulls();
348
349 let mut decoders = Vec::with_capacity(fields.len());
350 for (name, typ) in fields.iter() {
351 let inner_array = record_array
352 .column_by_name(name)
353 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
354 let inner_array = mask_nulls(inner_array, null_mask);
355 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
356 .context(name.clone())?;
357
358 decoders.push(Box::new(inner_decoder));
359 }
360
361 Ok(ColReader::Record {
362 fields: decoders,
363 nulls: null_mask.cloned(),
364 })
365 }
366 other => anyhow::bail!("unsupported: {other:?}"),
367 }
368}
369
370enum ColReader {
375 Boolean(arrow::array::BooleanArray),
376
377 Int8 {
378 array: arrow::array::Int8Array,
379 cast: fn(i8) -> Datum<'static>,
380 },
381 Int16(arrow::array::Int16Array),
382 Int32(arrow::array::Int32Array),
383 Int64(arrow::array::Int64Array),
384
385 UInt8 {
386 array: arrow::array::UInt8Array,
387 cast: fn(u8) -> Datum<'static>,
388 },
389 UInt16(arrow::array::UInt16Array),
390 UInt32(arrow::array::UInt32Array),
391 UInt64(arrow::array::UInt64Array),
392
393 Float16 {
394 array: arrow::array::Float16Array,
395 cast: fn(half::f16) -> Datum<'static>,
396 },
397 Float32(arrow::array::Float32Array),
398 Float64(arrow::array::Float64Array),
399
400 Decimal128 {
401 array: Decimal128Array,
402 scale_factor: Numeric,
403 precision: usize,
404 },
405 Decimal256 {
406 array: Decimal256Array,
407 scale_factor: Numeric,
408 precision: usize,
409 },
410
411 Binary(arrow::array::BinaryArray),
412 LargeBinary(arrow::array::LargeBinaryArray),
413 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
414 BinaryView(arrow::array::BinaryViewArray),
415 Uuid(Box<ColReader>),
416
417 String(arrow::array::StringArray),
418 LargeString(arrow::array::LargeStringArray),
419 StringView(arrow::array::StringViewArray),
420 Jsonb(Box<ColReader>),
421
422 TimestampSecond(arrow::array::TimestampSecondArray),
423 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
424 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
425 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
426
427 Date32(Date32Array),
428 Date64(Date64Array),
429
430 Time32Seconds(Time32SecondArray),
431 Time32Milliseconds(arrow::array::Time32MillisecondArray),
432
433 List {
434 offsets: OffsetBuffer<i32>,
435 values: Box<ColReader>,
436 nulls: Option<NullBuffer>,
437 },
438 LargeList {
439 offsets: OffsetBuffer<i64>,
440 values: Box<ColReader>,
441 nulls: Option<NullBuffer>,
442 },
443
444 Record {
445 fields: Vec<Box<ColReader>>,
446 nulls: Option<NullBuffer>,
447 },
448}
449
450impl ColReader {
451 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
452 let datum = match self {
453 ColReader::Boolean(array) => array
454 .is_valid(idx)
455 .then(|| array.value(idx))
456 .map(|x| if x { Datum::True } else { Datum::False }),
457 ColReader::Int8 { array, cast } => {
458 array.is_valid(idx).then(|| array.value(idx)).map(cast)
459 }
460 ColReader::Int16(array) => array
461 .is_valid(idx)
462 .then(|| array.value(idx))
463 .map(Datum::Int16),
464 ColReader::Int32(array) => array
465 .is_valid(idx)
466 .then(|| array.value(idx))
467 .map(Datum::Int32),
468 ColReader::Int64(array) => array
469 .is_valid(idx)
470 .then(|| array.value(idx))
471 .map(Datum::Int64),
472 ColReader::UInt8 { array, cast } => {
473 array.is_valid(idx).then(|| array.value(idx)).map(cast)
474 }
475 ColReader::UInt16(array) => array
476 .is_valid(idx)
477 .then(|| array.value(idx))
478 .map(Datum::UInt16),
479 ColReader::UInt32(array) => array
480 .is_valid(idx)
481 .then(|| array.value(idx))
482 .map(Datum::UInt32),
483 ColReader::UInt64(array) => array
484 .is_valid(idx)
485 .then(|| array.value(idx))
486 .map(Datum::UInt64),
487 ColReader::Float16 { array, cast } => {
488 array.is_valid(idx).then(|| array.value(idx)).map(cast)
489 }
490 ColReader::Float32(array) => array
491 .is_valid(idx)
492 .then(|| array.value(idx))
493 .map(|x| Datum::Float32(OrderedFloat(x))),
494 ColReader::Float64(array) => array
495 .is_valid(idx)
496 .then(|| array.value(idx))
497 .map(|x| Datum::Float64(OrderedFloat(x))),
498 ColReader::Decimal128 {
499 array,
500 scale_factor,
501 precision,
502 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
503 let mut ctx = dec::Context::<Numeric>::default();
505 ctx.set_precision(*precision).expect("checked before");
506 let mut num = ctx.from_i128(x);
507
508 ctx.div(&mut num, scale_factor);
510
511 Datum::Numeric(OrderedDecimal(num))
512 }),
513 ColReader::Decimal256 {
514 array,
515 scale_factor,
516 precision,
517 } => array
518 .is_valid(idx)
519 .then(|| array.value(idx))
520 .map(|x| {
521 let s = x.to_string();
522
523 let mut ctx = dec::Context::<Numeric>::default();
527 ctx.set_precision(*precision).expect("checked before");
528 let mut num = ctx
529 .parse(s)
530 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
531
532 ctx.div(&mut num, scale_factor);
534
535 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
536 })
537 .transpose()?,
538 ColReader::Binary(array) => array
539 .is_valid(idx)
540 .then(|| array.value(idx))
541 .map(Datum::Bytes),
542 ColReader::LargeBinary(array) => array
543 .is_valid(idx)
544 .then(|| array.value(idx))
545 .map(Datum::Bytes),
546 ColReader::FixedSizeBinary(array) => array
547 .is_valid(idx)
548 .then(|| array.value(idx))
549 .map(Datum::Bytes),
550 ColReader::BinaryView(array) => array
551 .is_valid(idx)
552 .then(|| array.value(idx))
553 .map(Datum::Bytes),
554 ColReader::Uuid(reader) => {
555 let mut temp_row = SharedRow::get();
558 reader.read(idx, &mut temp_row.packer()).context("uuid")?;
559 let slice = match temp_row.unpack_first() {
560 Datum::Bytes(slice) => slice,
561 Datum::Null => {
562 packer.push(Datum::Null);
563 return Ok(());
564 }
565 other => anyhow::bail!("expected String, found {other:?}"),
566 };
567
568 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
569 Some(Datum::Uuid(uuid))
570 }
571 ColReader::String(array) => array
572 .is_valid(idx)
573 .then(|| array.value(idx))
574 .map(Datum::String),
575 ColReader::LargeString(array) => array
576 .is_valid(idx)
577 .then(|| array.value(idx))
578 .map(Datum::String),
579 ColReader::StringView(array) => array
580 .is_valid(idx)
581 .then(|| array.value(idx))
582 .map(Datum::String),
583 ColReader::Jsonb(reader) => {
584 let mut temp_row = SharedRow::get();
587 reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
588 let value = match temp_row.unpack_first() {
589 Datum::String(value) => value,
590 Datum::Null => {
591 packer.push(Datum::Null);
592 return Ok(());
593 }
594 other => anyhow::bail!("expected String, found {other:?}"),
595 };
596
597 JsonbPacker::new(packer)
598 .pack_str(value)
599 .context("roundtrip json")?;
600
601 return Ok(());
603 }
604 ColReader::TimestampSecond(array) => array
605 .is_valid(idx)
606 .then(|| array.value(idx))
607 .map(|secs| {
608 let dt = DateTime::from_timestamp(secs, 0)
609 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
610 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
611 .context("TimestampSeconds")?;
612 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
613 })
614 .transpose()?,
615 ColReader::TimestampMillisecond(array) => array
616 .is_valid(idx)
617 .then(|| array.value(idx))
618 .map(|millis| {
619 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
620 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
621 })?;
622 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
623 .context("TimestampMillis")?;
624 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
625 })
626 .transpose()?,
627 ColReader::TimestampMicrosecond(array) => array
628 .is_valid(idx)
629 .then(|| array.value(idx))
630 .map(|micros| {
631 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
632 anyhow::anyhow!("invalid timestamp microseconds {micros}")
633 })?;
634 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
635 .context("TimestampMicros")?;
636 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
637 })
638 .transpose()?,
639 ColReader::TimestampNanosecond(array) => array
640 .is_valid(idx)
641 .then(|| array.value(idx))
642 .map(|nanos| {
643 let dt = DateTime::from_timestamp_nanos(nanos);
644 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
645 .context("TimestampNanos")?;
646 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
647 })
648 .transpose()?,
649 ColReader::Date32(array) => array
650 .is_valid(idx)
651 .then(|| array.value(idx))
652 .map(|unix_days| {
653 let date = Date::from_unix_epoch(unix_days).context("date32")?;
654 Ok::<_, anyhow::Error>(Datum::Date(date))
655 })
656 .transpose()?,
657 ColReader::Date64(array) => array
658 .is_valid(idx)
659 .then(|| array.value(idx))
660 .map(|unix_millis| {
661 let date = DateTime::from_timestamp_millis(unix_millis)
662 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
663 let unix_epoch = DateTime::from_timestamp(0, 0)
664 .expect("UNIX epoch")
665 .date_naive();
666 let delta = date.date_naive().signed_duration_since(unix_epoch);
667 let days: i32 = delta.num_days().try_into().context("date64")?;
668 let date = Date::from_unix_epoch(days).context("date64")?;
669 Ok::<_, anyhow::Error>(Datum::Date(date))
670 })
671 .transpose()?,
672 ColReader::Time32Seconds(array) => array
673 .is_valid(idx)
674 .then(|| array.value(idx))
675 .map(|secs| {
676 let usecs: u32 = secs.try_into().context("time32 seconds")?;
677 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
678 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
679 Ok::<_, anyhow::Error>(Datum::Time(time))
680 })
681 .transpose()?,
682 ColReader::Time32Milliseconds(array) => array
683 .is_valid(idx)
684 .then(|| array.value(idx))
685 .map(|millis| {
686 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
687 let usecs = umillis / 1000;
688 let unanos = (umillis % 1000).saturating_mul(1_000_000);
689 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
690 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
691 Ok::<_, anyhow::Error>(Datum::Time(time))
692 })
693 .transpose()?,
694 ColReader::List {
695 offsets,
696 values,
697 nulls,
698 } => {
699 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
700 if !is_valid {
701 packer.push(Datum::Null);
702 return Ok(());
703 }
704
705 let start: usize = offsets[idx].try_into().context("list start offset")?;
706 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
707
708 packer
709 .push_list_with(|packer| {
710 for idx in start..end {
711 values.read(idx, packer)?;
712 }
713 Ok::<_, anyhow::Error>(())
714 })
715 .context("pack list")?;
716
717 return Ok(());
719 }
720 ColReader::LargeList {
721 offsets,
722 values,
723 nulls,
724 } => {
725 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
726 if !is_valid {
727 packer.push(Datum::Null);
728 return Ok(());
729 }
730
731 let start: usize = offsets[idx].try_into().context("list start offset")?;
732 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
733
734 packer
735 .push_list_with(|packer| {
736 for idx in start..end {
737 values.read(idx, packer)?;
738 }
739 Ok::<_, anyhow::Error>(())
740 })
741 .context("pack list")?;
742
743 return Ok(());
745 }
746 ColReader::Record { fields, nulls } => {
747 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
748 if !is_valid {
749 packer.push(Datum::Null);
750 return Ok(());
751 }
752
753 packer
754 .push_list_with(|packer| {
755 for field in fields {
756 field.read(idx, packer)?;
757 }
758 Ok::<_, anyhow::Error>(())
759 })
760 .context("pack record")?;
761
762 return Ok(());
764 }
765 };
766
767 match datum {
768 Some(d) => packer.push(d),
769 None => packer.push(Datum::Null),
770 }
771
772 Ok(())
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use arrow::datatypes::Field;
779 use mz_ore::collections::CollectionExt;
780
781 use super::*;
782
783 #[mz_ore::test]
784 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
786 let desc = RelationDesc::builder()
787 .with_column("bool", ScalarType::Bool.nullable(true))
788 .with_column("int4", ScalarType::Int32.nullable(true))
789 .with_column("uint8", ScalarType::UInt64.nullable(true))
790 .with_column("float32", ScalarType::Float32.nullable(true))
791 .with_column("string", ScalarType::String.nullable(true))
792 .with_column("bytes", ScalarType::Bytes.nullable(true))
793 .with_column("uuid", ScalarType::Uuid.nullable(true))
794 .with_column("json", ScalarType::Jsonb.nullable(true))
795 .with_column(
796 "list",
797 ScalarType::List {
798 element_type: Box::new(ScalarType::UInt32),
799 custom_id: None,
800 }
801 .nullable(true),
802 )
803 .finish();
804
805 let mut og_row = Row::default();
806 let mut packer = og_row.packer();
807
808 packer.extend([
809 Datum::True,
810 Datum::Int32(42),
811 Datum::UInt64(10000),
812 Datum::Float32(OrderedFloat::from(-1.1f32)),
813 Datum::String("hello world"),
814 Datum::Bytes(b"1010101"),
815 Datum::Uuid(uuid::Uuid::new_v4()),
816 ]);
817 JsonbPacker::new(&mut packer)
818 .pack_serde_json(
819 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
820 )
821 .expect("failed to pack JSON");
822 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
823
824 let null_row = Row::pack(vec![Datum::Null; 9]);
825
826 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
828 builder.add_row(&og_row).unwrap();
829 builder.add_row(&null_row).unwrap();
830 let record_batch = builder.to_record_batch().unwrap();
831
832 let reader =
834 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
835 let mut rnd_row = Row::default();
836
837 reader.read(0, &mut rnd_row).unwrap();
838 assert_eq!(&og_row, &rnd_row);
839
840 rnd_row.packer();
842
843 reader.read(1, &mut rnd_row).unwrap();
844 assert_eq!(&null_row, &rnd_row);
845 }
846
847 #[mz_ore::test]
848 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
850 let desc = RelationDesc::builder()
851 .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
852 .finish();
853
854 let mut dec128 = arrow::array::Decimal128Builder::new();
855 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
856
857 dec128.append_value(1234);
859 dec128.append_null();
860 dec128.append_value(100000000009);
862
863 let dec128 = dec128.finish();
864 #[allow(clippy::as_conversions)]
865 let batch = StructArray::from(vec![(
866 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
867 Arc::new(dec128) as arrow::array::ArrayRef,
868 )]);
869
870 let reader = ArrowReader::new(&desc, batch).unwrap();
872 let mut rnd_row = Row::default();
873
874 reader.read(0, &mut rnd_row).unwrap();
875 let num = rnd_row.into_element().unwrap_numeric();
876 assert_eq!(num.0, Numeric::from(1.234f64));
877
878 rnd_row.packer();
880
881 reader.read(1, &mut rnd_row).unwrap();
882 let num = rnd_row.into_element();
883 assert_eq!(num, Datum::Null);
884
885 rnd_row.packer();
887
888 reader.read(2, &mut rnd_row).unwrap();
889 let num = rnd_row.into_element().unwrap_numeric();
890 assert_eq!(num.0, Numeric::from(100000000.009f64));
891 }
892
893 #[mz_ore::test]
894 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
896 let desc = RelationDesc::builder()
897 .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
898 .finish();
899
900 let mut dec256 = arrow::array::Decimal256Builder::new();
901 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
902
903 dec256.append_value(arrow::datatypes::i256::from(1234));
905 dec256.append_null();
906 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
908
909 let dec256 = dec256.finish();
910 #[allow(clippy::as_conversions)]
911 let batch = StructArray::from(vec![(
912 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
913 Arc::new(dec256) as arrow::array::ArrayRef,
914 )]);
915
916 let reader = ArrowReader::new(&desc, batch).unwrap();
918 let mut rnd_row = Row::default();
919
920 reader.read(0, &mut rnd_row).unwrap();
921 let num = rnd_row.into_element().unwrap_numeric();
922 assert_eq!(num.0, Numeric::from(1.234f64));
923
924 rnd_row.packer();
926
927 reader.read(1, &mut rnd_row).unwrap();
928 let num = rnd_row.into_element();
929 assert_eq!(num, Datum::Null);
930
931 rnd_row.packer();
933
934 reader.read(2, &mut rnd_row).unwrap();
935 let num = rnd_row.into_element().unwrap_numeric();
936 assert_eq!(num.0, Numeric::from(100000000.009f64));
937 }
938}