mz_arrow_util/
reader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reader for [`arrow`] data that outputs [`Row`]s.
11
12use std::sync::Arc;
13
14use anyhow::Context;
15use arrow::array::{
16    Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
17    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array,
18    Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
19    ListArray, StringArray, StringViewArray, StructArray, Time32MillisecondArray,
20    Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
22    UInt64Array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::{DataType, TimeUnit};
26use chrono::{DateTime, NaiveTime};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_repr::adt::date::Date;
30use mz_repr::adt::jsonb::JsonbPacker;
31use mz_repr::adt::numeric::Numeric;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, RelationDesc, Row, RowPacker, ScalarType, SharedRow};
34use ordered_float::OrderedFloat;
35use uuid::Uuid;
36
37use crate::mask_nulls;
38
39/// Type that can read out of an [`arrow::array::StructArray`] and into a [`Row`], given a
40/// [`RelationDesc`].
41///
42/// The inverse of a [`crate::builder::ArrowBuilder`].
43///
44/// Note: When creating an [`ArrowReader`] we perform a "one-time downcast" of the children Arrays
45/// from the [`StructArray`], into `enum ColReader`s. This is a much more verbose approach than the
46/// alternative of downcasting from a `dyn arrow::array::Array` every time we read a [`Row`], but
47/// it is _much_ more performant.
48pub struct ArrowReader {
49    len: usize,
50    readers: Vec<ColReader>,
51}
52
53impl ArrowReader {
54    /// Create an [`ArrowReader`] validating that the provided [`RelationDesc`] and [`StructArray`]
55    /// have a matching schema.
56    ///
57    /// The [`RelationDesc`] and [`StructArray`] need to uphold the following to be a valid pair:
58    ///
59    /// * Same number of columns.
60    /// * Columns of all the same name.
61    /// * Columns of compatible types.
62    ///
63    /// TODO(cf2): Relax some of these restrictions by allowing users to map column names, omit
64    /// columns, perform some lightweight casting, and matching not on column name but column
65    /// position.
66    /// TODO(cf2): Allow specifying an optional `arrow::Schema` for extra metadata.
67    pub fn new(desc: &RelationDesc, array: StructArray) -> Result<Self, anyhow::Error> {
68        let inner_columns = array.columns();
69        let desc_columns = desc.typ().columns();
70
71        if inner_columns.len() != desc_columns.len() {
72            return Err(anyhow::anyhow!(
73                "wrong number of columns {} vs {}",
74                inner_columns.len(),
75                desc_columns.len()
76            ));
77        }
78
79        let mut readers = Vec::with_capacity(desc_columns.len());
80        for (col_name, col_type) in desc.iter() {
81            let column = array
82                .column_by_name(col_name.as_str())
83                .ok_or_else(|| anyhow::anyhow!("'{col_name}' not found"))?;
84            let reader = scalar_type_and_array_to_reader(&col_type.scalar_type, Arc::clone(column))
85                .context(col_name.clone())?;
86
87            readers.push(reader);
88        }
89
90        Ok(ArrowReader {
91            len: array.len(),
92            readers,
93        })
94    }
95
96    /// Read the value at `idx` into the provided `Row`.
97    pub fn read(&self, idx: usize, row: &mut Row) -> Result<(), anyhow::Error> {
98        let mut packer = row.packer();
99        for reader in &self.readers {
100            reader.read(idx, &mut packer).context(idx)?;
101        }
102        Ok(())
103    }
104
105    /// Read all of the values in this [`ArrowReader`] into `rows`.
106    pub fn read_all(&self, rows: &mut Vec<Row>) -> Result<usize, anyhow::Error> {
107        for idx in 0..self.len {
108            let mut row = Row::default();
109            self.read(idx, &mut row).context(idx)?;
110            rows.push(row);
111        }
112        Ok(self.len)
113    }
114}
115
116fn scalar_type_and_array_to_reader(
117    scalar_type: &ScalarType,
118    array: Arc<dyn Array>,
119) -> Result<ColReader, anyhow::Error> {
120    fn downcast_array<T: arrow::array::Array + Clone + 'static>(array: Arc<dyn Array>) -> T {
121        array
122            .as_any()
123            .downcast_ref::<T>()
124            .expect("checked DataType")
125            .clone()
126    }
127
128    match (scalar_type, array.data_type()) {
129        (ScalarType::Bool, DataType::Boolean) => {
130            Ok(ColReader::Boolean(downcast_array::<BooleanArray>(array)))
131        }
132        (ScalarType::Int16 | ScalarType::Int32 | ScalarType::Int64, DataType::Int8) => {
133            let array = downcast_array::<Int8Array>(array);
134            let cast: fn(i8) -> Datum<'static> = match scalar_type {
135                ScalarType::Int16 => |x| Datum::Int16(i16::cast_from(x)),
136                ScalarType::Int32 => |x| Datum::Int32(i32::cast_from(x)),
137                ScalarType::Int64 => |x| Datum::Int64(i64::cast_from(x)),
138                _ => unreachable!("checked above"),
139            };
140            Ok(ColReader::Int8 { array, cast })
141        }
142        (ScalarType::Int16, DataType::Int16) => {
143            Ok(ColReader::Int16(downcast_array::<Int16Array>(array)))
144        }
145        (ScalarType::Int32, DataType::Int32) => {
146            Ok(ColReader::Int32(downcast_array::<Int32Array>(array)))
147        }
148        (ScalarType::Int64, DataType::Int64) => {
149            Ok(ColReader::Int64(downcast_array::<Int64Array>(array)))
150        }
151        (ScalarType::UInt16 | ScalarType::UInt32 | ScalarType::UInt64, DataType::UInt8) => {
152            let array = downcast_array::<UInt8Array>(array);
153            let cast: fn(u8) -> Datum<'static> = match scalar_type {
154                ScalarType::UInt16 => |x| Datum::UInt16(u16::cast_from(x)),
155                ScalarType::UInt32 => |x| Datum::UInt32(u32::cast_from(x)),
156                ScalarType::UInt64 => |x| Datum::UInt64(u64::cast_from(x)),
157                _ => unreachable!("checked above"),
158            };
159            Ok(ColReader::UInt8 { array, cast })
160        }
161        (ScalarType::UInt16, DataType::UInt16) => {
162            Ok(ColReader::UInt16(downcast_array::<UInt16Array>(array)))
163        }
164        (ScalarType::UInt32, DataType::UInt32) => {
165            Ok(ColReader::UInt32(downcast_array::<UInt32Array>(array)))
166        }
167        (ScalarType::UInt64, DataType::UInt64) => {
168            Ok(ColReader::UInt64(downcast_array::<UInt64Array>(array)))
169        }
170        (ScalarType::Float32 | ScalarType::Float64, DataType::Float16) => {
171            let array = downcast_array::<Float16Array>(array);
172            let cast: fn(half::f16) -> Datum<'static> = match scalar_type {
173                ScalarType::Float32 => |x| Datum::Float32(OrderedFloat::from(x.to_f32())),
174                ScalarType::Float64 => |x| Datum::Float64(OrderedFloat::from(x.to_f64())),
175                _ => unreachable!("checked above"),
176            };
177            Ok(ColReader::Float16 { array, cast })
178        }
179        (ScalarType::Float32, DataType::Float32) => {
180            Ok(ColReader::Float32(downcast_array::<Float32Array>(array)))
181        }
182        (ScalarType::Float64, DataType::Float64) => {
183            Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
184        }
185        // TODO(cf3): Consider the max_scale for numeric.
186        (ScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
187            use num_traits::Pow;
188
189            let base = Numeric::from(10);
190            let scale = Numeric::from(*scale);
191            let scale_factor = base.pow(scale);
192
193            let precision = usize::cast_from(*precision);
194            // Don't use the context here, but make sure the precision is valid.
195            let mut ctx = dec::Context::<Numeric>::default();
196            ctx.set_precision(precision).map_err(|e| {
197                anyhow::anyhow!("invalid precision from Decimal128, {precision}, {e}")
198            })?;
199
200            let array = downcast_array::<Decimal128Array>(array);
201
202            Ok(ColReader::Decimal128 {
203                array,
204                scale_factor,
205                precision,
206            })
207        }
208        // TODO(cf3): Consider the max_scale for numeric.
209        (ScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
210            use num_traits::Pow;
211
212            let base = Numeric::from(10);
213            let scale = Numeric::from(*scale);
214            let scale_factor = base.pow(scale);
215
216            let precision = usize::cast_from(*precision);
217            // Don't use the context here, but make sure the precision is valid.
218            let mut ctx = dec::Context::<Numeric>::default();
219            ctx.set_precision(precision).map_err(|e| {
220                anyhow::anyhow!("invalid precision from Decimal256, {precision}, {e}")
221            })?;
222
223            let array = downcast_array::<Decimal256Array>(array);
224
225            Ok(ColReader::Decimal256 {
226                array,
227                scale_factor,
228                precision,
229            })
230        }
231        (ScalarType::Bytes, DataType::Binary) => {
232            Ok(ColReader::Binary(downcast_array::<BinaryArray>(array)))
233        }
234        (ScalarType::Bytes, DataType::LargeBinary) => {
235            let array = downcast_array::<LargeBinaryArray>(array);
236            Ok(ColReader::LargeBinary(array))
237        }
238        (ScalarType::Bytes, DataType::FixedSizeBinary(_)) => {
239            let array = downcast_array::<FixedSizeBinaryArray>(array);
240            Ok(ColReader::FixedSizeBinary(array))
241        }
242        (ScalarType::Bytes, DataType::BinaryView) => {
243            let array = downcast_array::<BinaryViewArray>(array);
244            Ok(ColReader::BinaryView(array))
245        }
246        (
247            ScalarType::Uuid,
248            DataType::Binary
249            | DataType::BinaryView
250            | DataType::LargeBinary
251            | DataType::FixedSizeBinary(_),
252        ) => {
253            let reader = scalar_type_and_array_to_reader(&ScalarType::Bytes, array)
254                .context("uuid reader")?;
255            Ok(ColReader::Uuid(Box::new(reader)))
256        }
257        (ScalarType::String, DataType::Utf8) => {
258            Ok(ColReader::String(downcast_array::<StringArray>(array)))
259        }
260        (ScalarType::String, DataType::LargeUtf8) => {
261            let array = downcast_array::<LargeStringArray>(array);
262            Ok(ColReader::LargeString(array))
263        }
264        (ScalarType::String, DataType::Utf8View) => {
265            let array = downcast_array::<StringViewArray>(array);
266            Ok(ColReader::StringView(array))
267        }
268        (ScalarType::Jsonb, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View) => {
269            let reader = scalar_type_and_array_to_reader(&ScalarType::String, array)
270                .context("json reader")?;
271            Ok(ColReader::Jsonb(Box::new(reader)))
272        }
273        (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Second, None)) => {
274            let array = downcast_array::<TimestampSecondArray>(array);
275            Ok(ColReader::TimestampSecond(array))
276        }
277        (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Millisecond, None)) => {
278            let array = downcast_array::<TimestampMillisecondArray>(array);
279            Ok(ColReader::TimestampMillisecond(array))
280        }
281        (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Microsecond, None)) => {
282            let array = downcast_array::<TimestampMicrosecondArray>(array);
283            Ok(ColReader::TimestampMicrosecond(array))
284        }
285        (ScalarType::Timestamp { .. }, DataType::Timestamp(TimeUnit::Nanosecond, None)) => {
286            let array = downcast_array::<TimestampNanosecondArray>(array);
287            Ok(ColReader::TimestampNanosecond(array))
288        }
289        (ScalarType::Date, DataType::Date32) => {
290            let array = downcast_array::<Date32Array>(array);
291            Ok(ColReader::Date32(array))
292        }
293        (ScalarType::Date, DataType::Date64) => {
294            let array = downcast_array::<Date64Array>(array);
295            Ok(ColReader::Date64(array))
296        }
297        (ScalarType::Time, DataType::Time32(TimeUnit::Second)) => {
298            let array = downcast_array::<Time32SecondArray>(array);
299            Ok(ColReader::Time32Seconds(array))
300        }
301        (ScalarType::Time, DataType::Time32(TimeUnit::Millisecond)) => {
302            let array = downcast_array::<Time32MillisecondArray>(array);
303            Ok(ColReader::Time32Milliseconds(array))
304        }
305        (
306            ScalarType::List {
307                element_type,
308                custom_id: _,
309            },
310            DataType::List(_),
311        ) => {
312            let array = downcast_array::<ListArray>(array);
313            let inner_decoder =
314                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
315                    .context("list")?;
316            Ok(ColReader::List {
317                offsets: array.offsets().clone(),
318                values: Box::new(inner_decoder),
319                nulls: array.nulls().cloned(),
320            })
321        }
322        (
323            ScalarType::List {
324                element_type,
325                custom_id: _,
326            },
327            DataType::LargeList(_),
328        ) => {
329            let array = downcast_array::<LargeListArray>(array);
330            let inner_decoder =
331                scalar_type_and_array_to_reader(element_type, Arc::clone(array.values()))
332                    .context("large list")?;
333            Ok(ColReader::LargeList {
334                offsets: array.offsets().clone(),
335                values: Box::new(inner_decoder),
336                nulls: array.nulls().cloned(),
337            })
338        }
339        (
340            ScalarType::Record {
341                fields,
342                custom_id: _,
343            },
344            DataType::Struct(_),
345        ) => {
346            let record_array = downcast_array::<StructArray>(array);
347            let null_mask = record_array.nulls();
348
349            let mut decoders = Vec::with_capacity(fields.len());
350            for (name, typ) in fields.iter() {
351                let inner_array = record_array
352                    .column_by_name(name.as_str())
353                    .ok_or_else(|| anyhow::anyhow!("missing name '{name}'"))?;
354                let inner_array = mask_nulls(inner_array, null_mask);
355                let inner_decoder = scalar_type_and_array_to_reader(&typ.scalar_type, inner_array)
356                    .context(name.clone())?;
357
358                decoders.push(Box::new(inner_decoder));
359            }
360
361            Ok(ColReader::Record {
362                fields: decoders,
363                nulls: null_mask.cloned(),
364            })
365        }
366        other => anyhow::bail!("unsupported: {other:?}"),
367    }
368}
369
370/// A "downcasted" version of [`arrow::array::Array`] that supports reading [`Datum`]s.
371///
372/// Note: While this is fairly verbose, one-time "downcasting" to an enum is _much_ more performant
373/// than downcasting every time we read a [`Datum`].
374enum ColReader {
375    Boolean(arrow::array::BooleanArray),
376
377    Int8 {
378        array: arrow::array::Int8Array,
379        cast: fn(i8) -> Datum<'static>,
380    },
381    Int16(arrow::array::Int16Array),
382    Int32(arrow::array::Int32Array),
383    Int64(arrow::array::Int64Array),
384
385    UInt8 {
386        array: arrow::array::UInt8Array,
387        cast: fn(u8) -> Datum<'static>,
388    },
389    UInt16(arrow::array::UInt16Array),
390    UInt32(arrow::array::UInt32Array),
391    UInt64(arrow::array::UInt64Array),
392
393    Float16 {
394        array: arrow::array::Float16Array,
395        cast: fn(half::f16) -> Datum<'static>,
396    },
397    Float32(arrow::array::Float32Array),
398    Float64(arrow::array::Float64Array),
399
400    Decimal128 {
401        array: Decimal128Array,
402        scale_factor: Numeric,
403        precision: usize,
404    },
405    Decimal256 {
406        array: Decimal256Array,
407        scale_factor: Numeric,
408        precision: usize,
409    },
410
411    Binary(arrow::array::BinaryArray),
412    LargeBinary(arrow::array::LargeBinaryArray),
413    FixedSizeBinary(arrow::array::FixedSizeBinaryArray),
414    BinaryView(arrow::array::BinaryViewArray),
415    Uuid(Box<ColReader>),
416
417    String(arrow::array::StringArray),
418    LargeString(arrow::array::LargeStringArray),
419    StringView(arrow::array::StringViewArray),
420    Jsonb(Box<ColReader>),
421
422    TimestampSecond(arrow::array::TimestampSecondArray),
423    TimestampMillisecond(arrow::array::TimestampMillisecondArray),
424    TimestampMicrosecond(arrow::array::TimestampMicrosecondArray),
425    TimestampNanosecond(arrow::array::TimestampNanosecondArray),
426
427    Date32(Date32Array),
428    Date64(Date64Array),
429
430    Time32Seconds(Time32SecondArray),
431    Time32Milliseconds(arrow::array::Time32MillisecondArray),
432
433    List {
434        offsets: OffsetBuffer<i32>,
435        values: Box<ColReader>,
436        nulls: Option<NullBuffer>,
437    },
438    LargeList {
439        offsets: OffsetBuffer<i64>,
440        values: Box<ColReader>,
441        nulls: Option<NullBuffer>,
442    },
443
444    Record {
445        fields: Vec<Box<ColReader>>,
446        nulls: Option<NullBuffer>,
447    },
448}
449
450impl ColReader {
451    fn read(&self, idx: usize, packer: &mut RowPacker) -> Result<(), anyhow::Error> {
452        let datum = match self {
453            ColReader::Boolean(array) => array
454                .is_valid(idx)
455                .then(|| array.value(idx))
456                .map(|x| if x { Datum::True } else { Datum::False }),
457            ColReader::Int8 { array, cast } => {
458                array.is_valid(idx).then(|| array.value(idx)).map(cast)
459            }
460            ColReader::Int16(array) => array
461                .is_valid(idx)
462                .then(|| array.value(idx))
463                .map(Datum::Int16),
464            ColReader::Int32(array) => array
465                .is_valid(idx)
466                .then(|| array.value(idx))
467                .map(Datum::Int32),
468            ColReader::Int64(array) => array
469                .is_valid(idx)
470                .then(|| array.value(idx))
471                .map(Datum::Int64),
472            ColReader::UInt8 { array, cast } => {
473                array.is_valid(idx).then(|| array.value(idx)).map(cast)
474            }
475            ColReader::UInt16(array) => array
476                .is_valid(idx)
477                .then(|| array.value(idx))
478                .map(Datum::UInt16),
479            ColReader::UInt32(array) => array
480                .is_valid(idx)
481                .then(|| array.value(idx))
482                .map(Datum::UInt32),
483            ColReader::UInt64(array) => array
484                .is_valid(idx)
485                .then(|| array.value(idx))
486                .map(Datum::UInt64),
487            ColReader::Float16 { array, cast } => {
488                array.is_valid(idx).then(|| array.value(idx)).map(cast)
489            }
490            ColReader::Float32(array) => array
491                .is_valid(idx)
492                .then(|| array.value(idx))
493                .map(|x| Datum::Float32(OrderedFloat(x))),
494            ColReader::Float64(array) => array
495                .is_valid(idx)
496                .then(|| array.value(idx))
497                .map(|x| Datum::Float64(OrderedFloat(x))),
498            ColReader::Decimal128 {
499                array,
500                scale_factor,
501                precision,
502            } => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
503                // Create a Numeric from our i128 with precision.
504                let mut ctx = dec::Context::<Numeric>::default();
505                ctx.set_precision(*precision).expect("checked before");
506                let mut num = ctx.from_i128(x);
507
508                // Scale the number.
509                ctx.div(&mut num, scale_factor);
510
511                Datum::Numeric(OrderedDecimal(num))
512            }),
513            ColReader::Decimal256 {
514                array,
515                scale_factor,
516                precision,
517            } => array
518                .is_valid(idx)
519                .then(|| array.value(idx))
520                .map(|x| {
521                    let s = x.to_string();
522
523                    // Parse a i256 from it's String representation.
524                    //
525                    // TODO(cf3): See if we can add support for 256-bit numbers to the `dec` crate.
526                    let mut ctx = dec::Context::<Numeric>::default();
527                    ctx.set_precision(*precision).expect("checked before");
528                    let mut num = ctx
529                        .parse(s)
530                        .map_err(|e| anyhow::anyhow!("decimal out of range: {e}"))?;
531
532                    // Scale the number.
533                    ctx.div(&mut num, scale_factor);
534
535                    Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
536                })
537                .transpose()?,
538            ColReader::Binary(array) => array
539                .is_valid(idx)
540                .then(|| array.value(idx))
541                .map(Datum::Bytes),
542            ColReader::LargeBinary(array) => array
543                .is_valid(idx)
544                .then(|| array.value(idx))
545                .map(Datum::Bytes),
546            ColReader::FixedSizeBinary(array) => array
547                .is_valid(idx)
548                .then(|| array.value(idx))
549                .map(Datum::Bytes),
550            ColReader::BinaryView(array) => array
551                .is_valid(idx)
552                .then(|| array.value(idx))
553                .map(Datum::Bytes),
554            ColReader::Uuid(reader) => {
555                // First read a binary value into a temp row, and later parse that as UUID into our
556                // actual Row Packer.
557                let mut temp_row = SharedRow::get();
558                temp_row
559                    .pack_with(|temp_packer| reader.read(idx, temp_packer))
560                    .context("uuid")?;
561                let temp_row = temp_row.borrow();
562                let slice = match temp_row.unpack_first() {
563                    Datum::Bytes(slice) => slice,
564                    Datum::Null => {
565                        packer.push(Datum::Null);
566                        return Ok(());
567                    }
568                    other => anyhow::bail!("expected String, found {other:?}"),
569                };
570
571                let uuid = Uuid::from_slice(slice).context("parsing uuid")?;
572                Some(Datum::Uuid(uuid))
573            }
574            ColReader::String(array) => array
575                .is_valid(idx)
576                .then(|| array.value(idx))
577                .map(Datum::String),
578            ColReader::LargeString(array) => array
579                .is_valid(idx)
580                .then(|| array.value(idx))
581                .map(Datum::String),
582            ColReader::StringView(array) => array
583                .is_valid(idx)
584                .then(|| array.value(idx))
585                .map(Datum::String),
586            ColReader::Jsonb(reader) => {
587                // First read a string value into a temp row, and later parse that as JSON into our
588                // actual Row Packer.
589                let mut temp_row = SharedRow::get();
590                temp_row
591                    .pack_with(|temp_packer| reader.read(idx, temp_packer))
592                    .context("jsonb")?;
593                let temp_row = temp_row.borrow();
594                let value = match temp_row.unpack_first() {
595                    Datum::String(value) => value,
596                    Datum::Null => {
597                        packer.push(Datum::Null);
598                        return Ok(());
599                    }
600                    other => anyhow::bail!("expected String, found {other:?}"),
601                };
602
603                JsonbPacker::new(packer)
604                    .pack_str(value)
605                    .context("roundtrip json")?;
606
607                // Return early because we've already packed the necessasry Datums.
608                return Ok(());
609            }
610            ColReader::TimestampSecond(array) => array
611                .is_valid(idx)
612                .then(|| array.value(idx))
613                .map(|secs| {
614                    let dt = DateTime::from_timestamp(secs, 0)
615                        .ok_or_else(|| anyhow::anyhow!("invalid timestamp seconds {secs}"))?;
616                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
617                        .context("TimestampSeconds")?;
618                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
619                })
620                .transpose()?,
621            ColReader::TimestampMillisecond(array) => array
622                .is_valid(idx)
623                .then(|| array.value(idx))
624                .map(|millis| {
625                    let dt = DateTime::from_timestamp_millis(millis).ok_or_else(|| {
626                        anyhow::anyhow!("invalid timestamp milliseconds {millis}")
627                    })?;
628                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
629                        .context("TimestampMillis")?;
630                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
631                })
632                .transpose()?,
633            ColReader::TimestampMicrosecond(array) => array
634                .is_valid(idx)
635                .then(|| array.value(idx))
636                .map(|micros| {
637                    let dt = DateTime::from_timestamp_micros(micros).ok_or_else(|| {
638                        anyhow::anyhow!("invalid timestamp microseconds {micros}")
639                    })?;
640                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
641                        .context("TimestampMicros")?;
642                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
643                })
644                .transpose()?,
645            ColReader::TimestampNanosecond(array) => array
646                .is_valid(idx)
647                .then(|| array.value(idx))
648                .map(|nanos| {
649                    let dt = DateTime::from_timestamp_nanos(nanos);
650                    let dt = CheckedTimestamp::from_timestamplike(dt.naive_utc())
651                        .context("TimestampNanos")?;
652                    Ok::<_, anyhow::Error>(Datum::Timestamp(dt))
653                })
654                .transpose()?,
655            ColReader::Date32(array) => array
656                .is_valid(idx)
657                .then(|| array.value(idx))
658                .map(|unix_days| {
659                    let date = Date::from_unix_epoch(unix_days).context("date32")?;
660                    Ok::<_, anyhow::Error>(Datum::Date(date))
661                })
662                .transpose()?,
663            ColReader::Date64(array) => array
664                .is_valid(idx)
665                .then(|| array.value(idx))
666                .map(|unix_millis| {
667                    let date = DateTime::from_timestamp_millis(unix_millis)
668                        .ok_or_else(|| anyhow::anyhow!("invalid Date64 {unix_millis}"))?;
669                    let unix_epoch = DateTime::from_timestamp(0, 0)
670                        .expect("UNIX epoch")
671                        .date_naive();
672                    let delta = date.date_naive().signed_duration_since(unix_epoch);
673                    let days: i32 = delta.num_days().try_into().context("date64")?;
674                    let date = Date::from_unix_epoch(days).context("date64")?;
675                    Ok::<_, anyhow::Error>(Datum::Date(date))
676                })
677                .transpose()?,
678            ColReader::Time32Seconds(array) => array
679                .is_valid(idx)
680                .then(|| array.value(idx))
681                .map(|secs| {
682                    let usecs: u32 = secs.try_into().context("time32 seconds")?;
683                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, 0)
684                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Seconds {secs}"))?;
685                    Ok::<_, anyhow::Error>(Datum::Time(time))
686                })
687                .transpose()?,
688            ColReader::Time32Milliseconds(array) => array
689                .is_valid(idx)
690                .then(|| array.value(idx))
691                .map(|millis| {
692                    let umillis: u32 = millis.try_into().context("time32 milliseconds")?;
693                    let usecs = umillis / 1000;
694                    let unanos = (umillis % 1000).saturating_mul(1_000_000);
695                    let time = NaiveTime::from_num_seconds_from_midnight_opt(usecs, unanos)
696                        .ok_or_else(|| anyhow::anyhow!("invalid Time32 Milliseconds {umillis}"))?;
697                    Ok::<_, anyhow::Error>(Datum::Time(time))
698                })
699                .transpose()?,
700            ColReader::List {
701                offsets,
702                values,
703                nulls,
704            } => {
705                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
706                if !is_valid {
707                    packer.push(Datum::Null);
708                    return Ok(());
709                }
710
711                let start: usize = offsets[idx].try_into().context("list start offset")?;
712                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
713
714                packer
715                    .push_list_with(|packer| {
716                        for idx in start..end {
717                            values.read(idx, packer)?;
718                        }
719                        Ok::<_, anyhow::Error>(())
720                    })
721                    .context("pack list")?;
722
723                // Return early because we've already packed the necessasry Datums.
724                return Ok(());
725            }
726            ColReader::LargeList {
727                offsets,
728                values,
729                nulls,
730            } => {
731                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
732                if !is_valid {
733                    packer.push(Datum::Null);
734                    return Ok(());
735                }
736
737                let start: usize = offsets[idx].try_into().context("list start offset")?;
738                let end: usize = offsets[idx + 1].try_into().context("list end offset")?;
739
740                packer
741                    .push_list_with(|packer| {
742                        for idx in start..end {
743                            values.read(idx, packer)?;
744                        }
745                        Ok::<_, anyhow::Error>(())
746                    })
747                    .context("pack list")?;
748
749                // Return early because we've already packed the necessasry Datums.
750                return Ok(());
751            }
752            ColReader::Record { fields, nulls } => {
753                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
754                if !is_valid {
755                    packer.push(Datum::Null);
756                    return Ok(());
757                }
758
759                packer
760                    .push_list_with(|packer| {
761                        for field in fields {
762                            field.read(idx, packer)?;
763                        }
764                        Ok::<_, anyhow::Error>(())
765                    })
766                    .context("pack record")?;
767
768                // Return early because we've already packed the necessasry Datums.
769                return Ok(());
770            }
771        };
772
773        match datum {
774            Some(d) => packer.push(d),
775            None => packer.push(Datum::Null),
776        }
777
778        Ok(())
779    }
780}
781
782#[cfg(test)]
783mod tests {
784    use arrow::datatypes::Field;
785    use mz_ore::collections::CollectionExt;
786
787    use super::*;
788
789    #[mz_ore::test]
790    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
791    fn smoketest_reader() {
792        let desc = RelationDesc::builder()
793            .with_column("bool", ScalarType::Bool.nullable(true))
794            .with_column("int4", ScalarType::Int32.nullable(true))
795            .with_column("uint8", ScalarType::UInt64.nullable(true))
796            .with_column("float32", ScalarType::Float32.nullable(true))
797            .with_column("string", ScalarType::String.nullable(true))
798            .with_column("bytes", ScalarType::Bytes.nullable(true))
799            .with_column("uuid", ScalarType::Uuid.nullable(true))
800            .with_column("json", ScalarType::Jsonb.nullable(true))
801            .with_column(
802                "list",
803                ScalarType::List {
804                    element_type: Box::new(ScalarType::UInt32),
805                    custom_id: None,
806                }
807                .nullable(true),
808            )
809            .finish();
810
811        let mut og_row = Row::default();
812        let mut packer = og_row.packer();
813
814        packer.extend([
815            Datum::True,
816            Datum::Int32(42),
817            Datum::UInt64(10000),
818            Datum::Float32(OrderedFloat::from(-1.1f32)),
819            Datum::String("hello world"),
820            Datum::Bytes(b"1010101"),
821            Datum::Uuid(uuid::Uuid::new_v4()),
822        ]);
823        JsonbPacker::new(&mut packer)
824            .pack_serde_json(
825                serde_json::json!({"code": 200, "email": "space_monkey@materialize.com"}),
826            )
827            .expect("failed to pack JSON");
828        packer.push_list([Datum::UInt32(200), Datum::UInt32(300)]);
829
830        let null_row = Row::pack(vec![Datum::Null; 9]);
831
832        // Encode our data with our ArrowBuilder.
833        let mut builder = crate::builder::ArrowBuilder::new(&desc, 2, 46).unwrap();
834        builder.add_row(&og_row).unwrap();
835        builder.add_row(&null_row).unwrap();
836        let record_batch = builder.to_record_batch().unwrap();
837
838        // Decode our data!
839        let reader =
840            ArrowReader::new(&desc, arrow::array::StructArray::from(record_batch)).unwrap();
841        let mut rnd_row = Row::default();
842
843        reader.read(0, &mut rnd_row).unwrap();
844        assert_eq!(&og_row, &rnd_row);
845
846        // Create a packer to clear the row alloc.
847        rnd_row.packer();
848
849        reader.read(1, &mut rnd_row).unwrap();
850        assert_eq!(&null_row, &rnd_row);
851    }
852
853    #[mz_ore::test]
854    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
855    fn smoketest_decimal128() {
856        let desc = RelationDesc::builder()
857            .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
858            .finish();
859
860        let mut dec128 = arrow::array::Decimal128Builder::new();
861        dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
862
863        // 1.234
864        dec128.append_value(1234);
865        dec128.append_null();
866        // 100000000.009
867        dec128.append_value(100000000009);
868
869        let dec128 = dec128.finish();
870        #[allow(clippy::as_conversions)]
871        let batch = StructArray::from(vec![(
872            Arc::new(Field::new("a", dec128.data_type().clone(), true)),
873            Arc::new(dec128) as arrow::array::ArrayRef,
874        )]);
875
876        // Decode our data!
877        let reader = ArrowReader::new(&desc, batch).unwrap();
878        let mut rnd_row = Row::default();
879
880        reader.read(0, &mut rnd_row).unwrap();
881        let num = rnd_row.into_element().unwrap_numeric();
882        assert_eq!(num.0, Numeric::from(1.234f64));
883
884        // Create a packer to clear the row alloc.
885        rnd_row.packer();
886
887        reader.read(1, &mut rnd_row).unwrap();
888        let num = rnd_row.into_element();
889        assert_eq!(num, Datum::Null);
890
891        // Create a packer to clear the row alloc.
892        rnd_row.packer();
893
894        reader.read(2, &mut rnd_row).unwrap();
895        let num = rnd_row.into_element().unwrap_numeric();
896        assert_eq!(num.0, Numeric::from(100000000.009f64));
897    }
898
899    #[mz_ore::test]
900    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
901    fn smoketest_decimal256() {
902        let desc = RelationDesc::builder()
903            .with_column("a", ScalarType::Numeric { max_scale: None }.nullable(true))
904            .finish();
905
906        let mut dec256 = arrow::array::Decimal256Builder::new();
907        dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
908
909        // 1.234
910        dec256.append_value(arrow::datatypes::i256::from(1234));
911        dec256.append_null();
912        // 100000000.009
913        dec256.append_value(arrow::datatypes::i256::from(100000000009i64));
914
915        let dec256 = dec256.finish();
916        #[allow(clippy::as_conversions)]
917        let batch = StructArray::from(vec![(
918            Arc::new(Field::new("a", dec256.data_type().clone(), true)),
919            Arc::new(dec256) as arrow::array::ArrayRef,
920        )]);
921
922        // Decode our data!
923        let reader = ArrowReader::new(&desc, batch).unwrap();
924        let mut rnd_row = Row::default();
925
926        reader.read(0, &mut rnd_row).unwrap();
927        let num = rnd_row.into_element().unwrap_numeric();
928        assert_eq!(num.0, Numeric::from(1.234f64));
929
930        // Create a packer to clear the row alloc.
931        rnd_row.packer();
932
933        reader.read(1, &mut rnd_row).unwrap();
934        let num = rnd_row.into_element();
935        assert_eq!(num, Datum::Null);
936
937        // Create a packer to clear the row alloc.
938        rnd_row.packer();
939
940        reader.read(2, &mut rnd_row).unwrap();
941        let num = rnd_row.into_element().unwrap_numeric();
942        assert_eq!(num.0, Numeric::from(100000000.009f64));
943    }
944}