1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
19 ListArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray,
20 Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
22 UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::jsonb::JsonbPacker;
31use mz_repr::adt::numeric::Numeric;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
34use ordered_float::OrderedFloat;
35use uuid::Uuid;
36
37use crate::mask_nulls;
38
39pub struct ArrowReader {
49 len: usize,
50 readers: Vec<ColReader>,
51}
52
53impl ArrowReader {
54 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
68 let inner_columns = array.columns();
69 let desc_columns = desc.typ().columns();
70
71 if inner_columns.len() != desc_columns.len() {
72 return Err(anyhow::anyhow!(
73 "wrong number of columns {} vs {}",
74 inner_columns.len(),
75 desc_columns.len()
76 ));
77 }
78
79 let mut readers = Vec::with_capacity(desc_columns.len());
80 for (col_name, col_type) in desc.iter() {
81 let column = array
82 .column_by_name(col_name)
83 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
84 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
85 .context(col_name.clone())?;
86
87 readers.push(reader);
88 }
89
90 Ok(ArrowReader {
91 len: array.len(),
92 readers,
93 })
94 }
95
96 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
98 let mut packer = row.packer();
99 for reader in &self.readers {
100 reader.read(idx, &mut packer).context(idx)?;
101 }
102 Ok(())
103 }
104
105 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
107 for idx in 0..self.len {
108 let mut row = Row::default();
109 self.read(idx, &mut row).context(idx)?;
110 rows.push(row);
111 }
112 Ok(self.len)
113 }
114}
115
116fn scalar_type_and_array_to_reader(
117 scalar_type: &SqlScalarType,
118 array: Arc<dyn Array>,
119) -> Result<ColReader, anyhow::Error> {
120 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
121 array
122 .as_any()
123 .downcast_ref::<T>()
124 .expect("checked DataType")
125 .clone()
126 }
127
128 match (scalar_type, array.data_type()) {
129 (SqlScalarType::Bool, DataType::Boolean) => {
130 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
131 }
132 (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
133 let array = downcast_array::<Int8Array>(array);
134 let cast: fn(i8) -> Datum<'static> = match scalar_type {
135 SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
136 SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
137 SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
138 _ => unreachable!("checked above"),
139 };
140 Ok(ColReader::Int8 { array, cast })
141 }
142 (SqlScalarType::Int16, DataType::Int16) => {
143 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
144 }
145 (SqlScalarType::Int32, DataType::Int32) => {
146 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
147 }
148 (SqlScalarType::Int64, DataType::Int64) => {
149 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
150 }
151 (
152 SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
153 DataType::UInt8,
154 ) => {
155 let array = downcast_array::<UInt8Array>(array);
156 let cast: fn(u8) -> Datum<'static> = match scalar_type {
157 SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
158 SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
159 SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
160 _ => unreachable!("checked above"),
161 };
162 Ok(ColReader::UInt8 { array, cast })
163 }
164 (SqlScalarType::UInt16, DataType::UInt16) => {
165 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
166 }
167 (SqlScalarType::UInt32, DataType::UInt32) => {
168 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
169 }
170 (SqlScalarType::UInt64, DataType::UInt64) => {
171 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
172 }
173 (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
174 let array = downcast_array::<Float16Array>(array);
175 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
176 SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
177 SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
178 _ => unreachable!("checked above"),
179 };
180 Ok(ColReader::Float16 { array, cast })
181 }
182 (SqlScalarType::Float32, DataType::Float32) => {
183 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
184 }
185 (SqlScalarType::Float64, DataType::Float64) => {
186 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
187 }
188 (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
190 use num_traits::Pow;
191
192 let base = Numeric::from(10);
193 let scale = Numeric::from(*scale);
194 let scale_factor = base.pow(scale);
195
196 let precision = usize::cast_from(*precision);
197 let mut ctx = dec::Context::<Numeric>::default();
199 ctx.set_precision(precision).map_err(|e| {
200 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
201 })?;
202
203 let array = downcast_array::<Decimal128Array>(array);
204
205 Ok(ColReader::Decimal128 {
206 array,
207 scale_factor,
208 precision,
209 })
210 }
211 (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
213 use num_traits::Pow;
214
215 let base = Numeric::from(10);
216 let scale = Numeric::from(*scale);
217 let scale_factor = base.pow(scale);
218
219 let precision = usize::cast_from(*precision);
220 let mut ctx = dec::Context::<Numeric>::default();
222 ctx.set_precision(precision).map_err(|e| {
223 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
224 })?;
225
226 let array = downcast_array::<Decimal256Array>(array);
227
228 Ok(ColReader::Decimal256 {
229 array,
230 scale_factor,
231 precision,
232 })
233 }
234 (SqlScalarType::Bytes, DataType::Binary) => {
235 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
236 }
237 (SqlScalarType::Bytes, DataType::LargeBinary) => {
238 let array = downcast_array::<LargeBinaryArray>(array);
239 Ok(ColReader::LargeBinary(array))
240 }
241 (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
242 let array = downcast_array::<FixedSizeBinaryArray>(array);
243 Ok(ColReader::FixedSizeBinary(array))
244 }
245 (SqlScalarType::Bytes, DataType::BinaryView) => {
246 let array = downcast_array::<BinaryViewArray>(array);
247 Ok(ColReader::BinaryView(array))
248 }
249 (
250 SqlScalarType::Uuid,
251 DataType::Binary
252 | DataType::BinaryView
253 | DataType::LargeBinary
254 | DataType::FixedSizeBinary(_),
255 ) => {
256 let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
257 .context("uuid reader")?;
258 Ok(ColReader::Uuid(Box::new(reader)))
259 }
260 (SqlScalarType::String, DataType::Utf8) => {
261 Ok(ColReader::String(downcast_array::<StringArray>(array)))
262 }
263 (SqlScalarType::String, DataType::LargeUtf8) => {
264 let array = downcast_array::<LargeStringArray>(array);
265 Ok(ColReader::LargeString(array))
266 }
267 (SqlScalarType::String, DataType::Utf8View) => {
268 let array = downcast_array::<StringViewArray>(array);
269 Ok(ColReader::StringView(array))
270 }
271 (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
272 let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
273 .context("json reader")?;
274 Ok(ColReader::Jsonb(Box::new(reader)))
275 }
276 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
277 let array = downcast_array::<TimestampSecondArray>(array);
278 Ok(ColReader::TimestampSecond(array))
279 }
280 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
281 let array = downcast_array::<TimestampMillisecondArray>(array);
282 Ok(ColReader::TimestampMillisecond(array))
283 }
284 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
285 let array = downcast_array::<TimestampMicrosecondArray>(array);
286 Ok(ColReader::TimestampMicrosecond(array))
287 }
288 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
289 let array = downcast_array::<TimestampNanosecondArray>(array);
290 Ok(ColReader::TimestampNanosecond(array))
291 }
292 (SqlScalarType::Date, DataType::Date32) => {
293 let array = downcast_array::<Date32Array>(array);
294 Ok(ColReader::Date32(array))
295 }
296 (SqlScalarType::Date, DataType::Date64) => {
297 let array = downcast_array::<Date64Array>(array);
298 Ok(ColReader::Date64(array))
299 }
300 (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
301 let array = downcast_array::<Time32SecondArray>(array);
302 Ok(ColReader::Time32Seconds(array))
303 }
304 (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
305 let array = downcast_array::<Time32MillisecondArray>(array);
306 Ok(ColReader::Time32Milliseconds(array))
307 }
308 (
309 SqlScalarType::List {
310 element_type,
311 custom_id: _,
312 },
313 DataType::List(_),
314 ) => {
315 let array = downcast_array::<ListArray>(array);
316 let inner_decoder =
317 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
318 .context("list")?;
319 Ok(ColReader::List {
320 offsets: array.offsets().clone(),
321 values: Box::new(inner_decoder),
322 nulls: array.nulls().cloned(),
323 })
324 }
325 (
326 SqlScalarType::List {
327 element_type,
328 custom_id: _,
329 },
330 DataType::LargeList(_),
331 ) => {
332 let array = downcast_array::<LargeListArray>(array);
333 let inner_decoder =
334 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
335 .context("large list")?;
336 Ok(ColReader::LargeList {
337 offsets: array.offsets().clone(),
338 values: Box::new(inner_decoder),
339 nulls: array.nulls().cloned(),
340 })
341 }
342 (
343 SqlScalarType::Record {
344 fields,
345 custom_id: _,
346 },
347 DataType::Struct(_),
348 ) => {
349 let record_array = downcast_array::<StructArray>(array);
350 let null_mask = record_array.nulls();
351
352 let mut decoders = Vec::with_capacity(fields.len());
353 for (name, typ) in fields.iter() {
354 let inner_array = record_array
355 .column_by_name(name)
356 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
357 let inner_array = mask_nulls(inner_array, null_mask);
358 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
359 .context(name.clone())?;
360
361 decoders.push(Box::new(inner_decoder));
362 }
363
364 Ok(ColReader::Record {
365 fields: decoders,
366 nulls: null_mask.cloned(),
367 })
368 }
369 other => anyhow::bail!("unsupported: {other:?}"),
370 }
371}
372
373enum ColReader {
378 Boolean(arrow::array::BooleanArray),
379
380 Int8 {
381 array: arrow::array::Int8Array,
382 cast: fn(i8) -> Datum<'static>,
383 },
384 Int16(arrow::array::Int16Array),
385 Int32(arrow::array::Int32Array),
386 Int64(arrow::array::Int64Array),
387
388 UInt8 {
389 array: arrow::array::UInt8Array,
390 cast: fn(u8) -> Datum<'static>,
391 },
392 UInt16(arrow::array::UInt16Array),
393 UInt32(arrow::array::UInt32Array),
394 UInt64(arrow::array::UInt64Array),
395
396 Float16 {
397 array: arrow::array::Float16Array,
398 cast: fn(half::f16) -> Datum<'static>,
399 },
400 Float32(arrow::array::Float32Array),
401 Float64(arrow::array::Float64Array),
402
403 Decimal128 {
404 array: Decimal128Array,
405 scale_factor: Numeric,
406 precision: usize,
407 },
408 Decimal256 {
409 array: Decimal256Array,
410 scale_factor: Numeric,
411 precision: usize,
412 },
413
414 Binary(arrow::array::BinaryArray),
415 LargeBinary(arrow::array::LargeBinaryArray),
416 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
417 BinaryView(arrow::array::BinaryViewArray),
418 Uuid(Box<ColReader>),
419
420 String(arrow::array::StringArray),
421 LargeString(arrow::array::LargeStringArray),
422 StringView(arrow::array::StringViewArray),
423 Jsonb(Box<ColReader>),
424
425 TimestampSecond(arrow::array::TimestampSecondArray),
426 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
427 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
428 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
429
430 Date32(Date32Array),
431 Date64(Date64Array),
432
433 Time32Seconds(Time32SecondArray),
434 Time32Milliseconds(arrow::array::Time32MillisecondArray),
435
436 List {
437 offsets: OffsetBuffer<i32>,
438 values: Box<ColReader>,
439 nulls: Option<NullBuffer>,
440 },
441 LargeList {
442 offsets: OffsetBuffer<i64>,
443 values: Box<ColReader>,
444 nulls: Option<NullBuffer>,
445 },
446
447 Record {
448 fields: Vec<Box<ColReader>>,
449 nulls: Option<NullBuffer>,
450 },
451}
452
453impl ColReader {
454 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
455 let datum = match self {
456 ColReader::Boolean(array) => array
457 .is_valid(idx)
458 .then(|| array.value(idx))
459 .map(|x| if x { Datum::True } else { Datum::False }),
460 ColReader::Int8 { array, cast } => {
461 array.is_valid(idx).then(|| array.value(idx)).map(cast)
462 }
463 ColReader::Int16(array) => array
464 .is_valid(idx)
465 .then(|| array.value(idx))
466 .map(Datum::Int16),
467 ColReader::Int32(array) => array
468 .is_valid(idx)
469 .then(|| array.value(idx))
470 .map(Datum::Int32),
471 ColReader::Int64(array) => array
472 .is_valid(idx)
473 .then(|| array.value(idx))
474 .map(Datum::Int64),
475 ColReader::UInt8 { array, cast } => {
476 array.is_valid(idx).then(|| array.value(idx)).map(cast)
477 }
478 ColReader::UInt16(array) => array
479 .is_valid(idx)
480 .then(|| array.value(idx))
481 .map(Datum::UInt16),
482 ColReader::UInt32(array) => array
483 .is_valid(idx)
484 .then(|| array.value(idx))
485 .map(Datum::UInt32),
486 ColReader::UInt64(array) => array
487 .is_valid(idx)
488 .then(|| array.value(idx))
489 .map(Datum::UInt64),
490 ColReader::Float16 { array, cast } => {
491 array.is_valid(idx).then(|| array.value(idx)).map(cast)
492 }
493 ColReader::Float32(array) => array
494 .is_valid(idx)
495 .then(|| array.value(idx))
496 .map(|x| Datum::Float32(OrderedFloat(x))),
497 ColReader::Float64(array) => array
498 .is_valid(idx)
499 .then(|| array.value(idx))
500 .map(|x| Datum::Float64(OrderedFloat(x))),
501 ColReader::Decimal128 {
502 array,
503 scale_factor,
504 precision,
505 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
506 let mut ctx = dec::Context::<Numeric>::default();
508 ctx.set_precision(*precision).expect("checked before");
509 let mut num = ctx.from_i128(x);
510
511 ctx.div(&mut num, scale_factor);
513
514 Datum::Numeric(OrderedDecimal(num))
515 }),
516 ColReader::Decimal256 {
517 array,
518 scale_factor,
519 precision,
520 } => array
521 .is_valid(idx)
522 .then(|| array.value(idx))
523 .map(|x| {
524 let s = x.to_string();
525
526 let mut ctx = dec::Context::<Numeric>::default();
530 ctx.set_precision(*precision).expect("checked before");
531 let mut num = ctx
532 .parse(s)
533 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
534
535 ctx.div(&mut num, scale_factor);
537
538 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
539 })
540 .transpose()?,
541 ColReader::Binary(array) => array
542 .is_valid(idx)
543 .then(|| array.value(idx))
544 .map(Datum::Bytes),
545 ColReader::LargeBinary(array) => array
546 .is_valid(idx)
547 .then(|| array.value(idx))
548 .map(Datum::Bytes),
549 ColReader::FixedSizeBinary(array) => array
550 .is_valid(idx)
551 .then(|| array.value(idx))
552 .map(Datum::Bytes),
553 ColReader::BinaryView(array) => array
554 .is_valid(idx)
555 .then(|| array.value(idx))
556 .map(Datum::Bytes),
557 ColReader::Uuid(reader) => {
558 let mut temp_row = SharedRow::get();
561 reader.read(idx, &mut temp_row.packer()).context("uuid")?;
562 let slice = match temp_row.unpack_first() {
563 Datum::Bytes(slice) => slice,
564 Datum::Null => {
565 packer.push(Datum::Null);
566 return Ok(());
567 }
568 other => anyhow::bail!("expected String, found {other:?}"),
569 };
570
571 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
572 Some(Datum::Uuid(uuid))
573 }
574 ColReader::String(array) => array
575 .is_valid(idx)
576 .then(|| array.value(idx))
577 .map(Datum::String),
578 ColReader::LargeString(array) => array
579 .is_valid(idx)
580 .then(|| array.value(idx))
581 .map(Datum::String),
582 ColReader::StringView(array) => array
583 .is_valid(idx)
584 .then(|| array.value(idx))
585 .map(Datum::String),
586 ColReader::Jsonb(reader) => {
587 let mut temp_row = SharedRow::get();
590 reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
591 let value = match temp_row.unpack_first() {
592 Datum::String(value) => value,
593 Datum::Null => {
594 packer.push(Datum::Null);
595 return Ok(());
596 }
597 other => anyhow::bail!("expected String, found {other:?}"),
598 };
599
600 JsonbPacker::new(packer)
601 .pack_str(value)
602 .context("roundtrip json")?;
603
604 return Ok(());
606 }
607 ColReader::TimestampSecond(array) => array
608 .is_valid(idx)
609 .then(|| array.value(idx))
610 .map(|secs| {
611 let dt = DateTime::from_timestamp(secs, 0)
612 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
613 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
614 .context("TimestampSeconds")?;
615 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
616 })
617 .transpose()?,
618 ColReader::TimestampMillisecond(array) => array
619 .is_valid(idx)
620 .then(|| array.value(idx))
621 .map(|millis| {
622 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
623 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
624 })?;
625 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
626 .context("TimestampMillis")?;
627 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
628 })
629 .transpose()?,
630 ColReader::TimestampMicrosecond(array) => array
631 .is_valid(idx)
632 .then(|| array.value(idx))
633 .map(|micros| {
634 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
635 anyhow::anyhow!("invalid timestamp microseconds {micros}")
636 })?;
637 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
638 .context("TimestampMicros")?;
639 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
640 })
641 .transpose()?,
642 ColReader::TimestampNanosecond(array) => array
643 .is_valid(idx)
644 .then(|| array.value(idx))
645 .map(|nanos| {
646 let dt = DateTime::from_timestamp_nanos(nanos);
647 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
648 .context("TimestampNanos")?;
649 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
650 })
651 .transpose()?,
652 ColReader::Date32(array) => array
653 .is_valid(idx)
654 .then(|| array.value(idx))
655 .map(|unix_days| {
656 let date = Date::from_unix_epoch(unix_days).context("date32")?;
657 Ok::<_, anyhow::Error>(Datum::Date(date))
658 })
659 .transpose()?,
660 ColReader::Date64(array) => array
661 .is_valid(idx)
662 .then(|| array.value(idx))
663 .map(|unix_millis| {
664 let date = DateTime::from_timestamp_millis(unix_millis)
665 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
666 let unix_epoch = DateTime::from_timestamp(0, 0)
667 .expect("UNIX epoch")
668 .date_naive();
669 let delta = date.date_naive().signed_duration_since(unix_epoch);
670 let days: i32 = delta.num_days().try_into().context("date64")?;
671 let date = Date::from_unix_epoch(days).context("date64")?;
672 Ok::<_, anyhow::Error>(Datum::Date(date))
673 })
674 .transpose()?,
675 ColReader::Time32Seconds(array) => array
676 .is_valid(idx)
677 .then(|| array.value(idx))
678 .map(|secs| {
679 let usecs: u32 = secs.try_into().context("time32 seconds")?;
680 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
681 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
682 Ok::<_, anyhow::Error>(Datum::Time(time))
683 })
684 .transpose()?,
685 ColReader::Time32Milliseconds(array) => array
686 .is_valid(idx)
687 .then(|| array.value(idx))
688 .map(|millis| {
689 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
690 let usecs = umillis / 1000;
691 let unanos = (umillis % 1000).saturating_mul(1_000_000);
692 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
693 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
694 Ok::<_, anyhow::Error>(Datum::Time(time))
695 })
696 .transpose()?,
697 ColReader::List {
698 offsets,
699 values,
700 nulls,
701 } => {
702 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
703 if !is_valid {
704 packer.push(Datum::Null);
705 return Ok(());
706 }
707
708 let start: usize = offsets[idx].try_into().context("list start offset")?;
709 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
710
711 packer
712 .push_list_with(|packer| {
713 for idx in start..end {
714 values.read(idx, packer)?;
715 }
716 Ok::<_, anyhow::Error>(())
717 })
718 .context("pack list")?;
719
720 return Ok(());
722 }
723 ColReader::LargeList {
724 offsets,
725 values,
726 nulls,
727 } => {
728 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
729 if !is_valid {
730 packer.push(Datum::Null);
731 return Ok(());
732 }
733
734 let start: usize = offsets[idx].try_into().context("list start offset")?;
735 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
736
737 packer
738 .push_list_with(|packer| {
739 for idx in start..end {
740 values.read(idx, packer)?;
741 }
742 Ok::<_, anyhow::Error>(())
743 })
744 .context("pack list")?;
745
746 return Ok(());
748 }
749 ColReader::Record { fields, nulls } => {
750 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
751 if !is_valid {
752 packer.push(Datum::Null);
753 return Ok(());
754 }
755
756 packer
757 .push_list_with(|packer| {
758 for field in fields {
759 field.read(idx, packer)?;
760 }
761 Ok::<_, anyhow::Error>(())
762 })
763 .context("pack record")?;
764
765 return Ok(());
767 }
768 };
769
770 match datum {
771 Some(d) => packer.push(d),
772 None => packer.push(Datum::Null),
773 }
774
775 Ok(())
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use arrow::datatypes::Field;
782 use mz_ore::collections::CollectionExt;
783
784 use super::*;
785
786 #[mz_ore::test]
787 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
789 let desc = RelationDesc::builder()
790 .with_column("bool", SqlScalarType::Bool.nullable(true))
791 .with_column("int4", SqlScalarType::Int32.nullable(true))
792 .with_column("uint8", SqlScalarType::UInt64.nullable(true))
793 .with_column("float32", SqlScalarType::Float32.nullable(true))
794 .with_column("string", SqlScalarType::String.nullable(true))
795 .with_column("bytes", SqlScalarType::Bytes.nullable(true))
796 .with_column("uuid", SqlScalarType::Uuid.nullable(true))
797 .with_column("json", SqlScalarType::Jsonb.nullable(true))
798 .with_column(
799 "list",
800 SqlScalarType::List {
801 element_type: Box::new(SqlScalarType::UInt32),
802 custom_id: None,
803 }
804 .nullable(true),
805 )
806 .finish();
807
808 let mut og_row = Row::default();
809 let mut packer = og_row.packer();
810
811 packer.extend([
812 Datum::True,
813 Datum::Int32(42),
814 Datum::UInt64(10000),
815 Datum::Float32(OrderedFloat::from(-1.1f32)),
816 Datum::String("hello world"),
817 Datum::Bytes(b"1010101"),
818 Datum::Uuid(uuid::Uuid::new_v4()),
819 ]);
820 JsonbPacker::new(&mut packer)
821 .pack_serde_json(
822 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
823 )
824 .expect("failed to pack JSON");
825 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
826
827 let null_row = Row::pack(vec![Datum::Null; 9]);
828
829 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
831 builder.add_row(&og_row).unwrap();
832 builder.add_row(&null_row).unwrap();
833 let record_batch = builder.to_record_batch().unwrap();
834
835 let reader =
837 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
838 let mut rnd_row = Row::default();
839
840 reader.read(0, &mut rnd_row).unwrap();
841 assert_eq!(&og_row, &rnd_row);
842
843 rnd_row.packer();
845
846 reader.read(1, &mut rnd_row).unwrap();
847 assert_eq!(&null_row, &rnd_row);
848 }
849
850 #[mz_ore::test]
851 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
853 let desc = RelationDesc::builder()
854 .with_column(
855 "a",
856 SqlScalarType::Numeric { max_scale: None }.nullable(true),
857 )
858 .finish();
859
860 let mut dec128 = arrow::array::Decimal128Builder::new();
861 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
862
863 dec128.append_value(1234);
865 dec128.append_null();
866 dec128.append_value(100000000009);
868
869 let dec128 = dec128.finish();
870 #[allow(clippy::as_conversions)]
871 let batch = StructArray::from(vec![(
872 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
873 Arc::new(dec128) as arrow::array::ArrayRef,
874 )]);
875
876 let reader = ArrowReader::new(&desc, batch).unwrap();
878 let mut rnd_row = Row::default();
879
880 reader.read(0, &mut rnd_row).unwrap();
881 let num = rnd_row.into_element().unwrap_numeric();
882 assert_eq!(num.0, Numeric::from(1.234f64));
883
884 rnd_row.packer();
886
887 reader.read(1, &mut rnd_row).unwrap();
888 let num = rnd_row.into_element();
889 assert_eq!(num, Datum::Null);
890
891 rnd_row.packer();
893
894 reader.read(2, &mut rnd_row).unwrap();
895 let num = rnd_row.into_element().unwrap_numeric();
896 assert_eq!(num.0, Numeric::from(100000000.009f64));
897 }
898
899 #[mz_ore::test]
900 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
902 let desc = RelationDesc::builder()
903 .with_column(
904 "a",
905 SqlScalarType::Numeric { max_scale: None }.nullable(true),
906 )
907 .finish();
908
909 let mut dec256 = arrow::array::Decimal256Builder::new();
910 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
911
912 dec256.append_value(arrow::datatypes::i256::from(1234));
914 dec256.append_null();
915 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
917
918 let dec256 = dec256.finish();
919 #[allow(clippy::as_conversions)]
920 let batch = StructArray::from(vec![(
921 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
922 Arc::new(dec256) as arrow::array::ArrayRef,
923 )]);
924
925 let reader = ArrowReader::new(&desc, batch).unwrap();
927 let mut rnd_row = Row::default();
928
929 reader.read(0, &mut rnd_row).unwrap();
930 let num = rnd_row.into_element().unwrap_numeric();
931 assert_eq!(num.0, Numeric::from(1.234f64));
932
933 rnd_row.packer();
935
936 reader.read(1, &mut rnd_row).unwrap();
937 let num = rnd_row.into_element();
938 assert_eq!(num, Datum::Null);
939
940 rnd_row.packer();
942
943 reader.read(2, &mut rnd_row).unwrap();
944 let num = rnd_row.into_element().unwrap_numeric();
945 assert_eq!(num.0, Numeric::from(100000000.009f64));
946 }
947}