Skip to main content

mz_repr/row/
encode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A permanent storage encoding for rows.
11//!
12//! See row.proto for details.
13
14use std::fmt::Debug;
15use std::ops::AddAssign;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use arrow::array::{
20    Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
21    BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, Float32Builder,
22    Float64Array, Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array,
23    Int64Builder, ListArray, ListBuilder, MapArray, StringArray, StringBuilder, StructArray,
24    UInt8Array, UInt8Builder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array,
25    UInt64Builder, make_array,
26};
27use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
28use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
29use bytes::{BufMut, Bytes};
30use chrono::Timelike;
31use dec::{Context, Decimal, OrderedDecimal};
32use itertools::{EitherOrBoth, Itertools};
33use mz_ore::assert_none;
34use mz_ore::cast::CastFrom;
35use mz_persist_types::Codec;
36use mz_persist_types::arrow::ArrayOrd;
37use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema};
38use mz_persist_types::stats::{
39    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, FixedSizeBytesStatsKind,
40    OptionStats, PrimitiveStats, StructStats,
41};
42use mz_proto::chrono::ProtoNaiveTime;
43use mz_proto::{ProtoType, RustType, TryFromProtoError};
44use prost::Message;
45use uuid::Uuid;
46
47use crate::adt::array::{ArrayDimension, PackedArrayDimension};
48use crate::adt::date::Date;
49use crate::adt::datetime::PackedNaiveTime;
50use crate::adt::interval::PackedInterval;
51use crate::adt::jsonb::{JsonbPacker, JsonbRef};
52use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
53use crate::adt::numeric::{Numeric, PackedNumeric};
54use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
55use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
56use crate::row::proto_datum::DatumType;
57use crate::row::{
58    ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
59    ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
60};
61use crate::stats::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
62use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, SqlScalarType, Timestamp};
63
64// TODO(parkmycar): Benchmark the difference between `FixedSizeBinaryArray` and `BinaryArray`.
65//
66// `FixedSizeBinaryArray`s push empty bytes when a value is null which for larger binary types
67// could result in poor performance.
68#[allow(clippy::as_conversions)]
69mod fixed_binary_sizes {
70    use super::*;
71
72    pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
73    pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
74    pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
75    pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
76    pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
77    pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
78
79    pub const UUID_FIXED_BYTES: i32 = 16;
80    static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
81}
82use fixed_binary_sizes::*;
83
84/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match:
85/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this
86/// is true for many types.
87pub fn preserves_order(scalar_type: &SqlScalarType) -> bool {
88    match scalar_type {
89        // These types have short, fixed-length encodings that are designed to sort identically.
90        SqlScalarType::Bool
91        | SqlScalarType::Int16
92        | SqlScalarType::Int32
93        | SqlScalarType::Int64
94        | SqlScalarType::UInt16
95        | SqlScalarType::UInt32
96        | SqlScalarType::UInt64
97        | SqlScalarType::Date
98        | SqlScalarType::Time
99        | SqlScalarType::Timestamp { .. }
100        | SqlScalarType::TimestampTz { .. }
101        | SqlScalarType::Interval
102        | SqlScalarType::Bytes
103        | SqlScalarType::String
104        | SqlScalarType::Uuid
105        | SqlScalarType::MzTimestamp
106        | SqlScalarType::MzAclItem
107        | SqlScalarType::AclItem => true,
108        // We sort records lexicographically; a record has a meaningful sort if all its fields do.
109        SqlScalarType::Record { fields, .. } => fields
110            .iter()
111            .all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
112        // Our floating-point encoding preserves order generally, but differs when comparing
113        // -0 and 0. Opt these out for now.
114        SqlScalarType::Float32 | SqlScalarType::Float64 => false,
115        // Numeric is sensitive to similar ordering issues as floating point numbers, and requires
116        // some special handling we don't have yet.
117        SqlScalarType::Numeric { .. } => false,
118        // For all other types: either the encoding is known to not preserve ordering, or we
119        // don't yet care to make strong guarantees one way or the other.
120        SqlScalarType::PgLegacyChar
121        | SqlScalarType::PgLegacyName
122        | SqlScalarType::Char { .. }
123        | SqlScalarType::VarChar { .. }
124        | SqlScalarType::Jsonb
125        | SqlScalarType::Array(_)
126        | SqlScalarType::List { .. }
127        | SqlScalarType::Oid
128        | SqlScalarType::Map { .. }
129        | SqlScalarType::RegProc
130        | SqlScalarType::RegType
131        | SqlScalarType::RegClass
132        | SqlScalarType::Int2Vector
133        | SqlScalarType::Range { .. } => false,
134    }
135}
136
137/// An encoder for a column of [`Datum`]s.
138#[derive(Debug)]
139struct DatumEncoder {
140    nullable: bool,
141    encoder: DatumColumnEncoder,
142}
143
144impl DatumEncoder {
145    fn goodbytes(&self) -> usize {
146        self.encoder.goodbytes()
147    }
148
149    fn push(&mut self, datum: Datum) {
150        assert!(
151            !datum.is_null() || self.nullable,
152            "tried pushing Null into non-nullable column"
153        );
154        self.encoder.push(datum);
155    }
156
157    fn push_invalid(&mut self) {
158        self.encoder.push_invalid();
159    }
160
161    fn finish(self) -> ArrayRef {
162        self.encoder.finish()
163    }
164}
165
166/// An encoder for a single column of [`Datum`]s. To encode an entire row see
167/// [`RowColumnarEncoder`].
168///
169/// Note: We specifically structure the encoder as an enum instead of using trait objects because
170/// Datum encoding is an extremely hot path and downcasting objects is relatively slow.
171#[derive(Debug)]
172enum DatumColumnEncoder {
173    Bool(BooleanBuilder),
174    U8(UInt8Builder),
175    U16(UInt16Builder),
176    U32(UInt32Builder),
177    U64(UInt64Builder),
178    I16(Int16Builder),
179    I32(Int32Builder),
180    I64(Int64Builder),
181    F32(Float32Builder),
182    F64(Float64Builder),
183    Numeric {
184        /// The raw bytes so we can losslessly roundtrip Numerics.
185        binary_values: BinaryBuilder,
186        /// Also maintain a float64 approximation for sorting.
187        approx_values: Float64Builder,
188        /// Re-usable `libdecimal` context for conversions.
189        numeric_context: Context<Numeric>,
190    },
191    Bytes(BinaryBuilder),
192    String(StringBuilder),
193    Date(Int32Builder),
194    Time(FixedSizeBinaryBuilder),
195    Timestamp(FixedSizeBinaryBuilder),
196    TimestampTz(FixedSizeBinaryBuilder),
197    MzTimestamp(UInt64Builder),
198    Interval(FixedSizeBinaryBuilder),
199    Uuid(FixedSizeBinaryBuilder),
200    AclItem(FixedSizeBinaryBuilder),
201    MzAclItem(BinaryBuilder),
202    Range(BinaryBuilder),
203    /// Hand rolled "StringBuilder" that reduces the number of copies required
204    /// to serialize JSON.
205    ///
206    /// An alternative would be to use [`StringBuilder`] but that requires
207    /// serializing to an intermediary string, and then copying that
208    /// intermediary string into an underlying buffer.
209    Jsonb {
210        /// Monotonically increasing offsets of each encoded segment.
211        offsets: Vec<i32>,
212        /// Buffer that contains UTF-8 encoded JSON.
213        buf: Vec<u8>,
214        /// Null entries, if any.
215        nulls: Option<BooleanBufferBuilder>,
216    },
217    Array {
218        /// Binary encoded `ArrayDimension`s.
219        dims: ListBuilder<FixedSizeBinaryBuilder>,
220        /// Lengths of each `Array` in this column.
221        val_lengths: Vec<usize>,
222        /// Contiguous array of underlying data.
223        vals: Box<DatumColumnEncoder>,
224        /// Null entires, if any.
225        nulls: Option<BooleanBufferBuilder>,
226    },
227    List {
228        /// Lengths of each `List` in this column.
229        lengths: Vec<usize>,
230        /// Contiguous array of underlying data.
231        values: Box<DatumColumnEncoder>,
232        /// Null entires, if any.
233        nulls: Option<BooleanBufferBuilder>,
234    },
235    Map {
236        /// Lengths of each `Map` in this column
237        lengths: Vec<usize>,
238        /// Contiguous array of key data.
239        keys: StringBuilder,
240        /// Contiguous array of val data.
241        vals: Box<DatumColumnEncoder>,
242        /// Null entires, if any.
243        nulls: Option<BooleanBufferBuilder>,
244    },
245    Record {
246        /// Columns in the record.
247        fields: Vec<DatumEncoder>,
248        /// Null entries, if any.
249        nulls: Option<BooleanBufferBuilder>,
250        /// Number of values we've pushed into this builder thus far.
251        length: usize,
252    },
253    /// Special encoder for a [`SqlScalarType::Record`] that has no inner fields.
254    ///
255    /// We have a special case for this scenario because Arrow does not allow a
256    /// [`StructArray`] (what normally use to encod a `Record`) with no fields.
257    RecordEmpty(BooleanBuilder),
258}
259
260impl DatumColumnEncoder {
261    fn goodbytes(&self) -> usize {
262        match self {
263            DatumColumnEncoder::Bool(a) => a.len(),
264            DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
265            DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
266            DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
267            DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
268            DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
269            DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
270            DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
271            DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
272            DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
273            DatumColumnEncoder::Numeric {
274                binary_values,
275                approx_values,
276                ..
277            } => {
278                binary_values.values_slice().len()
279                    + approx_values.values_slice().to_byte_slice().len()
280            }
281            DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
282            DatumColumnEncoder::String(a) => a.values_slice().len(),
283            DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
284            DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
285            DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
286            DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
287            DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
288            DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
289            DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
290            DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
291            DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
292            DatumColumnEncoder::Range(a) => a.values_slice().len(),
293            DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
294            DatumColumnEncoder::Array { dims, vals, .. } => {
295                dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
296            }
297            DatumColumnEncoder::List { values, .. } => values.goodbytes(),
298            DatumColumnEncoder::Map { keys, vals, .. } => {
299                keys.values_slice().len() + vals.goodbytes()
300            }
301            DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
302            DatumColumnEncoder::RecordEmpty(a) => a.len(),
303        }
304    }
305
306    fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
307        match (self, datum) {
308            (DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
309                bool_builder.append_value(true)
310            }
311            (DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
312                bool_builder.append_value(false)
313            }
314            (DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
315            (DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
316            (DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
317            (DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
318            (DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
319            (DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
320            (DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
321            (DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
322            (DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
323            (
324                DatumColumnEncoder::Numeric {
325                    approx_values,
326                    binary_values,
327                    numeric_context,
328                },
329                Datum::Numeric(val),
330            ) => {
331                let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
332                    numeric_context.clear_status();
333                    if val.0.is_negative() {
334                        f64::NEG_INFINITY
335                    } else {
336                        f64::INFINITY
337                    }
338                });
339                let packed = PackedNumeric::from_value(val.0);
340
341                approx_values.append_value(float_approx);
342                binary_values.append_value(packed.as_bytes());
343            }
344            (DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
345            (DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
346            (DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
347                builder.append_value(val.pg_epoch_days())
348            }
349            (DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
350                let packed = PackedNaiveTime::from_value(val);
351                builder
352                    .append_value(packed.as_bytes())
353                    .expect("known correct size");
354            }
355            (DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
356                let packed = PackedNaiveDateTime::from_value(val.to_naive());
357                builder
358                    .append_value(packed.as_bytes())
359                    .expect("known correct size");
360            }
361            (DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
362                let packed = PackedNaiveDateTime::from_value(val.to_naive());
363                builder
364                    .append_value(packed.as_bytes())
365                    .expect("known correct size");
366            }
367            (DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
368                builder.append_value(val.into());
369            }
370            (DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
371                let packed = PackedInterval::from_value(val);
372                builder
373                    .append_value(packed.as_bytes())
374                    .expect("known correct size");
375            }
376            (DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
377                .append_value(val.as_bytes())
378                .expect("known correct size"),
379            (DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
380                let packed = PackedAclItem::from_value(val);
381                builder
382                    .append_value(packed.as_bytes())
383                    .expect("known correct size");
384            }
385            (DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
386                let packed = PackedMzAclItem::from_value(val);
387                builder.append_value(packed.as_bytes());
388            }
389            (DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
390                let proto = ProtoDatum::from(d);
391                let bytes = proto.encode_to_vec();
392                builder.append_value(&bytes);
393            }
394            (
395                DatumColumnEncoder::Jsonb {
396                    offsets,
397                    buf,
398                    nulls,
399                },
400                d @ Datum::JsonNull
401                | d @ Datum::True
402                | d @ Datum::False
403                | d @ Datum::Numeric(_)
404                | d @ Datum::String(_)
405                | d @ Datum::List(_)
406                | d @ Datum::Map(_),
407            ) => {
408                // TODO(parkmycar): Why do we need to re-borrow here?
409                let mut buf = buf;
410                let json = JsonbRef::from_datum(d);
411
412                // Serialize our JSON.
413                json.to_writer(&mut buf)
414                    .expect("failed to serialize Datum to jsonb");
415                let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
416                offsets.push(offset);
417
418                if let Some(nulls) = nulls {
419                    nulls.append(true);
420                }
421            }
422            (
423                DatumColumnEncoder::Array {
424                    dims,
425                    val_lengths,
426                    vals,
427                    nulls,
428                },
429                Datum::Array(array),
430            ) => {
431                // Store our array dimensions.
432                for dimension in array.dims() {
433                    let packed = PackedArrayDimension::from_value(dimension);
434                    dims.values()
435                        .append_value(packed.as_bytes())
436                        .expect("known correct size");
437                }
438                dims.append(true);
439
440                // Store the values of the array.
441                let mut count = 0;
442                for datum in array.elements() {
443                    count += 1;
444                    vals.push(datum);
445                }
446                val_lengths.push(count);
447
448                if let Some(nulls) = nulls {
449                    nulls.append(true);
450                }
451            }
452            (
453                DatumColumnEncoder::List {
454                    lengths,
455                    values,
456                    nulls,
457                },
458                Datum::List(list),
459            ) => {
460                let mut count = 0;
461                for datum in list {
462                    count += 1;
463                    values.push(datum);
464                }
465                lengths.push(count);
466
467                if let Some(nulls) = nulls {
468                    nulls.append(true);
469                }
470            }
471            (
472                DatumColumnEncoder::Map {
473                    lengths,
474                    keys,
475                    vals,
476                    nulls,
477                },
478                Datum::Map(map),
479            ) => {
480                let mut count = 0;
481                for (key, datum) in &map {
482                    count += 1;
483                    keys.append_value(key);
484                    vals.push(datum);
485                }
486                lengths.push(count);
487
488                if let Some(nulls) = nulls {
489                    nulls.append(true);
490                }
491            }
492            (
493                DatumColumnEncoder::Record {
494                    fields,
495                    nulls,
496                    length,
497                },
498                Datum::List(records),
499            ) => {
500                let mut count = 0;
501                // `zip_eq` will panic if the number of records != number of fields.
502                for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
503                    count += 1;
504                    encoder.push(datum);
505                }
506                assert_eq!(count, fields.len());
507
508                length.add_assign(1);
509                if let Some(nulls) = nulls.as_mut() {
510                    nulls.append(true);
511                }
512            }
513            (DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
514                assert_none!(records.into_iter().next());
515                builder.append_value(true);
516            }
517            (encoder, Datum::Null) => encoder.push_invalid(),
518            (encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
519        }
520    }
521
522    fn push_invalid(&mut self) {
523        match self {
524            DatumColumnEncoder::Bool(builder) => builder.append_null(),
525            DatumColumnEncoder::U8(builder) => builder.append_null(),
526            DatumColumnEncoder::U16(builder) => builder.append_null(),
527            DatumColumnEncoder::U32(builder) => builder.append_null(),
528            DatumColumnEncoder::U64(builder) => builder.append_null(),
529            DatumColumnEncoder::I16(builder) => builder.append_null(),
530            DatumColumnEncoder::I32(builder) => builder.append_null(),
531            DatumColumnEncoder::I64(builder) => builder.append_null(),
532            DatumColumnEncoder::F32(builder) => builder.append_null(),
533            DatumColumnEncoder::F64(builder) => builder.append_null(),
534            DatumColumnEncoder::Numeric {
535                approx_values,
536                binary_values,
537                numeric_context: _,
538            } => {
539                approx_values.append_null();
540                binary_values.append_null();
541            }
542            DatumColumnEncoder::String(builder) => builder.append_null(),
543            DatumColumnEncoder::Bytes(builder) => builder.append_null(),
544            DatumColumnEncoder::Date(builder) => builder.append_null(),
545            DatumColumnEncoder::Time(builder) => builder.append_null(),
546            DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
547            DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
548            DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
549            DatumColumnEncoder::Interval(builder) => builder.append_null(),
550            DatumColumnEncoder::Uuid(builder) => builder.append_null(),
551            DatumColumnEncoder::AclItem(builder) => builder.append_null(),
552            DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
553            DatumColumnEncoder::Range(builder) => builder.append_null(),
554            DatumColumnEncoder::Jsonb {
555                offsets,
556                buf: _,
557                nulls,
558            } => {
559                let nulls = nulls.get_or_insert_with(|| {
560                    let mut buf = BooleanBufferBuilder::new(offsets.len());
561                    // The offsets buffer has one more value than there are elements.
562                    buf.append_n(offsets.len() - 1, true);
563                    buf
564                });
565
566                offsets.push(offsets.last().copied().unwrap_or(0));
567                nulls.append(false);
568            }
569            DatumColumnEncoder::Array {
570                dims,
571                val_lengths,
572                vals: _,
573                nulls,
574            } => {
575                let nulls = nulls.get_or_insert_with(|| {
576                    let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
577                    buf.append_n(dims.len(), true);
578                    buf
579                });
580                dims.append_null();
581
582                val_lengths.push(0);
583                nulls.append(false);
584            }
585            DatumColumnEncoder::List {
586                lengths,
587                values: _,
588                nulls,
589            } => {
590                let nulls = nulls.get_or_insert_with(|| {
591                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
592                    buf.append_n(lengths.len(), true);
593                    buf
594                });
595
596                lengths.push(0);
597                nulls.append(false);
598            }
599            DatumColumnEncoder::Map {
600                lengths,
601                keys: _,
602                vals: _,
603                nulls,
604            } => {
605                let nulls = nulls.get_or_insert_with(|| {
606                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
607                    buf.append_n(lengths.len(), true);
608                    buf
609                });
610
611                lengths.push(0);
612                nulls.append(false);
613            }
614            DatumColumnEncoder::Record {
615                fields,
616                nulls,
617                length,
618            } => {
619                let nulls = nulls.get_or_insert_with(|| {
620                    let mut buf = BooleanBufferBuilder::new(*length + 1);
621                    buf.append_n(*length, true);
622                    buf
623                });
624                nulls.append(false);
625                length.add_assign(1);
626
627                for field in fields {
628                    field.push_invalid();
629                }
630            }
631            DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
632        }
633    }
634
635    fn finish(self) -> ArrayRef {
636        match self {
637            DatumColumnEncoder::Bool(mut builder) => {
638                let array = builder.finish();
639                Arc::new(array)
640            }
641            DatumColumnEncoder::U8(mut builder) => {
642                let array = builder.finish();
643                Arc::new(array)
644            }
645            DatumColumnEncoder::U16(mut builder) => {
646                let array = builder.finish();
647                Arc::new(array)
648            }
649            DatumColumnEncoder::U32(mut builder) => {
650                let array = builder.finish();
651                Arc::new(array)
652            }
653            DatumColumnEncoder::U64(mut builder) => {
654                let array = builder.finish();
655                Arc::new(array)
656            }
657            DatumColumnEncoder::I16(mut builder) => {
658                let array = builder.finish();
659                Arc::new(array)
660            }
661            DatumColumnEncoder::I32(mut builder) => {
662                let array = builder.finish();
663                Arc::new(array)
664            }
665            DatumColumnEncoder::I64(mut builder) => {
666                let array = builder.finish();
667                Arc::new(array)
668            }
669            DatumColumnEncoder::F32(mut builder) => {
670                let array = builder.finish();
671                Arc::new(array)
672            }
673            DatumColumnEncoder::F64(mut builder) => {
674                let array = builder.finish();
675                Arc::new(array)
676            }
677            DatumColumnEncoder::Numeric {
678                mut approx_values,
679                mut binary_values,
680                numeric_context: _,
681            } => {
682                let approx_array = approx_values.finish();
683                let binary_array = binary_values.finish();
684
685                assert_eq!(approx_array.len(), binary_array.len());
686                // This is O(n) so we only enable it for debug assertions.
687                debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
688
689                let fields = Fields::from(vec![
690                    Field::new("approx", approx_array.data_type().clone(), true),
691                    Field::new("binary", binary_array.data_type().clone(), true),
692                ]);
693                let nulls = approx_array.logical_nulls();
694                let array = StructArray::new(
695                    fields,
696                    vec![Arc::new(approx_array), Arc::new(binary_array)],
697                    nulls,
698                );
699                Arc::new(array)
700            }
701            DatumColumnEncoder::String(mut builder) => {
702                let array = builder.finish();
703                Arc::new(array)
704            }
705            DatumColumnEncoder::Bytes(mut builder) => {
706                let array = builder.finish();
707                Arc::new(array)
708            }
709            DatumColumnEncoder::Date(mut builder) => {
710                let array = builder.finish();
711                Arc::new(array)
712            }
713            DatumColumnEncoder::Time(mut builder) => {
714                let array = builder.finish();
715                Arc::new(array)
716            }
717            DatumColumnEncoder::Timestamp(mut builder) => {
718                let array = builder.finish();
719                Arc::new(array)
720            }
721            DatumColumnEncoder::TimestampTz(mut builder) => {
722                let array = builder.finish();
723                Arc::new(array)
724            }
725            DatumColumnEncoder::MzTimestamp(mut builder) => {
726                let array = builder.finish();
727                Arc::new(array)
728            }
729            DatumColumnEncoder::Interval(mut builder) => {
730                let array = builder.finish();
731                Arc::new(array)
732            }
733            DatumColumnEncoder::Uuid(mut builder) => {
734                let array = builder.finish();
735                Arc::new(array)
736            }
737            DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
738            DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
739            DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
740            DatumColumnEncoder::Jsonb {
741                offsets,
742                buf,
743                mut nulls,
744            } => {
745                let values = Buffer::from_vec(buf);
746                let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
747                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
748                let array = StringArray::new(offsets, values, nulls);
749                Arc::new(array)
750            }
751            DatumColumnEncoder::Array {
752                mut dims,
753                val_lengths,
754                vals,
755                mut nulls,
756            } => {
757                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
758                let vals = vals.finish();
759
760                // Note: Values in an Array can always be Null, regardless of whether or not the
761                // column is nullable.
762                let field = Field::new_list_field(vals.data_type().clone(), true);
763                let val_offsets = OffsetBuffer::from_lengths(val_lengths);
764                let values =
765                    ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
766
767                let dims = dims.finish();
768                assert_eq!(values.len(), dims.len());
769
770                // Note: The inner arrays are always nullable, and we let the higher-level array
771                // drive nullability for the entire column.
772                let fields = Fields::from(vec![
773                    Field::new("dims", dims.data_type().clone(), true),
774                    Field::new("vals", values.data_type().clone(), true),
775                ]);
776                let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
777
778                Arc::new(array)
779            }
780            DatumColumnEncoder::List {
781                lengths,
782                values,
783                mut nulls,
784            } => {
785                let values = values.finish();
786
787                // Note: Values in an Array can always be Null, regardless of whether or not the
788                // column is nullable.
789                let field = Field::new_list_field(values.data_type().clone(), true);
790                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
791                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
792
793                let array = ListArray::new(Arc::new(field), offsets, values, nulls);
794                Arc::new(array)
795            }
796            DatumColumnEncoder::Map {
797                lengths,
798                mut keys,
799                vals,
800                mut nulls,
801            } => {
802                let keys = keys.finish();
803                let vals = vals.finish();
804
805                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
806                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
807
808                // Note: Values in an Map can always be Null, regardless of whether or not the
809                // column is nullable, but Keys cannot.
810                assert_none!(keys.logical_nulls());
811                let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
812                let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
813                let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
814                let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
815
816                // Note: DatumMap is always sorted, and Arrow enforces that the inner 'map_entries'
817                // array can never be null.
818                let field = Field::new("map_entries", entries.data_type().clone(), false);
819                let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
820                Arc::new(array)
821            }
822            DatumColumnEncoder::Record {
823                fields,
824                mut nulls,
825                length: _,
826            } => {
827                let (fields, arrays): (Vec<_>, Vec<_>) = fields
828                    .into_iter()
829                    .enumerate()
830                    .map(|(tag, encoder)| {
831                        // Note: We mark all columns as nullable at the Arrow/Parquet level because
832                        // it has a negligible performance difference, but it protects us from
833                        // unintended nullability changes in the columns of SQL objects.
834                        //
835                        // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
836                        let nullable = true;
837                        let array = encoder.finish();
838                        let field =
839                            Field::new(tag.to_string(), array.data_type().clone(), nullable);
840                        (field, array)
841                    })
842                    .unzip();
843                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
844
845                let array = StructArray::new(Fields::from(fields), arrays, nulls);
846                Arc::new(array)
847            }
848            DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
849        }
850    }
851}
852
853/// A decoder for a column of [`Datum`]s.
854///
855/// Note: We specifically structure the decoder as an enum instead of using trait objects because
856/// Datum decoding is an extremely hot path and downcasting objects is relatively slow.
857#[derive(Debug)]
858enum DatumColumnDecoder {
859    Bool(BooleanArray),
860    U8(UInt8Array),
861    U16(UInt16Array),
862    U32(UInt32Array),
863    U64(UInt64Array),
864    I16(Int16Array),
865    I32(Int32Array),
866    I64(Int64Array),
867    F32(Float32Array),
868    F64(Float64Array),
869    Numeric(BinaryArray),
870    Bytes(BinaryArray),
871    String(StringArray),
872    Date(Int32Array),
873    Time(FixedSizeBinaryArray),
874    Timestamp(FixedSizeBinaryArray),
875    TimestampTz(FixedSizeBinaryArray),
876    MzTimestamp(UInt64Array),
877    Interval(FixedSizeBinaryArray),
878    Uuid(FixedSizeBinaryArray),
879    Json(StringArray),
880    Array {
881        dim_offsets: OffsetBuffer<i32>,
882        dims: FixedSizeBinaryArray,
883        val_offsets: OffsetBuffer<i32>,
884        vals: Box<DatumColumnDecoder>,
885        nulls: Option<NullBuffer>,
886    },
887    List {
888        offsets: OffsetBuffer<i32>,
889        values: Box<DatumColumnDecoder>,
890        nulls: Option<NullBuffer>,
891    },
892    Map {
893        offsets: OffsetBuffer<i32>,
894        keys: StringArray,
895        vals: Box<DatumColumnDecoder>,
896        nulls: Option<NullBuffer>,
897    },
898    RecordEmpty(BooleanArray),
899    Record {
900        fields: Vec<Box<DatumColumnDecoder>>,
901        nulls: Option<NullBuffer>,
902    },
903    Range(BinaryArray),
904    MzAclItem(BinaryArray),
905    AclItem(FixedSizeBinaryArray),
906}
907
908impl DatumColumnDecoder {
909    fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
910        let datum = match self {
911            DatumColumnDecoder::Bool(array) => array
912                .is_valid(idx)
913                .then(|| array.value(idx))
914                .map(|x| if x { Datum::True } else { Datum::False }),
915            DatumColumnDecoder::U8(array) => array
916                .is_valid(idx)
917                .then(|| array.value(idx))
918                .map(Datum::UInt8),
919            DatumColumnDecoder::U16(array) => array
920                .is_valid(idx)
921                .then(|| array.value(idx))
922                .map(Datum::UInt16),
923            DatumColumnDecoder::U32(array) => array
924                .is_valid(idx)
925                .then(|| array.value(idx))
926                .map(Datum::UInt32),
927            DatumColumnDecoder::U64(array) => array
928                .is_valid(idx)
929                .then(|| array.value(idx))
930                .map(Datum::UInt64),
931            DatumColumnDecoder::I16(array) => array
932                .is_valid(idx)
933                .then(|| array.value(idx))
934                .map(Datum::Int16),
935            DatumColumnDecoder::I32(array) => array
936                .is_valid(idx)
937                .then(|| array.value(idx))
938                .map(Datum::Int32),
939            DatumColumnDecoder::I64(array) => array
940                .is_valid(idx)
941                .then(|| array.value(idx))
942                .map(Datum::Int64),
943            DatumColumnDecoder::F32(array) => array
944                .is_valid(idx)
945                .then(|| array.value(idx))
946                .map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
947            DatumColumnDecoder::F64(array) => array
948                .is_valid(idx)
949                .then(|| array.value(idx))
950                .map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
951            DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
952                let val = array.value(idx);
953                let val = PackedNumeric::from_bytes(val)
954                    .expect("failed to roundtrip Numeric")
955                    .into_value();
956                Datum::Numeric(OrderedDecimal(val))
957            }),
958            DatumColumnDecoder::String(array) => array
959                .is_valid(idx)
960                .then(|| array.value(idx))
961                .map(Datum::String),
962            DatumColumnDecoder::Bytes(array) => array
963                .is_valid(idx)
964                .then(|| array.value(idx))
965                .map(Datum::Bytes),
966            DatumColumnDecoder::Date(array) => {
967                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
968                    let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
969                    Datum::Date(date)
970                })
971            }
972            DatumColumnDecoder::Time(array) => {
973                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
974                    let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
975                    Datum::Time(packed.into_value())
976                })
977            }
978            DatumColumnDecoder::Timestamp(array) => {
979                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
980                    let packed = PackedNaiveDateTime::from_bytes(x)
981                        .expect("failed to roundtrip PackedNaiveDateTime");
982                    let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
983                        .expect("failed to roundtrip timestamp");
984                    Datum::Timestamp(timestamp)
985                })
986            }
987            DatumColumnDecoder::TimestampTz(array) => {
988                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
989                    let packed = PackedNaiveDateTime::from_bytes(x)
990                        .expect("failed to roundtrip PackedNaiveDateTime");
991                    let timestamp =
992                        CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
993                            .expect("failed to roundtrip timestamp");
994                    Datum::TimestampTz(timestamp)
995                })
996            }
997            DatumColumnDecoder::MzTimestamp(array) => array
998                .is_valid(idx)
999                .then(|| array.value(idx))
1000                .map(|x| Datum::MzTimestamp(Timestamp::from(x))),
1001            DatumColumnDecoder::Interval(array) => {
1002                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1003                    let packed =
1004                        PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
1005                    Datum::Interval(packed.into_value())
1006                })
1007            }
1008            DatumColumnDecoder::Uuid(array) => {
1009                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1010                    let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
1011                    Datum::Uuid(uuid)
1012                })
1013            }
1014            DatumColumnDecoder::AclItem(array) => {
1015                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1016                    let packed =
1017                        PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1018                    Datum::AclItem(packed.into_value())
1019                })
1020            }
1021            DatumColumnDecoder::MzAclItem(array) => {
1022                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1023                    let packed =
1024                        PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1025                    Datum::MzAclItem(packed.into_value())
1026                })
1027            }
1028            DatumColumnDecoder::Range(array) => {
1029                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1030                    packer.push(Datum::Null);
1031                    return;
1032                };
1033
1034                let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
1035                packer
1036                    .try_push_proto(&proto)
1037                    .expect("failed to pack ProtoRange");
1038
1039                // Return early because we've already packed the necessary Datums.
1040                return;
1041            }
1042            DatumColumnDecoder::Json(array) => {
1043                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1044                    packer.push(Datum::Null);
1045                    return;
1046                };
1047                JsonbPacker::new(packer)
1048                    .pack_str(val)
1049                    .expect("failed to roundtrip JSON");
1050
1051                // Return early because we've already packed the necessary Datums.
1052                return;
1053            }
1054            DatumColumnDecoder::Array {
1055                dim_offsets,
1056                dims,
1057                val_offsets,
1058                vals,
1059                nulls,
1060            } => {
1061                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1062                if !is_valid {
1063                    packer.push(Datum::Null);
1064                    return;
1065                }
1066
1067                let start: usize = dim_offsets[idx]
1068                    .try_into()
1069                    .expect("unexpected negative offset");
1070                let end: usize = dim_offsets[idx + 1]
1071                    .try_into()
1072                    .expect("unexpected negative offset");
1073                let dimensions = (start..end).map(|idx| {
1074                    PackedArrayDimension::from_bytes(dims.value(idx))
1075                        .expect("failed to roundtrip ArrayDimension")
1076                        .into_value()
1077                });
1078
1079                let start: usize = val_offsets[idx]
1080                    .try_into()
1081                    .expect("unexpected negative offset");
1082                let end: usize = val_offsets[idx + 1]
1083                    .try_into()
1084                    .expect("unexpected negative offset");
1085                packer
1086                    .push_array_with_row_major(dimensions, |packer| {
1087                        for x in start..end {
1088                            vals.get(x, packer);
1089                        }
1090                        // Return the numer of Datums we just packed.
1091                        end - start
1092                    })
1093                    .expect("failed to pack Array");
1094
1095                // Return early because we've already packed the necessary Datums.
1096                return;
1097            }
1098            DatumColumnDecoder::List {
1099                offsets,
1100                values,
1101                nulls,
1102            } => {
1103                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1104                if !is_valid {
1105                    packer.push(Datum::Null);
1106                    return;
1107                }
1108
1109                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1110                let end: usize = offsets[idx + 1]
1111                    .try_into()
1112                    .expect("unexpected negative offset");
1113
1114                packer.push_list_with(|packer| {
1115                    for idx in start..end {
1116                        values.get(idx, packer)
1117                    }
1118                });
1119
1120                // Return early because we've already packed the necessary Datums.
1121                return;
1122            }
1123            DatumColumnDecoder::Map {
1124                offsets,
1125                keys,
1126                vals,
1127                nulls,
1128            } => {
1129                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1130                if !is_valid {
1131                    packer.push(Datum::Null);
1132                    return;
1133                }
1134
1135                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1136                let end: usize = offsets[idx + 1]
1137                    .try_into()
1138                    .expect("unexpected negative offset");
1139
1140                packer.push_dict_with(|packer| {
1141                    for idx in start..end {
1142                        packer.push(Datum::String(keys.value(idx)));
1143                        vals.get(idx, packer);
1144                    }
1145                });
1146
1147                // Return early because we've already packed the necessary Datums.
1148                return;
1149            }
1150            DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
1151            DatumColumnDecoder::Record { fields, nulls } => {
1152                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1153                if !is_valid {
1154                    packer.push(Datum::Null);
1155                    return;
1156                }
1157
1158                // let mut datums = Vec::with_capacity(fields.len());
1159                packer.push_list_with(|packer| {
1160                    for field in fields {
1161                        field.get(idx, packer);
1162                    }
1163                });
1164
1165                // Return early because we've already packed the necessary Datums.
1166                return;
1167            }
1168        };
1169
1170        match datum {
1171            Some(d) => packer.push(d),
1172            None => packer.push(Datum::Null),
1173        }
1174    }
1175
1176    fn stats(&self) -> ColumnStatKinds {
1177        match self {
1178            DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
1179            DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
1180            DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
1181            DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
1182            DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
1183            DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
1184            DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
1185            DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
1186            DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
1187            DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
1188            DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
1189            DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
1190            DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
1191            DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
1192            DatumColumnDecoder::Time(a) => {
1193                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
1194            }
1195            DatumColumnDecoder::Timestamp(a) => {
1196                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1197            }
1198            DatumColumnDecoder::TimestampTz(a) => {
1199                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1200            }
1201            DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
1202            DatumColumnDecoder::Interval(a) => {
1203                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
1204            }
1205            DatumColumnDecoder::Uuid(a) => {
1206                fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
1207            }
1208            DatumColumnDecoder::AclItem(_)
1209            | DatumColumnDecoder::MzAclItem(_)
1210            | DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
1211            DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
1212            DatumColumnDecoder::Array { .. }
1213            | DatumColumnDecoder::List { .. }
1214            | DatumColumnDecoder::Map { .. }
1215            | DatumColumnDecoder::Record { .. }
1216            | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
1217        }
1218    }
1219
1220    fn goodbytes(&self) -> usize {
1221        match self {
1222            DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1223            DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(),
1224            DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(),
1225            DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(),
1226            DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1227            DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(),
1228            DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1229            DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(),
1230            DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(),
1231            DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(),
1232            DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1233            DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(),
1234            DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1235            DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1236            DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1237            DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1238            DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1239            DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1240            DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1241            DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1242            DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1243            DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1244            DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1245            DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(),
1246            DatumColumnDecoder::Array { dims, vals, .. } => {
1247                (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes()
1248            }
1249            DatumColumnDecoder::List { values, .. } => values.goodbytes(),
1250            DatumColumnDecoder::Map { keys, vals, .. } => {
1251                ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes()
1252            }
1253            DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
1254            DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1255        }
1256    }
1257}
1258
1259impl Schema<Row> for RelationDesc {
1260    type ArrowColumn = arrow::array::StructArray;
1261    type Statistics = OptionStats<StructStats>;
1262
1263    type Decoder = RowColumnarDecoder;
1264    type Encoder = RowColumnarEncoder;
1265
1266    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1267        RowColumnarDecoder::new(col, self)
1268    }
1269
1270    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1271        RowColumnarEncoder::new(self)
1272            .ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
1273    }
1274}
1275
1276/// A [`ColumnDecoder`] for a [`Row`].
1277#[derive(Debug)]
1278pub struct RowColumnarDecoder {
1279    /// The length of all columns in this decoder; matching all child arrays and the null array
1280    /// if present.
1281    len: usize,
1282    /// Field-specific information: the user-readable field name, the null count (or None if the
1283    /// column is non-nullable), and the decoder which wraps the column-specific array.
1284    decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
1285    /// The null buffer for this row, if present. (At time of writing, all rows are assumed to be
1286    /// logically nullable.)
1287    nullability: Option<NullBuffer>,
1288}
1289
1290/// Merge the provided null buffer with the existing array's null buffer, if any.
1291fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
1292    if null_mask.is_none() {
1293        Arc::clone(column)
1294    } else {
1295        // We calculate stats on the nested arrays, so make sure we don't count entries
1296        // that are masked off at a higher level.
1297        let nulls = NullBuffer::union(null_mask, column.nulls());
1298        let data = column
1299            .to_data()
1300            .into_builder()
1301            .nulls(nulls)
1302            .build()
1303            .expect("changed only null mask");
1304        make_array(data)
1305    }
1306}
1307
1308impl RowColumnarDecoder {
1309    /// Creates a [`RowColumnarDecoder`] that decodes from the provided [`StructArray`].
1310    ///
1311    /// Returns an error if the schema of the [`StructArray`] does not match
1312    /// the provided [`RelationDesc`].
1313    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1314        let inner_columns = col.columns();
1315        let desc_columns = desc.typ().columns();
1316
1317        if desc_columns.len() > inner_columns.len() {
1318            anyhow::bail!(
1319                "provided array has too few columns! {desc_columns:?} > {inner_columns:?}"
1320            );
1321        }
1322
1323        // For performance reasons we downcast just a single time.
1324        let mut decoders = Vec::with_capacity(desc_columns.len());
1325
1326        let null_mask = col.nulls();
1327
1328        // The columns of the `StructArray` are named with their column index.
1329        for (col_idx, col_name, col_type) in desc.iter_all() {
1330            let field_name = col_idx.to_stable_name();
1331            let column = col.column_by_name(&field_name).ok_or_else(|| {
1332                anyhow::anyhow!(
1333                    "StructArray did not contain column name {field_name}, found {:?}",
1334                    col.column_names()
1335                )
1336            })?;
1337            let column = mask_nulls(column, null_mask);
1338            let null_count = col_type.nullable.then(|| column.null_count());
1339            let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
1340            decoders.push((col_name.as_str().into(), null_count, decoder));
1341        }
1342
1343        Ok(RowColumnarDecoder {
1344            len: col.len(),
1345            decoders,
1346            nullability: col.logical_nulls(),
1347        })
1348    }
1349
1350    // Returns the number of null entries in this array of Row structs. This
1351    // will be 0 when `Row` is encoded directly, but could be non-zero when it's
1352    // used inside `SourceDataEncoder`.
1353    pub fn null_count(&self) -> usize {
1354        self.nullability.as_ref().map_or(0, |n| n.null_count())
1355    }
1356}
1357
1358impl ColumnDecoder<Row> for RowColumnarDecoder {
1359    fn decode(&self, idx: usize, val: &mut Row) {
1360        let mut packer = val.packer();
1361
1362        for (_, _, decoder) in &self.decoders {
1363            decoder.get(idx, &mut packer);
1364        }
1365    }
1366
1367    fn is_null(&self, idx: usize) -> bool {
1368        let Some(nullability) = self.nullability.as_ref() else {
1369            return false;
1370        };
1371        nullability.is_null(idx)
1372    }
1373
1374    fn goodbytes(&self) -> usize {
1375        let decoders_size: usize = self
1376            .decoders
1377            .iter()
1378            .map(|(_name, _null_count, decoder)| decoder.goodbytes())
1379            .sum();
1380
1381        decoders_size
1382            + self
1383                .nullability
1384                .as_ref()
1385                .map(|nulls| nulls.inner().inner().len())
1386                .unwrap_or(0)
1387    }
1388
1389    fn stats(&self) -> StructStats {
1390        StructStats {
1391            len: self.len,
1392            cols: self
1393                .decoders
1394                .iter()
1395                .map(|(name, null_count, decoder)| {
1396                    let name = name.to_string();
1397                    let stats = ColumnarStats {
1398                        nulls: null_count.map(|count| ColumnNullStats { count }),
1399                        values: decoder.stats(),
1400                    };
1401                    (name, stats)
1402                })
1403                .collect(),
1404        }
1405    }
1406}
1407
1408/// A [`ColumnEncoder`] for a [`Row`].
1409#[derive(Debug)]
1410pub struct RowColumnarEncoder {
1411    encoders: Vec<DatumEncoder>,
1412    // TODO(parkmycar): Replace the `usize` with a `ColumnIdx` type.
1413    col_names: Vec<(usize, Arc<str>)>,
1414    // TODO(parkmycar): Optionally omit this.
1415    nullability: BooleanBufferBuilder,
1416}
1417
1418impl RowColumnarEncoder {
1419    /// Creates a [`RowColumnarEncoder`] for the provided [`RelationDesc`].
1420    ///
1421    /// Returns `None` if the provided [`RelationDesc`] has no columns.
1422    ///
1423    /// # Note
1424    /// Internally we represent a [`Row`] as a [`StructArray`] which is
1425    /// required to have at least one field. Instead of handling this case by
1426    /// adding some special "internal" column we let a higher level encoder
1427    /// (e.g. `SourceDataColumnarEncoder`) handle this case.
1428    pub fn new(desc: &RelationDesc) -> Option<Self> {
1429        if desc.typ().columns().is_empty() {
1430            return None;
1431        }
1432
1433        let (col_names, encoders): (Vec<_>, Vec<_>) = desc
1434            .iter_all()
1435            .map(|(col_idx, col_name, col_type)| {
1436                let encoder = scalar_type_to_encoder(&col_type.scalar_type)
1437                    .expect("failed to create encoder");
1438                let encoder = DatumEncoder {
1439                    nullable: col_type.nullable,
1440                    encoder,
1441                };
1442
1443                // We name the Fields in Parquet with the column index, but for
1444                // backwards compat use the column name for stats.
1445                let name = (col_idx.to_raw(), col_name.as_str().into());
1446
1447                (name, encoder)
1448            })
1449            .unzip();
1450
1451        Some(RowColumnarEncoder {
1452            encoders,
1453            col_names,
1454            nullability: BooleanBufferBuilder::new(100),
1455        })
1456    }
1457}
1458
1459impl ColumnEncoder<Row> for RowColumnarEncoder {
1460    type FinishedColumn = StructArray;
1461
1462    fn goodbytes(&self) -> usize {
1463        self.encoders.iter().map(|e| e.goodbytes()).sum()
1464    }
1465
1466    fn append(&mut self, val: &Row) {
1467        let mut num_datums = 0;
1468        for (datum, encoder) in val.iter().zip_eq(self.encoders.iter_mut()) {
1469            encoder.push(datum);
1470            num_datums += 1;
1471        }
1472        assert_eq!(
1473            num_datums,
1474            self.encoders.len(),
1475            "tried to encode {val:?}, but only have {:?}",
1476            self.encoders
1477        );
1478
1479        self.nullability.append(true);
1480    }
1481
1482    fn append_null(&mut self) {
1483        for encoder in self.encoders.iter_mut() {
1484            encoder.push_invalid();
1485        }
1486        self.nullability.append(false);
1487    }
1488
1489    fn finish(self) -> Self::FinishedColumn {
1490        let RowColumnarEncoder {
1491            encoders,
1492            col_names,
1493            nullability,
1494            ..
1495        } = self;
1496
1497        let (arrays, fields): (Vec<_>, Vec<_>) = col_names
1498            .iter()
1499            .zip_eq(encoders)
1500            .map(|((col_idx, _col_name), encoder)| {
1501                // Note: We mark all columns as nullable at the Arrow/Parquet level because it has
1502                // a negligible performance difference, but it protects us from unintended
1503                // nullability changes in the columns of SQL objects.
1504                //
1505                // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
1506                let nullable = true;
1507                let array = encoder.finish();
1508                let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
1509
1510                (array, field)
1511            })
1512            .multiunzip();
1513
1514        let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
1515
1516        let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
1517
1518        array
1519    }
1520}
1521
1522/// Small helper method to make downcasting an [`Array`] return an error.
1523///
1524/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1525/// very hot path and downcasting is relatively slow
1526#[inline]
1527fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
1528    array
1529        .as_any()
1530        .downcast_ref::<T>()
1531        .ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
1532}
1533
1534/// Small helper function to downcast from an array to a [`DatumColumnDecoder`].
1535///
1536/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1537/// very hot path and downcasting is relatively slow
1538fn array_to_decoder(
1539    array: &Arc<dyn Array>,
1540    col_ty: &SqlScalarType,
1541) -> Result<DatumColumnDecoder, anyhow::Error> {
1542    let decoder = match (array.data_type(), col_ty) {
1543        (DataType::Boolean, SqlScalarType::Bool) => {
1544            let array = downcast_array::<BooleanArray>(array)?;
1545            DatumColumnDecoder::Bool(array.clone())
1546        }
1547        (DataType::UInt8, SqlScalarType::PgLegacyChar) => {
1548            let array = downcast_array::<UInt8Array>(array)?;
1549            DatumColumnDecoder::U8(array.clone())
1550        }
1551        (DataType::UInt16, SqlScalarType::UInt16) => {
1552            let array = downcast_array::<UInt16Array>(array)?;
1553            DatumColumnDecoder::U16(array.clone())
1554        }
1555        (
1556            DataType::UInt32,
1557            SqlScalarType::UInt32
1558            | SqlScalarType::Oid
1559            | SqlScalarType::RegClass
1560            | SqlScalarType::RegProc
1561            | SqlScalarType::RegType,
1562        ) => {
1563            let array = downcast_array::<UInt32Array>(array)?;
1564            DatumColumnDecoder::U32(array.clone())
1565        }
1566        (DataType::UInt64, SqlScalarType::UInt64) => {
1567            let array = downcast_array::<UInt64Array>(array)?;
1568            DatumColumnDecoder::U64(array.clone())
1569        }
1570        (DataType::Int16, SqlScalarType::Int16) => {
1571            let array = downcast_array::<Int16Array>(array)?;
1572            DatumColumnDecoder::I16(array.clone())
1573        }
1574        (DataType::Int32, SqlScalarType::Int32) => {
1575            let array = downcast_array::<Int32Array>(array)?;
1576            DatumColumnDecoder::I32(array.clone())
1577        }
1578        (DataType::Int64, SqlScalarType::Int64) => {
1579            let array = downcast_array::<Int64Array>(array)?;
1580            DatumColumnDecoder::I64(array.clone())
1581        }
1582        (DataType::Float32, SqlScalarType::Float32) => {
1583            let array = downcast_array::<Float32Array>(array)?;
1584            DatumColumnDecoder::F32(array.clone())
1585        }
1586        (DataType::Float64, SqlScalarType::Float64) => {
1587            let array = downcast_array::<Float64Array>(array)?;
1588            DatumColumnDecoder::F64(array.clone())
1589        }
1590        (DataType::Struct(_), SqlScalarType::Numeric { .. }) => {
1591            let array = downcast_array::<StructArray>(array)?;
1592            // Note: We only use the approx column for sorting, and ignore it
1593            // when decoding.
1594            let binary_values = array
1595                .column_by_name("binary")
1596                .expect("missing binary column");
1597
1598            let array = downcast_array::<BinaryArray>(binary_values)?;
1599            DatumColumnDecoder::Numeric(array.clone())
1600        }
1601        (
1602            DataType::Utf8,
1603            SqlScalarType::String
1604            | SqlScalarType::PgLegacyName
1605            | SqlScalarType::Char { .. }
1606            | SqlScalarType::VarChar { .. },
1607        ) => {
1608            let array = downcast_array::<StringArray>(array)?;
1609            DatumColumnDecoder::String(array.clone())
1610        }
1611        (DataType::Binary, SqlScalarType::Bytes) => {
1612            let array = downcast_array::<BinaryArray>(array)?;
1613            DatumColumnDecoder::Bytes(array.clone())
1614        }
1615        (DataType::Int32, SqlScalarType::Date) => {
1616            let array = downcast_array::<Int32Array>(array)?;
1617            DatumColumnDecoder::Date(array.clone())
1618        }
1619        (DataType::FixedSizeBinary(TIME_FIXED_BYTES), SqlScalarType::Time) => {
1620            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1621            DatumColumnDecoder::Time(array.clone())
1622        }
1623        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::Timestamp { .. }) => {
1624            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1625            DatumColumnDecoder::Timestamp(array.clone())
1626        }
1627        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::TimestampTz { .. }) => {
1628            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1629            DatumColumnDecoder::TimestampTz(array.clone())
1630        }
1631        (DataType::UInt64, SqlScalarType::MzTimestamp) => {
1632            let array = downcast_array::<UInt64Array>(array)?;
1633            DatumColumnDecoder::MzTimestamp(array.clone())
1634        }
1635        (DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), SqlScalarType::Interval) => {
1636            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1637            DatumColumnDecoder::Interval(array.clone())
1638        }
1639        (DataType::FixedSizeBinary(UUID_FIXED_BYTES), SqlScalarType::Uuid) => {
1640            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1641            DatumColumnDecoder::Uuid(array.clone())
1642        }
1643        (DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), SqlScalarType::AclItem) => {
1644            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1645            DatumColumnDecoder::AclItem(array.clone())
1646        }
1647        (DataType::Binary, SqlScalarType::MzAclItem) => {
1648            let array = downcast_array::<BinaryArray>(array)?;
1649            DatumColumnDecoder::MzAclItem(array.clone())
1650        }
1651        (DataType::Binary, SqlScalarType::Range { .. }) => {
1652            let array = downcast_array::<BinaryArray>(array)?;
1653            DatumColumnDecoder::Range(array.clone())
1654        }
1655        (DataType::Utf8, SqlScalarType::Jsonb) => {
1656            let array = downcast_array::<StringArray>(array)?;
1657            DatumColumnDecoder::Json(array.clone())
1658        }
1659        (DataType::Struct(_), s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector) => {
1660            let element_type = match s {
1661                SqlScalarType::Array(inner) => inner,
1662                SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1663                _ => unreachable!("checked above"),
1664            };
1665
1666            let array = downcast_array::<StructArray>(array)?;
1667            let nulls = array.nulls().cloned();
1668
1669            let dims = array
1670                .column_by_name("dims")
1671                .expect("missing dimensions column");
1672            let dims = downcast_array::<ListArray>(dims).cloned()?;
1673            let dim_offsets = dims.offsets().clone();
1674            let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
1675
1676            let vals = array.column_by_name("vals").expect("missing values column");
1677            let vals = downcast_array::<ListArray>(vals)?;
1678            let val_offsets = vals.offsets().clone();
1679            let vals = array_to_decoder(vals.values(), element_type)?;
1680
1681            DatumColumnDecoder::Array {
1682                dim_offsets,
1683                dims,
1684                val_offsets,
1685                vals: Box::new(vals),
1686                nulls,
1687            }
1688        }
1689        (DataType::List(_), SqlScalarType::List { element_type, .. }) => {
1690            let array = downcast_array::<ListArray>(array)?;
1691            let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
1692            DatumColumnDecoder::List {
1693                offsets: array.offsets().clone(),
1694                values: Box::new(inner_decoder),
1695                nulls: array.nulls().cloned(),
1696            }
1697        }
1698        (DataType::Map(_, true), SqlScalarType::Map { value_type, .. }) => {
1699            let array = downcast_array::<MapArray>(array)?;
1700            let keys = downcast_array::<StringArray>(array.keys())?;
1701            let vals = array_to_decoder(array.values(), value_type)?;
1702            DatumColumnDecoder::Map {
1703                offsets: array.offsets().clone(),
1704                keys: keys.clone(),
1705                vals: Box::new(vals),
1706                nulls: array.nulls().cloned(),
1707            }
1708        }
1709        (DataType::List(_), SqlScalarType::Map { value_type, .. }) => {
1710            let array: &ListArray = downcast_array(array)?;
1711            let entries: &StructArray = downcast_array(array.values())?;
1712            let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
1713            let keys: &StringArray = downcast_array(keys)?;
1714            let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
1715            DatumColumnDecoder::Map {
1716                offsets: array.offsets().clone(),
1717                keys: keys.clone(),
1718                vals: Box::new(vals),
1719                nulls: array.nulls().cloned(),
1720            }
1721        }
1722        (DataType::Boolean, SqlScalarType::Record { fields, .. }) if fields.is_empty() => {
1723            let empty_record_array = downcast_array::<BooleanArray>(array)?;
1724            DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
1725        }
1726        (DataType::Struct(_), SqlScalarType::Record { fields, .. }) => {
1727            let record_array = downcast_array::<StructArray>(array)?;
1728            let null_mask = record_array.nulls();
1729            let mut decoders = Vec::with_capacity(fields.len());
1730            for (tag, (_name, col_type)) in fields.iter().enumerate() {
1731                let inner_array = record_array
1732                    .column_by_name(&tag.to_string())
1733                    .ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
1734                let inner_array = mask_nulls(inner_array, null_mask);
1735                let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
1736
1737                decoders.push(Box::new(inner_decoder));
1738            }
1739
1740            DatumColumnDecoder::Record {
1741                fields: decoders,
1742                nulls: record_array.nulls().cloned(),
1743            }
1744        }
1745        (x, y) => {
1746            let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
1747            mz_ore::soft_panic_or_log!("{msg}");
1748            anyhow::bail!("{msg}");
1749        }
1750    };
1751
1752    Ok(decoder)
1753}
1754
1755/// Small helper function to create a [`DatumColumnEncoder`] from a [`SqlScalarType`]
1756fn scalar_type_to_encoder(col_ty: &SqlScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
1757    let encoder = match &col_ty {
1758        SqlScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
1759        SqlScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
1760        SqlScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
1761        SqlScalarType::UInt32
1762        | SqlScalarType::Oid
1763        | SqlScalarType::RegClass
1764        | SqlScalarType::RegProc
1765        | SqlScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
1766        SqlScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
1767        SqlScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
1768        SqlScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
1769        SqlScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
1770        SqlScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
1771        SqlScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
1772        SqlScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
1773            approx_values: Float64Builder::new(),
1774            binary_values: BinaryBuilder::new(),
1775            numeric_context: crate::adt::numeric::cx_datum().clone(),
1776        },
1777        SqlScalarType::String
1778        | SqlScalarType::PgLegacyName
1779        | SqlScalarType::Char { .. }
1780        | SqlScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
1781        SqlScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
1782        SqlScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
1783        SqlScalarType::Time => {
1784            DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES))
1785        }
1786        SqlScalarType::Timestamp { .. } => {
1787            DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1788        }
1789        SqlScalarType::TimestampTz { .. } => {
1790            DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1791        }
1792        SqlScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
1793        SqlScalarType::Interval => {
1794            DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
1795        }
1796        SqlScalarType::Uuid => {
1797            DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES))
1798        }
1799        SqlScalarType::AclItem => {
1800            DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
1801        }
1802        SqlScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
1803        SqlScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
1804        SqlScalarType::Jsonb => DatumColumnEncoder::Jsonb {
1805            offsets: vec![0],
1806            buf: Vec::new(),
1807            nulls: None,
1808        },
1809        s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector => {
1810            let element_type = match s {
1811                SqlScalarType::Array(inner) => inner,
1812                SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1813                _ => unreachable!("checked above"),
1814            };
1815            let inner = scalar_type_to_encoder(element_type)?;
1816            DatumColumnEncoder::Array {
1817                dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
1818                val_lengths: Vec::new(),
1819                vals: Box::new(inner),
1820                nulls: None,
1821            }
1822        }
1823        SqlScalarType::List { element_type, .. } => {
1824            let inner = scalar_type_to_encoder(&*element_type)?;
1825            DatumColumnEncoder::List {
1826                lengths: Vec::new(),
1827                values: Box::new(inner),
1828                nulls: None,
1829            }
1830        }
1831        SqlScalarType::Map { value_type, .. } => {
1832            let inner = scalar_type_to_encoder(&*value_type)?;
1833            DatumColumnEncoder::Map {
1834                lengths: Vec::new(),
1835                keys: StringBuilder::new(),
1836                vals: Box::new(inner),
1837                nulls: None,
1838            }
1839        }
1840        SqlScalarType::Record { fields, .. } if fields.is_empty() => {
1841            DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
1842        }
1843        SqlScalarType::Record { fields, .. } => {
1844            let encoders = fields
1845                .iter()
1846                .map(|(_name, ty)| {
1847                    scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
1848                        nullable: ty.nullable,
1849                        encoder: e,
1850                    })
1851                })
1852                .collect::<Result<_, _>>()?;
1853
1854            DatumColumnEncoder::Record {
1855                fields: encoders,
1856                nulls: None,
1857                length: 0,
1858            }
1859        }
1860    };
1861    Ok(encoder)
1862}
1863
1864impl Codec for Row {
1865    type Storage = ProtoRow;
1866    type Schema = RelationDesc;
1867
1868    fn codec_name() -> String {
1869        "protobuf[Row]".into()
1870    }
1871
1872    /// Encodes a row into the permanent storage format.
1873    ///
1874    /// This perfectly round-trips through [Row::decode]. It's guaranteed to be
1875    /// readable by future versions of Materialize through v(TODO: Figure out
1876    /// our policy).
1877    fn encode<B>(&self, buf: &mut B)
1878    where
1879        B: BufMut,
1880    {
1881        self.into_proto()
1882            .encode(buf)
1883            .expect("no required fields means no initialization errors");
1884    }
1885
1886    /// Decodes a row from the permanent storage format.
1887    ///
1888    /// This perfectly round-trips through [Row::encode]. It can read rows
1889    /// encoded by historical versions of Materialize back to v(TODO: Figure out
1890    /// our policy).
1891    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
1892        // NB: We could easily implement this directly instead of via
1893        // `decode_from`, but do this so that we get maximal coverage of the
1894        // more complicated codepath.
1895        //
1896        // The length of the encoded ProtoRow (i.e. `buf.len()`) doesn't perfect
1897        // predict the length of the resulting Row, but it's definitely
1898        // correlated, so probably a decent estimate.
1899        let mut row = Row::with_capacity(buf.len());
1900        <Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
1901        Ok(row)
1902    }
1903
1904    fn decode_from<'a>(
1905        &mut self,
1906        buf: &'a [u8],
1907        storage: &mut Option<ProtoRow>,
1908        schema: &RelationDesc,
1909    ) -> Result<(), String> {
1910        let mut proto = storage.take().unwrap_or_default();
1911        proto.clear();
1912        proto.merge(buf).map_err(|err| err.to_string())?;
1913        let ret = self.decode_from_proto(&proto, schema);
1914        storage.replace(proto);
1915        ret
1916    }
1917
1918    fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
1919        for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
1920            match x {
1921                EitherOrBoth::Both(typ, datum) if datum.is_instance_of_sql(typ) => continue,
1922                _ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
1923            };
1924        }
1925        Ok(())
1926    }
1927
1928    fn encode_schema(schema: &Self::Schema) -> Bytes {
1929        schema.into_proto().encode_to_vec().into()
1930    }
1931
1932    fn decode_schema(buf: &Bytes) -> Self::Schema {
1933        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1934        proto.into_rust().expect("valid schema")
1935    }
1936}
1937
1938impl<'a> From<Datum<'a>> for ProtoDatum {
1939    fn from(x: Datum<'a>) -> Self {
1940        let datum_type = match x {
1941            Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
1942            Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
1943            Datum::Int16(x) => DatumType::Int16(x.into()),
1944            Datum::Int32(x) => DatumType::Int32(x),
1945            Datum::UInt8(x) => DatumType::Uint8(x.into()),
1946            Datum::UInt16(x) => DatumType::Uint16(x.into()),
1947            Datum::UInt32(x) => DatumType::Uint32(x),
1948            Datum::UInt64(x) => DatumType::Uint64(x),
1949            Datum::Int64(x) => DatumType::Int64(x),
1950            Datum::Float32(x) => DatumType::Float32(x.into_inner()),
1951            Datum::Float64(x) => DatumType::Float64(x.into_inner()),
1952            Datum::Date(x) => DatumType::Date(x.into_proto()),
1953            Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
1954                secs: x.num_seconds_from_midnight(),
1955                frac: x.nanosecond(),
1956            }),
1957            Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
1958            Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
1959            Datum::Interval(x) => DatumType::Interval(x.into_proto()),
1960            Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
1961            Datum::String(x) => DatumType::String(x.to_owned()),
1962            Datum::Array(x) => DatumType::Array(ProtoArray {
1963                elements: Some(ProtoRow {
1964                    datums: x.elements().iter().map(|x| x.into()).collect(),
1965                }),
1966                dims: x
1967                    .dims()
1968                    .into_iter()
1969                    .map(|x| ProtoArrayDimension {
1970                        lower_bound: i64::cast_from(x.lower_bound),
1971                        length: u64::cast_from(x.length),
1972                    })
1973                    .collect(),
1974            }),
1975            Datum::List(x) => DatumType::List(ProtoRow {
1976                datums: x.iter().map(|x| x.into()).collect(),
1977            }),
1978            Datum::Map(x) => DatumType::Dict(ProtoDict {
1979                elements: x
1980                    .iter()
1981                    .map(|(k, v)| ProtoDictElement {
1982                        key: k.to_owned(),
1983                        val: Some(v.into()),
1984                    })
1985                    .collect(),
1986            }),
1987            Datum::Numeric(x) => {
1988                // TODO: Do we need this defensive clone?
1989                let mut x = x.0.clone();
1990                if let Some((bcd, scale)) = x.to_packed_bcd() {
1991                    DatumType::Numeric(ProtoNumeric { bcd, scale })
1992                } else if x.is_nan() {
1993                    DatumType::Other(ProtoDatumOther::NumericNaN.into())
1994                } else if x.is_infinite() {
1995                    if x.is_negative() {
1996                        DatumType::Other(ProtoDatumOther::NumericNegInf.into())
1997                    } else {
1998                        DatumType::Other(ProtoDatumOther::NumericPosInf.into())
1999                    }
2000                } else if x.is_special() {
2001                    panic!("internal error: unhandled special numeric value: {}", x);
2002                } else {
2003                    panic!(
2004                        "internal error: to_packed_bcd returned None for non-special value: {}",
2005                        x
2006                    )
2007                }
2008            }
2009            Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
2010            Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
2011            Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
2012            Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
2013            Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
2014            Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
2015                inner: inner.map(|RangeInner { lower, upper }| {
2016                    Box::new(ProtoRangeInner {
2017                        lower_inclusive: lower.inclusive,
2018                        lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
2019                        upper_inclusive: upper.inclusive,
2020                        upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
2021                    })
2022                }),
2023            })),
2024            Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
2025            Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
2026        };
2027        ProtoDatum {
2028            datum_type: Some(datum_type),
2029        }
2030    }
2031}
2032
2033impl RowPacker<'_> {
2034    pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
2035        match &x.datum_type {
2036            Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
2037                Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
2038                Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
2039                Ok(ProtoDatumOther::False) => self.push(Datum::False),
2040                Ok(ProtoDatumOther::True) => self.push(Datum::True),
2041                Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
2042                Ok(ProtoDatumOther::Dummy) => {
2043                    // We plan to remove the `Dummy` variant soon (materialize#17099). To prepare for that, we
2044                    // emit a log to Sentry here, to notify us of any instances that might have
2045                    // been made durable.
2046                    #[cfg(feature = "tracing")]
2047                    tracing::error!("protobuf decoding found Dummy datum");
2048                    self.push(Datum::Dummy);
2049                }
2050                Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
2051                Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
2052                Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
2053                Err(_) => return Err(format!("unknown datum type: {}", o)),
2054            },
2055            Some(DatumType::Int16(x)) => {
2056                let x = i16::try_from(*x)
2057                    .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
2058                self.push(Datum::Int16(x))
2059            }
2060            Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
2061            Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
2062            Some(DatumType::Uint8(x)) => {
2063                let x = u8::try_from(*x)
2064                    .map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
2065                self.push(Datum::UInt8(x))
2066            }
2067            Some(DatumType::Uint16(x)) => {
2068                let x = u16::try_from(*x)
2069                    .map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
2070                self.push(Datum::UInt16(x))
2071            }
2072            Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
2073            Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
2074            Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
2075            Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
2076            Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
2077            Some(DatumType::String(x)) => self.push(Datum::String(x)),
2078            Some(DatumType::Uuid(x)) => {
2079                // Uuid internally has a [u8; 16] so we'll have to do at least
2080                // one copy, but there's currently an additional one when the
2081                // Vec is created. Perhaps the protobuf Bytes support will let
2082                // us fix one of them.
2083                let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
2084                self.push(Datum::Uuid(u));
2085            }
2086            Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
2087            Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
2088            Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
2089            Some(DatumType::TimestampTz(x)) => {
2090                self.push(Datum::TimestampTz(x.clone().into_rust()?))
2091            }
2092            Some(DatumType::Interval(x)) => self.push(Datum::Interval(
2093                x.clone()
2094                    .into_rust()
2095                    .map_err(|e: TryFromProtoError| e.to_string())?,
2096            )),
2097            Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
2098                for d in x.datums.iter() {
2099                    row.try_push_proto(d)?;
2100                }
2101                Ok(())
2102            })?,
2103            Some(DatumType::Array(x)) => {
2104                let dims = x
2105                    .dims
2106                    .iter()
2107                    .map(|x| ArrayDimension {
2108                        lower_bound: isize::cast_from(x.lower_bound),
2109                        length: usize::cast_from(x.length),
2110                    })
2111                    .collect::<Vec<_>>();
2112                match x.elements.as_ref() {
2113                    None => self.try_push_array(&dims, [].iter()),
2114                    Some(elements) => {
2115                        // TODO: Could we avoid this Row alloc if we made a
2116                        // push_array_with?
2117                        let elements_row = Row::try_from(elements)?;
2118                        self.try_push_array(&dims, elements_row.iter())
2119                    }
2120                }
2121                .map_err(|err| err.to_string())?
2122            }
2123            Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
2124                let mut prev_key: Option<&str> = None;
2125                for e in x.elements.iter() {
2126                    // Map keys must be unique and strictly ascending; iterating a
2127                    // map that violates this trips a debug_assert. A crafted
2128                    // proto can, so reject it as a decode error here instead.
2129                    if let Some(prev) = prev_key
2130                        && e.key.as_str() <= prev
2131                    {
2132                        return Err(format!(
2133                            "dict keys must be unique and in ascending order, \
2134                             but {:?} came after {:?}",
2135                            e.key, prev,
2136                        ));
2137                    }
2138                    prev_key = Some(e.key.as_str());
2139                    row.push(Datum::from(e.key.as_str()));
2140                    let val = e
2141                        .val
2142                        .as_ref()
2143                        .ok_or_else(|| format!("missing val for key: {}", e.key))?;
2144                    row.try_push_proto(val)?;
2145                }
2146                Ok(())
2147            })?,
2148            Some(DatumType::Numeric(x)) => {
2149                // Reminder that special values like NaN, PosInf, and NegInf are
2150                // represented as variants of ProtoDatumOther.
2151                //
2152                // `decPackedToNumber` (called via `Decimal::from_packed_bcd`)
2153                // doesn't bounds-check its input and segfaults on empty bcd.
2154                // That is reachable from untrusted proto bytes, so we reject
2155                // before descending into the FFI.
2156                if x.bcd.is_empty() {
2157                    return Err("ProtoNumeric.bcd is empty".to_string());
2158                }
2159                let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
2160                self.push(Datum::from(n))
2161            }
2162            Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
2163            Some(DatumType::Range(inner)) => {
2164                let ProtoRange { inner } = &**inner;
2165                match inner {
2166                    None => self.push_range(Range { inner: None }).unwrap(),
2167                    Some(inner) => {
2168                        let ProtoRangeInner {
2169                            lower_inclusive,
2170                            lower,
2171                            upper_inclusive,
2172                            upper,
2173                        } = &**inner;
2174
2175                        // Range bounds must not be `Datum::Null`, because
2176                        // `push_range_with` panics on that invariant. Reject
2177                        // untrusted proto bytes that would push a `Null` bound
2178                        // before calling it.
2179                        let is_null_proto = |d: &ProtoDatum| {
2180                            matches!(
2181                                d.datum_type,
2182                                Some(DatumType::Other(o))
2183                                    if ProtoDatumOther::try_from(o)
2184                                        == Ok(ProtoDatumOther::Null)
2185                            )
2186                        };
2187                        if lower.as_deref().is_some_and(is_null_proto)
2188                            || upper.as_deref().is_some_and(is_null_proto)
2189                        {
2190                            return Err("range bound cannot be Null".into());
2191                        }
2192
2193                        self.push_range_with(
2194                            RangeLowerBound {
2195                                inclusive: *lower_inclusive,
2196                                bound: lower
2197                                    .as_ref()
2198                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2199                            },
2200                            RangeUpperBound {
2201                                inclusive: *upper_inclusive,
2202                                bound: upper
2203                                    .as_ref()
2204                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2205                            },
2206                        )
2207                        .map_err(|err| err.to_string())?;
2208                    }
2209                }
2210            }
2211            Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
2212            Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
2213            None => return Err("unknown datum type".into()),
2214        };
2215        Ok(())
2216    }
2217}
2218
2219/// TODO: remove this in favor of [`RustType::from_proto`].
2220impl TryFrom<&ProtoRow> for Row {
2221    type Error = String;
2222
2223    fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
2224        // TODO: Try to pre-size this.
2225        // see https://github.com/MaterializeInc/database-issues/issues/3640
2226        let mut row = Row::default();
2227        let mut packer = row.packer();
2228        for d in x.datums.iter() {
2229            packer.try_push_proto(d)?;
2230        }
2231        Ok(row)
2232    }
2233}
2234
2235impl RustType<ProtoRow> for Row {
2236    fn into_proto(&self) -> ProtoRow {
2237        let datums = self.iter().map(|x| x.into()).collect();
2238        ProtoRow { datums }
2239    }
2240
2241    fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
2242        // TODO: Try to pre-size this.
2243        // see https://github.com/MaterializeInc/database-issues/issues/3640
2244        let mut row = Row::default();
2245        let mut packer = row.packer();
2246        for d in proto.datums.iter() {
2247            packer
2248                .try_push_proto(d)
2249                .map_err(TryFromProtoError::RowConversionError)?;
2250        }
2251        Ok(row)
2252    }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257    use std::collections::BTreeSet;
2258
2259    use arrow::array::{ArrayData, make_array};
2260    use arrow::compute::SortOptions;
2261    use arrow::datatypes::ArrowNativeType;
2262    use arrow::row::SortField;
2263    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2264    use mz_ore::assert_err;
2265    use mz_ore::collections::CollectionExt;
2266    use mz_persist::indexed::columnar::arrow::realloc_array;
2267    use mz_persist::metrics::ColumnarMetrics;
2268    use mz_persist_types::Codec;
2269    use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
2270    use mz_persist_types::columnar::{codec_to_schema, schema_to_codec};
2271    use mz_proto::{ProtoType, RustType};
2272    use proptest::prelude::*;
2273    use proptest::strategy::Strategy;
2274    use uuid::Uuid;
2275
2276    use super::*;
2277    use crate::adt::array::ArrayDimension;
2278    use crate::adt::interval::Interval;
2279    use crate::adt::numeric::Numeric;
2280    use crate::adt::timestamp::CheckedTimestamp;
2281    use crate::relation::arb_relation_desc;
2282    use crate::{ColumnName, RowArena, SqlColumnType, arb_datum_for_column, arb_row_for_relation};
2283    use crate::{Datum, RelationDesc, Row, SqlScalarType};
2284
2285    #[mz_ore::test]
2286    fn proto_row_invalid_range_is_error() {
2287        // A ProtoRow with a range whose bounds have inconsistent datum kinds
2288        // (or a null/extra bound) must decode to an error, not panic. The range
2289        // packer used to `assert!` these invariants. Regression for the
2290        // row_proto_roundtrip cargo-fuzz finding.
2291        use prost::Message;
2292        let bytes: &[u8] = &[
2293            0x0a, 0x03, 0xaa, 0x01, 0x00, 0x0a, 0x03, 0xaa, 0x01, 0x00, 0x0a, 0x03, 0xa2, 0x01,
2294            0x00, 0x0a, 0x03, 0xaa, 0x01, 0x00, 0x0a, 0x20, 0xfa, 0x01, 0x1d, 0x1d, 0x9f, 0x00,
2295            0x00, 0x00, 0xaa, 0x01, 0x00, 0x0a, 0x13, 0xf8, 0x01, 0x08, 0xaa, 0x0a, 0x03, 0xba,
2296            0x01, 0x00, 0x22, 0x03, 0xba, 0x01, 0x00, 0x12, 0x03, 0xaa, 0x01, 0x00, 0x0a, 0x03,
2297            0xaa, 0x01, 0x00,
2298        ];
2299        let proto = ProtoRow::decode(bytes).expect("crash input decodes as a proto");
2300        let result: Result<Row, _> = proto.into_rust();
2301        assert_err!(result);
2302    }
2303
2304    #[mz_ore::test]
2305    fn proto_row_unordered_dict_keys_is_error() {
2306        // A ProtoRow with a dict whose keys are duplicated or not in ascending
2307        // order must decode to an error, not panic. Iterating such a map trips
2308        // a debug_assert. Regression for the row_proto_roundtrip cargo-fuzz
2309        // finding.
2310        use prost::Message;
2311        let bytes: &[u8] = &[
2312            0x0a, 0x32, 0x18, 0x4e, 0x18, 0x18, 0x68, 0x4e, 0xe8, 0x68, 0x57, 0xba, 0x01, 0x0a,
2313            0x0a, 0x08, 0x60, 0xff, 0xff, 0x10, 0x12, 0x02, 0x10, 0x10, 0x99, 0x68, 0x0a, 0x18,
2314            0x18, 0x4e, 0x18, 0x18, 0x68, 0x4e, 0xe8, 0x5b, 0x18, 0x68, 0x57, 0xba, 0x01, 0x0a,
2315            0x0a, 0x08, 0x60, 0xff, 0xff, 0x10, 0x12, 0x02, 0x18, 0x10,
2316        ];
2317        let proto = ProtoRow::decode(bytes).expect("crash input decodes as a proto");
2318        let result: Result<Row, _> = proto.into_rust();
2319        assert_err!(result);
2320    }
2321
2322    #[track_caller]
2323    fn roundtrip_datum<'a>(
2324        ty: SqlColumnType,
2325        datum: impl Iterator<Item = Datum<'a>>,
2326        metrics: &ColumnarMetrics,
2327    ) {
2328        let desc = RelationDesc::builder().with_column("a", ty).finish();
2329        let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
2330        roundtrip_rows(&desc, rows, metrics)
2331    }
2332
2333    #[track_caller]
2334    fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
2335        let mut encoder = <RelationDesc as Schema<Row>>::encoder(desc).unwrap();
2336        for row in &rows {
2337            encoder.append(row);
2338        }
2339        let col = encoder.finish();
2340
2341        // Exercise reallocating columns with lgalloc.
2342        let col = realloc_array(&col, metrics);
2343        // Exercise our ProtoArray format.
2344        {
2345            let proto = col.to_data().into_proto();
2346            let bytes = proto.encode_to_vec();
2347            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2348            let array_data: ArrayData = proto.into_rust().unwrap();
2349
2350            let col_rnd = StructArray::from(array_data.clone());
2351            assert_eq!(col, col_rnd);
2352
2353            let col_dyn = arrow::array::make_array(array_data);
2354            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2355            assert_eq!(&col, col_dyn);
2356        }
2357
2358        let decoder = <RelationDesc as Schema<Row>>::decoder(desc, col.clone()).unwrap();
2359        let stats = decoder.stats();
2360
2361        // Collect all of our lower and upper bounds.
2362        let arena = RowArena::new();
2363        let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
2364            .iter()
2365            .map(|(name, ty)| {
2366                let col_stats = stats.cols.get(name.as_str()).unwrap();
2367                let lower_upper =
2368                    crate::stats::col_values(&ty.scalar_type, &col_stats.values, &arena);
2369                let null_count = col_stats.nulls.map_or(0, |n| n.count);
2370
2371                (lower_upper, null_count)
2372            })
2373            .unzip();
2374        // Track how many nulls we saw for each column so we can assert stats match.
2375        let mut actual_nulls = vec![0usize; stats.len()];
2376
2377        let mut rnd_row = Row::default();
2378        for (idx, og_row) in rows.iter().enumerate() {
2379            decoder.decode(idx, &mut rnd_row);
2380            assert_eq!(og_row, &rnd_row);
2381
2382            // Check for each Datum in each Row that we're within our stats bounds.
2383            for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
2384            {
2385                let lower_upper = stats[c_idx];
2386
2387                // Assert our stat bounds are correct.
2388                if rnd_datum.is_null() {
2389                    actual_nulls[c_idx] += 1;
2390                } else if let Some((lower, upper)) = lower_upper {
2391                    assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
2392                    assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
2393                } else {
2394                    match &ty.scalar_type {
2395                        // JSON stats are handled separately.
2396                        SqlScalarType::Jsonb => (),
2397                        // We don't collect stats for these types.
2398                        SqlScalarType::AclItem
2399                        | SqlScalarType::MzAclItem
2400                        | SqlScalarType::Range { .. }
2401                        | SqlScalarType::Array(_)
2402                        | SqlScalarType::Map { .. }
2403                        | SqlScalarType::List { .. }
2404                        | SqlScalarType::Record { .. }
2405                        | SqlScalarType::Int2Vector => (),
2406                        other => panic!("should have collected stats for {other:?}"),
2407                    }
2408                }
2409            }
2410        }
2411
2412        // Validate that the null counts in our stats matched the actual counts.
2413        for (col_idx, (stats_count, actual_count)) in
2414            stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
2415        {
2416            assert_eq!(
2417                stats_count, actual_count,
2418                "column {col_idx} has incorrect number of nulls!"
2419            );
2420        }
2421
2422        // Validate that we can convert losslessly to codec and back
2423        let codec = schema_to_codec::<Row>(desc, &col).unwrap();
2424        let col2 = codec_to_schema::<Row>(desc, &codec).unwrap();
2425        assert_eq!(col2.as_ref(), &col);
2426
2427        // Validate that we only generate supported array types
2428        let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
2429            col.data_type().clone(),
2430            SortOptions {
2431                descending: false,
2432                nulls_first: false,
2433            },
2434        )])
2435        .expect("sortable");
2436        let rows = converter
2437            .convert_columns(&[Arc::new(col.clone())])
2438            .expect("convertible");
2439        let mut row_vec = rows.iter().collect::<Vec<_>>();
2440        row_vec.sort();
2441        let row_col = converter
2442            .convert_rows(row_vec)
2443            .expect("convertible")
2444            .into_element();
2445        assert_eq!(row_col.len(), col.len());
2446
2447        let ord = ArrayOrd::new(&col);
2448        let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
2449        indices.sort_by_key(|i| ord.at(i.as_usize()));
2450        let indices = UInt64Array::from(indices);
2451        let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
2452        assert_eq!(row_col.as_ref(), ord_col.as_ref());
2453
2454        // Check that our order matches the datum-native order when `preserves_order` is true.
2455        let ordered_prefix_len = desc
2456            .iter()
2457            .take_while(|(_, c)| preserves_order(&c.scalar_type))
2458            .count();
2459        let decoder = <RelationDesc as Schema<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2460        let rows = (0..ord_col.len()).map(|i| {
2461            let mut row = Row::default();
2462            decoder.decode(i, &mut row);
2463            row
2464        });
2465        for (a, b) in rows.tuple_windows() {
2466            let a_prefix = a.iter().take(ordered_prefix_len);
2467            let b_prefix = b.iter().take(ordered_prefix_len);
2468            assert!(
2469                a_prefix.cmp(b_prefix).is_le(),
2470                "ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
2471                desc.iter().take(ordered_prefix_len).collect_vec(),
2472                a.iter().take(ordered_prefix_len).collect_vec(),
2473                b.iter().take(ordered_prefix_len).collect_vec()
2474            );
2475        }
2476
2477        // Check that our size estimates are consistent.
2478        assert_eq!(
2479            ord.goodbytes(),
2480            (0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
2481            "total size should match the sum of the sizes at each index"
2482        );
2483
2484        // Check that our lower bounds work as expected.
2485        if !ord_col.is_empty() {
2486            let min_idx = indices.values()[0].as_usize();
2487            let lower_bound = ArrayBound::new(ord_col, min_idx);
2488            let max_encoded_len = 1000;
2489            if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
2490                assert!(
2491                    proto.encoded_len() <= max_encoded_len,
2492                    "should respect the max len"
2493                );
2494                let array_data = proto.into_rust().expect("valid array");
2495                let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
2496                assert!(
2497                    new_lower_bound.get() <= lower_bound.get(),
2498                    "proto-roundtripped bound should be <= the original"
2499                );
2500            }
2501        }
2502    }
2503
2504    #[mz_ore::test]
2505    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2506    fn proptest_datums() {
2507        let strat = any::<SqlColumnType>().prop_flat_map(|ty| {
2508            proptest::collection::vec(arb_datum_for_column(ty.clone()), 0..16)
2509                .prop_map(move |d| (ty.clone(), d))
2510        });
2511        let metrics = ColumnarMetrics::disconnected();
2512
2513        proptest!(|((ty, datums) in strat)| {
2514            roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
2515        })
2516    }
2517
2518    #[mz_ore::test]
2519    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2520    fn proptest_non_empty_relation_descs() {
2521        let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
2522            proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
2523                .prop_map(move |rows| (desc.clone(), rows))
2524        });
2525        let metrics = ColumnarMetrics::disconnected();
2526
2527        proptest!(|((desc, rows) in strat)| {
2528            roundtrip_rows(&desc, rows, &metrics)
2529        })
2530    }
2531
2532    #[mz_ore::test]
2533    fn empty_relation_desc_returns_error() {
2534        let empty_desc = RelationDesc::empty();
2535        let result = <RelationDesc as Schema<Row>>::encoder(&empty_desc);
2536        assert_err!(result);
2537    }
2538
2539    #[mz_ore::test]
2540    fn smoketest_collections() {
2541        let mut row = Row::default();
2542        let mut packer = row.packer();
2543        let metrics = ColumnarMetrics::disconnected();
2544
2545        packer
2546            .try_push_array(
2547                &[ArrayDimension {
2548                    lower_bound: 0,
2549                    length: 3,
2550                }],
2551                [Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
2552            )
2553            .unwrap();
2554
2555        let array = row.unpack_first();
2556        roundtrip_datum(
2557            SqlScalarType::Array(Box::new(SqlScalarType::UInt32)).nullable(true),
2558            [array].into_iter(),
2559            &metrics,
2560        );
2561    }
2562
2563    #[mz_ore::test]
2564    fn smoketest_row() {
2565        let desc = RelationDesc::builder()
2566            .with_column("a", SqlScalarType::Int64.nullable(true))
2567            .with_column("b", SqlScalarType::String.nullable(true))
2568            .with_column("c", SqlScalarType::Bool.nullable(true))
2569            .with_column(
2570                "d",
2571                SqlScalarType::List {
2572                    element_type: Box::new(SqlScalarType::UInt32),
2573                    custom_id: None,
2574                }
2575                .nullable(true),
2576            )
2577            .with_column(
2578                "e",
2579                SqlScalarType::Map {
2580                    value_type: Box::new(SqlScalarType::Int16),
2581                    custom_id: None,
2582                }
2583                .nullable(true),
2584            )
2585            .finish();
2586        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2587
2588        let mut og_row = Row::default();
2589        {
2590            let mut packer = og_row.packer();
2591            packer.push(Datum::Int64(100));
2592            packer.push(Datum::String("hello world"));
2593            packer.push(Datum::True);
2594            packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2595            packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
2596        }
2597        let mut og_row_2 = Row::default();
2598        {
2599            let mut packer = og_row_2.packer();
2600            packer.push(Datum::Null);
2601            packer.push(Datum::Null);
2602            packer.push(Datum::Null);
2603            packer.push(Datum::Null);
2604            packer.push(Datum::Null);
2605        }
2606
2607        encoder.append(&og_row);
2608        encoder.append(&og_row_2);
2609        let col = encoder.finish();
2610
2611        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2612
2613        let mut rnd_row = Row::default();
2614        decoder.decode(0, &mut rnd_row);
2615        assert_eq!(og_row, rnd_row);
2616
2617        let mut rnd_row = Row::default();
2618        decoder.decode(1, &mut rnd_row);
2619        assert_eq!(og_row_2, rnd_row);
2620    }
2621
2622    #[mz_ore::test]
2623    fn test_nested_list() {
2624        let desc = RelationDesc::builder()
2625            .with_column(
2626                "a",
2627                SqlScalarType::List {
2628                    element_type: Box::new(SqlScalarType::List {
2629                        element_type: Box::new(SqlScalarType::Int64),
2630                        custom_id: None,
2631                    }),
2632                    custom_id: None,
2633                }
2634                .nullable(false),
2635            )
2636            .finish();
2637        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2638
2639        let mut og_row = Row::default();
2640        {
2641            let mut packer = og_row.packer();
2642            packer.push_list_with(|inner| {
2643                inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
2644                inner.push_list([Datum::Int64(5)]);
2645                inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
2646            });
2647        }
2648
2649        encoder.append(&og_row);
2650        let col = encoder.finish();
2651
2652        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2653        let mut rnd_row = Row::default();
2654        decoder.decode(0, &mut rnd_row);
2655
2656        assert_eq!(og_row, rnd_row);
2657    }
2658
2659    #[mz_ore::test]
2660    fn test_record() {
2661        let desc = RelationDesc::builder()
2662            .with_column(
2663                "a",
2664                SqlScalarType::Record {
2665                    fields: [
2666                        (
2667                            ColumnName::from("foo"),
2668                            SqlScalarType::Int64.nullable(false),
2669                        ),
2670                        (
2671                            ColumnName::from("bar"),
2672                            SqlScalarType::String.nullable(true),
2673                        ),
2674                        (
2675                            ColumnName::from("baz"),
2676                            SqlScalarType::List {
2677                                element_type: Box::new(SqlScalarType::UInt32),
2678                                custom_id: None,
2679                            }
2680                            .nullable(false),
2681                        ),
2682                    ]
2683                    .into(),
2684                    custom_id: None,
2685                }
2686                .nullable(true),
2687            )
2688            .finish();
2689        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2690
2691        let mut og_row = Row::default();
2692        {
2693            let mut packer = og_row.packer();
2694            packer.push_list_with(|inner| {
2695                inner.push(Datum::Int64(42));
2696                inner.push(Datum::Null);
2697                inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2698            });
2699        }
2700        let null_row = Row::pack_slice(&[Datum::Null]);
2701
2702        encoder.append(&og_row);
2703        encoder.append(&null_row);
2704        let col = encoder.finish();
2705
2706        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2707        let mut rnd_row = Row::default();
2708
2709        decoder.decode(0, &mut rnd_row);
2710        assert_eq!(og_row, rnd_row);
2711
2712        rnd_row.packer();
2713        decoder.decode(1, &mut rnd_row);
2714        assert_eq!(null_row, rnd_row);
2715    }
2716
2717    #[mz_ore::test]
2718    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
2719    fn roundtrip() {
2720        let mut row = Row::default();
2721        let mut packer = row.packer();
2722        packer.extend([
2723            Datum::False,
2724            Datum::True,
2725            Datum::Int16(1),
2726            Datum::Int32(2),
2727            Datum::Int64(3),
2728            Datum::Float32(4f32.into()),
2729            Datum::Float64(5f64.into()),
2730            Datum::Date(
2731                NaiveDate::from_ymd_opt(6, 7, 8)
2732                    .unwrap()
2733                    .try_into()
2734                    .unwrap(),
2735            ),
2736            Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
2737            Datum::Timestamp(
2738                CheckedTimestamp::from_timestamplike(
2739                    NaiveDate::from_ymd_opt(12, 13 % 12, 14)
2740                        .unwrap()
2741                        .and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
2742                )
2743                .unwrap(),
2744            ),
2745            Datum::TimestampTz(
2746                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
2747                    NaiveDate::from_ymd_opt(18, 19 % 12, 20)
2748                        .unwrap()
2749                        .and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
2750                    Utc,
2751                ))
2752                .unwrap(),
2753            ),
2754            Datum::Interval(Interval {
2755                months: 24,
2756                days: 42,
2757                micros: 25,
2758            }),
2759            Datum::Bytes(&[26, 27]),
2760            Datum::String("28"),
2761            Datum::from(Numeric::from(29)),
2762            Datum::from(Numeric::infinity()),
2763            Datum::from(-Numeric::infinity()),
2764            Datum::from(Numeric::nan()),
2765            Datum::JsonNull,
2766            Datum::Uuid(Uuid::from_u128(30)),
2767            Datum::Dummy,
2768            Datum::Null,
2769        ]);
2770        packer
2771            .try_push_array(
2772                &[ArrayDimension {
2773                    lower_bound: 2,
2774                    length: 2,
2775                }],
2776                vec![Datum::Int32(31), Datum::Int32(32)],
2777            )
2778            .expect("valid array");
2779        packer.push_list_with(|packer| {
2780            packer.push(Datum::String("33"));
2781            packer.push_list_with(|packer| {
2782                packer.push(Datum::String("34"));
2783                packer.push(Datum::String("35"));
2784            });
2785            packer.push(Datum::String("36"));
2786            packer.push(Datum::String("37"));
2787        });
2788        packer.push_dict_with(|row| {
2789            // Add a bunch of data to the hash to ensure we don't get a
2790            // HashMap's random iteration anywhere in the encode/decode path.
2791            let mut i = 38;
2792            for _ in 0..20 {
2793                row.push(Datum::String(&i.to_string()));
2794                row.push(Datum::Int32(i + 1));
2795                i += 2;
2796            }
2797        });
2798
2799        let mut desc = RelationDesc::builder();
2800        for (idx, _) in row.iter().enumerate() {
2801            // HACK(parkmycar): We don't currently validate the types of the `RelationDesc` are
2802            // correct, just the number of columns. So we can fill in any type here.
2803            desc = desc.with_column(idx.to_string(), SqlScalarType::Int32.nullable(true));
2804        }
2805        let desc = desc.finish();
2806
2807        let encoded = row.encode_to_vec();
2808        assert_eq!(Row::decode(&encoded, &desc), Ok(row));
2809    }
2810
2811    #[mz_ore::test]
2812    fn smoketest_projection() {
2813        let desc = RelationDesc::builder()
2814            .with_column("a", SqlScalarType::Int64.nullable(true))
2815            .with_column("b", SqlScalarType::String.nullable(true))
2816            .with_column("c", SqlScalarType::Bool.nullable(true))
2817            .finish();
2818        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2819
2820        let mut og_row = Row::default();
2821        {
2822            let mut packer = og_row.packer();
2823            packer.push(Datum::Int64(100));
2824            packer.push(Datum::String("hello world"));
2825            packer.push(Datum::True);
2826        }
2827        let mut og_row_2 = Row::default();
2828        {
2829            let mut packer = og_row_2.packer();
2830            packer.push(Datum::Null);
2831            packer.push(Datum::Null);
2832            packer.push(Datum::Null);
2833        }
2834
2835        encoder.append(&og_row);
2836        encoder.append(&og_row_2);
2837        let col = encoder.finish();
2838
2839        let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2840
2841        let decoder = <RelationDesc as Schema<Row>>::decoder(&projected_desc, col).unwrap();
2842
2843        let mut rnd_row = Row::default();
2844        decoder.decode(0, &mut rnd_row);
2845        let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]);
2846        assert_eq!(expected_row, rnd_row);
2847
2848        let mut rnd_row = Row::default();
2849        decoder.decode(1, &mut rnd_row);
2850        let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]);
2851        assert_eq!(expected_row, rnd_row);
2852    }
2853}