Skip to main content

mz_arrow_util/
reader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reader for [`arrow`] data that outputs [`Row`]s.
11
12use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16    Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18    Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
19    IntervalYearMonthArray, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
20    MapArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
21    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use itertools::Itertools;
29use mz_ore::cast::CastFrom;
30use mz_repr::adt::date::Date;
31use mz_repr::adt::interval::Interval;
32use mz_repr::adt::jsonb::JsonbPacker;
33use mz_repr::adt::numeric::Numeric;
34use mz_repr::adt::timestamp::CheckedTimestamp;
35use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
36use ordered_float::OrderedFloat;
37use uuid::Uuid;
38
39use crate::mask_nulls;
40
41/// Type that can read out of an [`arrow::array::StructArray`] and into a [`Row`], given a
42/// [`RelationDesc`].
43///
44/// The inverse of a [`crate::builder::ArrowBuilder`].
45///
46/// Note: When creating an [`ArrowReader`] we perform a "one-time downcast" of the children Arrays
47/// from the [`StructArray`], into `enum ColReader`s. This is a much more verbose approach than the
48/// alternative of downcasting from a `dyn arrow::array::Array` every time we read a [`Row`], but
49/// it is _much_ more performant.
50pub struct ArrowReader {
51    len: usize,
52    readers: Vec<ColReader>,
53}
54
55impl ArrowReader {
56    /// Create an [`ArrowReader`] validating that the provided [`RelationDesc`] and [`StructArray`]
57    /// have a matching schema.
58    ///
59    /// The [`RelationDesc`] and [`StructArray`] need to uphold the following to be a valid pair:
60    ///
61    /// * Same number of columns.
62    /// * Columns of all the same name.
63    /// * Columns of compatible types.
64    ///
65    /// TODO(cf2): Relax some of these restrictions by allowing users to map column names, omit
66    /// columns, perform some lightweight casting, and matching not on column name but column
67    /// position.
68    /// TODO(cf2): Allow specifying an optional `arrow::Schema` for extra metadata.
69    pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
70        let inner_columns = array.columns();
71        let desc_columns = desc.typ().columns();
72
73        if inner_columns.len() != desc_columns.len() {
74            return Err(anyhow::anyhow!(
75                "wrong number of columns {} vs {}",
76                inner_columns.len(),
77                desc_columns.len()
78            ));
79        }
80
81        let mut readers = Vec::with_capacity(desc_columns.len());
82        for (col_name, col_type) in desc.iter() {
83            let column = array
84                .column_by_name(col_name)
85                .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
86            let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
87                .context(col_name.clone())?;
88
89            readers.push(reader);
90        }
91
92        Ok(ArrowReader {
93            len: array.len(),
94            readers,
95        })
96    }
97
98    /// Read the value at `idx` into the provided `Row`.
99    pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
100        let mut packer = row.packer();
101        for reader in &self.readers {
102            reader.read(idx, &mut packer).context(idx)?;
103        }
104        Ok(())
105    }
106
107    /// Read all of the values in this [`ArrowReader`] into `rows`.
108    pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
109        for idx in 0..self.len {
110            let mut row = Row::default();
111            self.read(idx, &mut row).context(idx)?;
112            rows.push(row);
113        }
114        Ok(self.len)
115    }
116}
117
118fn scalar_type_and_array_to_reader(
119    scalar_type: &SqlScalarType,
120    array: Arc<dyn Array>,
121) -> Result<ColReader, anyhow::Error> {
122    fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
123        array
124            .as_any()
125            .downcast_ref::<T>()
126            .expect("checked DataType")
127            .clone()
128    }
129
130    match (scalar_type, array.data_type()) {
131        (SqlScalarType::Bool, DataType::Boolean) => {
132            Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
133        }
134        (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
135            let array = downcast_array::<Int8Array>(array);
136            let cast: fn(i8) -> Datum<'static> = match scalar_type {
137                SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
138                SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
139                SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
140                _ => unreachable!("checked above"),
141            };
142            Ok(ColReader::Int8 { array, cast })
143        }
144        (SqlScalarType::Int16, DataType::Int16) => {
145            Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
146        }
147        (SqlScalarType::Int32, DataType::Int32) => {
148            Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
149        }
150        (SqlScalarType::Int64, DataType::Int64) => {
151            Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
152        }
153        (
154            SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
155            DataType::UInt8,
156        ) => {
157            let array = downcast_array::<UInt8Array>(array);
158            let cast: fn(u8) -> Datum<'static> = match scalar_type {
159                SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
160                SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
161                SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
162                _ => unreachable!("checked above"),
163            };
164            Ok(ColReader::UInt8 { array, cast })
165        }
166        (SqlScalarType::UInt16, DataType::UInt16) => {
167            Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
168        }
169        (SqlScalarType::UInt32, DataType::UInt32) => {
170            Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
171        }
172        (SqlScalarType::UInt64, DataType::UInt64) => {
173            Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
174        }
175        (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
176            let array = downcast_array::<Float16Array>(array);
177            let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
178                SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
179                SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
180                _ => unreachable!("checked above"),
181            };
182            Ok(ColReader::Float16 { array, cast })
183        }
184        (SqlScalarType::Float32, DataType::Float32) => {
185            Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
186        }
187        (SqlScalarType::Float64, DataType::Float64) => {
188            Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
189        }
190        // TODO(cf3): Consider the max_scale for numeric.
191        (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
192            use num_traits::Pow;
193
194            let base = Numeric::from(10);
195            let scale = Numeric::from(*scale);
196            let scale_factor = base.pow(scale);
197
198            let precision = usize::cast_from(*precision);
199            // Don't use the context here, but make sure the precision is valid.
200            let mut ctx = dec::Context::<Numeric>::default();
201            ctx.set_precision(precision).map_err(|e| {
202                anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
203            })?;
204
205            let array = downcast_array::<Decimal128Array>(array);
206
207            Ok(ColReader::Decimal128 {
208                array,
209                scale_factor,
210                precision,
211            })
212        }
213        // TODO(cf3): Consider the max_scale for numeric.
214        (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
215            use num_traits::Pow;
216
217            let base = Numeric::from(10);
218            let scale = Numeric::from(*scale);
219            let scale_factor = base.pow(scale);
220
221            let precision = usize::cast_from(*precision);
222            // Don't use the context here, but make sure the precision is valid.
223            let mut ctx = dec::Context::<Numeric>::default();
224            ctx.set_precision(precision).map_err(|e| {
225                anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
226            })?;
227
228            let array = downcast_array::<Decimal256Array>(array);
229
230            Ok(ColReader::Decimal256 {
231                array,
232                scale_factor,
233                precision,
234            })
235        }
236        (SqlScalarType::Bytes, DataType::Binary) => {
237            Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
238        }
239        (SqlScalarType::Bytes, DataType::LargeBinary) => {
240            let array = downcast_array::<LargeBinaryArray>(array);
241            Ok(ColReader::LargeBinary(array))
242        }
243        (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
244            let array = downcast_array::<FixedSizeBinaryArray>(array);
245            Ok(ColReader::FixedSizeBinary(array))
246        }
247        (SqlScalarType::Bytes, DataType::BinaryView) => {
248            let array = downcast_array::<BinaryViewArray>(array);
249            Ok(ColReader::BinaryView(array))
250        }
251        (
252            SqlScalarType::Uuid,
253            DataType::Binary
254            | DataType::BinaryView
255            | DataType::LargeBinary
256            | DataType::FixedSizeBinary(_),
257        ) => {
258            let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
259                .context("uuid reader")?;
260            Ok(ColReader::Uuid(Box::new(reader)))
261        }
262        (SqlScalarType::String, DataType::Utf8) => {
263            Ok(ColReader::String(downcast_array::<StringArray>(array)))
264        }
265        (SqlScalarType::String, DataType::LargeUtf8) => {
266            let array = downcast_array::<LargeStringArray>(array);
267            Ok(ColReader::LargeString(array))
268        }
269        (SqlScalarType::String, DataType::Utf8View) => {
270            let array = downcast_array::<StringViewArray>(array);
271            Ok(ColReader::StringView(array))
272        }
273        (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
274            let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
275                .context("json reader")?;
276            Ok(ColReader::Jsonb(Box::new(reader)))
277        }
278        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
279            let array = downcast_array::<TimestampSecondArray>(array);
280            Ok(ColReader::TimestampSecond(array))
281        }
282        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
283            let array = downcast_array::<TimestampMillisecondArray>(array);
284            Ok(ColReader::TimestampMillisecond(array))
285        }
286        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
287            let array = downcast_array::<TimestampMicrosecondArray>(array);
288            Ok(ColReader::TimestampMicrosecond(array))
289        }
290        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
291            let array = downcast_array::<TimestampNanosecondArray>(array);
292            Ok(ColReader::TimestampNanosecond(array))
293        }
294        (SqlScalarType::Date, DataType::Date32) => {
295            let array = downcast_array::<Date32Array>(array);
296            Ok(ColReader::Date32(array))
297        }
298        (SqlScalarType::Date, DataType::Date64) => {
299            let array = downcast_array::<Date64Array>(array);
300            Ok(ColReader::Date64(array))
301        }
302        (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
303            let array = downcast_array::<Time32SecondArray>(array);
304            Ok(ColReader::Time32Seconds(array))
305        }
306        (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
307            let array = downcast_array::<Time32MillisecondArray>(array);
308            Ok(ColReader::Time32Milliseconds(array))
309        }
310        (
311            SqlScalarType::List {
312                element_type,
313                custom_id: _,
314            },
315            DataType::List(_),
316        ) => {
317            let array = downcast_array::<ListArray>(array);
318            let inner_decoder =
319                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
320                    .context("list")?;
321            Ok(ColReader::List {
322                offsets: array.offsets().clone(),
323                values: Box::new(inner_decoder),
324                nulls: array.nulls().cloned(),
325            })
326        }
327        (
328            SqlScalarType::List {
329                element_type,
330                custom_id: _,
331            },
332            DataType::LargeList(_),
333        ) => {
334            let array = downcast_array::<LargeListArray>(array);
335            let inner_decoder =
336                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
337                    .context("large list")?;
338            Ok(ColReader::LargeList {
339                offsets: array.offsets().clone(),
340                values: Box::new(inner_decoder),
341                nulls: array.nulls().cloned(),
342            })
343        }
344        (
345            SqlScalarType::Record {
346                fields,
347                custom_id: _,
348            },
349            DataType::Struct(_),
350        ) => {
351            let record_array = downcast_array::<StructArray>(array);
352            let null_mask = record_array.nulls();
353
354            let mut decoders = Vec::with_capacity(fields.len());
355            for (name, typ) in fields.iter() {
356                let inner_array = record_array
357                    .column_by_name(name)
358                    .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
359                let inner_array = mask_nulls(inner_array, null_mask);
360                let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
361                    .context(name.clone())?;
362
363                decoders.push(Box::new(inner_decoder));
364            }
365
366            Ok(ColReader::Record {
367                fields: decoders,
368                nulls: null_mask.cloned(),
369            })
370        }
371        (
372            SqlScalarType::Map {
373                value_type,
374                custom_id: _,
375            },
376            DataType::Map(_, _),
377        ) => {
378            let map_array = downcast_array::<MapArray>(array);
379            let keys = map_array
380                .keys()
381                .as_any()
382                .downcast_ref::<StringArray>()
383                .expect("map keys should be Utf8 strings")
384                .clone();
385            let values_reader =
386                scalar_type_and_array_to_reader(value_type, Arc::clone(map_array.values()))
387                    .context("map values")?;
388            Ok(ColReader::Map {
389                offsets: map_array.offsets().clone(),
390                keys,
391                values: Box::new(values_reader),
392                nulls: map_array.nulls().cloned(),
393            })
394        }
395        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::YearMonth)) => {
396            Ok(ColReader::IntervalYearMonth(downcast_array::<
397                IntervalYearMonthArray,
398            >(array)))
399        }
400        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::DayTime)) => {
401            Ok(ColReader::IntervalDayTime(downcast_array::<
402                IntervalDayTimeArray,
403            >(array)))
404        }
405        (SqlScalarType::Interval, DataType::Interval(IntervalUnit::MonthDayNano)) => {
406            Ok(ColReader::IntervalMonthDayNano(downcast_array::<
407                IntervalMonthDayNanoArray,
408            >(array)))
409        }
410        other => anyhow::bail!("unsupported: {other:?}"),
411    }
412}
413
414/// A "downcasted" version of [`arrow::array::Array`] that supports reading [`Datum`]s.
415///
416/// Note: While this is fairly verbose, one-time "downcasting" to an enum is _much_ more performant
417/// than downcasting every time we read a [`Datum`].
418enum ColReader {
419    Boolean(arrow::array::BooleanArray),
420
421    Int8 {
422        array: arrow::array::Int8Array,
423        cast: fn(i8) -> Datum<'static>,
424    },
425    Int16(arrow::array::Int16Array),
426    Int32(arrow::array::Int32Array),
427    Int64(arrow::array::Int64Array),
428
429    UInt8 {
430        array: arrow::array::UInt8Array,
431        cast: fn(u8) -> Datum<'static>,
432    },
433    UInt16(arrow::array::UInt16Array),
434    UInt32(arrow::array::UInt32Array),
435    UInt64(arrow::array::UInt64Array),
436
437    Float16 {
438        array: arrow::array::Float16Array,
439        cast: fn(half::f16) -> Datum<'static>,
440    },
441    Float32(arrow::array::Float32Array),
442    Float64(arrow::array::Float64Array),
443
444    Decimal128 {
445        array: Decimal128Array,
446        scale_factor: Numeric,
447        precision: usize,
448    },
449    Decimal256 {
450        array: Decimal256Array,
451        scale_factor: Numeric,
452        precision: usize,
453    },
454
455    Binary(arrow::array::BinaryArray),
456    LargeBinary(arrow::array::LargeBinaryArray),
457    FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
458    BinaryView(arrow::array::BinaryViewArray),
459    Uuid(Box<ColReader>),
460
461    String(arrow::array::StringArray),
462    LargeString(arrow::array::LargeStringArray),
463    StringView(arrow::array::StringViewArray),
464    Jsonb(Box<ColReader>),
465
466    TimestampSecond(arrow::array::TimestampSecondArray),
467    TimestampMillisecond(arrow::array::TimestampMillisecondArray),
468    TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
469    TimestampNanosecond(arrow::array::TimestampNanosecondArray),
470
471    Date32(Date32Array),
472    Date64(Date64Array),
473
474    Time32Seconds(Time32SecondArray),
475    Time32Milliseconds(arrow::array::Time32MillisecondArray),
476
477    List {
478        offsets: OffsetBuffer<i32>,
479        values: Box<ColReader>,
480        nulls: Option<NullBuffer>,
481    },
482    LargeList {
483        offsets: OffsetBuffer<i64>,
484        values: Box<ColReader>,
485        nulls: Option<NullBuffer>,
486    },
487
488    Record {
489        fields: Vec<Box<ColReader>>,
490        nulls: Option<NullBuffer>,
491    },
492
493    Map {
494        offsets: OffsetBuffer<i32>,
495        keys: StringArray,
496        values: Box<ColReader>,
497        nulls: Option<NullBuffer>,
498    },
499
500    IntervalYearMonth(IntervalYearMonthArray),
501    IntervalDayTime(IntervalDayTimeArray),
502    IntervalMonthDayNano(IntervalMonthDayNanoArray),
503}
504
505impl ColReader {
506    fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
507        let datum = match self {
508            ColReader::Boolean(array) => array
509                .is_valid(idx)
510                .then(|| array.value(idx))
511                .map(|x| if x { Datum::True } else { Datum::False }),
512            ColReader::Int8 { array, cast } => {
513                array.is_valid(idx).then(|| array.value(idx)).map(cast)
514            }
515            ColReader::Int16(array) => array
516                .is_valid(idx)
517                .then(|| array.value(idx))
518                .map(Datum::Int16),
519            ColReader::Int32(array) => array
520                .is_valid(idx)
521                .then(|| array.value(idx))
522                .map(Datum::Int32),
523            ColReader::Int64(array) => array
524                .is_valid(idx)
525                .then(|| array.value(idx))
526                .map(Datum::Int64),
527            ColReader::UInt8 { array, cast } => {
528                array.is_valid(idx).then(|| array.value(idx)).map(cast)
529            }
530            ColReader::UInt16(array) => array
531                .is_valid(idx)
532                .then(|| array.value(idx))
533                .map(Datum::UInt16),
534            ColReader::UInt32(array) => array
535                .is_valid(idx)
536                .then(|| array.value(idx))
537                .map(Datum::UInt32),
538            ColReader::UInt64(array) => array
539                .is_valid(idx)
540                .then(|| array.value(idx))
541                .map(Datum::UInt64),
542            ColReader::Float16 { array, cast } => {
543                array.is_valid(idx).then(|| array.value(idx)).map(cast)
544            }
545            ColReader::Float32(array) => array
546                .is_valid(idx)
547                .then(|| array.value(idx))
548                .map(|x| Datum::Float32(OrderedFloat(x))),
549            ColReader::Float64(array) => array
550                .is_valid(idx)
551                .then(|| array.value(idx))
552                .map(|x| Datum::Float64(OrderedFloat(x))),
553            ColReader::Decimal128 {
554                array,
555                scale_factor,
556                precision,
557            } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
558                // Create a Numeric from our i128 with precision.
559                let mut ctx = dec::Context::<Numeric>::default();
560                ctx.set_precision(*precision).expect("checked before");
561                let mut num = ctx.from_i128(x);
562
563                // Scale the number.
564                ctx.div(&mut num, scale_factor);
565
566                Datum::Numeric(OrderedDecimal(num))
567            }),
568            ColReader::Decimal256 {
569                array,
570                scale_factor,
571                precision,
572            } => array
573                .is_valid(idx)
574                .then(|| array.value(idx))
575                .map(|x| {
576                    let s = x.to_string();
577
578                    // Parse a i256 from it's String representation.
579                    //
580                    // TODO(cf3): See if we can add support for 256-bit numbers to the `dec` crate.
581                    let mut ctx = dec::Context::<Numeric>::default();
582                    ctx.set_precision(*precision).expect("checked before");
583                    let mut num = ctx
584                        .parse(s)
585                        .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
586
587                    // Scale the number.
588                    ctx.div(&mut num, scale_factor);
589
590                    Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
591                })
592                .transpose()?,
593            ColReader::Binary(array) => array
594                .is_valid(idx)
595                .then(|| array.value(idx))
596                .map(Datum::Bytes),
597            ColReader::LargeBinary(array) => array
598                .is_valid(idx)
599                .then(|| array.value(idx))
600                .map(Datum::Bytes),
601            ColReader::FixedSizeBinary(array) => array
602                .is_valid(idx)
603                .then(|| array.value(idx))
604                .map(Datum::Bytes),
605            ColReader::BinaryView(array) => array
606                .is_valid(idx)
607                .then(|| array.value(idx))
608                .map(Datum::Bytes),
609            ColReader::Uuid(reader) => {
610                // First read a binary value into a temp row, and later parse that as UUID into our
611                // actual Row Packer.
612                let mut temp_row = SharedRow::get();
613                reader.read(idx, &mut temp_row.packer()).context("uuid")?;
614                let slice = match temp_row.unpack_first() {
615                    Datum::Bytes(slice) => slice,
616                    Datum::Null => {
617                        packer.push(Datum::Null);
618                        return Ok(());
619                    }
620                    other => anyhow::bail!("expected String, found {other:?}"),
621                };
622
623                let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
624                Some(Datum::Uuid(uuid))
625            }
626            ColReader::String(array) => array
627                .is_valid(idx)
628                .then(|| array.value(idx))
629                .map(Datum::String),
630            ColReader::LargeString(array) => array
631                .is_valid(idx)
632                .then(|| array.value(idx))
633                .map(Datum::String),
634            ColReader::StringView(array) => array
635                .is_valid(idx)
636                .then(|| array.value(idx))
637                .map(Datum::String),
638            ColReader::Jsonb(reader) => {
639                // First read a string value into a temp row, and later parse that as JSON into our
640                // actual Row Packer.
641                let mut temp_row = SharedRow::get();
642                reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
643                let value = match temp_row.unpack_first() {
644                    Datum::String(value) => value,
645                    Datum::Null => {
646                        packer.push(Datum::Null);
647                        return Ok(());
648                    }
649                    other => anyhow::bail!("expected String, found {other:?}"),
650                };
651
652                JsonbPacker::new(packer)
653                    .pack_str(value)
654                    .context("roundtrip json")?;
655
656                // Return early because we've already packed the necessasry Datums.
657                return Ok(());
658            }
659            ColReader::TimestampSecond(array) => array
660                .is_valid(idx)
661                .then(|| array.value(idx))
662                .map(|secs| {
663                    let dt = DateTime::from_timestamp(secs, 0)
664                        .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
665                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
666                        .context("TimestampSeconds")?;
667                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
668                })
669                .transpose()?,
670            ColReader::TimestampMillisecond(array) => array
671                .is_valid(idx)
672                .then(|| array.value(idx))
673                .map(|millis| {
674                    let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
675                        anyhow::anyhow!("invalid timestamp milliseconds {millis}")
676                    })?;
677                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
678                        .context("TimestampMillis")?;
679                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
680                })
681                .transpose()?,
682            ColReader::TimestampMicrosecond(array) => array
683                .is_valid(idx)
684                .then(|| array.value(idx))
685                .map(|micros| {
686                    let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
687                        anyhow::anyhow!("invalid timestamp microseconds {micros}")
688                    })?;
689                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
690                        .context("TimestampMicros")?;
691                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
692                })
693                .transpose()?,
694            ColReader::TimestampNanosecond(array) => array
695                .is_valid(idx)
696                .then(|| array.value(idx))
697                .map(|nanos| {
698                    let dt = DateTime::from_timestamp_nanos(nanos);
699                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
700                        .context("TimestampNanos")?;
701                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
702                })
703                .transpose()?,
704            ColReader::Date32(array) => array
705                .is_valid(idx)
706                .then(|| array.value(idx))
707                .map(|unix_days| {
708                    let date = Date::from_unix_epoch(unix_days).context("date32")?;
709                    Ok::<_, anyhow::Error>(Datum::Date(date))
710                })
711                .transpose()?,
712            ColReader::Date64(array) => array
713                .is_valid(idx)
714                .then(|| array.value(idx))
715                .map(|unix_millis| {
716                    let date = DateTime::from_timestamp_millis(unix_millis)
717                        .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
718                    let unix_epoch = DateTime::from_timestamp(0, 0)
719                        .expect("UNIX epoch")
720                        .date_naive();
721                    let delta = date.date_naive().signed_duration_since(unix_epoch);
722                    let days: i32 = delta.num_days().try_into().context("date64")?;
723                    let date = Date::from_unix_epoch(days).context("date64")?;
724                    Ok::<_, anyhow::Error>(Datum::Date(date))
725                })
726                .transpose()?,
727            ColReader::Time32Seconds(array) => array
728                .is_valid(idx)
729                .then(|| array.value(idx))
730                .map(|secs| {
731                    let usecs: u32 = secs.try_into().context("time32 seconds")?;
732                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
733                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
734                    Ok::<_, anyhow::Error>(Datum::Time(time))
735                })
736                .transpose()?,
737            ColReader::Time32Milliseconds(array) => array
738                .is_valid(idx)
739                .then(|| array.value(idx))
740                .map(|millis| {
741                    let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
742                    let usecs = umillis / 1000;
743                    let unanos = (umillis % 1000).saturating_mul(1_000_000);
744                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
745                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
746                    Ok::<_, anyhow::Error>(Datum::Time(time))
747                })
748                .transpose()?,
749            ColReader::List {
750                offsets,
751                values,
752                nulls,
753            } => {
754                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
755                if !is_valid {
756                    packer.push(Datum::Null);
757                    return Ok(());
758                }
759
760                let start: usize = offsets[idx].try_into().context("list start offset")?;
761                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
762
763                packer
764                    .push_list_with(|packer| {
765                        for idx in start..end {
766                            values.read(idx, packer)?;
767                        }
768                        Ok::<_, anyhow::Error>(())
769                    })
770                    .context("pack list")?;
771
772                // Return early because we've already packed the necessasry Datums.
773                return Ok(());
774            }
775            ColReader::LargeList {
776                offsets,
777                values,
778                nulls,
779            } => {
780                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
781                if !is_valid {
782                    packer.push(Datum::Null);
783                    return Ok(());
784                }
785
786                let start: usize = offsets[idx].try_into().context("list start offset")?;
787                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
788
789                packer
790                    .push_list_with(|packer| {
791                        for idx in start..end {
792                            values.read(idx, packer)?;
793                        }
794                        Ok::<_, anyhow::Error>(())
795                    })
796                    .context("pack list")?;
797
798                // Return early because we've already packed the necessasry Datums.
799                return Ok(());
800            }
801            ColReader::Record { fields, nulls } => {
802                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
803                if !is_valid {
804                    packer.push(Datum::Null);
805                    return Ok(());
806                }
807
808                packer
809                    .push_list_with(|packer| {
810                        for field in fields {
811                            field.read(idx, packer)?;
812                        }
813                        Ok::<_, anyhow::Error>(())
814                    })
815                    .context("pack record")?;
816
817                // Return early because we've already packed the necessasry Datums.
818                return Ok(());
819            }
820            ColReader::Map {
821                offsets,
822                keys,
823                values,
824                nulls,
825            } => {
826                let is_non_null = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
827                if !is_non_null {
828                    packer.push(Datum::Null);
829                    return Ok(());
830                }
831
832                let start: usize = offsets[idx].try_into().context("map start offset")?;
833                let end: usize = offsets[idx + 1].try_into().context("map end offset")?;
834
835                // Arrow's MapArray doesn't guarantee that keys are in sorted order, but Materialize's
836                // Datum::Map does, so we need to sort the keys here before packing them, or else
837                // many assumptions will break.
838                let mut kv_sorted = (start..end)
839                    .map(|i| (keys.value(i), i))
840                    .sorted_by_key(|(k, _)| *k)
841                    .peekable();
842
843                packer
844                    .push_dict_with(|packer| {
845                        while let Some((key, i)) = kv_sorted.next() {
846                            // Parquet docs state that if there are duplicate keys, the last value
847                            // should be used, so skip duplicates here.
848                            //
849                            // sorted_by_key is a stable sort, so entries with duplicate keys will
850                            // maintain their original order, and we can pick the last one here.
851                            if let Some((next_key, _)) = kv_sorted.peek() {
852                                if key == *next_key {
853                                    continue;
854                                }
855                            }
856                            packer.push(Datum::String(key));
857                            values.read(i, packer)?;
858                        }
859                        Ok::<_, anyhow::Error>(())
860                    })
861                    .context("pack map")?;
862
863                // Return early because we've already packed the necessary Datums.
864                return Ok(());
865            }
866            ColReader::IntervalYearMonth(array) => array
867                .is_valid(idx)
868                .then(|| array.value(idx))
869                .map(|months| Datum::Interval(Interval::new(months, 0, 0))),
870            ColReader::IntervalDayTime(array) => {
871                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
872                    let micros = i64::from(v.milliseconds) * 1_000;
873                    Datum::Interval(Interval::new(0, v.days, micros))
874                })
875            }
876            ColReader::IntervalMonthDayNano(array) => {
877                array.is_valid(idx).then(|| array.value(idx)).map(|v| {
878                    let micros = v.nanoseconds / 1_000;
879                    Datum::Interval(Interval::new(v.months, v.days, micros))
880                })
881            }
882        };
883
884        match datum {
885            Some(d) => packer.push(d),
886            None => packer.push(Datum::Null),
887        }
888
889        Ok(())
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use arrow::datatypes::Field;
896    use mz_ore::collections::CollectionExt;
897
898    use super::*;
899
900    #[mz_ore::test]
901    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
902    fn smoketest_reader() {
903        let desc = RelationDesc::builder()
904            .with_column("bool", SqlScalarType::Bool.nullable(true))
905            .with_column("int4", SqlScalarType::Int32.nullable(true))
906            .with_column("uint8", SqlScalarType::UInt64.nullable(true))
907            .with_column("float32", SqlScalarType::Float32.nullable(true))
908            .with_column("string", SqlScalarType::String.nullable(true))
909            .with_column("bytes", SqlScalarType::Bytes.nullable(true))
910            .with_column("uuid", SqlScalarType::Uuid.nullable(true))
911            .with_column("json", SqlScalarType::Jsonb.nullable(true))
912            .with_column(
913                "list",
914                SqlScalarType::List {
915                    element_type: Box::new(SqlScalarType::UInt32),
916                    custom_id: None,
917                }
918                .nullable(true),
919            )
920            .finish();
921
922        let mut og_row = Row::default();
923        let mut packer = og_row.packer();
924
925        packer.extend([
926            Datum::True,
927            Datum::Int32(42),
928            Datum::UInt64(10000),
929            Datum::Float32(OrderedFloat::from(-1.1f32)),
930            Datum::String("hello world"),
931            Datum::Bytes(b"1010101"),
932            Datum::Uuid(uuid::Uuid::new_v4()),
933        ]);
934        JsonbPacker::new(&mut packer)
935            .pack_serde_json(
936                serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
937            )
938            .expect("failed to pack JSON");
939        packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
940
941        let null_row = Row::pack(vec![Datum::Null; 9]);
942
943        // Encode our data with our ArrowBuilder.
944        let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
945        builder.add_row(&og_row).unwrap();
946        builder.add_row(&null_row).unwrap();
947        let record_batch = builder.to_record_batch().unwrap();
948
949        // Decode our data!
950        let reader =
951            ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
952        let mut rnd_row = Row::default();
953
954        reader.read(0, &mut rnd_row).unwrap();
955        assert_eq!(&og_row, &rnd_row);
956
957        // Create a packer to clear the row alloc.
958        rnd_row.packer();
959
960        reader.read(1, &mut rnd_row).unwrap();
961        assert_eq!(&null_row, &rnd_row);
962    }
963
964    #[mz_ore::test]
965    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
966    fn smoketest_decimal128() {
967        let desc = RelationDesc::builder()
968            .with_column(
969                "a",
970                SqlScalarType::Numeric { max_scale: None }.nullable(true),
971            )
972            .finish();
973
974        let mut dec128 = arrow::array::Decimal128Builder::new();
975        dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
976
977        // 1.234
978        dec128.append_value(1234);
979        dec128.append_null();
980        // 100000000.009
981        dec128.append_value(100000000009);
982
983        let dec128 = dec128.finish();
984        #[allow(clippy::as_conversions)]
985        let batch = StructArray::from(vec![(
986            Arc::new(Field::new("a", dec128.data_type().clone(), true)),
987            Arc::new(dec128) as arrow::array::ArrayRef,
988        )]);
989
990        // Decode our data!
991        let reader = ArrowReader::new(&desc, batch).unwrap();
992        let mut rnd_row = Row::default();
993
994        reader.read(0, &mut rnd_row).unwrap();
995        let num = rnd_row.into_element().unwrap_numeric();
996        assert_eq!(num.0, Numeric::from(1.234f64));
997
998        // Create a packer to clear the row alloc.
999        rnd_row.packer();
1000
1001        reader.read(1, &mut rnd_row).unwrap();
1002        let num = rnd_row.into_element();
1003        assert_eq!(num, Datum::Null);
1004
1005        // Create a packer to clear the row alloc.
1006        rnd_row.packer();
1007
1008        reader.read(2, &mut rnd_row).unwrap();
1009        let num = rnd_row.into_element().unwrap_numeric();
1010        assert_eq!(num.0, Numeric::from(100000000.009f64));
1011    }
1012
1013    #[mz_ore::test]
1014    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1015    fn smoketest_decimal256() {
1016        let desc = RelationDesc::builder()
1017            .with_column(
1018                "a",
1019                SqlScalarType::Numeric { max_scale: None }.nullable(true),
1020            )
1021            .finish();
1022
1023        let mut dec256 = arrow::array::Decimal256Builder::new();
1024        dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1025
1026        // 1.234
1027        dec256.append_value(arrow::datatypes::i256::from(1234));
1028        dec256.append_null();
1029        // 100000000.009
1030        dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
1031
1032        let dec256 = dec256.finish();
1033        #[allow(clippy::as_conversions)]
1034        let batch = StructArray::from(vec![(
1035            Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1036            Arc::new(dec256) as arrow::array::ArrayRef,
1037        )]);
1038
1039        // Decode our data!
1040        let reader = ArrowReader::new(&desc, batch).unwrap();
1041        let mut rnd_row = Row::default();
1042
1043        reader.read(0, &mut rnd_row).unwrap();
1044        let num = rnd_row.into_element().unwrap_numeric();
1045        assert_eq!(num.0, Numeric::from(1.234f64));
1046
1047        // Create a packer to clear the row alloc.
1048        rnd_row.packer();
1049
1050        reader.read(1, &mut rnd_row).unwrap();
1051        let num = rnd_row.into_element();
1052        assert_eq!(num, Datum::Null);
1053
1054        // Create a packer to clear the row alloc.
1055        rnd_row.packer();
1056
1057        reader.read(2, &mut rnd_row).unwrap();
1058        let num = rnd_row.into_element().unwrap_numeric();
1059        assert_eq!(num.0, Numeric::from(100000000.009f64));
1060    }
1061}