Skip to main content

mz_arrow_util/
reader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reader for [`arrow`] data that outputs [`Row`]s.
11
12use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16    Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18    Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19    IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20    MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use itertools::Itertools;
29use mz_ore::cast::CastFrom;
30use mz_repr::adt::date::Date;
31use mz_repr::adt::interval::Interval;
32use mz_repr::adt::jsonb::JsonbPacker;
33use mz_repr::adt::numeric::Numeric;
34use mz_repr::adt::range::{Range, RangeLowerBound, RangeUpperBound};
35use mz_repr::adt::timestamp::CheckedTimestamp;
36use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
37use ordered_float::OrderedFloat;
38use uuid::Uuid;
39
40use crate::mask_nulls;
41
42/// Type that can read out of an [`arrow::array::StructArray`] and into a [`Row`], given a
43/// [`RelationDesc`].
44///
45/// The inverse of a [`crate::builder::ArrowBuilder`].
46///
47/// Note: When creating an [`ArrowReader`] we perform a "one-time downcast" of the children Arrays
48/// from the [`StructArray`], into `enum ColReader`s. This is a much more verbose approach than the
49/// alternative of downcasting from a `dyn arrow::array::Array` every time we read a [`Row`], but
50/// it is _much_ more performant.
51pub struct ArrowReader {
52    len: usize,
53    readers: Vec<ColReader>,
54}
55
56impl ArrowReader {
57    /// Create an [`ArrowReader`] validating that the provided [`RelationDesc`] and [`StructArray`]
58    /// have a matching schema.
59    ///
60    /// The [`RelationDesc`] and [`StructArray`] need to uphold the following to be a valid pair:
61    ///
62    /// * Same number of columns.
63    /// * Columns of all the same name.
64    /// * Columns of compatible types.
65    ///
66    /// TODO(cf2): Relax some of these restrictions by allowing users to map column names, omit
67    /// columns, perform some lightweight casting, and matching not on column name but column
68    /// position.
69    /// TODO(cf2): Allow specifying an optional `arrow::Schema` for extra metadata.
70    pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
71        let inner_columns = array.columns();
72        let desc_columns = desc.typ().columns();
73
74        if inner_columns.len() != desc_columns.len() {
75            return Err(anyhow::anyhow!(
76                "wrong number of columns {} vs {}",
77                inner_columns.len(),
78                desc_columns.len()
79            ));
80        }
81
82        let mut readers = Vec::with_capacity(desc_columns.len());
83        for (col_name, col_type) in desc.iter() {
84            let column = array
85                .column_by_name(col_name)
86                .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
87            let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
88                .context(col_name.clone())?;
89
90            readers.push(reader);
91        }
92
93        Ok(ArrowReader {
94            len: array.len(),
95            readers,
96        })
97    }
98
99    /// Read the value at `idx` into the provided `Row`.
100    pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
101        let mut packer = row.packer();
102        for reader in &self.readers {
103            reader.read(idx, &mut packer).context(idx)?;
104        }
105        Ok(())
106    }
107
108    /// Read all of the values in this [`ArrowReader`] into `rows`.
109    pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
110        for idx in 0..self.len {
111            let mut row = Row::default();
112            self.read(idx, &mut row).context(idx)?;
113            rows.push(row);
114        }
115        Ok(self.len)
116    }
117}
118
119fn scalar_type_and_array_to_reader(
120    scalar_type: &SqlScalarType,
121    array: Arc<dyn Array>,
122) -> Result<ColReader, anyhow::Error> {
123    fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
124        array
125            .as_any()
126            .downcast_ref::<T>()
127            .expect("checked DataType")
128            .clone()
129    }
130
131    match (scalar_type, array.data_type()) {
132        (SqlScalarType::Bool, DataType::Boolean) => {
133            Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
134        }
135        (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
136            let array = downcast_array::<Int8Array>(array);
137            let cast: fn(i8) -> Datum<'static> = match scalar_type {
138                SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
139                SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
140                SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
141                _ => unreachable!("checked above"),
142            };
143            Ok(ColReader::Int8 { array, cast })
144        }
145        (SqlScalarType::Int16, DataType::Int16) => {
146            Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
147        }
148        (SqlScalarType::Int32, DataType::Int32) => {
149            Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
150        }
151        (SqlScalarType::Int64, DataType::Int64) => {
152            Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
153        }
154        (
155            SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
156            DataType::UInt8,
157        ) => {
158            let array = downcast_array::<UInt8Array>(array);
159            let cast: fn(u8) -> Datum<'static> = match scalar_type {
160                SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
161                SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
162                SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
163                _ => unreachable!("checked above"),
164            };
165            Ok(ColReader::UInt8 { array, cast })
166        }
167        (SqlScalarType::UInt16, DataType::UInt16) => {
168            Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
169        }
170        (SqlScalarType::UInt32, DataType::UInt32) => {
171            Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
172        }
173        (SqlScalarType::UInt64, DataType::UInt64) => {
174            Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
175        }
176        (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
177            let array = downcast_array::<Float16Array>(array);
178            let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
179                SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
180                SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
181                _ => unreachable!("checked above"),
182            };
183            Ok(ColReader::Float16 { array, cast })
184        }
185        (SqlScalarType::Float32, DataType::Float32) => {
186            Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
187        }
188        (SqlScalarType::Float64, DataType::Float64) => {
189            Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
190        }
191        // TODO(cf3): Consider the max_scale for numeric.
192        (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
193            use num_traits::Pow;
194
195            let base = Numeric::from(10);
196            let scale = Numeric::from(*scale);
197            let scale_factor = base.pow(scale);
198
199            let precision = usize::cast_from(*precision);
200            // Don't use the context here, but make sure the precision is valid.
201            let mut ctx = dec::Context::<Numeric>::default();
202            ctx.set_precision(precision).map_err(|e| {
203                anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
204            })?;
205
206            let array = downcast_array::<Decimal128Array>(array);
207
208            Ok(ColReader::Decimal128 {
209                array,
210                scale_factor,
211                precision,
212            })
213        }
214        // TODO(cf3): Consider the max_scale for numeric.
215        (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
216            use num_traits::Pow;
217
218            let base = Numeric::from(10);
219            let scale = Numeric::from(*scale);
220            let scale_factor = base.pow(scale);
221
222            let precision = usize::cast_from(*precision);
223            // Don't use the context here, but make sure the precision is valid.
224            let mut ctx = dec::Context::<Numeric>::default();
225            ctx.set_precision(precision).map_err(|e| {
226                anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
227            })?;
228
229            let array = downcast_array::<Decimal256Array>(array);
230
231            Ok(ColReader::Decimal256 {
232                array,
233                scale_factor,
234                precision,
235            })
236        }
237        (SqlScalarType::Bytes, DataType::Binary) => {
238            Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
239        }
240        (SqlScalarType::Bytes, DataType::LargeBinary) => {
241            let array = downcast_array::<LargeBinaryArray>(array);
242            Ok(ColReader::LargeBinary(array))
243        }
244        (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
245            let array = downcast_array::<FixedSizeBinaryArray>(array);
246            Ok(ColReader::FixedSizeBinary(array))
247        }
248        (SqlScalarType::Bytes, DataType::BinaryView) => {
249            let array = downcast_array::<BinaryViewArray>(array);
250            Ok(ColReader::BinaryView(array))
251        }
252        (
253            SqlScalarType::Uuid,
254            DataType::Binary
255            | DataType::BinaryView
256            | DataType::LargeBinary
257            | DataType::FixedSizeBinary(_),
258        ) => {
259            let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
260                .context("uuid reader")?;
261            Ok(ColReader::Uuid(Box::new(reader)))
262        }
263        (SqlScalarType::String, DataType::Utf8) => {
264            Ok(ColReader::String(downcast_array::<StringArray>(array)))
265        }
266        (SqlScalarType::String, DataType::LargeUtf8) => {
267            let array = downcast_array::<LargeStringArray>(array);
268            Ok(ColReader::LargeString(array))
269        }
270        (SqlScalarType::String, DataType::Utf8View) => {
271            let array = downcast_array::<StringViewArray>(array);
272            Ok(ColReader::StringView(array))
273        }
274        (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
275            let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
276                .context("json reader")?;
277            Ok(ColReader::Jsonb(Box::new(reader)))
278        }
279        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
280            let array = downcast_array::<TimestampSecondArray>(array);
281            Ok(ColReader::TimestampSecond(array))
282        }
283        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
284            let array = downcast_array::<TimestampMillisecondArray>(array);
285            Ok(ColReader::TimestampMillisecond(array))
286        }
287        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
288            let array = downcast_array::<TimestampMicrosecondArray>(array);
289            Ok(ColReader::TimestampMicrosecond(array))
290        }
291        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
292            let array = downcast_array::<TimestampNanosecondArray>(array);
293            Ok(ColReader::TimestampNanosecond(array))
294        }
295        (SqlScalarType::Date, DataType::Date32) => {
296            let array = downcast_array::<Date32Array>(array);
297            Ok(ColReader::Date32(array))
298        }
299        (SqlScalarType::Date, DataType::Date64) => {
300            let array = downcast_array::<Date64Array>(array);
301            Ok(ColReader::Date64(array))
302        }
303        (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
304            let array = downcast_array::<Time32SecondArray>(array);
305            Ok(ColReader::Time32Seconds(array))
306        }
307        (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
308            let array = downcast_array::<Time32MillisecondArray>(array);
309            Ok(ColReader::Time32Milliseconds(array))
310        }
311        (
312            SqlScalarType::List {
313                element_type,
314                custom_id: _,
315            },
316            DataType::List(_),
317        ) => {
318            let array = downcast_array::<ListArray>(array);
319            let inner_decoder =
320                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
321                    .context("list")?;
322            Ok(ColReader::List {
323                offsets: array.offsets().clone(),
324                values: Box::new(inner_decoder),
325                nulls: array.nulls().cloned(),
326            })
327        }
328        (
329            SqlScalarType::List {
330                element_type,
331                custom_id: _,
332            },
333            DataType::LargeList(_),
334        ) => {
335            let array = downcast_array::<LargeListArray>(array);
336            let inner_decoder =
337                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
338                    .context("large list")?;
339            Ok(ColReader::LargeList {
340                offsets: array.offsets().clone(),
341                values: Box::new(inner_decoder),
342                nulls: array.nulls().cloned(),
343            })
344        }
345        (
346            SqlScalarType::Record {
347                fields,
348                custom_id: _,
349            },
350            DataType::Struct(_),
351        ) => {
352            let record_array = downcast_array::<StructArray>(array);
353            let null_mask = record_array.nulls();
354
355            let mut decoders = Vec::with_capacity(fields.len());
356            for (name, typ) in fields.iter() {
357                let inner_array = record_array
358                    .column_by_name(name)
359                    .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
360                let inner_array = mask_nulls(inner_array, null_mask);
361                let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
362                    .context(name.clone())?;
363
364                decoders.push(Box::new(inner_decoder));
365            }
366
367            Ok(ColReader::Record {
368                fields: decoders,
369                nulls: null_mask.cloned(),
370            })
371        }
372        (
373            SqlScalarType::Map {
374                value_type,
375                custom_id: _,
376            },
377            DataType::Map(_, _),
378        ) => {
379            let map_array = downcast_array::<MapArray>(array);
380            let keys = map_array
381                .keys()
382                .as_any()
383                .downcast_ref::<StringArray>()
384                .expect("map keys should be Utf8 strings")
385                .clone();
386            let values_reader =
387                scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
388                    .context("map values")?;
389            Ok(ColReader::Map {
390                offsets: map_array.offsets().clone(),
391                keys,
392                values: Box::new(values_reader),
393                nulls: map_array.nulls().cloned(),
394            })
395        }
396        (SqlScalarType::Range { element_type }, DataType::Struct(_)) => {
397            let struct_array = downcast_array::<StructArray>(array);
398            let lower_array = struct_array
399                .column_by_name("lower")
400                .ok_or_else(|| anyhow::anyhow!("range struct missing 'lower' field"))?;
401            let upper_array = struct_array
402                .column_by_name("upper")
403                .ok_or_else(|| anyhow::anyhow!("range struct missing 'upper' field"))?;
404            let empty_array = struct_array
405                .column_by_name("empty")
406                .ok_or_else(|| anyhow::anyhow!("range struct missing 'empty' field"))?;
407            let lower_inclusive_array = struct_array
408                .column_by_name("lower_inclusive")
409                .ok_or_else(|| anyhow::anyhow!("range struct missing 'lower_inclusive' field"))?;
410            let upper_inclusive_array = struct_array
411                .column_by_name("upper_inclusive")
412                .ok_or_else(|| anyhow::anyhow!("range struct missing 'upper_inclusive' field"))?;
413
414            let lower_reader =
415                scalar_type_and_array_to_reader(element_type, Arc::clone(lower_array))
416                    .context("range lower")?;
417            let upper_reader =
418                scalar_type_and_array_to_reader(element_type, Arc::clone(upper_array))
419                    .context("range upper")?;
420
421            let empty = downcast_array::<BooleanArray>(Arc::clone(empty_array));
422            let lower_inclusive = downcast_array::<BooleanArray>(Arc::clone(lower_inclusive_array));
423            let upper_inclusive = downcast_array::<BooleanArray>(Arc::clone(upper_inclusive_array));
424
425            let lower_nulls = lower_array.nulls().cloned();
426            let upper_nulls = upper_array.nulls().cloned();
427
428            Ok(ColReader::Range {
429                lower: Box::new(lower_reader),
430                lower_nulls,
431                upper: Box::new(upper_reader),
432                upper_nulls,
433                lower_inclusive,
434                upper_inclusive,
435                empty,
436                nulls: struct_array.nulls().cloned(),
437            })
438        }
439        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
440            Ok(ColReader::IntervalYearMonth(downcast_array::<
441                IntervalYearMonthArray,
442            >(array)))
443        }
444        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
445            Ok(ColReader::IntervalDayTime(downcast_array::<
446                IntervalDayTimeArray,
447            >(array)))
448        }
449        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
450            Ok(ColReader::IntervalMonthDayNano(downcast_array::<
451                IntervalMonthDayNanoArray,
452            >(array)))
453        }
454        other => anyhow::bail!("unsupported: {other:?}"),
455    }
456}
457
458/// A "downcasted" version of [`arrow::array::Array`] that supports reading [`Datum`]s.
459///
460/// Note: While this is fairly verbose, one-time "downcasting" to an enum is _much_ more performant
461/// than downcasting every time we read a [`Datum`].
462enum ColReader {
463    Boolean(arrow::array::BooleanArray),
464
465    Int8 {
466        array: arrow::array::Int8Array,
467        cast: fn(i8) -> Datum<'static>,
468    },
469    Int16(arrow::array::Int16Array),
470    Int32(arrow::array::Int32Array),
471    Int64(arrow::array::Int64Array),
472
473    UInt8 {
474        array: arrow::array::UInt8Array,
475        cast: fn(u8) -> Datum<'static>,
476    },
477    UInt16(arrow::array::UInt16Array),
478    UInt32(arrow::array::UInt32Array),
479    UInt64(arrow::array::UInt64Array),
480
481    Float16 {
482        array: arrow::array::Float16Array,
483        cast: fn(half::f16) -> Datum<'static>,
484    },
485    Float32(arrow::array::Float32Array),
486    Float64(arrow::array::Float64Array),
487
488    Decimal128 {
489        array: Decimal128Array,
490        scale_factor: Numeric,
491        precision: usize,
492    },
493    Decimal256 {
494        array: Decimal256Array,
495        scale_factor: Numeric,
496        precision: usize,
497    },
498
499    Binary(arrow::array::BinaryArray),
500    LargeBinary(arrow::array::LargeBinaryArray),
501    FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
502    BinaryView(arrow::array::BinaryViewArray),
503    Uuid(Box<ColReader>),
504
505    String(arrow::array::StringArray),
506    LargeString(arrow::array::LargeStringArray),
507    StringView(arrow::array::StringViewArray),
508    Jsonb(Box<ColReader>),
509
510    TimestampSecond(arrow::array::TimestampSecondArray),
511    TimestampMillisecond(arrow::array::TimestampMillisecondArray),
512    TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
513    TimestampNanosecond(arrow::array::TimestampNanosecondArray),
514
515    Date32(Date32Array),
516    Date64(Date64Array),
517
518    Time32Seconds(Time32SecondArray),
519    Time32Milliseconds(arrow::array::Time32MillisecondArray),
520
521    List {
522        offsets: OffsetBuffer<i32>,
523        values: Box<ColReader>,
524        nulls: Option<NullBuffer>,
525    },
526    LargeList {
527        offsets: OffsetBuffer<i64>,
528        values: Box<ColReader>,
529        nulls: Option<NullBuffer>,
530    },
531
532    Record {
533        fields: Vec<Box<ColReader>>,
534        nulls: Option<NullBuffer>,
535    },
536
537    Map {
538        offsets: OffsetBuffer<i32>,
539        keys: StringArray,
540        values: Box<ColReader>,
541        nulls: Option<NullBuffer>,
542    },
543
544    Range {
545        lower: Box<ColReader>,
546        lower_nulls: Option<NullBuffer>,
547        upper: Box<ColReader>,
548        upper_nulls: Option<NullBuffer>,
549        lower_inclusive: BooleanArray,
550        upper_inclusive: BooleanArray,
551        empty: BooleanArray,
552        nulls: Option<NullBuffer>,
553    },
554
555    IntervalYearMonth(IntervalYearMonthArray),
556    IntervalDayTime(IntervalDayTimeArray),
557    IntervalMonthDayNano(IntervalMonthDayNanoArray),
558}
559
560impl ColReader {
561    fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
562        let datum = match self {
563            ColReader::Boolean(array) => array
564                .is_valid(idx)
565                .then(|| array.value(idx))
566                .map(|x| if x { Datum::True } else { Datum::False }),
567            ColReader::Int8 { array, cast } => {
568                array.is_valid(idx).then(|| array.value(idx)).map(cast)
569            }
570            ColReader::Int16(array) => array
571                .is_valid(idx)
572                .then(|| array.value(idx))
573                .map(Datum::Int16),
574            ColReader::Int32(array) => array
575                .is_valid(idx)
576                .then(|| array.value(idx))
577                .map(Datum::Int32),
578            ColReader::Int64(array) => array
579                .is_valid(idx)
580                .then(|| array.value(idx))
581                .map(Datum::Int64),
582            ColReader::UInt8 { array, cast } => {
583                array.is_valid(idx).then(|| array.value(idx)).map(cast)
584            }
585            ColReader::UInt16(array) => array
586                .is_valid(idx)
587                .then(|| array.value(idx))
588                .map(Datum::UInt16),
589            ColReader::UInt32(array) => array
590                .is_valid(idx)
591                .then(|| array.value(idx))
592                .map(Datum::UInt32),
593            ColReader::UInt64(array) => array
594                .is_valid(idx)
595                .then(|| array.value(idx))
596                .map(Datum::UInt64),
597            ColReader::Float16 { array, cast } => {
598                array.is_valid(idx).then(|| array.value(idx)).map(cast)
599            }
600            ColReader::Float32(array) => array
601                .is_valid(idx)
602                .then(|| array.value(idx))
603                .map(|x| Datum::Float32(OrderedFloat(x))),
604            ColReader::Float64(array) => array
605                .is_valid(idx)
606                .then(|| array.value(idx))
607                .map(|x| Datum::Float64(OrderedFloat(x))),
608            ColReader::Decimal128 {
609                array,
610                scale_factor,
611                precision,
612            } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
613                // Create a Numeric from our i128 with precision.
614                let mut ctx = dec::Context::<Numeric>::default();
615                ctx.set_precision(*precision).expect("checked before");
616                let mut num = ctx.from_i128(x);
617
618                // Scale the number.
619                ctx.div(&mut num, scale_factor);
620
621                Datum::Numeric(OrderedDecimal(num))
622            }),
623            ColReader::Decimal256 {
624                array,
625                scale_factor,
626                precision,
627            } => array
628                .is_valid(idx)
629                .then(|| array.value(idx))
630                .map(|x| {
631                    let s = x.to_string();
632
633                    // Parse a i256 from it's String representation.
634                    //
635                    // TODO(cf3): See if we can add support for 256-bit numbers to the `dec` crate.
636                    let mut ctx = dec::Context::<Numeric>::default();
637                    ctx.set_precision(*precision).expect("checked before");
638                    let mut num = ctx
639                        .parse(s)
640                        .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
641
642                    // Scale the number.
643                    ctx.div(&mut num, scale_factor);
644
645                    Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
646                })
647                .transpose()?,
648            ColReader::Binary(array) => array
649                .is_valid(idx)
650                .then(|| array.value(idx))
651                .map(Datum::Bytes),
652            ColReader::LargeBinary(array) => array
653                .is_valid(idx)
654                .then(|| array.value(idx))
655                .map(Datum::Bytes),
656            ColReader::FixedSizeBinary(array) => array
657                .is_valid(idx)
658                .then(|| array.value(idx))
659                .map(Datum::Bytes),
660            ColReader::BinaryView(array) => array
661                .is_valid(idx)
662                .then(|| array.value(idx))
663                .map(Datum::Bytes),
664            ColReader::Uuid(reader) => {
665                // First read a binary value into a temp row, and later parse that as UUID into our
666                // actual Row Packer.
667                let mut temp_row = SharedRow::get();
668                reader.read(idx, &mut temp_row.packer()).context("uuid")?;
669                let slice = match temp_row.unpack_first() {
670                    Datum::Bytes(slice) => slice,
671                    Datum::Null => {
672                        packer.push(Datum::Null);
673                        return Ok(());
674                    }
675                    other => anyhow::bail!("expected String, found {other:?}"),
676                };
677
678                let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
679                Some(Datum::Uuid(uuid))
680            }
681            ColReader::String(array) => array
682                .is_valid(idx)
683                .then(|| array.value(idx))
684                .map(Datum::String),
685            ColReader::LargeString(array) => array
686                .is_valid(idx)
687                .then(|| array.value(idx))
688                .map(Datum::String),
689            ColReader::StringView(array) => array
690                .is_valid(idx)
691                .then(|| array.value(idx))
692                .map(Datum::String),
693            ColReader::Jsonb(reader) => {
694                // First read a string value into a temp row, and later parse that as JSON into our
695                // actual Row Packer.
696                let mut temp_row = SharedRow::get();
697                reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
698                let value = match temp_row.unpack_first() {
699                    Datum::String(value) => value,
700                    Datum::Null => {
701                        packer.push(Datum::Null);
702                        return Ok(());
703                    }
704                    other => anyhow::bail!("expected String, found {other:?}"),
705                };
706
707                JsonbPacker::new(packer)
708                    .pack_str(value)
709                    .context("roundtrip json")?;
710
711                // Return early because we've already packed the necessasry Datums.
712                return Ok(());
713            }
714            ColReader::TimestampSecond(array) => array
715                .is_valid(idx)
716                .then(|| array.value(idx))
717                .map(|secs| {
718                    let dt = DateTime::from_timestamp(secs, 0)
719                        .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
720                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
721                        .context("TimestampSeconds")?;
722                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
723                })
724                .transpose()?,
725            ColReader::TimestampMillisecond(array) => array
726                .is_valid(idx)
727                .then(|| array.value(idx))
728                .map(|millis| {
729                    let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
730                        anyhow::anyhow!("invalid timestamp milliseconds {millis}")
731                    })?;
732                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
733                        .context("TimestampMillis")?;
734                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
735                })
736                .transpose()?,
737            ColReader::TimestampMicrosecond(array) => array
738                .is_valid(idx)
739                .then(|| array.value(idx))
740                .map(|micros| {
741                    let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
742                        anyhow::anyhow!("invalid timestamp microseconds {micros}")
743                    })?;
744                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
745                        .context("TimestampMicros")?;
746                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
747                })
748                .transpose()?,
749            ColReader::TimestampNanosecond(array) => array
750                .is_valid(idx)
751                .then(|| array.value(idx))
752                .map(|nanos| {
753                    let dt = DateTime::from_timestamp_nanos(nanos);
754                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
755                        .context("TimestampNanos")?;
756                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
757                })
758                .transpose()?,
759            ColReader::Date32(array) => array
760                .is_valid(idx)
761                .then(|| array.value(idx))
762                .map(|unix_days| {
763                    let date = Date::from_unix_epoch(unix_days).context("date32")?;
764                    Ok::<_, anyhow::Error>(Datum::Date(date))
765                })
766                .transpose()?,
767            ColReader::Date64(array) => array
768                .is_valid(idx)
769                .then(|| array.value(idx))
770                .map(|unix_millis| {
771                    let date = DateTime::from_timestamp_millis(unix_millis)
772                        .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
773                    let unix_epoch = DateTime::from_timestamp(0, 0)
774                        .expect("UNIX epoch")
775                        .date_naive();
776                    let delta = date.date_naive().signed_duration_since(unix_epoch);
777                    let days: i32 = delta.num_days().try_into().context("date64")?;
778                    let date = Date::from_unix_epoch(days).context("date64")?;
779                    Ok::<_, anyhow::Error>(Datum::Date(date))
780                })
781                .transpose()?,
782            ColReader::Time32Seconds(array) => array
783                .is_valid(idx)
784                .then(|| array.value(idx))
785                .map(|secs| {
786                    let usecs: u32 = secs.try_into().context("time32 seconds")?;
787                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
788                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
789                    Ok::<_, anyhow::Error>(Datum::Time(time))
790                })
791                .transpose()?,
792            ColReader::Time32Milliseconds(array) => array
793                .is_valid(idx)
794                .then(|| array.value(idx))
795                .map(|millis| {
796                    let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
797                    let usecs = umillis / 1000;
798                    let unanos = (umillis % 1000).saturating_mul(1_000_000);
799                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
800                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
801                    Ok::<_, anyhow::Error>(Datum::Time(time))
802                })
803                .transpose()?,
804            ColReader::List {
805                offsets,
806                values,
807                nulls,
808            } => {
809                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
810                if !is_valid {
811                    packer.push(Datum::Null);
812                    return Ok(());
813                }
814
815                let start: usize = offsets[idx].try_into().context("list start offset")?;
816                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
817
818                packer
819                    .push_list_with(|packer| {
820                        for idx in start..end {
821                            values.read(idx, packer)?;
822                        }
823                        Ok::<_, anyhow::Error>(())
824                    })
825                    .context("pack list")?;
826
827                // Return early because we've already packed the necessasry Datums.
828                return Ok(());
829            }
830            ColReader::LargeList {
831                offsets,
832                values,
833                nulls,
834            } => {
835                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
836                if !is_valid {
837                    packer.push(Datum::Null);
838                    return Ok(());
839                }
840
841                let start: usize = offsets[idx].try_into().context("list start offset")?;
842                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
843
844                packer
845                    .push_list_with(|packer| {
846                        for idx in start..end {
847                            values.read(idx, packer)?;
848                        }
849                        Ok::<_, anyhow::Error>(())
850                    })
851                    .context("pack list")?;
852
853                // Return early because we've already packed the necessasry Datums.
854                return Ok(());
855            }
856            ColReader::Record { fields, nulls } => {
857                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
858                if !is_valid {
859                    packer.push(Datum::Null);
860                    return Ok(());
861                }
862
863                packer
864                    .push_list_with(|packer| {
865                        for field in fields {
866                            field.read(idx, packer)?;
867                        }
868                        Ok::<_, anyhow::Error>(())
869                    })
870                    .context("pack record")?;
871
872                // Return early because we've already packed the necessasry Datums.
873                return Ok(());
874            }
875            ColReader::Map {
876                offsets,
877                keys,
878                values,
879                nulls,
880            } => {
881                let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
882                if !is_non_null {
883                    packer.push(Datum::Null);
884                    return Ok(());
885                }
886
887                let start: usize = offsets[idx].try_into().context("map start offset")?;
888                let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
889
890                // Arrow's MapArray doesn't guarantee that keys are in sorted order, but Materialize's
891                // Datum::Map does, so we need to sort the keys here before packing them, or else
892                // many assumptions will break.
893                let mut kv_sorted = (start..end)
894                    .map(|i| (keys.value(i), i))
895                    .sorted_by_key(|(k, _)| *k)
896                    .peekable();
897
898                packer
899                    .push_dict_with(|packer| {
900                        while let Some((key, i)) = kv_sorted.next() {
901                            // Parquet docs state that if there are duplicate keys, the last value
902                            // should be used, so skip duplicates here.
903                            //
904                            // sorted_by_key is a stable sort, so entries with duplicate keys will
905                            // maintain their original order, and we can pick the last one here.
906                            if let Some((next_key, _)) = kv_sorted.peek() {
907                                if key == *next_key {
908                                    continue;
909                                }
910                            }
911                            packer.push(Datum::String(key));
912                            values.read(i, packer)?;
913                        }
914                        Ok::<_, anyhow::Error>(())
915                    })
916                    .context("pack map")?;
917
918                // Return early because we've already packed the necessary Datums.
919                return Ok(());
920            }
921            ColReader::Range {
922                lower,
923                lower_nulls,
924                upper,
925                upper_nulls,
926                lower_inclusive,
927                upper_inclusive,
928                empty,
929                nulls,
930            } => {
931                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
932                if !is_valid {
933                    packer.push(Datum::Null);
934                    return Ok(());
935                }
936
937                if empty.value(idx) {
938                    packer.push(Datum::Range(Range { inner: None }));
939                    return Ok(());
940                }
941
942                let lower_is_infinite = lower_nulls
943                    .as_ref()
944                    .map(|n| !n.is_valid(idx))
945                    .unwrap_or(false);
946                let upper_is_infinite = upper_nulls
947                    .as_ref()
948                    .map(|n| !n.is_valid(idx))
949                    .unwrap_or(false);
950
951                // Read finite bounds into owned Rows so the packing closures
952                // handed to `push_range_with` can capture them by move.
953                let lower_bound = if lower_is_infinite {
954                    None
955                } else {
956                    let mut temp = SharedRow::get();
957                    lower.read(idx, &mut temp.packer())?;
958                    let row = temp.clone();
959                    Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
960                        packer.push(row.unpack_first());
961                        Ok(())
962                    })
963                };
964
965                let upper_bound = if upper_is_infinite {
966                    None
967                } else {
968                    let mut temp = SharedRow::get();
969                    upper.read(idx, &mut temp.packer())?;
970                    let row = temp.clone();
971                    Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
972                        packer.push(row.unpack_first());
973                        Ok(())
974                    })
975                };
976
977                packer
978                    .push_range_with(
979                        RangeLowerBound {
980                            inclusive: lower_inclusive.value(idx),
981                            bound: lower_bound,
982                        },
983                        RangeUpperBound {
984                            inclusive: upper_inclusive.value(idx),
985                            bound: upper_bound,
986                        },
987                    )
988                    .context("pack range")?;
989
990                return Ok(());
991            }
992            ColReader::IntervalYearMonth(array) => array
993                .is_valid(idx)
994                .then(|| array.value(idx))
995                .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
996            ColReader::IntervalDayTime(array) => {
997                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
998                    let micros = i64::from(v.milliseconds) * 1_000;
999                    Datum::Interval(Interval::new(0, v.days, micros))
1000                })
1001            }
1002            ColReader::IntervalMonthDayNano(array) => {
1003                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
1004                    let micros = v.nanoseconds / 1_000;
1005                    Datum::Interval(Interval::new(v.months, v.days, micros))
1006                })
1007            }
1008        };
1009
1010        match datum {
1011            Some(d) => packer.push(d),
1012            None => packer.push(Datum::Null),
1013        }
1014
1015        Ok(())
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use arrow::datatypes::Field;
1022    use mz_ore::collections::CollectionExt;
1023
1024    use super::*;
1025
1026    #[mz_ore::test]
1027    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1028    fn smoketest_reader() {
1029        let desc = RelationDesc::builder()
1030            .with_column("bool", SqlScalarType::Bool.nullable(true))
1031            .with_column("int4", SqlScalarType::Int32.nullable(true))
1032            .with_column("uint8", SqlScalarType::UInt64.nullable(true))
1033            .with_column("float32", SqlScalarType::Float32.nullable(true))
1034            .with_column("string", SqlScalarType::String.nullable(true))
1035            .with_column("bytes", SqlScalarType::Bytes.nullable(true))
1036            .with_column("uuid", SqlScalarType::Uuid.nullable(true))
1037            .with_column("json", SqlScalarType::Jsonb.nullable(true))
1038            .with_column(
1039                "list",
1040                SqlScalarType::List {
1041                    element_type: Box::new(SqlScalarType::UInt32),
1042                    custom_id: None,
1043                }
1044                .nullable(true),
1045            )
1046            .finish();
1047
1048        let mut og_row = Row::default();
1049        let mut packer = og_row.packer();
1050
1051        packer.extend([
1052            Datum::True,
1053            Datum::Int32(42),
1054            Datum::UInt64(10000),
1055            Datum::Float32(OrderedFloat::from(-1.1f32)),
1056            Datum::String("hello world"),
1057            Datum::Bytes(b"1010101"),
1058            Datum::Uuid(uuid::Uuid::new_v4()),
1059        ]);
1060        JsonbPacker::new(&mut packer)
1061            .pack_serde_json(
1062                serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
1063            )
1064            .expect("failed to pack JSON");
1065        packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
1066
1067        let null_row = Row::pack(vec![Datum::Null; 9]);
1068
1069        // Encode our data with our ArrowBuilder.
1070        let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
1071        builder.add_row(&og_row).unwrap();
1072        builder.add_row(&null_row).unwrap();
1073        let record_batch = builder.to_record_batch().unwrap();
1074
1075        // Decode our data!
1076        let reader =
1077            ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
1078        let mut rnd_row = Row::default();
1079
1080        reader.read(0, &mut rnd_row).unwrap();
1081        assert_eq!(&og_row, &rnd_row);
1082
1083        // Create a packer to clear the row alloc.
1084        rnd_row.packer();
1085
1086        reader.read(1, &mut rnd_row).unwrap();
1087        assert_eq!(&null_row, &rnd_row);
1088    }
1089
1090    #[mz_ore::test]
1091    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1092    fn smoketest_decimal128() {
1093        let desc = RelationDesc::builder()
1094            .with_column(
1095                "a",
1096                SqlScalarType::Numeric { max_scale: None }.nullable(true),
1097            )
1098            .finish();
1099
1100        let mut dec128 = arrow::array::Decimal128Builder::new();
1101        dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
1102
1103        // 1.234
1104        dec128.append_value(1234);
1105        dec128.append_null();
1106        // 100000000.009
1107        dec128.append_value(100000000009);
1108
1109        let dec128 = dec128.finish();
1110        #[allow(clippy::as_conversions)]
1111        let batch = StructArray::from(vec![(
1112            Arc::new(Field::new("a", dec128.data_type().clone(), true)),
1113            Arc::new(dec128) as arrow::array::ArrayRef,
1114        )]);
1115
1116        // Decode our data!
1117        let reader = ArrowReader::new(&desc, batch).unwrap();
1118        let mut rnd_row = Row::default();
1119
1120        reader.read(0, &mut rnd_row).unwrap();
1121        let num = rnd_row.into_element().unwrap_numeric();
1122        assert_eq!(num.0, Numeric::from(1.234f64));
1123
1124        // Create a packer to clear the row alloc.
1125        rnd_row.packer();
1126
1127        reader.read(1, &mut rnd_row).unwrap();
1128        let num = rnd_row.into_element();
1129        assert_eq!(num, Datum::Null);
1130
1131        // Create a packer to clear the row alloc.
1132        rnd_row.packer();
1133
1134        reader.read(2, &mut rnd_row).unwrap();
1135        let num = rnd_row.into_element().unwrap_numeric();
1136        assert_eq!(num.0, Numeric::from(100000000.009f64));
1137    }
1138
1139    #[mz_ore::test]
1140    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1141    fn smoketest_decimal256() {
1142        let desc = RelationDesc::builder()
1143            .with_column(
1144                "a",
1145                SqlScalarType::Numeric { max_scale: None }.nullable(true),
1146            )
1147            .finish();
1148
1149        let mut dec256 = arrow::array::Decimal256Builder::new();
1150        dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1151
1152        // 1.234
1153        dec256.append_value(arrow::datatypes::i256::from(1234));
1154        dec256.append_null();
1155        // 100000000.009
1156        dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1157
1158        let dec256 = dec256.finish();
1159        #[allow(clippy::as_conversions)]
1160        let batch = StructArray::from(vec![(
1161            Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1162            Arc::new(dec256) as arrow::array::ArrayRef,
1163        )]);
1164
1165        // Decode our data!
1166        let reader = ArrowReader::new(&desc, batch).unwrap();
1167        let mut rnd_row = Row::default();
1168
1169        reader.read(0, &mut rnd_row).unwrap();
1170        let num = rnd_row.into_element().unwrap_numeric();
1171        assert_eq!(num.0, Numeric::from(1.234f64));
1172
1173        // Create a packer to clear the row alloc.
1174        rnd_row.packer();
1175
1176        reader.read(1, &mut rnd_row).unwrap();
1177        let num = rnd_row.into_element();
1178        assert_eq!(num, Datum::Null);
1179
1180        // Create a packer to clear the row alloc.
1181        rnd_row.packer();
1182
1183        reader.read(2, &mut rnd_row).unwrap();
1184        let num = rnd_row.into_element().unwrap_numeric();
1185        assert_eq!(num.0, Numeric::from(100000000.009f64));
1186    }
1187}