1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19 IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20 MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::interval::Interval;
31use mz_repr::adt::jsonb::JsonbPacker;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::adt::timestamp::CheckedTimestamp;
34use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
35use ordered_float::OrderedFloat;
36use uuid::Uuid;
37
38use crate::mask_nulls;
39
40pub struct ArrowReader {
50 len: usize,
51 readers: Vec<ColReader>,
52}
53
54impl ArrowReader {
55 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
69 let inner_columns = array.columns();
70 let desc_columns = desc.typ().columns();
71
72 if inner_columns.len() != desc_columns.len() {
73 return Err(anyhow::anyhow!(
74 "wrong number of columns {} vs {}",
75 inner_columns.len(),
76 desc_columns.len()
77 ));
78 }
79
80 let mut readers = Vec::with_capacity(desc_columns.len());
81 for (col_name, col_type) in desc.iter() {
82 let column = array
83 .column_by_name(col_name)
84 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
85 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
86 .context(col_name.clone())?;
87
88 readers.push(reader);
89 }
90
91 Ok(ArrowReader {
92 len: array.len(),
93 readers,
94 })
95 }
96
97 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
99 let mut packer = row.packer();
100 for reader in &self.readers {
101 reader.read(idx, &mut packer).context(idx)?;
102 }
103 Ok(())
104 }
105
106 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
108 for idx in 0..self.len {
109 let mut row = Row::default();
110 self.read(idx, &mut row).context(idx)?;
111 rows.push(row);
112 }
113 Ok(self.len)
114 }
115}
116
117fn scalar_type_and_array_to_reader(
118 scalar_type: &SqlScalarType,
119 array: Arc<dyn Array>,
120) -> Result<ColReader, anyhow::Error> {
121 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
122 array
123 .as_any()
124 .downcast_ref::<T>()
125 .expect("checked DataType")
126 .clone()
127 }
128
129 match (scalar_type, array.data_type()) {
130 (SqlScalarType::Bool, DataType::Boolean) => {
131 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
132 }
133 (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
134 let array = downcast_array::<Int8Array>(array);
135 let cast: fn(i8) -> Datum<'static> = match scalar_type {
136 SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
137 SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
138 SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
139 _ => unreachable!("checked above"),
140 };
141 Ok(ColReader::Int8 { array, cast })
142 }
143 (SqlScalarType::Int16, DataType::Int16) => {
144 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
145 }
146 (SqlScalarType::Int32, DataType::Int32) => {
147 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
148 }
149 (SqlScalarType::Int64, DataType::Int64) => {
150 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
151 }
152 (
153 SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
154 DataType::UInt8,
155 ) => {
156 let array = downcast_array::<UInt8Array>(array);
157 let cast: fn(u8) -> Datum<'static> = match scalar_type {
158 SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
159 SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
160 SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
161 _ => unreachable!("checked above"),
162 };
163 Ok(ColReader::UInt8 { array, cast })
164 }
165 (SqlScalarType::UInt16, DataType::UInt16) => {
166 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
167 }
168 (SqlScalarType::UInt32, DataType::UInt32) => {
169 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
170 }
171 (SqlScalarType::UInt64, DataType::UInt64) => {
172 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
173 }
174 (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
175 let array = downcast_array::<Float16Array>(array);
176 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
177 SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
178 SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
179 _ => unreachable!("checked above"),
180 };
181 Ok(ColReader::Float16 { array, cast })
182 }
183 (SqlScalarType::Float32, DataType::Float32) => {
184 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
185 }
186 (SqlScalarType::Float64, DataType::Float64) => {
187 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
188 }
189 (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
191 use num_traits::Pow;
192
193 let base = Numeric::from(10);
194 let scale = Numeric::from(*scale);
195 let scale_factor = base.pow(scale);
196
197 let precision = usize::cast_from(*precision);
198 let mut ctx = dec::Context::<Numeric>::default();
200 ctx.set_precision(precision).map_err(|e| {
201 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
202 })?;
203
204 let array = downcast_array::<Decimal128Array>(array);
205
206 Ok(ColReader::Decimal128 {
207 array,
208 scale_factor,
209 precision,
210 })
211 }
212 (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
214 use num_traits::Pow;
215
216 let base = Numeric::from(10);
217 let scale = Numeric::from(*scale);
218 let scale_factor = base.pow(scale);
219
220 let precision = usize::cast_from(*precision);
221 let mut ctx = dec::Context::<Numeric>::default();
223 ctx.set_precision(precision).map_err(|e| {
224 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
225 })?;
226
227 let array = downcast_array::<Decimal256Array>(array);
228
229 Ok(ColReader::Decimal256 {
230 array,
231 scale_factor,
232 precision,
233 })
234 }
235 (SqlScalarType::Bytes, DataType::Binary) => {
236 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
237 }
238 (SqlScalarType::Bytes, DataType::LargeBinary) => {
239 let array = downcast_array::<LargeBinaryArray>(array);
240 Ok(ColReader::LargeBinary(array))
241 }
242 (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
243 let array = downcast_array::<FixedSizeBinaryArray>(array);
244 Ok(ColReader::FixedSizeBinary(array))
245 }
246 (SqlScalarType::Bytes, DataType::BinaryView) => {
247 let array = downcast_array::<BinaryViewArray>(array);
248 Ok(ColReader::BinaryView(array))
249 }
250 (
251 SqlScalarType::Uuid,
252 DataType::Binary
253 | DataType::BinaryView
254 | DataType::LargeBinary
255 | DataType::FixedSizeBinary(_),
256 ) => {
257 let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
258 .context("uuid reader")?;
259 Ok(ColReader::Uuid(Box::new(reader)))
260 }
261 (SqlScalarType::String, DataType::Utf8) => {
262 Ok(ColReader::String(downcast_array::<StringArray>(array)))
263 }
264 (SqlScalarType::String, DataType::LargeUtf8) => {
265 let array = downcast_array::<LargeStringArray>(array);
266 Ok(ColReader::LargeString(array))
267 }
268 (SqlScalarType::String, DataType::Utf8View) => {
269 let array = downcast_array::<StringViewArray>(array);
270 Ok(ColReader::StringView(array))
271 }
272 (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
273 let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
274 .context("json reader")?;
275 Ok(ColReader::Jsonb(Box::new(reader)))
276 }
277 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
278 let array = downcast_array::<TimestampSecondArray>(array);
279 Ok(ColReader::TimestampSecond(array))
280 }
281 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
282 let array = downcast_array::<TimestampMillisecondArray>(array);
283 Ok(ColReader::TimestampMillisecond(array))
284 }
285 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
286 let array = downcast_array::<TimestampMicrosecondArray>(array);
287 Ok(ColReader::TimestampMicrosecond(array))
288 }
289 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
290 let array = downcast_array::<TimestampNanosecondArray>(array);
291 Ok(ColReader::TimestampNanosecond(array))
292 }
293 (SqlScalarType::Date, DataType::Date32) => {
294 let array = downcast_array::<Date32Array>(array);
295 Ok(ColReader::Date32(array))
296 }
297 (SqlScalarType::Date, DataType::Date64) => {
298 let array = downcast_array::<Date64Array>(array);
299 Ok(ColReader::Date64(array))
300 }
301 (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
302 let array = downcast_array::<Time32SecondArray>(array);
303 Ok(ColReader::Time32Seconds(array))
304 }
305 (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
306 let array = downcast_array::<Time32MillisecondArray>(array);
307 Ok(ColReader::Time32Milliseconds(array))
308 }
309 (
310 SqlScalarType::List {
311 element_type,
312 custom_id: _,
313 },
314 DataType::List(_),
315 ) => {
316 let array = downcast_array::<ListArray>(array);
317 let inner_decoder =
318 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
319 .context("list")?;
320 Ok(ColReader::List {
321 offsets: array.offsets().clone(),
322 values: Box::new(inner_decoder),
323 nulls: array.nulls().cloned(),
324 })
325 }
326 (
327 SqlScalarType::List {
328 element_type,
329 custom_id: _,
330 },
331 DataType::LargeList(_),
332 ) => {
333 let array = downcast_array::<LargeListArray>(array);
334 let inner_decoder =
335 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
336 .context("large list")?;
337 Ok(ColReader::LargeList {
338 offsets: array.offsets().clone(),
339 values: Box::new(inner_decoder),
340 nulls: array.nulls().cloned(),
341 })
342 }
343 (
344 SqlScalarType::Record {
345 fields,
346 custom_id: _,
347 },
348 DataType::Struct(_),
349 ) => {
350 let record_array = downcast_array::<StructArray>(array);
351 let null_mask = record_array.nulls();
352
353 let mut decoders = Vec::with_capacity(fields.len());
354 for (name, typ) in fields.iter() {
355 let inner_array = record_array
356 .column_by_name(name)
357 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
358 let inner_array = mask_nulls(inner_array, null_mask);
359 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
360 .context(name.clone())?;
361
362 decoders.push(Box::new(inner_decoder));
363 }
364
365 Ok(ColReader::Record {
366 fields: decoders,
367 nulls: null_mask.cloned(),
368 })
369 }
370 (
371 SqlScalarType::Map {
372 value_type,
373 custom_id: _,
374 },
375 DataType::Map(_, _),
376 ) => {
377 let map_array = downcast_array::<MapArray>(array);
378 let keys = map_array
379 .keys()
380 .as_any()
381 .downcast_ref::<StringArray>()
382 .expect("map keys should be Utf8 strings")
383 .clone();
384 let values_reader =
385 scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
386 .context("map values")?;
387 Ok(ColReader::Map {
388 offsets: map_array.offsets().clone(),
389 keys,
390 values: Box::new(values_reader),
391 nulls: map_array.nulls().cloned(),
392 })
393 }
394 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
395 Ok(ColReader::IntervalYearMonth(downcast_array::<
396 IntervalYearMonthArray,
397 >(array)))
398 }
399 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
400 Ok(ColReader::IntervalDayTime(downcast_array::<
401 IntervalDayTimeArray,
402 >(array)))
403 }
404 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
405 Ok(ColReader::IntervalMonthDayNano(downcast_array::<
406 IntervalMonthDayNanoArray,
407 >(array)))
408 }
409 other => anyhow::bail!("unsupported: {other:?}"),
410 }
411}
412
413enum ColReader {
418 Boolean(arrow::array::BooleanArray),
419
420 Int8 {
421 array: arrow::array::Int8Array,
422 cast: fn(i8) -> Datum<'static>,
423 },
424 Int16(arrow::array::Int16Array),
425 Int32(arrow::array::Int32Array),
426 Int64(arrow::array::Int64Array),
427
428 UInt8 {
429 array: arrow::array::UInt8Array,
430 cast: fn(u8) -> Datum<'static>,
431 },
432 UInt16(arrow::array::UInt16Array),
433 UInt32(arrow::array::UInt32Array),
434 UInt64(arrow::array::UInt64Array),
435
436 Float16 {
437 array: arrow::array::Float16Array,
438 cast: fn(half::f16) -> Datum<'static>,
439 },
440 Float32(arrow::array::Float32Array),
441 Float64(arrow::array::Float64Array),
442
443 Decimal128 {
444 array: Decimal128Array,
445 scale_factor: Numeric,
446 precision: usize,
447 },
448 Decimal256 {
449 array: Decimal256Array,
450 scale_factor: Numeric,
451 precision: usize,
452 },
453
454 Binary(arrow::array::BinaryArray),
455 LargeBinary(arrow::array::LargeBinaryArray),
456 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
457 BinaryView(arrow::array::BinaryViewArray),
458 Uuid(Box<ColReader>),
459
460 String(arrow::array::StringArray),
461 LargeString(arrow::array::LargeStringArray),
462 StringView(arrow::array::StringViewArray),
463 Jsonb(Box<ColReader>),
464
465 TimestampSecond(arrow::array::TimestampSecondArray),
466 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
467 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
468 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
469
470 Date32(Date32Array),
471 Date64(Date64Array),
472
473 Time32Seconds(Time32SecondArray),
474 Time32Milliseconds(arrow::array::Time32MillisecondArray),
475
476 List {
477 offsets: OffsetBuffer<i32>,
478 values: Box<ColReader>,
479 nulls: Option<NullBuffer>,
480 },
481 LargeList {
482 offsets: OffsetBuffer<i64>,
483 values: Box<ColReader>,
484 nulls: Option<NullBuffer>,
485 },
486
487 Record {
488 fields: Vec<Box<ColReader>>,
489 nulls: Option<NullBuffer>,
490 },
491
492 Map {
493 offsets: OffsetBuffer<i32>,
494 keys: StringArray,
495 values: Box<ColReader>,
496 nulls: Option<NullBuffer>,
497 },
498
499 IntervalYearMonth(IntervalYearMonthArray),
500 IntervalDayTime(IntervalDayTimeArray),
501 IntervalMonthDayNano(IntervalMonthDayNanoArray),
502}
503
504impl ColReader {
505 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
506 let datum = match self {
507 ColReader::Boolean(array) => array
508 .is_valid(idx)
509 .then(|| array.value(idx))
510 .map(|x| if x { Datum::True } else { Datum::False }),
511 ColReader::Int8 { array, cast } => {
512 array.is_valid(idx).then(|| array.value(idx)).map(cast)
513 }
514 ColReader::Int16(array) => array
515 .is_valid(idx)
516 .then(|| array.value(idx))
517 .map(Datum::Int16),
518 ColReader::Int32(array) => array
519 .is_valid(idx)
520 .then(|| array.value(idx))
521 .map(Datum::Int32),
522 ColReader::Int64(array) => array
523 .is_valid(idx)
524 .then(|| array.value(idx))
525 .map(Datum::Int64),
526 ColReader::UInt8 { array, cast } => {
527 array.is_valid(idx).then(|| array.value(idx)).map(cast)
528 }
529 ColReader::UInt16(array) => array
530 .is_valid(idx)
531 .then(|| array.value(idx))
532 .map(Datum::UInt16),
533 ColReader::UInt32(array) => array
534 .is_valid(idx)
535 .then(|| array.value(idx))
536 .map(Datum::UInt32),
537 ColReader::UInt64(array) => array
538 .is_valid(idx)
539 .then(|| array.value(idx))
540 .map(Datum::UInt64),
541 ColReader::Float16 { array, cast } => {
542 array.is_valid(idx).then(|| array.value(idx)).map(cast)
543 }
544 ColReader::Float32(array) => array
545 .is_valid(idx)
546 .then(|| array.value(idx))
547 .map(|x| Datum::Float32(OrderedFloat(x))),
548 ColReader::Float64(array) => array
549 .is_valid(idx)
550 .then(|| array.value(idx))
551 .map(|x| Datum::Float64(OrderedFloat(x))),
552 ColReader::Decimal128 {
553 array,
554 scale_factor,
555 precision,
556 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
557 let mut ctx = dec::Context::<Numeric>::default();
559 ctx.set_precision(*precision).expect("checked before");
560 let mut num = ctx.from_i128(x);
561
562 ctx.div(&mut num, scale_factor);
564
565 Datum::Numeric(OrderedDecimal(num))
566 }),
567 ColReader::Decimal256 {
568 array,
569 scale_factor,
570 precision,
571 } => array
572 .is_valid(idx)
573 .then(|| array.value(idx))
574 .map(|x| {
575 let s = x.to_string();
576
577 let mut ctx = dec::Context::<Numeric>::default();
581 ctx.set_precision(*precision).expect("checked before");
582 let mut num = ctx
583 .parse(s)
584 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
585
586 ctx.div(&mut num, scale_factor);
588
589 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
590 })
591 .transpose()?,
592 ColReader::Binary(array) => array
593 .is_valid(idx)
594 .then(|| array.value(idx))
595 .map(Datum::Bytes),
596 ColReader::LargeBinary(array) => array
597 .is_valid(idx)
598 .then(|| array.value(idx))
599 .map(Datum::Bytes),
600 ColReader::FixedSizeBinary(array) => array
601 .is_valid(idx)
602 .then(|| array.value(idx))
603 .map(Datum::Bytes),
604 ColReader::BinaryView(array) => array
605 .is_valid(idx)
606 .then(|| array.value(idx))
607 .map(Datum::Bytes),
608 ColReader::Uuid(reader) => {
609 let mut temp_row = SharedRow::get();
612 reader.read(idx, &mut temp_row.packer()).context("uuid")?;
613 let slice = match temp_row.unpack_first() {
614 Datum::Bytes(slice) => slice,
615 Datum::Null => {
616 packer.push(Datum::Null);
617 return Ok(());
618 }
619 other => anyhow::bail!("expected String, found {other:?}"),
620 };
621
622 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
623 Some(Datum::Uuid(uuid))
624 }
625 ColReader::String(array) => array
626 .is_valid(idx)
627 .then(|| array.value(idx))
628 .map(Datum::String),
629 ColReader::LargeString(array) => array
630 .is_valid(idx)
631 .then(|| array.value(idx))
632 .map(Datum::String),
633 ColReader::StringView(array) => array
634 .is_valid(idx)
635 .then(|| array.value(idx))
636 .map(Datum::String),
637 ColReader::Jsonb(reader) => {
638 let mut temp_row = SharedRow::get();
641 reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
642 let value = match temp_row.unpack_first() {
643 Datum::String(value) => value,
644 Datum::Null => {
645 packer.push(Datum::Null);
646 return Ok(());
647 }
648 other => anyhow::bail!("expected String, found {other:?}"),
649 };
650
651 JsonbPacker::new(packer)
652 .pack_str(value)
653 .context("roundtrip json")?;
654
655 return Ok(());
657 }
658 ColReader::TimestampSecond(array) => array
659 .is_valid(idx)
660 .then(|| array.value(idx))
661 .map(|secs| {
662 let dt = DateTime::from_timestamp(secs, 0)
663 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
664 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
665 .context("TimestampSeconds")?;
666 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
667 })
668 .transpose()?,
669 ColReader::TimestampMillisecond(array) => array
670 .is_valid(idx)
671 .then(|| array.value(idx))
672 .map(|millis| {
673 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
674 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
675 })?;
676 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
677 .context("TimestampMillis")?;
678 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
679 })
680 .transpose()?,
681 ColReader::TimestampMicrosecond(array) => array
682 .is_valid(idx)
683 .then(|| array.value(idx))
684 .map(|micros| {
685 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
686 anyhow::anyhow!("invalid timestamp microseconds {micros}")
687 })?;
688 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
689 .context("TimestampMicros")?;
690 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
691 })
692 .transpose()?,
693 ColReader::TimestampNanosecond(array) => array
694 .is_valid(idx)
695 .then(|| array.value(idx))
696 .map(|nanos| {
697 let dt = DateTime::from_timestamp_nanos(nanos);
698 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
699 .context("TimestampNanos")?;
700 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
701 })
702 .transpose()?,
703 ColReader::Date32(array) => array
704 .is_valid(idx)
705 .then(|| array.value(idx))
706 .map(|unix_days| {
707 let date = Date::from_unix_epoch(unix_days).context("date32")?;
708 Ok::<_, anyhow::Error>(Datum::Date(date))
709 })
710 .transpose()?,
711 ColReader::Date64(array) => array
712 .is_valid(idx)
713 .then(|| array.value(idx))
714 .map(|unix_millis| {
715 let date = DateTime::from_timestamp_millis(unix_millis)
716 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
717 let unix_epoch = DateTime::from_timestamp(0, 0)
718 .expect("UNIX epoch")
719 .date_naive();
720 let delta = date.date_naive().signed_duration_since(unix_epoch);
721 let days: i32 = delta.num_days().try_into().context("date64")?;
722 let date = Date::from_unix_epoch(days).context("date64")?;
723 Ok::<_, anyhow::Error>(Datum::Date(date))
724 })
725 .transpose()?,
726 ColReader::Time32Seconds(array) => array
727 .is_valid(idx)
728 .then(|| array.value(idx))
729 .map(|secs| {
730 let usecs: u32 = secs.try_into().context("time32 seconds")?;
731 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
732 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
733 Ok::<_, anyhow::Error>(Datum::Time(time))
734 })
735 .transpose()?,
736 ColReader::Time32Milliseconds(array) => array
737 .is_valid(idx)
738 .then(|| array.value(idx))
739 .map(|millis| {
740 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
741 let usecs = umillis / 1000;
742 let unanos = (umillis % 1000).saturating_mul(1_000_000);
743 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
744 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
745 Ok::<_, anyhow::Error>(Datum::Time(time))
746 })
747 .transpose()?,
748 ColReader::List {
749 offsets,
750 values,
751 nulls,
752 } => {
753 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
754 if !is_valid {
755 packer.push(Datum::Null);
756 return Ok(());
757 }
758
759 let start: usize = offsets[idx].try_into().context("list start offset")?;
760 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
761
762 packer
763 .push_list_with(|packer| {
764 for idx in start..end {
765 values.read(idx, packer)?;
766 }
767 Ok::<_, anyhow::Error>(())
768 })
769 .context("pack list")?;
770
771 return Ok(());
773 }
774 ColReader::LargeList {
775 offsets,
776 values,
777 nulls,
778 } => {
779 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
780 if !is_valid {
781 packer.push(Datum::Null);
782 return Ok(());
783 }
784
785 let start: usize = offsets[idx].try_into().context("list start offset")?;
786 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
787
788 packer
789 .push_list_with(|packer| {
790 for idx in start..end {
791 values.read(idx, packer)?;
792 }
793 Ok::<_, anyhow::Error>(())
794 })
795 .context("pack list")?;
796
797 return Ok(());
799 }
800 ColReader::Record { fields, nulls } => {
801 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
802 if !is_valid {
803 packer.push(Datum::Null);
804 return Ok(());
805 }
806
807 packer
808 .push_list_with(|packer| {
809 for field in fields {
810 field.read(idx, packer)?;
811 }
812 Ok::<_, anyhow::Error>(())
813 })
814 .context("pack record")?;
815
816 return Ok(());
818 }
819 ColReader::Map {
820 offsets,
821 keys,
822 values,
823 nulls,
824 } => {
825 let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
826 if !is_non_null {
827 packer.push(Datum::Null);
828 return Ok(());
829 }
830
831 let start: usize = offsets[idx].try_into().context("map start offset")?;
832 let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
833
834 packer
835 .push_dict_with(|packer| {
836 for i in start..end {
837 packer.push(Datum::String(keys.value(i)));
838 values.read(i, packer)?;
839 }
840 Ok::<_, anyhow::Error>(())
841 })
842 .context("pack map")?;
843
844 return Ok(());
846 }
847 ColReader::IntervalYearMonth(array) => array
848 .is_valid(idx)
849 .then(|| array.value(idx))
850 .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
851 ColReader::IntervalDayTime(array) => {
852 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
853 let micros = i64::from(v.milliseconds) * 1_000;
854 Datum::Interval(Interval::new(0, v.days, micros))
855 })
856 }
857 ColReader::IntervalMonthDayNano(array) => {
858 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
859 let micros = v.nanoseconds / 1_000;
860 Datum::Interval(Interval::new(v.months, v.days, micros))
861 })
862 }
863 };
864
865 match datum {
866 Some(d) => packer.push(d),
867 None => packer.push(Datum::Null),
868 }
869
870 Ok(())
871 }
872}
873
874#[cfg(test)]
875mod tests {
876 use arrow::datatypes::Field;
877 use mz_ore::collections::CollectionExt;
878
879 use super::*;
880
881 #[mz_ore::test]
882 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
884 let desc = RelationDesc::builder()
885 .with_column("bool", SqlScalarType::Bool.nullable(true))
886 .with_column("int4", SqlScalarType::Int32.nullable(true))
887 .with_column("uint8", SqlScalarType::UInt64.nullable(true))
888 .with_column("float32", SqlScalarType::Float32.nullable(true))
889 .with_column("string", SqlScalarType::String.nullable(true))
890 .with_column("bytes", SqlScalarType::Bytes.nullable(true))
891 .with_column("uuid", SqlScalarType::Uuid.nullable(true))
892 .with_column("json", SqlScalarType::Jsonb.nullable(true))
893 .with_column(
894 "list",
895 SqlScalarType::List {
896 element_type: Box::new(SqlScalarType::UInt32),
897 custom_id: None,
898 }
899 .nullable(true),
900 )
901 .finish();
902
903 let mut og_row = Row::default();
904 let mut packer = og_row.packer();
905
906 packer.extend([
907 Datum::True,
908 Datum::Int32(42),
909 Datum::UInt64(10000),
910 Datum::Float32(OrderedFloat::from(-1.1f32)),
911 Datum::String("hello world"),
912 Datum::Bytes(b"1010101"),
913 Datum::Uuid(uuid::Uuid::new_v4()),
914 ]);
915 JsonbPacker::new(&mut packer)
916 .pack_serde_json(
917 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
918 )
919 .expect("failed to pack JSON");
920 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
921
922 let null_row = Row::pack(vec![Datum::Null; 9]);
923
924 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
926 builder.add_row(&og_row).unwrap();
927 builder.add_row(&null_row).unwrap();
928 let record_batch = builder.to_record_batch().unwrap();
929
930 let reader =
932 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
933 let mut rnd_row = Row::default();
934
935 reader.read(0, &mut rnd_row).unwrap();
936 assert_eq!(&og_row, &rnd_row);
937
938 rnd_row.packer();
940
941 reader.read(1, &mut rnd_row).unwrap();
942 assert_eq!(&null_row, &rnd_row);
943 }
944
945 #[mz_ore::test]
946 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
948 let desc = RelationDesc::builder()
949 .with_column(
950 "a",
951 SqlScalarType::Numeric { max_scale: None }.nullable(true),
952 )
953 .finish();
954
955 let mut dec128 = arrow::array::Decimal128Builder::new();
956 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
957
958 dec128.append_value(1234);
960 dec128.append_null();
961 dec128.append_value(100000000009);
963
964 let dec128 = dec128.finish();
965 #[allow(clippy::as_conversions)]
966 let batch = StructArray::from(vec![(
967 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
968 Arc::new(dec128) as arrow::array::ArrayRef,
969 )]);
970
971 let reader = ArrowReader::new(&desc, batch).unwrap();
973 let mut rnd_row = Row::default();
974
975 reader.read(0, &mut rnd_row).unwrap();
976 let num = rnd_row.into_element().unwrap_numeric();
977 assert_eq!(num.0, Numeric::from(1.234f64));
978
979 rnd_row.packer();
981
982 reader.read(1, &mut rnd_row).unwrap();
983 let num = rnd_row.into_element();
984 assert_eq!(num, Datum::Null);
985
986 rnd_row.packer();
988
989 reader.read(2, &mut rnd_row).unwrap();
990 let num = rnd_row.into_element().unwrap_numeric();
991 assert_eq!(num.0, Numeric::from(100000000.009f64));
992 }
993
994 #[mz_ore::test]
995 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
997 let desc = RelationDesc::builder()
998 .with_column(
999 "a",
1000 SqlScalarType::Numeric { max_scale: None }.nullable(true),
1001 )
1002 .finish();
1003
1004 let mut dec256 = arrow::array::Decimal256Builder::new();
1005 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1006
1007 dec256.append_value(arrow::datatypes::i256::from(1234));
1009 dec256.append_null();
1010 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1012
1013 let dec256 = dec256.finish();
1014 #[allow(clippy::as_conversions)]
1015 let batch = StructArray::from(vec![(
1016 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1017 Arc::new(dec256) as arrow::array::ArrayRef,
1018 )]);
1019
1020 let reader = ArrowReader::new(&desc, batch).unwrap();
1022 let mut rnd_row = Row::default();
1023
1024 reader.read(0, &mut rnd_row).unwrap();
1025 let num = rnd_row.into_element().unwrap_numeric();
1026 assert_eq!(num.0, Numeric::from(1.234f64));
1027
1028 rnd_row.packer();
1030
1031 reader.read(1, &mut rnd_row).unwrap();
1032 let num = rnd_row.into_element();
1033 assert_eq!(num, Datum::Null);
1034
1035 rnd_row.packer();
1037
1038 reader.read(2, &mut rnd_row).unwrap();
1039 let num = rnd_row.into_element().unwrap_numeric();
1040 assert_eq!(num.0, Numeric::from(100000000.009f64));
1041 }
1042}