1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
19 ListArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray,
20 Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
22 UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::jsonb::JsonbPacker;
31use mz_repr::adt::numeric::Numeric;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, RelationDesc, Row, RowPacker, ScalarType, SharedRow};
34use ordered_float::OrderedFloat;
35use uuid::Uuid;
36
37use crate::mask_nulls;
38
39pub struct ArrowReader {
49 len: usize,
50 readers: Vec<ColReader>,
51}
52
53impl ArrowReader {
54 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
68 let inner_columns = array.columns();
69 let desc_columns = desc.typ().columns();
70
71 if inner_columns.len() != desc_columns.len() {
72 return Err(anyhow::anyhow!(
73 "wrong number of columns {} vs {}",
74 inner_columns.len(),
75 desc_columns.len()
76 ));
77 }
78
79 let mut readers = Vec::with_capacity(desc_columns.len());
80 for (col_name, col_type) in desc.iter() {
81 let column = array
82 .column_by_name(col_name.as_str())
83 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
84 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
85 .context(col_name.clone())?;
86
87 readers.push(reader);
88 }
89
90 Ok(ArrowReader {
91 len: array.len(),
92 readers,
93 })
94 }
95
96 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
98 let mut packer = row.packer();
99 for reader in &self.readers {
100 reader.read(idx, &mut packer).context(idx)?;
101 }
102 Ok(())
103 }
104
105 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
107 for idx in 0..self.len {
108 let mut row = Row::default();
109 self.read(idx, &mut row).context(idx)?;
110 rows.push(row);
111 }
112 Ok(self.len)
113 }
114}
115
116fn scalar_type_and_array_to_reader(
117 scalar_type: &ScalarType,
118 array: Arc<dyn Array>,
119) -> Result<ColReader, anyhow::Error> {
120 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
121 array
122 .as_any()
123 .downcast_ref::<T>()
124 .expect("checked DataType")
125 .clone()
126 }
127
128 match (scalar_type, array.data_type()) {
129 (ScalarType::Bool, DataType::Boolean) => {
130 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
131 }
132 (ScalarType::Int16 | ScalarType::Int32 | ScalarType::Int64, DataType::Int8) => {
133 let array = downcast_array::<Int8Array>(array);
134 let cast: fn(i8) -> Datum<'static> = match scalar_type {
135 ScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
136 ScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
137 ScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
138 _ => unreachable!("checked above"),
139 };
140 Ok(ColReader::Int8 { array, cast })
141 }
142 (ScalarType::Int16, DataType::Int16) => {
143 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
144 }
145 (ScalarType::Int32, DataType::Int32) => {
146 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
147 }
148 (ScalarType::Int64, DataType::Int64) => {
149 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
150 }
151 (ScalarType::UInt16 | ScalarType::UInt32 | ScalarType::UInt64, DataType::UInt8) => {
152 let array = downcast_array::<UInt8Array>(array);
153 let cast: fn(u8) -> Datum<'static> = match scalar_type {
154 ScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
155 ScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
156 ScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
157 _ => unreachable!("checked above"),
158 };
159 Ok(ColReader::UInt8 { array, cast })
160 }
161 (ScalarType::UInt16, DataType::UInt16) => {
162 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
163 }
164 (ScalarType::UInt32, DataType::UInt32) => {
165 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
166 }
167 (ScalarType::UInt64, DataType::UInt64) => {
168 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
169 }
170 (ScalarType::Float32 | ScalarType::Float64, DataType::Float16) => {
171 let array = downcast_array::<Float16Array>(array);
172 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
173 ScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
174 ScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
175 _ => unreachable!("checked above"),
176 };
177 Ok(ColReader::Float16 { array, cast })
178 }
179 (ScalarType::Float32, DataType::Float32) => {
180 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
181 }
182 (ScalarType::Float64, DataType::Float64) => {
183 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
184 }
185 (ScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
187 use num_traits::Pow;
188
189 let base = Numeric::from(10);
190 let scale = Numeric::from(*scale);
191 let scale_factor = base.pow(scale);
192
193 let precision = usize::cast_from(*precision);
194 let mut ctx = dec::Context::<Numeric>::default();
196 ctx.set_precision(precision).map_err(|e| {
197 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
198 })?;
199
200 let array = downcast_array::<Decimal128Array>(array);
201
202 Ok(ColReader::Decimal128 {
203 array,
204 scale_factor,
205 precision,
206 })
207 }
208 (ScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
210 use num_traits::Pow;
211
212 let base = Numeric::from(10);
213 let scale = Numeric::from(*scale);
214 let scale_factor = base.pow(scale);
215
216 let precision = usize::cast_from(*precision);
217 let mut ctx = dec::Context::<Numeric>::default();
219 ctx.set_precision(precision).map_err(|e| {
220 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
221 })?;
222
223 let array = downcast_array::<Decimal256Array>(array);
224
225 Ok(ColReader::Decimal256 {
226 array,
227 scale_factor,
228 precision,
229 })
230 }
231 (ScalarType::Bytes, DataType::Binary) => {
232 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
233 }
234 (ScalarType::Bytes, DataType::LargeBinary) => {
235 let array = downcast_array::<LargeBinaryArray>(array);
236 Ok(ColReader::LargeBinary(array))
237 }
238 (ScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
239 let array = downcast_array::<FixedSizeBinaryArray>(array);
240 Ok(ColReader::FixedSizeBinary(array))
241 }
242 (ScalarType::Bytes, DataType::BinaryView) => {
243 let array = downcast_array::<BinaryViewArray>(array);
244 Ok(ColReader::BinaryView(array))
245 }
246 (
247 ScalarType::Uuid,
248 DataType::Binary
249 | DataType::BinaryView
250 | DataType::LargeBinary
251 | DataType::FixedSizeBinary(_),
252 ) => {
253 let reader = scalar_type_and_array_to_reader(&ScalarType::Bytes, array)
254 .context("uuid reader")?;
255 Ok(ColReader::Uuid(Box::new(reader)))
256 }
257 (ScalarType::String, DataType::Utf8) => {
258 Ok(ColReader::String(downcast_array::<StringArray>(array)))
259 }
260 (ScalarType::String, DataType::LargeUtf8) => {
261 let array = downcast_array::<LargeStringArray>(array);
262 Ok(ColReader::LargeString(array))
263 }
264 (ScalarType::String, DataType::Utf8View) => {
265 let array = downcast_array::<StringViewArray>(array);
266 Ok(ColReader::StringView(array))
267 }
268 (ScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
269 let reader = scalar_type_and_array_to_reader(&ScalarType::String, array)
270 .context("json reader")?;
271 Ok(ColReader::Jsonb(Box::new(reader)))
272 }
273 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
274 let array = downcast_array::<TimestampSecondArray>(array);
275 Ok(ColReader::TimestampSecond(array))
276 }
277 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
278 let array = downcast_array::<TimestampMillisecondArray>(array);
279 Ok(ColReader::TimestampMillisecond(array))
280 }
281 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
282 let array = downcast_array::<TimestampMicrosecondArray>(array);
283 Ok(ColReader::TimestampMicrosecond(array))
284 }
285 (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
286 let array = downcast_array::<TimestampNanosecondArray>(array);
287 Ok(ColReader::TimestampNanosecond(array))
288 }
289 (ScalarType::Date, DataType::Date32) => {
290 let array = downcast_array::<Date32Array>(array);
291 Ok(ColReader::Date32(array))
292 }
293 (ScalarType::Date, DataType::Date64) => {
294 let array = downcast_array::<Date64Array>(array);
295 Ok(ColReader::Date64(array))
296 }
297 (ScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
298 let array = downcast_array::<Time32SecondArray>(array);
299 Ok(ColReader::Time32Seconds(array))
300 }
301 (ScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
302 let array = downcast_array::<Time32MillisecondArray>(array);
303 Ok(ColReader::Time32Milliseconds(array))
304 }
305 (
306 ScalarType::List {
307 element_type,
308 custom_id: _,
309 },
310 DataType::List(_),
311 ) => {
312 let array = downcast_array::<ListArray>(array);
313 let inner_decoder =
314 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
315 .context("list")?;
316 Ok(ColReader::List {
317 offsets: array.offsets().clone(),
318 values: Box::new(inner_decoder),
319 nulls: array.nulls().cloned(),
320 })
321 }
322 (
323 ScalarType::List {
324 element_type,
325 custom_id: _,
326 },
327 DataType::LargeList(_),
328 ) => {
329 let array = downcast_array::<LargeListArray>(array);
330 let inner_decoder =
331 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
332 .context("large list")?;
333 Ok(ColReader::LargeList {
334 offsets: array.offsets().clone(),
335 values: Box::new(inner_decoder),
336 nulls: array.nulls().cloned(),
337 })
338 }
339 (
340 ScalarType::Record {
341 fields,
342 custom_id: _,
343 },
344 DataType::Struct(_),
345 ) => {
346 let record_array = downcast_array::<StructArray>(array);
347 let null_mask = record_array.nulls();
348
349 let mut decoders = Vec::with_capacity(fields.len());
350 for (name, typ) in fields.iter() {
351 let inner_array = record_array
352 .column_by_name(name.as_str())
353 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
354 let inner_array = mask_nulls(inner_array, null_mask);
355 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
356 .context(name.clone())?;
357
358 decoders.push(Box::new(inner_decoder));
359 }
360
361 Ok(ColReader::Record {
362 fields: decoders,
363 nulls: null_mask.cloned(),
364 })
365 }
366 other => anyhow::bail!("unsupported: {other:?}"),
367 }
368}
369
370enum ColReader {
375 Boolean(arrow::array::BooleanArray),
376
377 Int8 {
378 array: arrow::array::Int8Array,
379 cast: fn(i8) -> Datum<'static>,
380 },
381 Int16(arrow::array::Int16Array),
382 Int32(arrow::array::Int32Array),
383 Int64(arrow::array::Int64Array),
384
385 UInt8 {
386 array: arrow::array::UInt8Array,
387 cast: fn(u8) -> Datum<'static>,
388 },
389 UInt16(arrow::array::UInt16Array),
390 UInt32(arrow::array::UInt32Array),
391 UInt64(arrow::array::UInt64Array),
392
393 Float16 {
394 array: arrow::array::Float16Array,
395 cast: fn(half::f16) -> Datum<'static>,
396 },
397 Float32(arrow::array::Float32Array),
398 Float64(arrow::array::Float64Array),
399
400 Decimal128 {
401 array: Decimal128Array,
402 scale_factor: Numeric,
403 precision: usize,
404 },
405 Decimal256 {
406 array: Decimal256Array,
407 scale_factor: Numeric,
408 precision: usize,
409 },
410
411 Binary(arrow::array::BinaryArray),
412 LargeBinary(arrow::array::LargeBinaryArray),
413 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
414 BinaryView(arrow::array::BinaryViewArray),
415 Uuid(Box<ColReader>),
416
417 String(arrow::array::StringArray),
418 LargeString(arrow::array::LargeStringArray),
419 StringView(arrow::array::StringViewArray),
420 Jsonb(Box<ColReader>),
421
422 TimestampSecond(arrow::array::TimestampSecondArray),
423 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
424 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
425 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
426
427 Date32(Date32Array),
428 Date64(Date64Array),
429
430 Time32Seconds(Time32SecondArray),
431 Time32Milliseconds(arrow::array::Time32MillisecondArray),
432
433 List {
434 offsets: OffsetBuffer<i32>,
435 values: Box<ColReader>,
436 nulls: Option<NullBuffer>,
437 },
438 LargeList {
439 offsets: OffsetBuffer<i64>,
440 values: Box<ColReader>,
441 nulls: Option<NullBuffer>,
442 },
443
444 Record {
445 fields: Vec<Box<ColReader>>,
446 nulls: Option<NullBuffer>,
447 },
448}
449
450impl ColReader {
451 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
452 let datum = match self {
453 ColReader::Boolean(array) => array
454 .is_valid(idx)
455 .then(|| array.value(idx))
456 .map(|x| if x { Datum::True } else { Datum::False }),
457 ColReader::Int8 { array, cast } => {
458 array.is_valid(idx).then(|| array.value(idx)).map(cast)
459 }
460 ColReader::Int16(array) => array
461 .is_valid(idx)
462 .then(|| array.value(idx))
463 .map(Datum::Int16),
464 ColReader::Int32(array) => array
465 .is_valid(idx)
466 .then(|| array.value(idx))
467 .map(Datum::Int32),
468 ColReader::Int64(array) => array
469 .is_valid(idx)
470 .then(|| array.value(idx))
471 .map(Datum::Int64),
472 ColReader::UInt8 { array, cast } => {
473 array.is_valid(idx).then(|| array.value(idx)).map(cast)
474 }
475 ColReader::UInt16(array) => array
476 .is_valid(idx)
477 .then(|| array.value(idx))
478 .map(Datum::UInt16),
479 ColReader::UInt32(array) => array
480 .is_valid(idx)
481 .then(|| array.value(idx))
482 .map(Datum::UInt32),
483 ColReader::UInt64(array) => array
484 .is_valid(idx)
485 .then(|| array.value(idx))
486 .map(Datum::UInt64),
487 ColReader::Float16 { array, cast } => {
488 array.is_valid(idx).then(|| array.value(idx)).map(cast)
489 }
490 ColReader::Float32(array) => array
491 .is_valid(idx)
492 .then(|| array.value(idx))
493 .map(|x| Datum::Float32(OrderedFloat(x))),
494 ColReader::Float64(array) => array
495 .is_valid(idx)
496 .then(|| array.value(idx))
497 .map(|x| Datum::Float64(OrderedFloat(x))),
498 ColReader::Decimal128 {
499 array,
500 scale_factor,
501 precision,
502 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
503 let mut ctx = dec::Context::<Numeric>::default();
505 ctx.set_precision(*precision).expect("checked before");
506 let mut num = ctx.from_i128(x);
507
508 ctx.div(&mut num, scale_factor);
510
511 Datum::Numeric(OrderedDecimal(num))
512 }),
513 ColReader::Decimal256 {
514 array,
515 scale_factor,
516 precision,
517 } => array
518 .is_valid(idx)
519 .then(|| array.value(idx))
520 .map(|x| {
521 let s = x.to_string();
522
523 let mut ctx = dec::Context::<Numeric>::default();
527 ctx.set_precision(*precision).expect("checked before");
528 let mut num = ctx
529 .parse(s)
530 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
531
532 ctx.div(&mut num, scale_factor);
534
535 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
536 })
537 .transpose()?,
538 ColReader::Binary(array) => array
539 .is_valid(idx)
540 .then(|| array.value(idx))
541 .map(Datum::Bytes),
542 ColReader::LargeBinary(array) => array
543 .is_valid(idx)
544 .then(|| array.value(idx))
545 .map(Datum::Bytes),
546 ColReader::FixedSizeBinary(array) => array
547 .is_valid(idx)
548 .then(|| array.value(idx))
549 .map(Datum::Bytes),
550 ColReader::BinaryView(array) => array
551 .is_valid(idx)
552 .then(|| array.value(idx))
553 .map(Datum::Bytes),
554 ColReader::Uuid(reader) => {
555 let mut temp_row = SharedRow::get();
558 temp_row
559 .pack_with(|temp_packer| reader.read(idx, temp_packer))
560 .context("uuid")?;
561 let temp_row = temp_row.borrow();
562 let slice = match temp_row.unpack_first() {
563 Datum::Bytes(slice) => slice,
564 Datum::Null => {
565 packer.push(Datum::Null);
566 return Ok(());
567 }
568 other => anyhow::bail!("expected String, found {other:?}"),
569 };
570
571 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
572 Some(Datum::Uuid(uuid))
573 }
574 ColReader::String(array) => array
575 .is_valid(idx)
576 .then(|| array.value(idx))
577 .map(Datum::String),
578 ColReader::LargeString(array) => array
579 .is_valid(idx)
580 .then(|| array.value(idx))
581 .map(Datum::String),
582 ColReader::StringView(array) => array
583 .is_valid(idx)
584 .then(|| array.value(idx))
585 .map(Datum::String),
586 ColReader::Jsonb(reader) => {
587 let mut temp_row = SharedRow::get();
590 temp_row
591 .pack_with(|temp_packer| reader.read(idx, temp_packer))
592 .context("jsonb")?;
593 let temp_row = temp_row.borrow();
594 let value = match temp_row.unpack_first() {
595 Datum::String(value) => value,
596 Datum::Null => {
597 packer.push(Datum::Null);
598 return Ok(());
599 }
600 other => anyhow::bail!("expected String, found {other:?}"),
601 };
602
603 JsonbPacker::new(packer)
604 .pack_str(value)
605 .context("roundtrip json")?;
606
607 return Ok(());
609 }
610 ColReader::TimestampSecond(array) => array
611 .is_valid(idx)
612 .then(|| array.value(idx))
613 .map(|secs| {
614 let dt = DateTime::from_timestamp(secs, 0)
615 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
616 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
617 .context("TimestampSeconds")?;
618 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
619 })
620 .transpose()?,
621 ColReader::TimestampMillisecond(array) => array
622 .is_valid(idx)
623 .then(|| array.value(idx))
624 .map(|millis| {
625 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
626 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
627 })?;
628 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
629 .context("TimestampMillis")?;
630 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
631 })
632 .transpose()?,
633 ColReader::TimestampMicrosecond(array) => array
634 .is_valid(idx)
635 .then(|| array.value(idx))
636 .map(|micros| {
637 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
638 anyhow::anyhow!("invalid timestamp microseconds {micros}")
639 })?;
640 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
641 .context("TimestampMicros")?;
642 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
643 })
644 .transpose()?,
645 ColReader::TimestampNanosecond(array) => array
646 .is_valid(idx)
647 .then(|| array.value(idx))
648 .map(|nanos| {
649 let dt = DateTime::from_timestamp_nanos(nanos);
650 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
651 .context("TimestampNanos")?;
652 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
653 })
654 .transpose()?,
655 ColReader::Date32(array) => array
656 .is_valid(idx)
657 .then(|| array.value(idx))
658 .map(|unix_days| {
659 let date = Date::from_unix_epoch(unix_days).context("date32")?;
660 Ok::<_, anyhow::Error>(Datum::Date(date))
661 })
662 .transpose()?,
663 ColReader::Date64(array) => array
664 .is_valid(idx)
665 .then(|| array.value(idx))
666 .map(|unix_millis| {
667 let date = DateTime::from_timestamp_millis(unix_millis)
668 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
669 let unix_epoch = DateTime::from_timestamp(0, 0)
670 .expect("UNIX epoch")
671 .date_naive();
672 let delta = date.date_naive().signed_duration_since(unix_epoch);
673 let days: i32 = delta.num_days().try_into().context("date64")?;
674 let date = Date::from_unix_epoch(days).context("date64")?;
675 Ok::<_, anyhow::Error>(Datum::Date(date))
676 })
677 .transpose()?,
678 ColReader::Time32Seconds(array) => array
679 .is_valid(idx)
680 .then(|| array.value(idx))
681 .map(|secs| {
682 let usecs: u32 = secs.try_into().context("time32 seconds")?;
683 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
684 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
685 Ok::<_, anyhow::Error>(Datum::Time(time))
686 })
687 .transpose()?,
688 ColReader::Time32Milliseconds(array) => array
689 .is_valid(idx)
690 .then(|| array.value(idx))
691 .map(|millis| {
692 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
693 let usecs = umillis / 1000;
694 let unanos = (umillis % 1000).saturating_mul(1_000_000);
695 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
696 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
697 Ok::<_, anyhow::Error>(Datum::Time(time))
698 })
699 .transpose()?,
700 ColReader::List {
701 offsets,
702 values,
703 nulls,
704 } => {
705 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
706 if !is_valid {
707 packer.push(Datum::Null);
708 return Ok(());
709 }
710
711 let start: usize = offsets[idx].try_into().context("list start offset")?;
712 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
713
714 packer
715 .push_list_with(|packer| {
716 for idx in start..end {
717 values.read(idx, packer)?;
718 }
719 Ok::<_, anyhow::Error>(())
720 })
721 .context("pack list")?;
722
723 return Ok(());
725 }
726 ColReader::LargeList {
727 offsets,
728 values,
729 nulls,
730 } => {
731 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
732 if !is_valid {
733 packer.push(Datum::Null);
734 return Ok(());
735 }
736
737 let start: usize = offsets[idx].try_into().context("list start offset")?;
738 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
739
740 packer
741 .push_list_with(|packer| {
742 for idx in start..end {
743 values.read(idx, packer)?;
744 }
745 Ok::<_, anyhow::Error>(())
746 })
747 .context("pack list")?;
748
749 return Ok(());
751 }
752 ColReader::Record { fields, nulls } => {
753 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
754 if !is_valid {
755 packer.push(Datum::Null);
756 return Ok(());
757 }
758
759 packer
760 .push_list_with(|packer| {
761 for field in fields {
762 field.read(idx, packer)?;
763 }
764 Ok::<_, anyhow::Error>(())
765 })
766 .context("pack record")?;
767
768 return Ok(());
770 }
771 };
772
773 match datum {
774 Some(d) => packer.push(d),
775 None => packer.push(Datum::Null),
776 }
777
778 Ok(())
779 }
780}
781
782#[cfg(test)]
783mod tests {
784 use arrow::datatypes::Field;
785 use mz_ore::collections::CollectionExt;
786
787 use super::*;
788
789 #[mz_ore::test]
790 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
792 let desc = RelationDesc::builder()
793 .with_column("bool", ScalarType::Bool.nullable(true))
794 .with_column("int4", ScalarType::Int32.nullable(true))
795 .with_column("uint8", ScalarType::UInt64.nullable(true))
796 .with_column("float32", ScalarType::Float32.nullable(true))
797 .with_column("string", ScalarType::String.nullable(true))
798 .with_column("bytes", ScalarType::Bytes.nullable(true))
799 .with_column("uuid", ScalarType::Uuid.nullable(true))
800 .with_column("json", ScalarType::Jsonb.nullable(true))
801 .with_column(
802 "list",
803 ScalarType::List {
804 element_type: Box::new(ScalarType::UInt32),
805 custom_id: None,
806 }
807 .nullable(true),
808 )
809 .finish();
810
811 let mut og_row = Row::default();
812 let mut packer = og_row.packer();
813
814 packer.extend([
815 Datum::True,
816 Datum::Int32(42),
817 Datum::UInt64(10000),
818 Datum::Float32(OrderedFloat::from(-1.1f32)),
819 Datum::String("hello world"),
820 Datum::Bytes(b"1010101"),
821 Datum::Uuid(uuid::Uuid::new_v4()),
822 ]);
823 JsonbPacker::new(&mut packer)
824 .pack_serde_json(
825 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
826 )
827 .expect("failed to pack JSON");
828 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
829
830 let null_row = Row::pack(vec![Datum::Null; 9]);
831
832 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
834 builder.add_row(&og_row).unwrap();
835 builder.add_row(&null_row).unwrap();
836 let record_batch = builder.to_record_batch().unwrap();
837
838 let reader =
840 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
841 let mut rnd_row = Row::default();
842
843 reader.read(0, &mut rnd_row).unwrap();
844 assert_eq!(&og_row, &rnd_row);
845
846 rnd_row.packer();
848
849 reader.read(1, &mut rnd_row).unwrap();
850 assert_eq!(&null_row, &rnd_row);
851 }
852
853 #[mz_ore::test]
854 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
856 let desc = RelationDesc::builder()
857 .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
858 .finish();
859
860 let mut dec128 = arrow::array::Decimal128Builder::new();
861 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
862
863 dec128.append_value(1234);
865 dec128.append_null();
866 dec128.append_value(100000000009);
868
869 let dec128 = dec128.finish();
870 #[allow(clippy::as_conversions)]
871 let batch = StructArray::from(vec![(
872 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
873 Arc::new(dec128) as arrow::array::ArrayRef,
874 )]);
875
876 let reader = ArrowReader::new(&desc, batch).unwrap();
878 let mut rnd_row = Row::default();
879
880 reader.read(0, &mut rnd_row).unwrap();
881 let num = rnd_row.into_element().unwrap_numeric();
882 assert_eq!(num.0, Numeric::from(1.234f64));
883
884 rnd_row.packer();
886
887 reader.read(1, &mut rnd_row).unwrap();
888 let num = rnd_row.into_element();
889 assert_eq!(num, Datum::Null);
890
891 rnd_row.packer();
893
894 reader.read(2, &mut rnd_row).unwrap();
895 let num = rnd_row.into_element().unwrap_numeric();
896 assert_eq!(num.0, Numeric::from(100000000.009f64));
897 }
898
899 #[mz_ore::test]
900 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
902 let desc = RelationDesc::builder()
903 .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
904 .finish();
905
906 let mut dec256 = arrow::array::Decimal256Builder::new();
907 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
908
909 dec256.append_value(arrow::datatypes::i256::from(1234));
911 dec256.append_null();
912 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
914
915 let dec256 = dec256.finish();
916 #[allow(clippy::as_conversions)]
917 let batch = StructArray::from(vec![(
918 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
919 Arc::new(dec256) as arrow::array::ArrayRef,
920 )]);
921
922 let reader = ArrowReader::new(&desc, batch).unwrap();
924 let mut rnd_row = Row::default();
925
926 reader.read(0, &mut rnd_row).unwrap();
927 let num = rnd_row.into_element().unwrap_numeric();
928 assert_eq!(num.0, Numeric::from(1.234f64));
929
930 rnd_row.packer();
932
933 reader.read(1, &mut rnd_row).unwrap();
934 let num = rnd_row.into_element();
935 assert_eq!(num, Datum::Null);
936
937 rnd_row.packer();
939
940 reader.read(2, &mut rnd_row).unwrap();
941 let num = rnd_row.into_element().unwrap_numeric();
942 assert_eq!(num.0, Numeric::from(100000000.009f64));
943 }
944}