mz_arrow_util/
reader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reader for [`arrow`] data that outputs [`Row`]s.
11
12use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16    Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18    Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
19    ListArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray,
20    Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
22    UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::jsonb::JsonbPacker;
31use mz_repr::adt::numeric::Numeric;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
34use ordered_float::OrderedFloat;
35use uuid::Uuid;
36
37use crate::mask_nulls;
38
39/// Type that can read out of an [`arrow::array::StructArray`] and into a [`Row`], given a
40/// [`RelationDesc`].
41///
42/// The inverse of a [`crate::builder::ArrowBuilder`].
43///
44/// Note: When creating an [`ArrowReader`] we perform a "one-time downcast" of the children Arrays
45/// from the [`StructArray`], into `enum ColReader`s. This is a much more verbose approach than the
46/// alternative of downcasting from a `dyn arrow::array::Array` every time we read a [`Row`], but
47/// it is _much_ more performant.
48pub struct ArrowReader {
49    len: usize,
50    readers: Vec<ColReader>,
51}
52
53impl ArrowReader {
54    /// Create an [`ArrowReader`] validating that the provided [`RelationDesc`] and [`StructArray`]
55    /// have a matching schema.
56    ///
57    /// The [`RelationDesc`] and [`StructArray`] need to uphold the following to be a valid pair:
58    ///
59    /// * Same number of columns.
60    /// * Columns of all the same name.
61    /// * Columns of compatible types.
62    ///
63    /// TODO(cf2): Relax some of these restrictions by allowing users to map column names, omit
64    /// columns, perform some lightweight casting, and matching not on column name but column
65    /// position.
66    /// TODO(cf2): Allow specifying an optional `arrow::Schema` for extra metadata.
67    pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
68        let inner_columns = array.columns();
69        let desc_columns = desc.typ().columns();
70
71        if inner_columns.len() != desc_columns.len() {
72            return Err(anyhow::anyhow!(
73                "wrong number of columns {} vs {}",
74                inner_columns.len(),
75                desc_columns.len()
76            ));
77        }
78
79        let mut readers = Vec::with_capacity(desc_columns.len());
80        for (col_name, col_type) in desc.iter() {
81            let column = array
82                .column_by_name(col_name)
83                .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
84            let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
85                .context(col_name.clone())?;
86
87            readers.push(reader);
88        }
89
90        Ok(ArrowReader {
91            len: array.len(),
92            readers,
93        })
94    }
95
96    /// Read the value at `idx` into the provided `Row`.
97    pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
98        let mut packer = row.packer();
99        for reader in &self.readers {
100            reader.read(idx, &mut packer).context(idx)?;
101        }
102        Ok(())
103    }
104
105    /// Read all of the values in this [`ArrowReader`] into `rows`.
106    pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
107        for idx in 0..self.len {
108            let mut row = Row::default();
109            self.read(idx, &mut row).context(idx)?;
110            rows.push(row);
111        }
112        Ok(self.len)
113    }
114}
115
116fn scalar_type_and_array_to_reader(
117    scalar_type: &SqlScalarType,
118    array: Arc<dyn Array>,
119) -> Result<ColReader, anyhow::Error> {
120    fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
121        array
122            .as_any()
123            .downcast_ref::<T>()
124            .expect("checked DataType")
125            .clone()
126    }
127
128    match (scalar_type, array.data_type()) {
129        (SqlScalarType::Bool, DataType::Boolean) => {
130            Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
131        }
132        (SqlScalarType::Int16 | SqlScalarType::Int32 | SqlScalarType::Int64, DataType::Int8) => {
133            let array = downcast_array::<Int8Array>(array);
134            let cast: fn(i8) -> Datum<'static> = match scalar_type {
135                SqlScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
136                SqlScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
137                SqlScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
138                _ => unreachable!("checked above"),
139            };
140            Ok(ColReader::Int8 { array, cast })
141        }
142        (SqlScalarType::Int16, DataType::Int16) => {
143            Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
144        }
145        (SqlScalarType::Int32, DataType::Int32) => {
146            Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
147        }
148        (SqlScalarType::Int64, DataType::Int64) => {
149            Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
150        }
151        (
152            SqlScalarType::UInt16 | SqlScalarType::UInt32 | SqlScalarType::UInt64,
153            DataType::UInt8,
154        ) => {
155            let array = downcast_array::<UInt8Array>(array);
156            let cast: fn(u8) -> Datum<'static> = match scalar_type {
157                SqlScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
158                SqlScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
159                SqlScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
160                _ => unreachable!("checked above"),
161            };
162            Ok(ColReader::UInt8 { array, cast })
163        }
164        (SqlScalarType::UInt16, DataType::UInt16) => {
165            Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
166        }
167        (SqlScalarType::UInt32, DataType::UInt32) => {
168            Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
169        }
170        (SqlScalarType::UInt64, DataType::UInt64) => {
171            Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
172        }
173        (SqlScalarType::Float32 | SqlScalarType::Float64, DataType::Float16) => {
174            let array = downcast_array::<Float16Array>(array);
175            let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
176                SqlScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
177                SqlScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
178                _ => unreachable!("checked above"),
179            };
180            Ok(ColReader::Float16 { array, cast })
181        }
182        (SqlScalarType::Float32, DataType::Float32) => {
183            Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
184        }
185        (SqlScalarType::Float64, DataType::Float64) => {
186            Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
187        }
188        // TODO(cf3): Consider the max_scale for numeric.
189        (SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
190            use num_traits::Pow;
191
192            let base = Numeric::from(10);
193            let scale = Numeric::from(*scale);
194            let scale_factor = base.pow(scale);
195
196            let precision = usize::cast_from(*precision);
197            // Don't use the context here, but make sure the precision is valid.
198            let mut ctx = dec::Context::<Numeric>::default();
199            ctx.set_precision(precision).map_err(|e| {
200                anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
201            })?;
202
203            let array = downcast_array::<Decimal128Array>(array);
204
205            Ok(ColReader::Decimal128 {
206                array,
207                scale_factor,
208                precision,
209            })
210        }
211        // TODO(cf3): Consider the max_scale for numeric.
212        (SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
213            use num_traits::Pow;
214
215            let base = Numeric::from(10);
216            let scale = Numeric::from(*scale);
217            let scale_factor = base.pow(scale);
218
219            let precision = usize::cast_from(*precision);
220            // Don't use the context here, but make sure the precision is valid.
221            let mut ctx = dec::Context::<Numeric>::default();
222            ctx.set_precision(precision).map_err(|e| {
223                anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
224            })?;
225
226            let array = downcast_array::<Decimal256Array>(array);
227
228            Ok(ColReader::Decimal256 {
229                array,
230                scale_factor,
231                precision,
232            })
233        }
234        (SqlScalarType::Bytes, DataType::Binary) => {
235            Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
236        }
237        (SqlScalarType::Bytes, DataType::LargeBinary) => {
238            let array = downcast_array::<LargeBinaryArray>(array);
239            Ok(ColReader::LargeBinary(array))
240        }
241        (SqlScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
242            let array = downcast_array::<FixedSizeBinaryArray>(array);
243            Ok(ColReader::FixedSizeBinary(array))
244        }
245        (SqlScalarType::Bytes, DataType::BinaryView) => {
246            let array = downcast_array::<BinaryViewArray>(array);
247            Ok(ColReader::BinaryView(array))
248        }
249        (
250            SqlScalarType::Uuid,
251            DataType::Binary
252            | DataType::BinaryView
253            | DataType::LargeBinary
254            | DataType::FixedSizeBinary(_),
255        ) => {
256            let reader = scalar_type_and_array_to_reader(&SqlScalarType::Bytes, array)
257                .context("uuid reader")?;
258            Ok(ColReader::Uuid(Box::new(reader)))
259        }
260        (SqlScalarType::String, DataType::Utf8) => {
261            Ok(ColReader::String(downcast_array::<StringArray>(array)))
262        }
263        (SqlScalarType::String, DataType::LargeUtf8) => {
264            let array = downcast_array::<LargeStringArray>(array);
265            Ok(ColReader::LargeString(array))
266        }
267        (SqlScalarType::String, DataType::Utf8View) => {
268            let array = downcast_array::<StringViewArray>(array);
269            Ok(ColReader::StringView(array))
270        }
271        (SqlScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
272            let reader = scalar_type_and_array_to_reader(&SqlScalarType::String, array)
273                .context("json reader")?;
274            Ok(ColReader::Jsonb(Box::new(reader)))
275        }
276        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
277            let array = downcast_array::<TimestampSecondArray>(array);
278            Ok(ColReader::TimestampSecond(array))
279        }
280        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
281            let array = downcast_array::<TimestampMillisecondArray>(array);
282            Ok(ColReader::TimestampMillisecond(array))
283        }
284        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
285            let array = downcast_array::<TimestampMicrosecondArray>(array);
286            Ok(ColReader::TimestampMicrosecond(array))
287        }
288        (SqlScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
289            let array = downcast_array::<TimestampNanosecondArray>(array);
290            Ok(ColReader::TimestampNanosecond(array))
291        }
292        (SqlScalarType::Date, DataType::Date32) => {
293            let array = downcast_array::<Date32Array>(array);
294            Ok(ColReader::Date32(array))
295        }
296        (SqlScalarType::Date, DataType::Date64) => {
297            let array = downcast_array::<Date64Array>(array);
298            Ok(ColReader::Date64(array))
299        }
300        (SqlScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
301            let array = downcast_array::<Time32SecondArray>(array);
302            Ok(ColReader::Time32Seconds(array))
303        }
304        (SqlScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
305            let array = downcast_array::<Time32MillisecondArray>(array);
306            Ok(ColReader::Time32Milliseconds(array))
307        }
308        (
309            SqlScalarType::List {
310                element_type,
311                custom_id: _,
312            },
313            DataType::List(_),
314        ) => {
315            let array = downcast_array::<ListArray>(array);
316            let inner_decoder =
317                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
318                    .context("list")?;
319            Ok(ColReader::List {
320                offsets: array.offsets().clone(),
321                values: Box::new(inner_decoder),
322                nulls: array.nulls().cloned(),
323            })
324        }
325        (
326            SqlScalarType::List {
327                element_type,
328                custom_id: _,
329            },
330            DataType::LargeList(_),
331        ) => {
332            let array = downcast_array::<LargeListArray>(array);
333            let inner_decoder =
334                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
335                    .context("large list")?;
336            Ok(ColReader::LargeList {
337                offsets: array.offsets().clone(),
338                values: Box::new(inner_decoder),
339                nulls: array.nulls().cloned(),
340            })
341        }
342        (
343            SqlScalarType::Record {
344                fields,
345                custom_id: _,
346            },
347            DataType::Struct(_),
348        ) => {
349            let record_array = downcast_array::<StructArray>(array);
350            let null_mask = record_array.nulls();
351
352            let mut decoders = Vec::with_capacity(fields.len());
353            for (name, typ) in fields.iter() {
354                let inner_array = record_array
355                    .column_by_name(name)
356                    .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
357                let inner_array = mask_nulls(inner_array, null_mask);
358                let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
359                    .context(name.clone())?;
360
361                decoders.push(Box::new(inner_decoder));
362            }
363
364            Ok(ColReader::Record {
365                fields: decoders,
366                nulls: null_mask.cloned(),
367            })
368        }
369        other => anyhow::bail!("unsupported: {other:?}"),
370    }
371}
372
373/// A "downcasted" version of [`arrow::array::Array`] that supports reading [`Datum`]s.
374///
375/// Note: While this is fairly verbose, one-time "downcasting" to an enum is _much_ more performant
376/// than downcasting every time we read a [`Datum`].
377enum ColReader {
378    Boolean(arrow::array::BooleanArray),
379
380    Int8 {
381        array: arrow::array::Int8Array,
382        cast: fn(i8) -> Datum<'static>,
383    },
384    Int16(arrow::array::Int16Array),
385    Int32(arrow::array::Int32Array),
386    Int64(arrow::array::Int64Array),
387
388    UInt8 {
389        array: arrow::array::UInt8Array,
390        cast: fn(u8) -> Datum<'static>,
391    },
392    UInt16(arrow::array::UInt16Array),
393    UInt32(arrow::array::UInt32Array),
394    UInt64(arrow::array::UInt64Array),
395
396    Float16 {
397        array: arrow::array::Float16Array,
398        cast: fn(half::f16) -> Datum<'static>,
399    },
400    Float32(arrow::array::Float32Array),
401    Float64(arrow::array::Float64Array),
402
403    Decimal128 {
404        array: Decimal128Array,
405        scale_factor: Numeric,
406        precision: usize,
407    },
408    Decimal256 {
409        array: Decimal256Array,
410        scale_factor: Numeric,
411        precision: usize,
412    },
413
414    Binary(arrow::array::BinaryArray),
415    LargeBinary(arrow::array::LargeBinaryArray),
416    FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
417    BinaryView(arrow::array::BinaryViewArray),
418    Uuid(Box<ColReader>),
419
420    String(arrow::array::StringArray),
421    LargeString(arrow::array::LargeStringArray),
422    StringView(arrow::array::StringViewArray),
423    Jsonb(Box<ColReader>),
424
425    TimestampSecond(arrow::array::TimestampSecondArray),
426    TimestampMillisecond(arrow::array::TimestampMillisecondArray),
427    TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
428    TimestampNanosecond(arrow::array::TimestampNanosecondArray),
429
430    Date32(Date32Array),
431    Date64(Date64Array),
432
433    Time32Seconds(Time32SecondArray),
434    Time32Milliseconds(arrow::array::Time32MillisecondArray),
435
436    List {
437        offsets: OffsetBuffer<i32>,
438        values: Box<ColReader>,
439        nulls: Option<NullBuffer>,
440    },
441    LargeList {
442        offsets: OffsetBuffer<i64>,
443        values: Box<ColReader>,
444        nulls: Option<NullBuffer>,
445    },
446
447    Record {
448        fields: Vec<Box<ColReader>>,
449        nulls: Option<NullBuffer>,
450    },
451}
452
453impl ColReader {
454    fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
455        let datum = match self {
456            ColReader::Boolean(array) => array
457                .is_valid(idx)
458                .then(|| array.value(idx))
459                .map(|x| if x { Datum::True } else { Datum::False }),
460            ColReader::Int8 { array, cast } => {
461                array.is_valid(idx).then(|| array.value(idx)).map(cast)
462            }
463            ColReader::Int16(array) => array
464                .is_valid(idx)
465                .then(|| array.value(idx))
466                .map(Datum::Int16),
467            ColReader::Int32(array) => array
468                .is_valid(idx)
469                .then(|| array.value(idx))
470                .map(Datum::Int32),
471            ColReader::Int64(array) => array
472                .is_valid(idx)
473                .then(|| array.value(idx))
474                .map(Datum::Int64),
475            ColReader::UInt8 { array, cast } => {
476                array.is_valid(idx).then(|| array.value(idx)).map(cast)
477            }
478            ColReader::UInt16(array) => array
479                .is_valid(idx)
480                .then(|| array.value(idx))
481                .map(Datum::UInt16),
482            ColReader::UInt32(array) => array
483                .is_valid(idx)
484                .then(|| array.value(idx))
485                .map(Datum::UInt32),
486            ColReader::UInt64(array) => array
487                .is_valid(idx)
488                .then(|| array.value(idx))
489                .map(Datum::UInt64),
490            ColReader::Float16 { array, cast } => {
491                array.is_valid(idx).then(|| array.value(idx)).map(cast)
492            }
493            ColReader::Float32(array) => array
494                .is_valid(idx)
495                .then(|| array.value(idx))
496                .map(|x| Datum::Float32(OrderedFloat(x))),
497            ColReader::Float64(array) => array
498                .is_valid(idx)
499                .then(|| array.value(idx))
500                .map(|x| Datum::Float64(OrderedFloat(x))),
501            ColReader::Decimal128 {
502                array,
503                scale_factor,
504                precision,
505            } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
506                // Create a Numeric from our i128 with precision.
507                let mut ctx = dec::Context::<Numeric>::default();
508                ctx.set_precision(*precision).expect("checked before");
509                let mut num = ctx.from_i128(x);
510
511                // Scale the number.
512                ctx.div(&mut num, scale_factor);
513
514                Datum::Numeric(OrderedDecimal(num))
515            }),
516            ColReader::Decimal256 {
517                array,
518                scale_factor,
519                precision,
520            } => array
521                .is_valid(idx)
522                .then(|| array.value(idx))
523                .map(|x| {
524                    let s = x.to_string();
525
526                    // Parse a i256 from it's String representation.
527                    //
528                    // TODO(cf3): See if we can add support for 256-bit numbers to the `dec` crate.
529                    let mut ctx = dec::Context::<Numeric>::default();
530                    ctx.set_precision(*precision).expect("checked before");
531                    let mut num = ctx
532                        .parse(s)
533                        .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
534
535                    // Scale the number.
536                    ctx.div(&mut num, scale_factor);
537
538                    Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
539                })
540                .transpose()?,
541            ColReader::Binary(array) => array
542                .is_valid(idx)
543                .then(|| array.value(idx))
544                .map(Datum::Bytes),
545            ColReader::LargeBinary(array) => array
546                .is_valid(idx)
547                .then(|| array.value(idx))
548                .map(Datum::Bytes),
549            ColReader::FixedSizeBinary(array) => array
550                .is_valid(idx)
551                .then(|| array.value(idx))
552                .map(Datum::Bytes),
553            ColReader::BinaryView(array) => array
554                .is_valid(idx)
555                .then(|| array.value(idx))
556                .map(Datum::Bytes),
557            ColReader::Uuid(reader) => {
558                // First read a binary value into a temp row, and later parse that as UUID into our
559                // actual Row Packer.
560                let mut temp_row = SharedRow::get();
561                reader.read(idx, &mut temp_row.packer()).context("uuid")?;
562                let slice = match temp_row.unpack_first() {
563                    Datum::Bytes(slice) => slice,
564                    Datum::Null => {
565                        packer.push(Datum::Null);
566                        return Ok(());
567                    }
568                    other => anyhow::bail!("expected String, found {other:?}"),
569                };
570
571                let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
572                Some(Datum::Uuid(uuid))
573            }
574            ColReader::String(array) => array
575                .is_valid(idx)
576                .then(|| array.value(idx))
577                .map(Datum::String),
578            ColReader::LargeString(array) => array
579                .is_valid(idx)
580                .then(|| array.value(idx))
581                .map(Datum::String),
582            ColReader::StringView(array) => array
583                .is_valid(idx)
584                .then(|| array.value(idx))
585                .map(Datum::String),
586            ColReader::Jsonb(reader) => {
587                // First read a string value into a temp row, and later parse that as JSON into our
588                // actual Row Packer.
589                let mut temp_row = SharedRow::get();
590                reader.read(idx, &mut temp_row.packer()).context("jsonb")?;
591                let value = match temp_row.unpack_first() {
592                    Datum::String(value) => value,
593                    Datum::Null => {
594                        packer.push(Datum::Null);
595                        return Ok(());
596                    }
597                    other => anyhow::bail!("expected String, found {other:?}"),
598                };
599
600                JsonbPacker::new(packer)
601                    .pack_str(value)
602                    .context("roundtrip json")?;
603
604                // Return early because we've already packed the necessasry Datums.
605                return Ok(());
606            }
607            ColReader::TimestampSecond(array) => array
608                .is_valid(idx)
609                .then(|| array.value(idx))
610                .map(|secs| {
611                    let dt = DateTime::from_timestamp(secs, 0)
612                        .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
613                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
614                        .context("TimestampSeconds")?;
615                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
616                })
617                .transpose()?,
618            ColReader::TimestampMillisecond(array) => array
619                .is_valid(idx)
620                .then(|| array.value(idx))
621                .map(|millis| {
622                    let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
623                        anyhow::anyhow!("invalid timestamp milliseconds {millis}")
624                    })?;
625                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
626                        .context("TimestampMillis")?;
627                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
628                })
629                .transpose()?,
630            ColReader::TimestampMicrosecond(array) => array
631                .is_valid(idx)
632                .then(|| array.value(idx))
633                .map(|micros| {
634                    let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
635                        anyhow::anyhow!("invalid timestamp microseconds {micros}")
636                    })?;
637                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
638                        .context("TimestampMicros")?;
639                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
640                })
641                .transpose()?,
642            ColReader::TimestampNanosecond(array) => array
643                .is_valid(idx)
644                .then(|| array.value(idx))
645                .map(|nanos| {
646                    let dt = DateTime::from_timestamp_nanos(nanos);
647                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
648                        .context("TimestampNanos")?;
649                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
650                })
651                .transpose()?,
652            ColReader::Date32(array) => array
653                .is_valid(idx)
654                .then(|| array.value(idx))
655                .map(|unix_days| {
656                    let date = Date::from_unix_epoch(unix_days).context("date32")?;
657                    Ok::<_, anyhow::Error>(Datum::Date(date))
658                })
659                .transpose()?,
660            ColReader::Date64(array) => array
661                .is_valid(idx)
662                .then(|| array.value(idx))
663                .map(|unix_millis| {
664                    let date = DateTime::from_timestamp_millis(unix_millis)
665                        .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
666                    let unix_epoch = DateTime::from_timestamp(0, 0)
667                        .expect("UNIX epoch")
668                        .date_naive();
669                    let delta = date.date_naive().signed_duration_since(unix_epoch);
670                    let days: i32 = delta.num_days().try_into().context("date64")?;
671                    let date = Date::from_unix_epoch(days).context("date64")?;
672                    Ok::<_, anyhow::Error>(Datum::Date(date))
673                })
674                .transpose()?,
675            ColReader::Time32Seconds(array) => array
676                .is_valid(idx)
677                .then(|| array.value(idx))
678                .map(|secs| {
679                    let usecs: u32 = secs.try_into().context("time32 seconds")?;
680                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
681                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
682                    Ok::<_, anyhow::Error>(Datum::Time(time))
683                })
684                .transpose()?,
685            ColReader::Time32Milliseconds(array) => array
686                .is_valid(idx)
687                .then(|| array.value(idx))
688                .map(|millis| {
689                    let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
690                    let usecs = umillis / 1000;
691                    let unanos = (umillis % 1000).saturating_mul(1_000_000);
692                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
693                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
694                    Ok::<_, anyhow::Error>(Datum::Time(time))
695                })
696                .transpose()?,
697            ColReader::List {
698                offsets,
699                values,
700                nulls,
701            } => {
702                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
703                if !is_valid {
704                    packer.push(Datum::Null);
705                    return Ok(());
706                }
707
708                let start: usize = offsets[idx].try_into().context("list start offset")?;
709                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
710
711                packer
712                    .push_list_with(|packer| {
713                        for idx in start..end {
714                            values.read(idx, packer)?;
715                        }
716                        Ok::<_, anyhow::Error>(())
717                    })
718                    .context("pack list")?;
719
720                // Return early because we've already packed the necessasry Datums.
721                return Ok(());
722            }
723            ColReader::LargeList {
724                offsets,
725                values,
726                nulls,
727            } => {
728                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
729                if !is_valid {
730                    packer.push(Datum::Null);
731                    return Ok(());
732                }
733
734                let start: usize = offsets[idx].try_into().context("list start offset")?;
735                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
736
737                packer
738                    .push_list_with(|packer| {
739                        for idx in start..end {
740                            values.read(idx, packer)?;
741                        }
742                        Ok::<_, anyhow::Error>(())
743                    })
744                    .context("pack list")?;
745
746                // Return early because we've already packed the necessasry Datums.
747                return Ok(());
748            }
749            ColReader::Record { fields, nulls } => {
750                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
751                if !is_valid {
752                    packer.push(Datum::Null);
753                    return Ok(());
754                }
755
756                packer
757                    .push_list_with(|packer| {
758                        for field in fields {
759                            field.read(idx, packer)?;
760                        }
761                        Ok::<_, anyhow::Error>(())
762                    })
763                    .context("pack record")?;
764
765                // Return early because we've already packed the necessasry Datums.
766                return Ok(());
767            }
768        };
769
770        match datum {
771            Some(d) => packer.push(d),
772            None => packer.push(Datum::Null),
773        }
774
775        Ok(())
776    }
777}
778
779#[cfg(test)]
780mod tests {
781    use arrow::datatypes::Field;
782    use mz_ore::collections::CollectionExt;
783
784    use super::*;
785
786    #[mz_ore::test]
787    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
788    fn smoketest_reader() {
789        let desc = RelationDesc::builder()
790            .with_column("bool", SqlScalarType::Bool.nullable(true))
791            .with_column("int4", SqlScalarType::Int32.nullable(true))
792            .with_column("uint8", SqlScalarType::UInt64.nullable(true))
793            .with_column("float32", SqlScalarType::Float32.nullable(true))
794            .with_column("string", SqlScalarType::String.nullable(true))
795            .with_column("bytes", SqlScalarType::Bytes.nullable(true))
796            .with_column("uuid", SqlScalarType::Uuid.nullable(true))
797            .with_column("json", SqlScalarType::Jsonb.nullable(true))
798            .with_column(
799                "list",
800                SqlScalarType::List {
801                    element_type: Box::new(SqlScalarType::UInt32),
802                    custom_id: None,
803                }
804                .nullable(true),
805            )
806            .finish();
807
808        let mut og_row = Row::default();
809        let mut packer = og_row.packer();
810
811        packer.extend([
812            Datum::True,
813            Datum::Int32(42),
814            Datum::UInt64(10000),
815            Datum::Float32(OrderedFloat::from(-1.1f32)),
816            Datum::String("hello world"),
817            Datum::Bytes(b"1010101"),
818            Datum::Uuid(uuid::Uuid::new_v4()),
819        ]);
820        JsonbPacker::new(&mut packer)
821            .pack_serde_json(
822                serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
823            )
824            .expect("failed to pack JSON");
825        packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
826
827        let null_row = Row::pack(vec![Datum::Null; 9]);
828
829        // Encode our data with our ArrowBuilder.
830        let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
831        builder.add_row(&og_row).unwrap();
832        builder.add_row(&null_row).unwrap();
833        let record_batch = builder.to_record_batch().unwrap();
834
835        // Decode our data!
836        let reader =
837            ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
838        let mut rnd_row = Row::default();
839
840        reader.read(0, &mut rnd_row).unwrap();
841        assert_eq!(&og_row, &rnd_row);
842
843        // Create a packer to clear the row alloc.
844        rnd_row.packer();
845
846        reader.read(1, &mut rnd_row).unwrap();
847        assert_eq!(&null_row, &rnd_row);
848    }
849
850    #[mz_ore::test]
851    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
852    fn smoketest_decimal128() {
853        let desc = RelationDesc::builder()
854            .with_column(
855                "a",
856                SqlScalarType::Numeric { max_scale: None }.nullable(true),
857            )
858            .finish();
859
860        let mut dec128 = arrow::array::Decimal128Builder::new();
861        dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
862
863        // 1.234
864        dec128.append_value(1234);
865        dec128.append_null();
866        // 100000000.009
867        dec128.append_value(100000000009);
868
869        let dec128 = dec128.finish();
870        #[allow(clippy::as_conversions)]
871        let batch = StructArray::from(vec![(
872            Arc::new(Field::new("a", dec128.data_type().clone(), true)),
873            Arc::new(dec128) as arrow::array::ArrayRef,
874        )]);
875
876        // Decode our data!
877        let reader = ArrowReader::new(&desc, batch).unwrap();
878        let mut rnd_row = Row::default();
879
880        reader.read(0, &mut rnd_row).unwrap();
881        let num = rnd_row.into_element().unwrap_numeric();
882        assert_eq!(num.0, Numeric::from(1.234f64));
883
884        // Create a packer to clear the row alloc.
885        rnd_row.packer();
886
887        reader.read(1, &mut rnd_row).unwrap();
888        let num = rnd_row.into_element();
889        assert_eq!(num, Datum::Null);
890
891        // Create a packer to clear the row alloc.
892        rnd_row.packer();
893
894        reader.read(2, &mut rnd_row).unwrap();
895        let num = rnd_row.into_element().unwrap_numeric();
896        assert_eq!(num.0, Numeric::from(100000000.009f64));
897    }
898
899    #[mz_ore::test]
900    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
901    fn smoketest_decimal256() {
902        let desc = RelationDesc::builder()
903            .with_column(
904                "a",
905                SqlScalarType::Numeric { max_scale: None }.nullable(true),
906            )
907            .finish();
908
909        let mut dec256 = arrow::array::Decimal256Builder::new();
910        dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
911
912        // 1.234
913        dec256.append_value(arrow::datatypes::i256::from(1234));
914        dec256.append_null();
915        // 100000000.009
916        dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
917
918        let dec256 = dec256.finish();
919        #[allow(clippy::as_conversions)]
920        let batch = StructArray::from(vec![(
921            Arc::new(Field::new("a", dec256.data_type().clone(), true)),
922            Arc::new(dec256) as arrow::array::ArrayRef,
923        )]);
924
925        // Decode our data!
926        let reader = ArrowReader::new(&desc, batch).unwrap();
927        let mut rnd_row = Row::default();
928
929        reader.read(0, &mut rnd_row).unwrap();
930        let num = rnd_row.into_element().unwrap_numeric();
931        assert_eq!(num.0, Numeric::from(1.234f64));
932
933        // Create a packer to clear the row alloc.
934        rnd_row.packer();
935
936        reader.read(1, &mut rnd_row).unwrap();
937        let num = rnd_row.into_element();
938        assert_eq!(num, Datum::Null);
939
940        // Create a packer to clear the row alloc.
941        rnd_row.packer();
942
943        reader.read(2, &mut rnd_row).unwrap();
944        let num = rnd_row.into_element().unwrap_numeric();
945        assert_eq!(num.0, Numeric::from(100000000.009f64));
946    }
947}