1use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16 Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17 Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18 Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19 IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20 MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use itertools::Itertools;
29use mz_ore::cast::CastFrom;
30use mz_repr::adt::date::Date;
31use mz_repr::adt::interval::Interval;
32use mz_repr::adt::jsonb::JsonbPacker;
33use mz_repr::adt::numeric::Numeric;
34use mz_repr::adt::timestamp::CheckedTimestamp;
35use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
36use ordered_float::OrderedFloat;
37use uuid::Uuid;
38
39use crate::mask_nulls;
40
41pub struct ArrowReader {
51 len: usize,
52 readers: Vec<ColReader>,
53}
54
55impl ArrowReader {
56 pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
70 let inner_columns = array.columns();
71 let desc_columns = desc.typ().columns();
72
73 if inner_columns.len() != desc_columns.len() {
74 return Err(anyhow::anyhow!(
75 "wrong number of columns {} vs {}",
76 inner_columns.len(),
77 desc_columns.len()
78 ));
79 }
80
81 let mut readers = Vec::with_capacity(desc_columns.len());
82 for (col_name, col_type) in desc.iter() {
83 let column = array
84 .column_by_name(col_name)
85 .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
86 let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
87 .context(col_name.clone())?;
88
89 readers.push(reader);
90 }
91
92 Ok(ArrowReader {
93 len: array.len(),
94 readers,
95 })
96 }
97
98 pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
100 let mut packer = row.packer();
101 for reader in &self.readers {
102 reader.read(idx, &mut packer).context(idx)?;
103 }
104 Ok(())
105 }
106
107 pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
109 for idx in 0..self.len {
110 let mut row = Row::default();
111 self.read(idx, &mut row).context(idx)?;
112 rows.push(row);
113 }
114 Ok(self.len)
115 }
116}
117
118fn scalar_type_and_array_to_reader(
119 scalar_type: &SqlScalarType,
120 array: Arc<dyn Array>,
121) -> Result<ColReader, anyhow::Error> {
122 fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
123 array
124 .as_any()
125 .downcast_ref::<T>()
126 .expect("checked DataType")
127 .clone()
128 }
129
130 match (scalar_type, array.data_type()) {
131 (SqlScalarType::Bool, DataType::Boolean) => {
132 Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
133 }
134 (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
135 let array = downcast_array::<Int8Array>(array);
136 let cast: fn(i8) -> Datum<'static> = match scalar_type {
137 SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
138 SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
139 SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
140 _ => unreachable!("checked above"),
141 };
142 Ok(ColReader::Int8 { array, cast })
143 }
144 (SqlScalarType::Int16, DataType::Int16) => {
145 Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
146 }
147 (SqlScalarType::Int32, DataType::Int32) => {
148 Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
149 }
150 (SqlScalarType::Int64, DataType::Int64) => {
151 Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
152 }
153 (
154 SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
155 DataType::UInt8,
156 ) => {
157 let array = downcast_array::<UInt8Array>(array);
158 let cast: fn(u8) -> Datum<'static> = match scalar_type {
159 SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
160 SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
161 SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
162 _ => unreachable!("checked above"),
163 };
164 Ok(ColReader::UInt8 { array, cast })
165 }
166 (SqlScalarType::UInt16, DataType::UInt16) => {
167 Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
168 }
169 (SqlScalarType::UInt32, DataType::UInt32) => {
170 Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
171 }
172 (SqlScalarType::UInt64, DataType::UInt64) => {
173 Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
174 }
175 (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
176 let array = downcast_array::<Float16Array>(array);
177 let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
178 SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
179 SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
180 _ => unreachable!("checked above"),
181 };
182 Ok(ColReader::Float16 { array, cast })
183 }
184 (SqlScalarType::Float32, DataType::Float32) => {
185 Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
186 }
187 (SqlScalarType::Float64, DataType::Float64) => {
188 Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
189 }
190 (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
192 use num_traits::Pow;
193
194 let base = Numeric::from(10);
195 let scale = Numeric::from(*scale);
196 let scale_factor = base.pow(scale);
197
198 let precision = usize::cast_from(*precision);
199 let mut ctx = dec::Context::<Numeric>::default();
201 ctx.set_precision(precision).map_err(|e| {
202 anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
203 })?;
204
205 let array = downcast_array::<Decimal128Array>(array);
206
207 Ok(ColReader::Decimal128 {
208 array,
209 scale_factor,
210 precision,
211 })
212 }
213 (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
215 use num_traits::Pow;
216
217 let base = Numeric::from(10);
218 let scale = Numeric::from(*scale);
219 let scale_factor = base.pow(scale);
220
221 let precision = usize::cast_from(*precision);
222 let mut ctx = dec::Context::<Numeric>::default();
224 ctx.set_precision(precision).map_err(|e| {
225 anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
226 })?;
227
228 let array = downcast_array::<Decimal256Array>(array);
229
230 Ok(ColReader::Decimal256 {
231 array,
232 scale_factor,
233 precision,
234 })
235 }
236 (SqlScalarType::Bytes, DataType::Binary) => {
237 Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
238 }
239 (SqlScalarType::Bytes, DataType::LargeBinary) => {
240 let array = downcast_array::<LargeBinaryArray>(array);
241 Ok(ColReader::LargeBinary(array))
242 }
243 (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
244 let array = downcast_array::<FixedSizeBinaryArray>(array);
245 Ok(ColReader::FixedSizeBinary(array))
246 }
247 (SqlScalarType::Bytes, DataType::BinaryView) => {
248 let array = downcast_array::<BinaryViewArray>(array);
249 Ok(ColReader::BinaryView(array))
250 }
251 (
252 SqlScalarType::Uuid,
253 DataType::Binary
254 | DataType::BinaryView
255 | DataType::LargeBinary
256 | DataType::FixedSizeBinary(_),
257 ) => {
258 let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
259 .context("uuid reader")?;
260 Ok(ColReader::Uuid(Box::new(reader)))
261 }
262 (SqlScalarType::String, DataType::Utf8) => {
263 Ok(ColReader::String(downcast_array::<StringArray>(array)))
264 }
265 (SqlScalarType::String, DataType::LargeUtf8) => {
266 let array = downcast_array::<LargeStringArray>(array);
267 Ok(ColReader::LargeString(array))
268 }
269 (SqlScalarType::String, DataType::Utf8View) => {
270 let array = downcast_array::<StringViewArray>(array);
271 Ok(ColReader::StringView(array))
272 }
273 (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
274 let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
275 .context("json reader")?;
276 Ok(ColReader::Jsonb(Box::new(reader)))
277 }
278 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
279 let array = downcast_array::<TimestampSecondArray>(array);
280 Ok(ColReader::TimestampSecond(array))
281 }
282 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
283 let array = downcast_array::<TimestampMillisecondArray>(array);
284 Ok(ColReader::TimestampMillisecond(array))
285 }
286 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
287 let array = downcast_array::<TimestampMicrosecondArray>(array);
288 Ok(ColReader::TimestampMicrosecond(array))
289 }
290 (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
291 let array = downcast_array::<TimestampNanosecondArray>(array);
292 Ok(ColReader::TimestampNanosecond(array))
293 }
294 (SqlScalarType::Date, DataType::Date32) => {
295 let array = downcast_array::<Date32Array>(array);
296 Ok(ColReader::Date32(array))
297 }
298 (SqlScalarType::Date, DataType::Date64) => {
299 let array = downcast_array::<Date64Array>(array);
300 Ok(ColReader::Date64(array))
301 }
302 (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
303 let array = downcast_array::<Time32SecondArray>(array);
304 Ok(ColReader::Time32Seconds(array))
305 }
306 (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
307 let array = downcast_array::<Time32MillisecondArray>(array);
308 Ok(ColReader::Time32Milliseconds(array))
309 }
310 (
311 SqlScalarType::List {
312 element_type,
313 custom_id: _,
314 },
315 DataType::List(_),
316 ) => {
317 let array = downcast_array::<ListArray>(array);
318 let inner_decoder =
319 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
320 .context("list")?;
321 Ok(ColReader::List {
322 offsets: array.offsets().clone(),
323 values: Box::new(inner_decoder),
324 nulls: array.nulls().cloned(),
325 })
326 }
327 (
328 SqlScalarType::List {
329 element_type,
330 custom_id: _,
331 },
332 DataType::LargeList(_),
333 ) => {
334 let array = downcast_array::<LargeListArray>(array);
335 let inner_decoder =
336 scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
337 .context("large list")?;
338 Ok(ColReader::LargeList {
339 offsets: array.offsets().clone(),
340 values: Box::new(inner_decoder),
341 nulls: array.nulls().cloned(),
342 })
343 }
344 (
345 SqlScalarType::Record {
346 fields,
347 custom_id: _,
348 },
349 DataType::Struct(_),
350 ) => {
351 let record_array = downcast_array::<StructArray>(array);
352 let null_mask = record_array.nulls();
353
354 let mut decoders = Vec::with_capacity(fields.len());
355 for (name, typ) in fields.iter() {
356 let inner_array = record_array
357 .column_by_name(name)
358 .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
359 let inner_array = mask_nulls(inner_array, null_mask);
360 let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
361 .context(name.clone())?;
362
363 decoders.push(Box::new(inner_decoder));
364 }
365
366 Ok(ColReader::Record {
367 fields: decoders,
368 nulls: null_mask.cloned(),
369 })
370 }
371 (
372 SqlScalarType::Map {
373 value_type,
374 custom_id: _,
375 },
376 DataType::Map(_, _),
377 ) => {
378 let map_array = downcast_array::<MapArray>(array);
379 let keys = map_array
380 .keys()
381 .as_any()
382 .downcast_ref::<StringArray>()
383 .expect("map keys should be Utf8 strings")
384 .clone();
385 let values_reader =
386 scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
387 .context("map values")?;
388 Ok(ColReader::Map {
389 offsets: map_array.offsets().clone(),
390 keys,
391 values: Box::new(values_reader),
392 nulls: map_array.nulls().cloned(),
393 })
394 }
395 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
396 Ok(ColReader::IntervalYearMonth(downcast_array::<
397 IntervalYearMonthArray,
398 >(array)))
399 }
400 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
401 Ok(ColReader::IntervalDayTime(downcast_array::<
402 IntervalDayTimeArray,
403 >(array)))
404 }
405 (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
406 Ok(ColReader::IntervalMonthDayNano(downcast_array::<
407 IntervalMonthDayNanoArray,
408 >(array)))
409 }
410 other => anyhow::bail!("unsupported: {other:?}"),
411 }
412}
413
414enum ColReader {
419 Boolean(arrow::array::BooleanArray),
420
421 Int8 {
422 array: arrow::array::Int8Array,
423 cast: fn(i8) -> Datum<'static>,
424 },
425 Int16(arrow::array::Int16Array),
426 Int32(arrow::array::Int32Array),
427 Int64(arrow::array::Int64Array),
428
429 UInt8 {
430 array: arrow::array::UInt8Array,
431 cast: fn(u8) -> Datum<'static>,
432 },
433 UInt16(arrow::array::UInt16Array),
434 UInt32(arrow::array::UInt32Array),
435 UInt64(arrow::array::UInt64Array),
436
437 Float16 {
438 array: arrow::array::Float16Array,
439 cast: fn(half::f16) -> Datum<'static>,
440 },
441 Float32(arrow::array::Float32Array),
442 Float64(arrow::array::Float64Array),
443
444 Decimal128 {
445 array: Decimal128Array,
446 scale_factor: Numeric,
447 precision: usize,
448 },
449 Decimal256 {
450 array: Decimal256Array,
451 scale_factor: Numeric,
452 precision: usize,
453 },
454
455 Binary(arrow::array::BinaryArray),
456 LargeBinary(arrow::array::LargeBinaryArray),
457 FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
458 BinaryView(arrow::array::BinaryViewArray),
459 Uuid(Box<ColReader>),
460
461 String(arrow::array::StringArray),
462 LargeString(arrow::array::LargeStringArray),
463 StringView(arrow::array::StringViewArray),
464 Jsonb(Box<ColReader>),
465
466 TimestampSecond(arrow::array::TimestampSecondArray),
467 TimestampMillisecond(arrow::array::TimestampMillisecondArray),
468 TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
469 TimestampNanosecond(arrow::array::TimestampNanosecondArray),
470
471 Date32(Date32Array),
472 Date64(Date64Array),
473
474 Time32Seconds(Time32SecondArray),
475 Time32Milliseconds(arrow::array::Time32MillisecondArray),
476
477 List {
478 offsets: OffsetBuffer<i32>,
479 values: Box<ColReader>,
480 nulls: Option<NullBuffer>,
481 },
482 LargeList {
483 offsets: OffsetBuffer<i64>,
484 values: Box<ColReader>,
485 nulls: Option<NullBuffer>,
486 },
487
488 Record {
489 fields: Vec<Box<ColReader>>,
490 nulls: Option<NullBuffer>,
491 },
492
493 Map {
494 offsets: OffsetBuffer<i32>,
495 keys: StringArray,
496 values: Box<ColReader>,
497 nulls: Option<NullBuffer>,
498 },
499
500 IntervalYearMonth(IntervalYearMonthArray),
501 IntervalDayTime(IntervalDayTimeArray),
502 IntervalMonthDayNano(IntervalMonthDayNanoArray),
503}
504
505impl ColReader {
506 fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
507 let datum = match self {
508 ColReader::Boolean(array) => array
509 .is_valid(idx)
510 .then(|| array.value(idx))
511 .map(|x| if x { Datum::True } else { Datum::False }),
512 ColReader::Int8 { array, cast } => {
513 array.is_valid(idx).then(|| array.value(idx)).map(cast)
514 }
515 ColReader::Int16(array) => array
516 .is_valid(idx)
517 .then(|| array.value(idx))
518 .map(Datum::Int16),
519 ColReader::Int32(array) => array
520 .is_valid(idx)
521 .then(|| array.value(idx))
522 .map(Datum::Int32),
523 ColReader::Int64(array) => array
524 .is_valid(idx)
525 .then(|| array.value(idx))
526 .map(Datum::Int64),
527 ColReader::UInt8 { array, cast } => {
528 array.is_valid(idx).then(|| array.value(idx)).map(cast)
529 }
530 ColReader::UInt16(array) => array
531 .is_valid(idx)
532 .then(|| array.value(idx))
533 .map(Datum::UInt16),
534 ColReader::UInt32(array) => array
535 .is_valid(idx)
536 .then(|| array.value(idx))
537 .map(Datum::UInt32),
538 ColReader::UInt64(array) => array
539 .is_valid(idx)
540 .then(|| array.value(idx))
541 .map(Datum::UInt64),
542 ColReader::Float16 { array, cast } => {
543 array.is_valid(idx).then(|| array.value(idx)).map(cast)
544 }
545 ColReader::Float32(array) => array
546 .is_valid(idx)
547 .then(|| array.value(idx))
548 .map(|x| Datum::Float32(OrderedFloat(x))),
549 ColReader::Float64(array) => array
550 .is_valid(idx)
551 .then(|| array.value(idx))
552 .map(|x| Datum::Float64(OrderedFloat(x))),
553 ColReader::Decimal128 {
554 array,
555 scale_factor,
556 precision,
557 } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
558 let mut ctx = dec::Context::<Numeric>::default();
560 ctx.set_precision(*precision).expect("checked before");
561 let mut num = ctx.from_i128(x);
562
563 ctx.div(&mut num, scale_factor);
565
566 Datum::Numeric(OrderedDecimal(num))
567 }),
568 ColReader::Decimal256 {
569 array,
570 scale_factor,
571 precision,
572 } => array
573 .is_valid(idx)
574 .then(|| array.value(idx))
575 .map(|x| {
576 let s = x.to_string();
577
578 let mut ctx = dec::Context::<Numeric>::default();
582 ctx.set_precision(*precision).expect("checked before");
583 let mut num = ctx
584 .parse(s)
585 .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
586
587 ctx.div(&mut num, scale_factor);
589
590 Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
591 })
592 .transpose()?,
593 ColReader::Binary(array) => array
594 .is_valid(idx)
595 .then(|| array.value(idx))
596 .map(Datum::Bytes),
597 ColReader::LargeBinary(array) => array
598 .is_valid(idx)
599 .then(|| array.value(idx))
600 .map(Datum::Bytes),
601 ColReader::FixedSizeBinary(array) => array
602 .is_valid(idx)
603 .then(|| array.value(idx))
604 .map(Datum::Bytes),
605 ColReader::BinaryView(array) => array
606 .is_valid(idx)
607 .then(|| array.value(idx))
608 .map(Datum::Bytes),
609 ColReader::Uuid(reader) => {
610 let mut temp_row = SharedRow::get();
613 reader.read(idx, &mut temp_row.packer()).context("uuid")?;
614 let slice = match temp_row.unpack_first() {
615 Datum::Bytes(slice) => slice,
616 Datum::Null => {
617 packer.push(Datum::Null);
618 return Ok(());
619 }
620 other => anyhow::bail!("expected String, found {other:?}"),
621 };
622
623 let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
624 Some(Datum::Uuid(uuid))
625 }
626 ColReader::String(array) => array
627 .is_valid(idx)
628 .then(|| array.value(idx))
629 .map(Datum::String),
630 ColReader::LargeString(array) => array
631 .is_valid(idx)
632 .then(|| array.value(idx))
633 .map(Datum::String),
634 ColReader::StringView(array) => array
635 .is_valid(idx)
636 .then(|| array.value(idx))
637 .map(Datum::String),
638 ColReader::Jsonb(reader) => {
639 let mut temp_row = SharedRow::get();
642 reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
643 let value = match temp_row.unpack_first() {
644 Datum::String(value) => value,
645 Datum::Null => {
646 packer.push(Datum::Null);
647 return Ok(());
648 }
649 other => anyhow::bail!("expected String, found {other:?}"),
650 };
651
652 JsonbPacker::new(packer)
653 .pack_str(value)
654 .context("roundtrip json")?;
655
656 return Ok(());
658 }
659 ColReader::TimestampSecond(array) => array
660 .is_valid(idx)
661 .then(|| array.value(idx))
662 .map(|secs| {
663 let dt = DateTime::from_timestamp(secs, 0)
664 .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
665 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
666 .context("TimestampSeconds")?;
667 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
668 })
669 .transpose()?,
670 ColReader::TimestampMillisecond(array) => array
671 .is_valid(idx)
672 .then(|| array.value(idx))
673 .map(|millis| {
674 let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
675 anyhow::anyhow!("invalid timestamp milliseconds {millis}")
676 })?;
677 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
678 .context("TimestampMillis")?;
679 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
680 })
681 .transpose()?,
682 ColReader::TimestampMicrosecond(array) => array
683 .is_valid(idx)
684 .then(|| array.value(idx))
685 .map(|micros| {
686 let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
687 anyhow::anyhow!("invalid timestamp microseconds {micros}")
688 })?;
689 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
690 .context("TimestampMicros")?;
691 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
692 })
693 .transpose()?,
694 ColReader::TimestampNanosecond(array) => array
695 .is_valid(idx)
696 .then(|| array.value(idx))
697 .map(|nanos| {
698 let dt = DateTime::from_timestamp_nanos(nanos);
699 let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
700 .context("TimestampNanos")?;
701 Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
702 })
703 .transpose()?,
704 ColReader::Date32(array) => array
705 .is_valid(idx)
706 .then(|| array.value(idx))
707 .map(|unix_days| {
708 let date = Date::from_unix_epoch(unix_days).context("date32")?;
709 Ok::<_, anyhow::Error>(Datum::Date(date))
710 })
711 .transpose()?,
712 ColReader::Date64(array) => array
713 .is_valid(idx)
714 .then(|| array.value(idx))
715 .map(|unix_millis| {
716 let date = DateTime::from_timestamp_millis(unix_millis)
717 .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
718 let unix_epoch = DateTime::from_timestamp(0, 0)
719 .expect("UNIX epoch")
720 .date_naive();
721 let delta = date.date_naive().signed_duration_since(unix_epoch);
722 let days: i32 = delta.num_days().try_into().context("date64")?;
723 let date = Date::from_unix_epoch(days).context("date64")?;
724 Ok::<_, anyhow::Error>(Datum::Date(date))
725 })
726 .transpose()?,
727 ColReader::Time32Seconds(array) => array
728 .is_valid(idx)
729 .then(|| array.value(idx))
730 .map(|secs| {
731 let usecs: u32 = secs.try_into().context("time32 seconds")?;
732 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
733 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
734 Ok::<_, anyhow::Error>(Datum::Time(time))
735 })
736 .transpose()?,
737 ColReader::Time32Milliseconds(array) => array
738 .is_valid(idx)
739 .then(|| array.value(idx))
740 .map(|millis| {
741 let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
742 let usecs = umillis / 1000;
743 let unanos = (umillis % 1000).saturating_mul(1_000_000);
744 let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
745 .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
746 Ok::<_, anyhow::Error>(Datum::Time(time))
747 })
748 .transpose()?,
749 ColReader::List {
750 offsets,
751 values,
752 nulls,
753 } => {
754 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
755 if !is_valid {
756 packer.push(Datum::Null);
757 return Ok(());
758 }
759
760 let start: usize = offsets[idx].try_into().context("list start offset")?;
761 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
762
763 packer
764 .push_list_with(|packer| {
765 for idx in start..end {
766 values.read(idx, packer)?;
767 }
768 Ok::<_, anyhow::Error>(())
769 })
770 .context("pack list")?;
771
772 return Ok(());
774 }
775 ColReader::LargeList {
776 offsets,
777 values,
778 nulls,
779 } => {
780 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
781 if !is_valid {
782 packer.push(Datum::Null);
783 return Ok(());
784 }
785
786 let start: usize = offsets[idx].try_into().context("list start offset")?;
787 let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
788
789 packer
790 .push_list_with(|packer| {
791 for idx in start..end {
792 values.read(idx, packer)?;
793 }
794 Ok::<_, anyhow::Error>(())
795 })
796 .context("pack list")?;
797
798 return Ok(());
800 }
801 ColReader::Record { fields, nulls } => {
802 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
803 if !is_valid {
804 packer.push(Datum::Null);
805 return Ok(());
806 }
807
808 packer
809 .push_list_with(|packer| {
810 for field in fields {
811 field.read(idx, packer)?;
812 }
813 Ok::<_, anyhow::Error>(())
814 })
815 .context("pack record")?;
816
817 return Ok(());
819 }
820 ColReader::Map {
821 offsets,
822 keys,
823 values,
824 nulls,
825 } => {
826 let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
827 if !is_non_null {
828 packer.push(Datum::Null);
829 return Ok(());
830 }
831
832 let start: usize = offsets[idx].try_into().context("map start offset")?;
833 let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
834
835 let mut kv_sorted = (start..end)
839 .map(|i| (keys.value(i), i))
840 .sorted_by_key(|(k, _)| *k)
841 .peekable();
842
843 packer
844 .push_dict_with(|packer| {
845 while let Some((key, i)) = kv_sorted.next() {
846 if let Some((next_key, _)) = kv_sorted.peek() {
852 if key == *next_key {
853 continue;
854 }
855 }
856 packer.push(Datum::String(key));
857 values.read(i, packer)?;
858 }
859 Ok::<_, anyhow::Error>(())
860 })
861 .context("pack map")?;
862
863 return Ok(());
865 }
866 ColReader::IntervalYearMonth(array) => array
867 .is_valid(idx)
868 .then(|| array.value(idx))
869 .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
870 ColReader::IntervalDayTime(array) => {
871 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
872 let micros = i64::from(v.milliseconds) * 1_000;
873 Datum::Interval(Interval::new(0, v.days, micros))
874 })
875 }
876 ColReader::IntervalMonthDayNano(array) => {
877 array.is_valid(idx).then(|| array.value(idx)).map(|v| {
878 let micros = v.nanoseconds / 1_000;
879 Datum::Interval(Interval::new(v.months, v.days, micros))
880 })
881 }
882 };
883
884 match datum {
885 Some(d) => packer.push(d),
886 None => packer.push(Datum::Null),
887 }
888
889 Ok(())
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use arrow::datatypes::Field;
896 use mz_ore::collections::CollectionExt;
897
898 use super::*;
899
900 #[mz_ore::test]
901 #[cfg_attr(miri, ignore)] fn smoketest_reader() {
903 let desc = RelationDesc::builder()
904 .with_column("bool", SqlScalarType::Bool.nullable(true))
905 .with_column("int4", SqlScalarType::Int32.nullable(true))
906 .with_column("uint8", SqlScalarType::UInt64.nullable(true))
907 .with_column("float32", SqlScalarType::Float32.nullable(true))
908 .with_column("string", SqlScalarType::String.nullable(true))
909 .with_column("bytes", SqlScalarType::Bytes.nullable(true))
910 .with_column("uuid", SqlScalarType::Uuid.nullable(true))
911 .with_column("json", SqlScalarType::Jsonb.nullable(true))
912 .with_column(
913 "list",
914 SqlScalarType::List {
915 element_type: Box::new(SqlScalarType::UInt32),
916 custom_id: None,
917 }
918 .nullable(true),
919 )
920 .finish();
921
922 let mut og_row = Row::default();
923 let mut packer = og_row.packer();
924
925 packer.extend([
926 Datum::True,
927 Datum::Int32(42),
928 Datum::UInt64(10000),
929 Datum::Float32(OrderedFloat::from(-1.1f32)),
930 Datum::String("hello world"),
931 Datum::Bytes(b"1010101"),
932 Datum::Uuid(uuid::Uuid::new_v4()),
933 ]);
934 JsonbPacker::new(&mut packer)
935 .pack_serde_json(
936 serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
937 )
938 .expect("failed to pack JSON");
939 packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
940
941 let null_row = Row::pack(vec![Datum::Null; 9]);
942
943 let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
945 builder.add_row(&og_row).unwrap();
946 builder.add_row(&null_row).unwrap();
947 let record_batch = builder.to_record_batch().unwrap();
948
949 let reader =
951 ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
952 let mut rnd_row = Row::default();
953
954 reader.read(0, &mut rnd_row).unwrap();
955 assert_eq!(&og_row, &rnd_row);
956
957 rnd_row.packer();
959
960 reader.read(1, &mut rnd_row).unwrap();
961 assert_eq!(&null_row, &rnd_row);
962 }
963
964 #[mz_ore::test]
965 #[cfg_attr(miri, ignore)] fn smoketest_decimal128() {
967 let desc = RelationDesc::builder()
968 .with_column(
969 "a",
970 SqlScalarType::Numeric { max_scale: None }.nullable(true),
971 )
972 .finish();
973
974 let mut dec128 = arrow::array::Decimal128Builder::new();
975 dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
976
977 dec128.append_value(1234);
979 dec128.append_null();
980 dec128.append_value(100000000009);
982
983 let dec128 = dec128.finish();
984 #[allow(clippy::as_conversions)]
985 let batch = StructArray::from(vec![(
986 Arc::new(Field::new("a", dec128.data_type().clone(), true)),
987 Arc::new(dec128) as arrow::array::ArrayRef,
988 )]);
989
990 let reader = ArrowReader::new(&desc, batch).unwrap();
992 let mut rnd_row = Row::default();
993
994 reader.read(0, &mut rnd_row).unwrap();
995 let num = rnd_row.into_element().unwrap_numeric();
996 assert_eq!(num.0, Numeric::from(1.234f64));
997
998 rnd_row.packer();
1000
1001 reader.read(1, &mut rnd_row).unwrap();
1002 let num = rnd_row.into_element();
1003 assert_eq!(num, Datum::Null);
1004
1005 rnd_row.packer();
1007
1008 reader.read(2, &mut rnd_row).unwrap();
1009 let num = rnd_row.into_element().unwrap_numeric();
1010 assert_eq!(num.0, Numeric::from(100000000.009f64));
1011 }
1012
1013 #[mz_ore::test]
1014 #[cfg_attr(miri, ignore)] fn smoketest_decimal256() {
1016 let desc = RelationDesc::builder()
1017 .with_column(
1018 "a",
1019 SqlScalarType::Numeric { max_scale: None }.nullable(true),
1020 )
1021 .finish();
1022
1023 let mut dec256 = arrow::array::Decimal256Builder::new();
1024 dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1025
1026 dec256.append_value(arrow::datatypes::i256::from(1234));
1028 dec256.append_null();
1029 dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1031
1032 let dec256 = dec256.finish();
1033 #[allow(clippy::as_conversions)]
1034 let batch = StructArray::from(vec![(
1035 Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1036 Arc::new(dec256) as arrow::array::ArrayRef,
1037 )]);
1038
1039 let reader = ArrowReader::new(&desc, batch).unwrap();
1041 let mut rnd_row = Row::default();
1042
1043 reader.read(0, &mut rnd_row).unwrap();
1044 let num = rnd_row.into_element().unwrap_numeric();
1045 assert_eq!(num.0, Numeric::from(1.234f64));
1046
1047 rnd_row.packer();
1049
1050 reader.read(1, &mut rnd_row).unwrap();
1051 let num = rnd_row.into_element();
1052 assert_eq!(num, Datum::Null);
1053
1054 rnd_row.packer();
1056
1057 reader.read(2, &mut rnd_row).unwrap();
1058 let num = rnd_row.into_element().unwrap_numeric();
1059 assert_eq!(num.0, Numeric::from(100000000.009f64));
1060 }
1061}