Skip to main content

mz_arrow_util/
reader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reader for [`arrow`] data that outputs [`Row`]s.
11
12use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16    Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18    Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19    IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20    MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::interval::Interval;
31use mz_repr::adt::jsonb::JsonbPacker;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::adt::timestamp::CheckedTimestamp;
34use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
35use ordered_float::OrderedFloat;
36use uuid::Uuid;
37
38use crate::mask_nulls;
39
40/// Type that can read out of an [`arrow::array::StructArray`] and into a [`Row`], given a
41/// [`RelationDesc`].
42///
43/// The inverse of a [`crate::builder::ArrowBuilder`].
44///
45/// Note: When creating an [`ArrowReader`] we perform a "one-time downcast" of the children Arrays
46/// from the [`StructArray`], into `enum ColReader`s. This is a much more verbose approach than the
47/// alternative of downcasting from a `dyn arrow::array::Array` every time we read a [`Row`], but
48/// it is _much_ more performant.
49pub struct ArrowReader {
50    len: usize,
51    readers: Vec<ColReader>,
52}
53
54impl ArrowReader {
55    /// Create an [`ArrowReader`] validating that the provided [`RelationDesc`] and [`StructArray`]
56    /// have a matching schema.
57    ///
58    /// The [`RelationDesc`] and [`StructArray`] need to uphold the following to be a valid pair:
59    ///
60    /// * Same number of columns.
61    /// * Columns of all the same name.
62    /// * Columns of compatible types.
63    ///
64    /// TODO(cf2): Relax some of these restrictions by allowing users to map column names, omit
65    /// columns, perform some lightweight casting, and matching not on column name but column
66    /// position.
67    /// TODO(cf2): Allow specifying an optional `arrow::Schema` for extra metadata.
68    pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
69        let inner_columns = array.columns();
70        let desc_columns = desc.typ().columns();
71
72        if inner_columns.len() != desc_columns.len() {
73            return Err(anyhow::anyhow!(
74                "wrong number of columns {} vs {}",
75                inner_columns.len(),
76                desc_columns.len()
77            ));
78        }
79
80        let mut readers = Vec::with_capacity(desc_columns.len());
81        for (col_name, col_type) in desc.iter() {
82            let column = array
83                .column_by_name(col_name)
84                .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
85            let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
86                .context(col_name.clone())?;
87
88            readers.push(reader);
89        }
90
91        Ok(ArrowReader {
92            len: array.len(),
93            readers,
94        })
95    }
96
97    /// Read the value at `idx` into the provided `Row`.
98    pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
99        let mut packer = row.packer();
100        for reader in &self.readers {
101            reader.read(idx, &mut packer).context(idx)?;
102        }
103        Ok(())
104    }
105
106    /// Read all of the values in this [`ArrowReader`] into `rows`.
107    pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
108        for idx in 0..self.len {
109            let mut row = Row::default();
110            self.read(idx, &mut row).context(idx)?;
111            rows.push(row);
112        }
113        Ok(self.len)
114    }
115}
116
117fn scalar_type_and_array_to_reader(
118    scalar_type: &SqlScalarType,
119    array: Arc<dyn Array>,
120) -> Result<ColReader, anyhow::Error> {
121    fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
122        array
123            .as_any()
124            .downcast_ref::<T>()
125            .expect("checked DataType")
126            .clone()
127    }
128
129    match (scalar_type, array.data_type()) {
130        (SqlScalarType::Bool, DataType::Boolean) => {
131            Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
132        }
133        (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
134            let array = downcast_array::<Int8Array>(array);
135            let cast: fn(i8) -> Datum<'static> = match scalar_type {
136                SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
137                SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
138                SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
139                _ => unreachable!("checked above"),
140            };
141            Ok(ColReader::Int8 { array, cast })
142        }
143        (SqlScalarType::Int16, DataType::Int16) => {
144            Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
145        }
146        (SqlScalarType::Int32, DataType::Int32) => {
147            Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
148        }
149        (SqlScalarType::Int64, DataType::Int64) => {
150            Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
151        }
152        (
153            SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
154            DataType::UInt8,
155        ) => {
156            let array = downcast_array::<UInt8Array>(array);
157            let cast: fn(u8) -> Datum<'static> = match scalar_type {
158                SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
159                SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
160                SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
161                _ => unreachable!("checked above"),
162            };
163            Ok(ColReader::UInt8 { array, cast })
164        }
165        (SqlScalarType::UInt16, DataType::UInt16) => {
166            Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
167        }
168        (SqlScalarType::UInt32, DataType::UInt32) => {
169            Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
170        }
171        (SqlScalarType::UInt64, DataType::UInt64) => {
172            Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
173        }
174        (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
175            let array = downcast_array::<Float16Array>(array);
176            let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
177                SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
178                SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
179                _ => unreachable!("checked above"),
180            };
181            Ok(ColReader::Float16 { array, cast })
182        }
183        (SqlScalarType::Float32, DataType::Float32) => {
184            Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
185        }
186        (SqlScalarType::Float64, DataType::Float64) => {
187            Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
188        }
189        // TODO(cf3): Consider the max_scale for numeric.
190        (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
191            use num_traits::Pow;
192
193            let base = Numeric::from(10);
194            let scale = Numeric::from(*scale);
195            let scale_factor = base.pow(scale);
196
197            let precision = usize::cast_from(*precision);
198            // Don't use the context here, but make sure the precision is valid.
199            let mut ctx = dec::Context::<Numeric>::default();
200            ctx.set_precision(precision).map_err(|e| {
201                anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
202            })?;
203
204            let array = downcast_array::<Decimal128Array>(array);
205
206            Ok(ColReader::Decimal128 {
207                array,
208                scale_factor,
209                precision,
210            })
211        }
212        // TODO(cf3): Consider the max_scale for numeric.
213        (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
214            use num_traits::Pow;
215
216            let base = Numeric::from(10);
217            let scale = Numeric::from(*scale);
218            let scale_factor = base.pow(scale);
219
220            let precision = usize::cast_from(*precision);
221            // Don't use the context here, but make sure the precision is valid.
222            let mut ctx = dec::Context::<Numeric>::default();
223            ctx.set_precision(precision).map_err(|e| {
224                anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
225            })?;
226
227            let array = downcast_array::<Decimal256Array>(array);
228
229            Ok(ColReader::Decimal256 {
230                array,
231                scale_factor,
232                precision,
233            })
234        }
235        (SqlScalarType::Bytes, DataType::Binary) => {
236            Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
237        }
238        (SqlScalarType::Bytes, DataType::LargeBinary) => {
239            let array = downcast_array::<LargeBinaryArray>(array);
240            Ok(ColReader::LargeBinary(array))
241        }
242        (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
243            let array = downcast_array::<FixedSizeBinaryArray>(array);
244            Ok(ColReader::FixedSizeBinary(array))
245        }
246        (SqlScalarType::Bytes, DataType::BinaryView) => {
247            let array = downcast_array::<BinaryViewArray>(array);
248            Ok(ColReader::BinaryView(array))
249        }
250        (
251            SqlScalarType::Uuid,
252            DataType::Binary
253            | DataType::BinaryView
254            | DataType::LargeBinary
255            | DataType::FixedSizeBinary(_),
256        ) => {
257            let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
258                .context("uuid reader")?;
259            Ok(ColReader::Uuid(Box::new(reader)))
260        }
261        (SqlScalarType::String, DataType::Utf8) => {
262            Ok(ColReader::String(downcast_array::<StringArray>(array)))
263        }
264        (SqlScalarType::String, DataType::LargeUtf8) => {
265            let array = downcast_array::<LargeStringArray>(array);
266            Ok(ColReader::LargeString(array))
267        }
268        (SqlScalarType::String, DataType::Utf8View) => {
269            let array = downcast_array::<StringViewArray>(array);
270            Ok(ColReader::StringView(array))
271        }
272        (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
273            let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
274                .context("json reader")?;
275            Ok(ColReader::Jsonb(Box::new(reader)))
276        }
277        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
278            let array = downcast_array::<TimestampSecondArray>(array);
279            Ok(ColReader::TimestampSecond(array))
280        }
281        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
282            let array = downcast_array::<TimestampMillisecondArray>(array);
283            Ok(ColReader::TimestampMillisecond(array))
284        }
285        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
286            let array = downcast_array::<TimestampMicrosecondArray>(array);
287            Ok(ColReader::TimestampMicrosecond(array))
288        }
289        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
290            let array = downcast_array::<TimestampNanosecondArray>(array);
291            Ok(ColReader::TimestampNanosecond(array))
292        }
293        (SqlScalarType::Date, DataType::Date32) => {
294            let array = downcast_array::<Date32Array>(array);
295            Ok(ColReader::Date32(array))
296        }
297        (SqlScalarType::Date, DataType::Date64) => {
298            let array = downcast_array::<Date64Array>(array);
299            Ok(ColReader::Date64(array))
300        }
301        (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
302            let array = downcast_array::<Time32SecondArray>(array);
303            Ok(ColReader::Time32Seconds(array))
304        }
305        (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
306            let array = downcast_array::<Time32MillisecondArray>(array);
307            Ok(ColReader::Time32Milliseconds(array))
308        }
309        (
310            SqlScalarType::List {
311                element_type,
312                custom_id: _,
313            },
314            DataType::List(_),
315        ) => {
316            let array = downcast_array::<ListArray>(array);
317            let inner_decoder =
318                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
319                    .context("list")?;
320            Ok(ColReader::List {
321                offsets: array.offsets().clone(),
322                values: Box::new(inner_decoder),
323                nulls: array.nulls().cloned(),
324            })
325        }
326        (
327            SqlScalarType::List {
328                element_type,
329                custom_id: _,
330            },
331            DataType::LargeList(_),
332        ) => {
333            let array = downcast_array::<LargeListArray>(array);
334            let inner_decoder =
335                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
336                    .context("large list")?;
337            Ok(ColReader::LargeList {
338                offsets: array.offsets().clone(),
339                values: Box::new(inner_decoder),
340                nulls: array.nulls().cloned(),
341            })
342        }
343        (
344            SqlScalarType::Record {
345                fields,
346                custom_id: _,
347            },
348            DataType::Struct(_),
349        ) => {
350            let record_array = downcast_array::<StructArray>(array);
351            let null_mask = record_array.nulls();
352
353            let mut decoders = Vec::with_capacity(fields.len());
354            for (name, typ) in fields.iter() {
355                let inner_array = record_array
356                    .column_by_name(name)
357                    .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
358                let inner_array = mask_nulls(inner_array, null_mask);
359                let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
360                    .context(name.clone())?;
361
362                decoders.push(Box::new(inner_decoder));
363            }
364
365            Ok(ColReader::Record {
366                fields: decoders,
367                nulls: null_mask.cloned(),
368            })
369        }
370        (
371            SqlScalarType::Map {
372                value_type,
373                custom_id: _,
374            },
375            DataType::Map(_, _),
376        ) => {
377            let map_array = downcast_array::<MapArray>(array);
378            let keys = map_array
379                .keys()
380                .as_any()
381                .downcast_ref::<StringArray>()
382                .expect("map keys should be Utf8 strings")
383                .clone();
384            let values_reader =
385                scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
386                    .context("map values")?;
387            Ok(ColReader::Map {
388                offsets: map_array.offsets().clone(),
389                keys,
390                values: Box::new(values_reader),
391                nulls: map_array.nulls().cloned(),
392            })
393        }
394        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
395            Ok(ColReader::IntervalYearMonth(downcast_array::<
396                IntervalYearMonthArray,
397            >(array)))
398        }
399        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
400            Ok(ColReader::IntervalDayTime(downcast_array::<
401                IntervalDayTimeArray,
402            >(array)))
403        }
404        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
405            Ok(ColReader::IntervalMonthDayNano(downcast_array::<
406                IntervalMonthDayNanoArray,
407            >(array)))
408        }
409        other => anyhow::bail!("unsupported: {other:?}"),
410    }
411}
412
413/// A "downcasted" version of [`arrow::array::Array`] that supports reading [`Datum`]s.
414///
415/// Note: While this is fairly verbose, one-time "downcasting" to an enum is _much_ more performant
416/// than downcasting every time we read a [`Datum`].
417enum ColReader {
418    Boolean(arrow::array::BooleanArray),
419
420    Int8 {
421        array: arrow::array::Int8Array,
422        cast: fn(i8) -> Datum<'static>,
423    },
424    Int16(arrow::array::Int16Array),
425    Int32(arrow::array::Int32Array),
426    Int64(arrow::array::Int64Array),
427
428    UInt8 {
429        array: arrow::array::UInt8Array,
430        cast: fn(u8) -> Datum<'static>,
431    },
432    UInt16(arrow::array::UInt16Array),
433    UInt32(arrow::array::UInt32Array),
434    UInt64(arrow::array::UInt64Array),
435
436    Float16 {
437        array: arrow::array::Float16Array,
438        cast: fn(half::f16) -> Datum<'static>,
439    },
440    Float32(arrow::array::Float32Array),
441    Float64(arrow::array::Float64Array),
442
443    Decimal128 {
444        array: Decimal128Array,
445        scale_factor: Numeric,
446        precision: usize,
447    },
448    Decimal256 {
449        array: Decimal256Array,
450        scale_factor: Numeric,
451        precision: usize,
452    },
453
454    Binary(arrow::array::BinaryArray),
455    LargeBinary(arrow::array::LargeBinaryArray),
456    FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
457    BinaryView(arrow::array::BinaryViewArray),
458    Uuid(Box<ColReader>),
459
460    String(arrow::array::StringArray),
461    LargeString(arrow::array::LargeStringArray),
462    StringView(arrow::array::StringViewArray),
463    Jsonb(Box<ColReader>),
464
465    TimestampSecond(arrow::array::TimestampSecondArray),
466    TimestampMillisecond(arrow::array::TimestampMillisecondArray),
467    TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
468    TimestampNanosecond(arrow::array::TimestampNanosecondArray),
469
470    Date32(Date32Array),
471    Date64(Date64Array),
472
473    Time32Seconds(Time32SecondArray),
474    Time32Milliseconds(arrow::array::Time32MillisecondArray),
475
476    List {
477        offsets: OffsetBuffer<i32>,
478        values: Box<ColReader>,
479        nulls: Option<NullBuffer>,
480    },
481    LargeList {
482        offsets: OffsetBuffer<i64>,
483        values: Box<ColReader>,
484        nulls: Option<NullBuffer>,
485    },
486
487    Record {
488        fields: Vec<Box<ColReader>>,
489        nulls: Option<NullBuffer>,
490    },
491
492    Map {
493        offsets: OffsetBuffer<i32>,
494        keys: StringArray,
495        values: Box<ColReader>,
496        nulls: Option<NullBuffer>,
497    },
498
499    IntervalYearMonth(IntervalYearMonthArray),
500    IntervalDayTime(IntervalDayTimeArray),
501    IntervalMonthDayNano(IntervalMonthDayNanoArray),
502}
503
504impl ColReader {
505    fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
506        let datum = match self {
507            ColReader::Boolean(array) => array
508                .is_valid(idx)
509                .then(|| array.value(idx))
510                .map(|x| if x { Datum::True } else { Datum::False }),
511            ColReader::Int8 { array, cast } => {
512                array.is_valid(idx).then(|| array.value(idx)).map(cast)
513            }
514            ColReader::Int16(array) => array
515                .is_valid(idx)
516                .then(|| array.value(idx))
517                .map(Datum::Int16),
518            ColReader::Int32(array) => array
519                .is_valid(idx)
520                .then(|| array.value(idx))
521                .map(Datum::Int32),
522            ColReader::Int64(array) => array
523                .is_valid(idx)
524                .then(|| array.value(idx))
525                .map(Datum::Int64),
526            ColReader::UInt8 { array, cast } => {
527                array.is_valid(idx).then(|| array.value(idx)).map(cast)
528            }
529            ColReader::UInt16(array) => array
530                .is_valid(idx)
531                .then(|| array.value(idx))
532                .map(Datum::UInt16),
533            ColReader::UInt32(array) => array
534                .is_valid(idx)
535                .then(|| array.value(idx))
536                .map(Datum::UInt32),
537            ColReader::UInt64(array) => array
538                .is_valid(idx)
539                .then(|| array.value(idx))
540                .map(Datum::UInt64),
541            ColReader::Float16 { array, cast } => {
542                array.is_valid(idx).then(|| array.value(idx)).map(cast)
543            }
544            ColReader::Float32(array) => array
545                .is_valid(idx)
546                .then(|| array.value(idx))
547                .map(|x| Datum::Float32(OrderedFloat(x))),
548            ColReader::Float64(array) => array
549                .is_valid(idx)
550                .then(|| array.value(idx))
551                .map(|x| Datum::Float64(OrderedFloat(x))),
552            ColReader::Decimal128 {
553                array,
554                scale_factor,
555                precision,
556            } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
557                // Create a Numeric from our i128 with precision.
558                let mut ctx = dec::Context::<Numeric>::default();
559                ctx.set_precision(*precision).expect("checked before");
560                let mut num = ctx.from_i128(x);
561
562                // Scale the number.
563                ctx.div(&mut num, scale_factor);
564
565                Datum::Numeric(OrderedDecimal(num))
566            }),
567            ColReader::Decimal256 {
568                array,
569                scale_factor,
570                precision,
571            } => array
572                .is_valid(idx)
573                .then(|| array.value(idx))
574                .map(|x| {
575                    let s = x.to_string();
576
577                    // Parse a i256 from it's String representation.
578                    //
579                    // TODO(cf3): See if we can add support for 256-bit numbers to the `dec` crate.
580                    let mut ctx = dec::Context::<Numeric>::default();
581                    ctx.set_precision(*precision).expect("checked before");
582                    let mut num = ctx
583                        .parse(s)
584                        .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
585
586                    // Scale the number.
587                    ctx.div(&mut num, scale_factor);
588
589                    Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
590                })
591                .transpose()?,
592            ColReader::Binary(array) => array
593                .is_valid(idx)
594                .then(|| array.value(idx))
595                .map(Datum::Bytes),
596            ColReader::LargeBinary(array) => array
597                .is_valid(idx)
598                .then(|| array.value(idx))
599                .map(Datum::Bytes),
600            ColReader::FixedSizeBinary(array) => array
601                .is_valid(idx)
602                .then(|| array.value(idx))
603                .map(Datum::Bytes),
604            ColReader::BinaryView(array) => array
605                .is_valid(idx)
606                .then(|| array.value(idx))
607                .map(Datum::Bytes),
608            ColReader::Uuid(reader) => {
609                // First read a binary value into a temp row, and later parse that as UUID into our
610                // actual Row Packer.
611                let mut temp_row = SharedRow::get();
612                reader.read(idx, &mut temp_row.packer()).context("uuid")?;
613                let slice = match temp_row.unpack_first() {
614                    Datum::Bytes(slice) => slice,
615                    Datum::Null => {
616                        packer.push(Datum::Null);
617                        return Ok(());
618                    }
619                    other => anyhow::bail!("expected String, found {other:?}"),
620                };
621
622                let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
623                Some(Datum::Uuid(uuid))
624            }
625            ColReader::String(array) => array
626                .is_valid(idx)
627                .then(|| array.value(idx))
628                .map(Datum::String),
629            ColReader::LargeString(array) => array
630                .is_valid(idx)
631                .then(|| array.value(idx))
632                .map(Datum::String),
633            ColReader::StringView(array) => array
634                .is_valid(idx)
635                .then(|| array.value(idx))
636                .map(Datum::String),
637            ColReader::Jsonb(reader) => {
638                // First read a string value into a temp row, and later parse that as JSON into our
639                // actual Row Packer.
640                let mut temp_row = SharedRow::get();
641                reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
642                let value = match temp_row.unpack_first() {
643                    Datum::String(value) => value,
644                    Datum::Null => {
645                        packer.push(Datum::Null);
646                        return Ok(());
647                    }
648                    other => anyhow::bail!("expected String, found {other:?}"),
649                };
650
651                JsonbPacker::new(packer)
652                    .pack_str(value)
653                    .context("roundtrip json")?;
654
655                // Return early because we've already packed the necessasry Datums.
656                return Ok(());
657            }
658            ColReader::TimestampSecond(array) => array
659                .is_valid(idx)
660                .then(|| array.value(idx))
661                .map(|secs| {
662                    let dt = DateTime::from_timestamp(secs, 0)
663                        .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
664                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
665                        .context("TimestampSeconds")?;
666                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
667                })
668                .transpose()?,
669            ColReader::TimestampMillisecond(array) => array
670                .is_valid(idx)
671                .then(|| array.value(idx))
672                .map(|millis| {
673                    let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
674                        anyhow::anyhow!("invalid timestamp milliseconds {millis}")
675                    })?;
676                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
677                        .context("TimestampMillis")?;
678                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
679                })
680                .transpose()?,
681            ColReader::TimestampMicrosecond(array) => array
682                .is_valid(idx)
683                .then(|| array.value(idx))
684                .map(|micros| {
685                    let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
686                        anyhow::anyhow!("invalid timestamp microseconds {micros}")
687                    })?;
688                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
689                        .context("TimestampMicros")?;
690                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
691                })
692                .transpose()?,
693            ColReader::TimestampNanosecond(array) => array
694                .is_valid(idx)
695                .then(|| array.value(idx))
696                .map(|nanos| {
697                    let dt = DateTime::from_timestamp_nanos(nanos);
698                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
699                        .context("TimestampNanos")?;
700                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
701                })
702                .transpose()?,
703            ColReader::Date32(array) => array
704                .is_valid(idx)
705                .then(|| array.value(idx))
706                .map(|unix_days| {
707                    let date = Date::from_unix_epoch(unix_days).context("date32")?;
708                    Ok::<_, anyhow::Error>(Datum::Date(date))
709                })
710                .transpose()?,
711            ColReader::Date64(array) => array
712                .is_valid(idx)
713                .then(|| array.value(idx))
714                .map(|unix_millis| {
715                    let date = DateTime::from_timestamp_millis(unix_millis)
716                        .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
717                    let unix_epoch = DateTime::from_timestamp(0, 0)
718                        .expect("UNIX epoch")
719                        .date_naive();
720                    let delta = date.date_naive().signed_duration_since(unix_epoch);
721                    let days: i32 = delta.num_days().try_into().context("date64")?;
722                    let date = Date::from_unix_epoch(days).context("date64")?;
723                    Ok::<_, anyhow::Error>(Datum::Date(date))
724                })
725                .transpose()?,
726            ColReader::Time32Seconds(array) => array
727                .is_valid(idx)
728                .then(|| array.value(idx))
729                .map(|secs| {
730                    let usecs: u32 = secs.try_into().context("time32 seconds")?;
731                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
732                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
733                    Ok::<_, anyhow::Error>(Datum::Time(time))
734                })
735                .transpose()?,
736            ColReader::Time32Milliseconds(array) => array
737                .is_valid(idx)
738                .then(|| array.value(idx))
739                .map(|millis| {
740                    let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
741                    let usecs = umillis / 1000;
742                    let unanos = (umillis % 1000).saturating_mul(1_000_000);
743                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
744                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
745                    Ok::<_, anyhow::Error>(Datum::Time(time))
746                })
747                .transpose()?,
748            ColReader::List {
749                offsets,
750                values,
751                nulls,
752            } => {
753                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
754                if !is_valid {
755                    packer.push(Datum::Null);
756                    return Ok(());
757                }
758
759                let start: usize = offsets[idx].try_into().context("list start offset")?;
760                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
761
762                packer
763                    .push_list_with(|packer| {
764                        for idx in start..end {
765                            values.read(idx, packer)?;
766                        }
767                        Ok::<_, anyhow::Error>(())
768                    })
769                    .context("pack list")?;
770
771                // Return early because we've already packed the necessasry Datums.
772                return Ok(());
773            }
774            ColReader::LargeList {
775                offsets,
776                values,
777                nulls,
778            } => {
779                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
780                if !is_valid {
781                    packer.push(Datum::Null);
782                    return Ok(());
783                }
784
785                let start: usize = offsets[idx].try_into().context("list start offset")?;
786                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
787
788                packer
789                    .push_list_with(|packer| {
790                        for idx in start..end {
791                            values.read(idx, packer)?;
792                        }
793                        Ok::<_, anyhow::Error>(())
794                    })
795                    .context("pack list")?;
796
797                // Return early because we've already packed the necessasry Datums.
798                return Ok(());
799            }
800            ColReader::Record { fields, nulls } => {
801                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
802                if !is_valid {
803                    packer.push(Datum::Null);
804                    return Ok(());
805                }
806
807                packer
808                    .push_list_with(|packer| {
809                        for field in fields {
810                            field.read(idx, packer)?;
811                        }
812                        Ok::<_, anyhow::Error>(())
813                    })
814                    .context("pack record")?;
815
816                // Return early because we've already packed the necessasry Datums.
817                return Ok(());
818            }
819            ColReader::Map {
820                offsets,
821                keys,
822                values,
823                nulls,
824            } => {
825                let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
826                if !is_non_null {
827                    packer.push(Datum::Null);
828                    return Ok(());
829                }
830
831                let start: usize = offsets[idx].try_into().context("map start offset")?;
832                let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
833
834                packer
835                    .push_dict_with(|packer| {
836                        for i in start..end {
837                            packer.push(Datum::String(keys.value(i)));
838                            values.read(i, packer)?;
839                        }
840                        Ok::<_, anyhow::Error>(())
841                    })
842                    .context("pack map")?;
843
844                // Return early because we've already packed the necessary Datums.
845                return Ok(());
846            }
847            ColReader::IntervalYearMonth(array) => array
848                .is_valid(idx)
849                .then(|| array.value(idx))
850                .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
851            ColReader::IntervalDayTime(array) => {
852                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
853                    let micros = i64::from(v.milliseconds) * 1_000;
854                    Datum::Interval(Interval::new(0, v.days, micros))
855                })
856            }
857            ColReader::IntervalMonthDayNano(array) => {
858                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
859                    let micros = v.nanoseconds / 1_000;
860                    Datum::Interval(Interval::new(v.months, v.days, micros))
861                })
862            }
863        };
864
865        match datum {
866            Some(d) => packer.push(d),
867            None => packer.push(Datum::Null),
868        }
869
870        Ok(())
871    }
872}
873
874#[cfg(test)]
875mod tests {
876    use arrow::datatypes::Field;
877    use mz_ore::collections::CollectionExt;
878
879    use super::*;
880
881    #[mz_ore::test]
882    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
883    fn smoketest_reader() {
884        let desc = RelationDesc::builder()
885            .with_column("bool", SqlScalarType::Bool.nullable(true))
886            .with_column("int4", SqlScalarType::Int32.nullable(true))
887            .with_column("uint8", SqlScalarType::UInt64.nullable(true))
888            .with_column("float32", SqlScalarType::Float32.nullable(true))
889            .with_column("string", SqlScalarType::String.nullable(true))
890            .with_column("bytes", SqlScalarType::Bytes.nullable(true))
891            .with_column("uuid", SqlScalarType::Uuid.nullable(true))
892            .with_column("json", SqlScalarType::Jsonb.nullable(true))
893            .with_column(
894                "list",
895                SqlScalarType::List {
896                    element_type: Box::new(SqlScalarType::UInt32),
897                    custom_id: None,
898                }
899                .nullable(true),
900            )
901            .finish();
902
903        let mut og_row = Row::default();
904        let mut packer = og_row.packer();
905
906        packer.extend([
907            Datum::True,
908            Datum::Int32(42),
909            Datum::UInt64(10000),
910            Datum::Float32(OrderedFloat::from(-1.1f32)),
911            Datum::String("hello world"),
912            Datum::Bytes(b"1010101"),
913            Datum::Uuid(uuid::Uuid::new_v4()),
914        ]);
915        JsonbPacker::new(&mut packer)
916            .pack_serde_json(
917                serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
918            )
919            .expect("failed to pack JSON");
920        packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
921
922        let null_row = Row::pack(vec![Datum::Null; 9]);
923
924        // Encode our data with our ArrowBuilder.
925        let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
926        builder.add_row(&og_row).unwrap();
927        builder.add_row(&null_row).unwrap();
928        let record_batch = builder.to_record_batch().unwrap();
929
930        // Decode our data!
931        let reader =
932            ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
933        let mut rnd_row = Row::default();
934
935        reader.read(0, &mut rnd_row).unwrap();
936        assert_eq!(&og_row, &rnd_row);
937
938        // Create a packer to clear the row alloc.
939        rnd_row.packer();
940
941        reader.read(1, &mut rnd_row).unwrap();
942        assert_eq!(&null_row, &rnd_row);
943    }
944
945    #[mz_ore::test]
946    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
947    fn smoketest_decimal128() {
948        let desc = RelationDesc::builder()
949            .with_column(
950                "a",
951                SqlScalarType::Numeric { max_scale: None }.nullable(true),
952            )
953            .finish();
954
955        let mut dec128 = arrow::array::Decimal128Builder::new();
956        dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
957
958        // 1.234
959        dec128.append_value(1234);
960        dec128.append_null();
961        // 100000000.009
962        dec128.append_value(100000000009);
963
964        let dec128 = dec128.finish();
965        #[allow(clippy::as_conversions)]
966        let batch = StructArray::from(vec![(
967            Arc::new(Field::new("a", dec128.data_type().clone(), true)),
968            Arc::new(dec128) as arrow::array::ArrayRef,
969        )]);
970
971        // Decode our data!
972        let reader = ArrowReader::new(&desc, batch).unwrap();
973        let mut rnd_row = Row::default();
974
975        reader.read(0, &mut rnd_row).unwrap();
976        let num = rnd_row.into_element().unwrap_numeric();
977        assert_eq!(num.0, Numeric::from(1.234f64));
978
979        // Create a packer to clear the row alloc.
980        rnd_row.packer();
981
982        reader.read(1, &mut rnd_row).unwrap();
983        let num = rnd_row.into_element();
984        assert_eq!(num, Datum::Null);
985
986        // Create a packer to clear the row alloc.
987        rnd_row.packer();
988
989        reader.read(2, &mut rnd_row).unwrap();
990        let num = rnd_row.into_element().unwrap_numeric();
991        assert_eq!(num.0, Numeric::from(100000000.009f64));
992    }
993
994    #[mz_ore::test]
995    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
996    fn smoketest_decimal256() {
997        let desc = RelationDesc::builder()
998            .with_column(
999                "a",
1000                SqlScalarType::Numeric { max_scale: None }.nullable(true),
1001            )
1002            .finish();
1003
1004        let mut dec256 = arrow::array::Decimal256Builder::new();
1005        dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1006
1007        // 1.234
1008        dec256.append_value(arrow::datatypes::i256::from(1234));
1009        dec256.append_null();
1010        // 100000000.009
1011        dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1012
1013        let dec256 = dec256.finish();
1014        #[allow(clippy::as_conversions)]
1015        let batch = StructArray::from(vec![(
1016            Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1017            Arc::new(dec256) as arrow::array::ArrayRef,
1018        )]);
1019
1020        // Decode our data!
1021        let reader = ArrowReader::new(&desc, batch).unwrap();
1022        let mut rnd_row = Row::default();
1023
1024        reader.read(0, &mut rnd_row).unwrap();
1025        let num = rnd_row.into_element().unwrap_numeric();
1026        assert_eq!(num.0, Numeric::from(1.234f64));
1027
1028        // Create a packer to clear the row alloc.
1029        rnd_row.packer();
1030
1031        reader.read(1, &mut rnd_row).unwrap();
1032        let num = rnd_row.into_element();
1033        assert_eq!(num, Datum::Null);
1034
1035        // Create a packer to clear the row alloc.
1036        rnd_row.packer();
1037
1038        reader.read(2, &mut rnd_row).unwrap();
1039        let num = rnd_row.into_element().unwrap_numeric();
1040        assert_eq!(num.0, Numeric::from(100000000.009f64));
1041    }
1042}