mz_repr/row/
encode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A permanent storage encoding for rows.
11//!
12//! See row.proto for details.
13
14use std::fmt::Debug;
15use std::ops::AddAssign;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use arrow::array::{
20    Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
21    BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, Float32Builder,
22    Float64Array, Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array,
23    Int64Builder, ListArray, ListBuilder, MapArray, StringArray, StringBuilder, StructArray,
24    UInt8Array, UInt8Builder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array,
25    UInt64Builder, make_array,
26};
27use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
28use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
29use bytes::{BufMut, Bytes};
30use chrono::Timelike;
31use dec::{Context, Decimal, OrderedDecimal};
32use itertools::{EitherOrBoth, Itertools};
33use mz_ore::assert_none;
34use mz_ore::cast::CastFrom;
35use mz_persist_types::Codec;
36use mz_persist_types::arrow::ArrayOrd;
37use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema};
38use mz_persist_types::stats::{
39    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, FixedSizeBytesStatsKind,
40    OptionStats, PrimitiveStats, StructStats,
41};
42use mz_proto::chrono::ProtoNaiveTime;
43use mz_proto::{ProtoType, RustType, TryFromProtoError};
44use prost::Message;
45use uuid::Uuid;
46
47use crate::adt::array::{ArrayDimension, PackedArrayDimension};
48use crate::adt::date::Date;
49use crate::adt::datetime::PackedNaiveTime;
50use crate::adt::interval::PackedInterval;
51use crate::adt::jsonb::{JsonbPacker, JsonbRef};
52use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
53use crate::adt::numeric::{Numeric, PackedNumeric};
54use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
55use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
56use crate::row::proto_datum::DatumType;
57use crate::row::{
58    ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
59    ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
60};
61use crate::stats::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
62use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, SqlScalarType, Timestamp};
63
64// TODO(parkmycar): Benchmark the difference between `FixedSizeBinaryArray` and `BinaryArray`.
65//
66// `FixedSizeBinaryArray`s push empty bytes when a value is null which for larger binary types
67// could result in poor performance.
68#[allow(clippy::as_conversions)]
69mod fixed_binary_sizes {
70    use super::*;
71
72    pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
73    pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
74    pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
75    pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
76    pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
77    pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
78
79    pub const UUID_FIXED_BYTES: i32 = 16;
80    static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
81}
82use fixed_binary_sizes::*;
83
84/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match:
85/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this
86/// is true for many types.
87pub fn preserves_order(scalar_type: &SqlScalarType) -> bool {
88    match scalar_type {
89        // These types have short, fixed-length encodings that are designed to sort identically.
90        SqlScalarType::Bool
91        | SqlScalarType::Int16
92        | SqlScalarType::Int32
93        | SqlScalarType::Int64
94        | SqlScalarType::UInt16
95        | SqlScalarType::UInt32
96        | SqlScalarType::UInt64
97        | SqlScalarType::Date
98        | SqlScalarType::Time
99        | SqlScalarType::Timestamp { .. }
100        | SqlScalarType::TimestampTz { .. }
101        | SqlScalarType::Interval
102        | SqlScalarType::Bytes
103        | SqlScalarType::String
104        | SqlScalarType::Uuid
105        | SqlScalarType::MzTimestamp
106        | SqlScalarType::MzAclItem
107        | SqlScalarType::AclItem => true,
108        // We sort records lexicographically; a record has a meaningful sort if all its fields do.
109        SqlScalarType::Record { fields, .. } => fields
110            .iter()
111            .all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
112        // Our floating-point encoding preserves order generally, but differs when comparing
113        // -0 and 0. Opt these out for now.
114        SqlScalarType::Float32 | SqlScalarType::Float64 => false,
115        // Numeric is sensitive to similar ordering issues as floating point numbers, and requires
116        // some special handling we don't have yet.
117        SqlScalarType::Numeric { .. } => false,
118        // For all other types: either the encoding is known to not preserve ordering, or we
119        // don't yet care to make strong guarantees one way or the other.
120        SqlScalarType::PgLegacyChar
121        | SqlScalarType::PgLegacyName
122        | SqlScalarType::Char { .. }
123        | SqlScalarType::VarChar { .. }
124        | SqlScalarType::Jsonb
125        | SqlScalarType::Array(_)
126        | SqlScalarType::List { .. }
127        | SqlScalarType::Oid
128        | SqlScalarType::Map { .. }
129        | SqlScalarType::RegProc
130        | SqlScalarType::RegType
131        | SqlScalarType::RegClass
132        | SqlScalarType::Int2Vector
133        | SqlScalarType::Range { .. } => false,
134    }
135}
136
137/// An encoder for a column of [`Datum`]s.
138#[derive(Debug)]
139struct DatumEncoder {
140    nullable: bool,
141    encoder: DatumColumnEncoder,
142}
143
144impl DatumEncoder {
145    fn goodbytes(&self) -> usize {
146        self.encoder.goodbytes()
147    }
148
149    fn push(&mut self, datum: Datum) {
150        assert!(
151            !datum.is_null() || self.nullable,
152            "tried pushing Null into non-nullable column"
153        );
154        self.encoder.push(datum);
155    }
156
157    fn push_invalid(&mut self) {
158        self.encoder.push_invalid();
159    }
160
161    fn finish(self) -> ArrayRef {
162        self.encoder.finish()
163    }
164}
165
166/// An encoder for a single column of [`Datum`]s. To encode an entire row see
167/// [`RowColumnarEncoder`].
168///
169/// Note: We specifically structure the encoder as an enum instead of using trait objects because
170/// Datum encoding is an extremely hot path and downcasting objects is relatively slow.
171#[derive(Debug)]
172enum DatumColumnEncoder {
173    Bool(BooleanBuilder),
174    U8(UInt8Builder),
175    U16(UInt16Builder),
176    U32(UInt32Builder),
177    U64(UInt64Builder),
178    I16(Int16Builder),
179    I32(Int32Builder),
180    I64(Int64Builder),
181    F32(Float32Builder),
182    F64(Float64Builder),
183    Numeric {
184        /// The raw bytes so we can losslessly roundtrip Numerics.
185        binary_values: BinaryBuilder,
186        /// Also maintain a float64 approximation for sorting.
187        approx_values: Float64Builder,
188        /// Re-usable `libdecimal` context for conversions.
189        numeric_context: Context<Numeric>,
190    },
191    Bytes(BinaryBuilder),
192    String(StringBuilder),
193    Date(Int32Builder),
194    Time(FixedSizeBinaryBuilder),
195    Timestamp(FixedSizeBinaryBuilder),
196    TimestampTz(FixedSizeBinaryBuilder),
197    MzTimestamp(UInt64Builder),
198    Interval(FixedSizeBinaryBuilder),
199    Uuid(FixedSizeBinaryBuilder),
200    AclItem(FixedSizeBinaryBuilder),
201    MzAclItem(BinaryBuilder),
202    Range(BinaryBuilder),
203    /// Hand rolled "StringBuilder" that reduces the number of copies required
204    /// to serialize JSON.
205    ///
206    /// An alternative would be to use [`StringBuilder`] but that requires
207    /// serializing to an intermediary string, and then copying that
208    /// intermediary string into an underlying buffer.
209    Jsonb {
210        /// Monotonically increasing offsets of each encoded segment.
211        offsets: Vec<i32>,
212        /// Buffer that contains UTF-8 encoded JSON.
213        buf: Vec<u8>,
214        /// Null entries, if any.
215        nulls: Option<BooleanBufferBuilder>,
216    },
217    Array {
218        /// Binary encoded `ArrayDimension`s.
219        dims: ListBuilder<FixedSizeBinaryBuilder>,
220        /// Lengths of each `Array` in this column.
221        val_lengths: Vec<usize>,
222        /// Contiguous array of underlying data.
223        vals: Box<DatumColumnEncoder>,
224        /// Null entires, if any.
225        nulls: Option<BooleanBufferBuilder>,
226    },
227    List {
228        /// Lengths of each `List` in this column.
229        lengths: Vec<usize>,
230        /// Contiguous array of underlying data.
231        values: Box<DatumColumnEncoder>,
232        /// Null entires, if any.
233        nulls: Option<BooleanBufferBuilder>,
234    },
235    Map {
236        /// Lengths of each `Map` in this column
237        lengths: Vec<usize>,
238        /// Contiguous array of key data.
239        keys: StringBuilder,
240        /// Contiguous array of val data.
241        vals: Box<DatumColumnEncoder>,
242        /// Null entires, if any.
243        nulls: Option<BooleanBufferBuilder>,
244    },
245    Record {
246        /// Columns in the record.
247        fields: Vec<DatumEncoder>,
248        /// Null entries, if any.
249        nulls: Option<BooleanBufferBuilder>,
250        /// Number of values we've pushed into this builder thus far.
251        length: usize,
252    },
253    /// Special encoder for a [`SqlScalarType::Record`] that has no inner fields.
254    ///
255    /// We have a special case for this scenario because Arrow does not allow a
256    /// [`StructArray`] (what normally use to encod a `Record`) with no fields.
257    RecordEmpty(BooleanBuilder),
258}
259
260impl DatumColumnEncoder {
261    fn goodbytes(&self) -> usize {
262        match self {
263            DatumColumnEncoder::Bool(a) => a.len(),
264            DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
265            DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
266            DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
267            DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
268            DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
269            DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
270            DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
271            DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
272            DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
273            DatumColumnEncoder::Numeric {
274                binary_values,
275                approx_values,
276                ..
277            } => {
278                binary_values.values_slice().len()
279                    + approx_values.values_slice().to_byte_slice().len()
280            }
281            DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
282            DatumColumnEncoder::String(a) => a.values_slice().len(),
283            DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
284            DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
285            DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
286            DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
287            DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
288            DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
289            DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
290            DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
291            DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
292            DatumColumnEncoder::Range(a) => a.values_slice().len(),
293            DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
294            DatumColumnEncoder::Array { dims, vals, .. } => {
295                dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
296            }
297            DatumColumnEncoder::List { values, .. } => values.goodbytes(),
298            DatumColumnEncoder::Map { keys, vals, .. } => {
299                keys.values_slice().len() + vals.goodbytes()
300            }
301            DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
302            DatumColumnEncoder::RecordEmpty(a) => a.len(),
303        }
304    }
305
306    fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
307        match (self, datum) {
308            (DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
309                bool_builder.append_value(true)
310            }
311            (DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
312                bool_builder.append_value(false)
313            }
314            (DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
315            (DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
316            (DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
317            (DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
318            (DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
319            (DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
320            (DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
321            (DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
322            (DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
323            (
324                DatumColumnEncoder::Numeric {
325                    approx_values,
326                    binary_values,
327                    numeric_context,
328                },
329                Datum::Numeric(val),
330            ) => {
331                let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
332                    numeric_context.clear_status();
333                    if val.0.is_negative() {
334                        f64::NEG_INFINITY
335                    } else {
336                        f64::INFINITY
337                    }
338                });
339                let packed = PackedNumeric::from_value(val.0);
340
341                approx_values.append_value(float_approx);
342                binary_values.append_value(packed.as_bytes());
343            }
344            (DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
345            (DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
346            (DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
347                builder.append_value(val.pg_epoch_days())
348            }
349            (DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
350                let packed = PackedNaiveTime::from_value(val);
351                builder
352                    .append_value(packed.as_bytes())
353                    .expect("known correct size");
354            }
355            (DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
356                let packed = PackedNaiveDateTime::from_value(val.to_naive());
357                builder
358                    .append_value(packed.as_bytes())
359                    .expect("known correct size");
360            }
361            (DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
362                let packed = PackedNaiveDateTime::from_value(val.to_naive());
363                builder
364                    .append_value(packed.as_bytes())
365                    .expect("known correct size");
366            }
367            (DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
368                builder.append_value(val.into());
369            }
370            (DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
371                let packed = PackedInterval::from_value(val);
372                builder
373                    .append_value(packed.as_bytes())
374                    .expect("known correct size");
375            }
376            (DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
377                .append_value(val.as_bytes())
378                .expect("known correct size"),
379            (DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
380                let packed = PackedAclItem::from_value(val);
381                builder
382                    .append_value(packed.as_bytes())
383                    .expect("known correct size");
384            }
385            (DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
386                let packed = PackedMzAclItem::from_value(val);
387                builder.append_value(packed.as_bytes());
388            }
389            (DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
390                let proto = ProtoDatum::from(d);
391                let bytes = proto.encode_to_vec();
392                builder.append_value(&bytes);
393            }
394            (
395                DatumColumnEncoder::Jsonb {
396                    offsets,
397                    buf,
398                    nulls,
399                },
400                d @ Datum::JsonNull
401                | d @ Datum::True
402                | d @ Datum::False
403                | d @ Datum::Numeric(_)
404                | d @ Datum::String(_)
405                | d @ Datum::List(_)
406                | d @ Datum::Map(_),
407            ) => {
408                // TODO(parkmycar): Why do we need to re-borrow here?
409                let mut buf = buf;
410                let json = JsonbRef::from_datum(d);
411
412                // Serialize our JSON.
413                json.to_writer(&mut buf)
414                    .expect("failed to serialize Datum to jsonb");
415                let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
416                offsets.push(offset);
417
418                if let Some(nulls) = nulls {
419                    nulls.append(true);
420                }
421            }
422            (
423                DatumColumnEncoder::Array {
424                    dims,
425                    val_lengths,
426                    vals,
427                    nulls,
428                },
429                Datum::Array(array),
430            ) => {
431                // Store our array dimensions.
432                for dimension in array.dims() {
433                    let packed = PackedArrayDimension::from_value(dimension);
434                    dims.values()
435                        .append_value(packed.as_bytes())
436                        .expect("known correct size");
437                }
438                dims.append(true);
439
440                // Store the values of the array.
441                let mut count = 0;
442                for datum in &array.elements() {
443                    count += 1;
444                    vals.push(datum);
445                }
446                val_lengths.push(count);
447
448                if let Some(nulls) = nulls {
449                    nulls.append(true);
450                }
451            }
452            (
453                DatumColumnEncoder::List {
454                    lengths,
455                    values,
456                    nulls,
457                },
458                Datum::List(list),
459            ) => {
460                let mut count = 0;
461                for datum in &list {
462                    count += 1;
463                    values.push(datum);
464                }
465                lengths.push(count);
466
467                if let Some(nulls) = nulls {
468                    nulls.append(true);
469                }
470            }
471            (
472                DatumColumnEncoder::Map {
473                    lengths,
474                    keys,
475                    vals,
476                    nulls,
477                },
478                Datum::Map(map),
479            ) => {
480                let mut count = 0;
481                for (key, datum) in &map {
482                    count += 1;
483                    keys.append_value(key);
484                    vals.push(datum);
485                }
486                lengths.push(count);
487
488                if let Some(nulls) = nulls {
489                    nulls.append(true);
490                }
491            }
492            (
493                DatumColumnEncoder::Record {
494                    fields,
495                    nulls,
496                    length,
497                },
498                Datum::List(records),
499            ) => {
500                let mut count = 0;
501                // `zip_eq` will panic if the number of records != number of fields.
502                for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
503                    count += 1;
504                    encoder.push(datum);
505                }
506                assert_eq!(count, fields.len());
507
508                length.add_assign(1);
509                if let Some(nulls) = nulls.as_mut() {
510                    nulls.append(true);
511                }
512            }
513            (DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
514                assert_none!(records.into_iter().next());
515                builder.append_value(true);
516            }
517            (encoder, Datum::Null) => encoder.push_invalid(),
518            (encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
519        }
520    }
521
522    fn push_invalid(&mut self) {
523        match self {
524            DatumColumnEncoder::Bool(builder) => builder.append_null(),
525            DatumColumnEncoder::U8(builder) => builder.append_null(),
526            DatumColumnEncoder::U16(builder) => builder.append_null(),
527            DatumColumnEncoder::U32(builder) => builder.append_null(),
528            DatumColumnEncoder::U64(builder) => builder.append_null(),
529            DatumColumnEncoder::I16(builder) => builder.append_null(),
530            DatumColumnEncoder::I32(builder) => builder.append_null(),
531            DatumColumnEncoder::I64(builder) => builder.append_null(),
532            DatumColumnEncoder::F32(builder) => builder.append_null(),
533            DatumColumnEncoder::F64(builder) => builder.append_null(),
534            DatumColumnEncoder::Numeric {
535                approx_values,
536                binary_values,
537                numeric_context: _,
538            } => {
539                approx_values.append_null();
540                binary_values.append_null();
541            }
542            DatumColumnEncoder::String(builder) => builder.append_null(),
543            DatumColumnEncoder::Bytes(builder) => builder.append_null(),
544            DatumColumnEncoder::Date(builder) => builder.append_null(),
545            DatumColumnEncoder::Time(builder) => builder.append_null(),
546            DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
547            DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
548            DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
549            DatumColumnEncoder::Interval(builder) => builder.append_null(),
550            DatumColumnEncoder::Uuid(builder) => builder.append_null(),
551            DatumColumnEncoder::AclItem(builder) => builder.append_null(),
552            DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
553            DatumColumnEncoder::Range(builder) => builder.append_null(),
554            DatumColumnEncoder::Jsonb {
555                offsets,
556                buf: _,
557                nulls,
558            } => {
559                let nulls = nulls.get_or_insert_with(|| {
560                    let mut buf = BooleanBufferBuilder::new(offsets.len());
561                    // The offsets buffer has one more value than there are elements.
562                    buf.append_n(offsets.len() - 1, true);
563                    buf
564                });
565
566                offsets.push(offsets.last().copied().unwrap_or(0));
567                nulls.append(false);
568            }
569            DatumColumnEncoder::Array {
570                dims,
571                val_lengths,
572                vals: _,
573                nulls,
574            } => {
575                let nulls = nulls.get_or_insert_with(|| {
576                    let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
577                    buf.append_n(dims.len(), true);
578                    buf
579                });
580                dims.append_null();
581
582                val_lengths.push(0);
583                nulls.append(false);
584            }
585            DatumColumnEncoder::List {
586                lengths,
587                values: _,
588                nulls,
589            } => {
590                let nulls = nulls.get_or_insert_with(|| {
591                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
592                    buf.append_n(lengths.len(), true);
593                    buf
594                });
595
596                lengths.push(0);
597                nulls.append(false);
598            }
599            DatumColumnEncoder::Map {
600                lengths,
601                keys: _,
602                vals: _,
603                nulls,
604            } => {
605                let nulls = nulls.get_or_insert_with(|| {
606                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
607                    buf.append_n(lengths.len(), true);
608                    buf
609                });
610
611                lengths.push(0);
612                nulls.append(false);
613            }
614            DatumColumnEncoder::Record {
615                fields,
616                nulls,
617                length,
618            } => {
619                let nulls = nulls.get_or_insert_with(|| {
620                    let mut buf = BooleanBufferBuilder::new(*length + 1);
621                    buf.append_n(*length, true);
622                    buf
623                });
624                nulls.append(false);
625                length.add_assign(1);
626
627                for field in fields {
628                    field.push_invalid();
629                }
630            }
631            DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
632        }
633    }
634
635    fn finish(self) -> ArrayRef {
636        match self {
637            DatumColumnEncoder::Bool(mut builder) => {
638                let array = builder.finish();
639                Arc::new(array)
640            }
641            DatumColumnEncoder::U8(mut builder) => {
642                let array = builder.finish();
643                Arc::new(array)
644            }
645            DatumColumnEncoder::U16(mut builder) => {
646                let array = builder.finish();
647                Arc::new(array)
648            }
649            DatumColumnEncoder::U32(mut builder) => {
650                let array = builder.finish();
651                Arc::new(array)
652            }
653            DatumColumnEncoder::U64(mut builder) => {
654                let array = builder.finish();
655                Arc::new(array)
656            }
657            DatumColumnEncoder::I16(mut builder) => {
658                let array = builder.finish();
659                Arc::new(array)
660            }
661            DatumColumnEncoder::I32(mut builder) => {
662                let array = builder.finish();
663                Arc::new(array)
664            }
665            DatumColumnEncoder::I64(mut builder) => {
666                let array = builder.finish();
667                Arc::new(array)
668            }
669            DatumColumnEncoder::F32(mut builder) => {
670                let array = builder.finish();
671                Arc::new(array)
672            }
673            DatumColumnEncoder::F64(mut builder) => {
674                let array = builder.finish();
675                Arc::new(array)
676            }
677            DatumColumnEncoder::Numeric {
678                mut approx_values,
679                mut binary_values,
680                numeric_context: _,
681            } => {
682                let approx_array = approx_values.finish();
683                let binary_array = binary_values.finish();
684
685                assert_eq!(approx_array.len(), binary_array.len());
686                // This is O(n) so we only enable it for debug assertions.
687                debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
688
689                let fields = Fields::from(vec![
690                    Field::new("approx", approx_array.data_type().clone(), true),
691                    Field::new("binary", binary_array.data_type().clone(), true),
692                ]);
693                let nulls = approx_array.logical_nulls();
694                let array = StructArray::new(
695                    fields,
696                    vec![Arc::new(approx_array), Arc::new(binary_array)],
697                    nulls,
698                );
699                Arc::new(array)
700            }
701            DatumColumnEncoder::String(mut builder) => {
702                let array = builder.finish();
703                Arc::new(array)
704            }
705            DatumColumnEncoder::Bytes(mut builder) => {
706                let array = builder.finish();
707                Arc::new(array)
708            }
709            DatumColumnEncoder::Date(mut builder) => {
710                let array = builder.finish();
711                Arc::new(array)
712            }
713            DatumColumnEncoder::Time(mut builder) => {
714                let array = builder.finish();
715                Arc::new(array)
716            }
717            DatumColumnEncoder::Timestamp(mut builder) => {
718                let array = builder.finish();
719                Arc::new(array)
720            }
721            DatumColumnEncoder::TimestampTz(mut builder) => {
722                let array = builder.finish();
723                Arc::new(array)
724            }
725            DatumColumnEncoder::MzTimestamp(mut builder) => {
726                let array = builder.finish();
727                Arc::new(array)
728            }
729            DatumColumnEncoder::Interval(mut builder) => {
730                let array = builder.finish();
731                Arc::new(array)
732            }
733            DatumColumnEncoder::Uuid(mut builder) => {
734                let array = builder.finish();
735                Arc::new(array)
736            }
737            DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
738            DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
739            DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
740            DatumColumnEncoder::Jsonb {
741                offsets,
742                buf,
743                mut nulls,
744            } => {
745                let values = Buffer::from_vec(buf);
746                let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
747                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
748                let array = StringArray::new(offsets, values, nulls);
749                Arc::new(array)
750            }
751            DatumColumnEncoder::Array {
752                mut dims,
753                val_lengths,
754                vals,
755                mut nulls,
756            } => {
757                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
758                let vals = vals.finish();
759
760                // Note: Values in an Array can always be Null, regardless of whether or not the
761                // column is nullable.
762                let field = Field::new_list_field(vals.data_type().clone(), true);
763                let val_offsets = OffsetBuffer::from_lengths(val_lengths);
764                let values =
765                    ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
766
767                let dims = dims.finish();
768                assert_eq!(values.len(), dims.len());
769
770                // Note: The inner arrays are always nullable, and we let the higher-level array
771                // drive nullability for the entire column.
772                let fields = Fields::from(vec![
773                    Field::new("dims", dims.data_type().clone(), true),
774                    Field::new("vals", values.data_type().clone(), true),
775                ]);
776                let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
777
778                Arc::new(array)
779            }
780            DatumColumnEncoder::List {
781                lengths,
782                values,
783                mut nulls,
784            } => {
785                let values = values.finish();
786
787                // Note: Values in an Array can always be Null, regardless of whether or not the
788                // column is nullable.
789                let field = Field::new_list_field(values.data_type().clone(), true);
790                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
791                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
792
793                let array = ListArray::new(Arc::new(field), offsets, values, nulls);
794                Arc::new(array)
795            }
796            DatumColumnEncoder::Map {
797                lengths,
798                mut keys,
799                vals,
800                mut nulls,
801            } => {
802                let keys = keys.finish();
803                let vals = vals.finish();
804
805                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
806                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
807
808                // Note: Values in an Map can always be Null, regardless of whether or not the
809                // column is nullable, but Keys cannot.
810                assert_none!(keys.logical_nulls());
811                let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
812                let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
813                let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
814                let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
815
816                // Note: DatumMap is always sorted, and Arrow enforces that the inner 'map_entries'
817                // array can never be null.
818                let field = Field::new("map_entries", entries.data_type().clone(), false);
819                let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
820                Arc::new(array)
821            }
822            DatumColumnEncoder::Record {
823                fields,
824                mut nulls,
825                length: _,
826            } => {
827                let (fields, arrays): (Vec<_>, Vec<_>) = fields
828                    .into_iter()
829                    .enumerate()
830                    .map(|(tag, encoder)| {
831                        // Note: We mark all columns as nullable at the Arrow/Parquet level because
832                        // it has a negligible performance difference, but it protects us from
833                        // unintended nullability changes in the columns of SQL objects.
834                        //
835                        // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
836                        let nullable = true;
837                        let array = encoder.finish();
838                        let field =
839                            Field::new(tag.to_string(), array.data_type().clone(), nullable);
840                        (field, array)
841                    })
842                    .unzip();
843                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
844
845                let array = StructArray::new(Fields::from(fields), arrays, nulls);
846                Arc::new(array)
847            }
848            DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
849        }
850    }
851}
852
853/// A decoder for a column of [`Datum`]s.
854///
855/// Note: We specifically structure the decoder as an enum instead of using trait objects because
856/// Datum decoding is an extremely hot path and downcasting objects is relatively slow.
857#[derive(Debug)]
858enum DatumColumnDecoder {
859    Bool(BooleanArray),
860    U8(UInt8Array),
861    U16(UInt16Array),
862    U32(UInt32Array),
863    U64(UInt64Array),
864    I16(Int16Array),
865    I32(Int32Array),
866    I64(Int64Array),
867    F32(Float32Array),
868    F64(Float64Array),
869    Numeric(BinaryArray),
870    Bytes(BinaryArray),
871    String(StringArray),
872    Date(Int32Array),
873    Time(FixedSizeBinaryArray),
874    Timestamp(FixedSizeBinaryArray),
875    TimestampTz(FixedSizeBinaryArray),
876    MzTimestamp(UInt64Array),
877    Interval(FixedSizeBinaryArray),
878    Uuid(FixedSizeBinaryArray),
879    Json(StringArray),
880    Array {
881        dim_offsets: OffsetBuffer<i32>,
882        dims: FixedSizeBinaryArray,
883        val_offsets: OffsetBuffer<i32>,
884        vals: Box<DatumColumnDecoder>,
885        nulls: Option<NullBuffer>,
886    },
887    List {
888        offsets: OffsetBuffer<i32>,
889        values: Box<DatumColumnDecoder>,
890        nulls: Option<NullBuffer>,
891    },
892    Map {
893        offsets: OffsetBuffer<i32>,
894        keys: StringArray,
895        vals: Box<DatumColumnDecoder>,
896        nulls: Option<NullBuffer>,
897    },
898    RecordEmpty(BooleanArray),
899    Record {
900        fields: Vec<Box<DatumColumnDecoder>>,
901        nulls: Option<NullBuffer>,
902    },
903    Range(BinaryArray),
904    MzAclItem(BinaryArray),
905    AclItem(FixedSizeBinaryArray),
906}
907
908impl DatumColumnDecoder {
909    fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
910        let datum = match self {
911            DatumColumnDecoder::Bool(array) => array
912                .is_valid(idx)
913                .then(|| array.value(idx))
914                .map(|x| if x { Datum::True } else { Datum::False }),
915            DatumColumnDecoder::U8(array) => array
916                .is_valid(idx)
917                .then(|| array.value(idx))
918                .map(Datum::UInt8),
919            DatumColumnDecoder::U16(array) => array
920                .is_valid(idx)
921                .then(|| array.value(idx))
922                .map(Datum::UInt16),
923            DatumColumnDecoder::U32(array) => array
924                .is_valid(idx)
925                .then(|| array.value(idx))
926                .map(Datum::UInt32),
927            DatumColumnDecoder::U64(array) => array
928                .is_valid(idx)
929                .then(|| array.value(idx))
930                .map(Datum::UInt64),
931            DatumColumnDecoder::I16(array) => array
932                .is_valid(idx)
933                .then(|| array.value(idx))
934                .map(Datum::Int16),
935            DatumColumnDecoder::I32(array) => array
936                .is_valid(idx)
937                .then(|| array.value(idx))
938                .map(Datum::Int32),
939            DatumColumnDecoder::I64(array) => array
940                .is_valid(idx)
941                .then(|| array.value(idx))
942                .map(Datum::Int64),
943            DatumColumnDecoder::F32(array) => array
944                .is_valid(idx)
945                .then(|| array.value(idx))
946                .map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
947            DatumColumnDecoder::F64(array) => array
948                .is_valid(idx)
949                .then(|| array.value(idx))
950                .map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
951            DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
952                let val = array.value(idx);
953                let val = PackedNumeric::from_bytes(val)
954                    .expect("failed to roundtrip Numeric")
955                    .into_value();
956                Datum::Numeric(OrderedDecimal(val))
957            }),
958            DatumColumnDecoder::String(array) => array
959                .is_valid(idx)
960                .then(|| array.value(idx))
961                .map(Datum::String),
962            DatumColumnDecoder::Bytes(array) => array
963                .is_valid(idx)
964                .then(|| array.value(idx))
965                .map(Datum::Bytes),
966            DatumColumnDecoder::Date(array) => {
967                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
968                    let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
969                    Datum::Date(date)
970                })
971            }
972            DatumColumnDecoder::Time(array) => {
973                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
974                    let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
975                    Datum::Time(packed.into_value())
976                })
977            }
978            DatumColumnDecoder::Timestamp(array) => {
979                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
980                    let packed = PackedNaiveDateTime::from_bytes(x)
981                        .expect("failed to roundtrip PackedNaiveDateTime");
982                    let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
983                        .expect("failed to roundtrip timestamp");
984                    Datum::Timestamp(timestamp)
985                })
986            }
987            DatumColumnDecoder::TimestampTz(array) => {
988                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
989                    let packed = PackedNaiveDateTime::from_bytes(x)
990                        .expect("failed to roundtrip PackedNaiveDateTime");
991                    let timestamp =
992                        CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
993                            .expect("failed to roundtrip timestamp");
994                    Datum::TimestampTz(timestamp)
995                })
996            }
997            DatumColumnDecoder::MzTimestamp(array) => array
998                .is_valid(idx)
999                .then(|| array.value(idx))
1000                .map(|x| Datum::MzTimestamp(Timestamp::from(x))),
1001            DatumColumnDecoder::Interval(array) => {
1002                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1003                    let packed =
1004                        PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
1005                    Datum::Interval(packed.into_value())
1006                })
1007            }
1008            DatumColumnDecoder::Uuid(array) => {
1009                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1010                    let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
1011                    Datum::Uuid(uuid)
1012                })
1013            }
1014            DatumColumnDecoder::AclItem(array) => {
1015                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1016                    let packed =
1017                        PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1018                    Datum::AclItem(packed.into_value())
1019                })
1020            }
1021            DatumColumnDecoder::MzAclItem(array) => {
1022                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1023                    let packed =
1024                        PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1025                    Datum::MzAclItem(packed.into_value())
1026                })
1027            }
1028            DatumColumnDecoder::Range(array) => {
1029                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1030                    packer.push(Datum::Null);
1031                    return;
1032                };
1033
1034                let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
1035                packer
1036                    .try_push_proto(&proto)
1037                    .expect("failed to pack ProtoRange");
1038
1039                // Return early because we've already packed the necessary Datums.
1040                return;
1041            }
1042            DatumColumnDecoder::Json(array) => {
1043                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1044                    packer.push(Datum::Null);
1045                    return;
1046                };
1047                JsonbPacker::new(packer)
1048                    .pack_str(val)
1049                    .expect("failed to roundtrip JSON");
1050
1051                // Return early because we've already packed the necessary Datums.
1052                return;
1053            }
1054            DatumColumnDecoder::Array {
1055                dim_offsets,
1056                dims,
1057                val_offsets,
1058                vals,
1059                nulls,
1060            } => {
1061                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1062                if !is_valid {
1063                    packer.push(Datum::Null);
1064                    return;
1065                }
1066
1067                let start: usize = dim_offsets[idx]
1068                    .try_into()
1069                    .expect("unexpected negative offset");
1070                let end: usize = dim_offsets[idx + 1]
1071                    .try_into()
1072                    .expect("unexpected negative offset");
1073                let dimensions = (start..end).map(|idx| {
1074                    PackedArrayDimension::from_bytes(dims.value(idx))
1075                        .expect("failed to roundtrip ArrayDimension")
1076                        .into_value()
1077                });
1078
1079                let start: usize = val_offsets[idx]
1080                    .try_into()
1081                    .expect("unexpected negative offset");
1082                let end: usize = val_offsets[idx + 1]
1083                    .try_into()
1084                    .expect("unexpected negative offset");
1085                packer
1086                    .push_array_with_row_major(dimensions, |packer| {
1087                        for x in start..end {
1088                            vals.get(x, packer);
1089                        }
1090                        // Return the numer of Datums we just packed.
1091                        end - start
1092                    })
1093                    .expect("failed to pack Array");
1094
1095                // Return early because we've already packed the necessary Datums.
1096                return;
1097            }
1098            DatumColumnDecoder::List {
1099                offsets,
1100                values,
1101                nulls,
1102            } => {
1103                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1104                if !is_valid {
1105                    packer.push(Datum::Null);
1106                    return;
1107                }
1108
1109                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1110                let end: usize = offsets[idx + 1]
1111                    .try_into()
1112                    .expect("unexpected negative offset");
1113
1114                packer.push_list_with(|packer| {
1115                    for idx in start..end {
1116                        values.get(idx, packer)
1117                    }
1118                });
1119
1120                // Return early because we've already packed the necessary Datums.
1121                return;
1122            }
1123            DatumColumnDecoder::Map {
1124                offsets,
1125                keys,
1126                vals,
1127                nulls,
1128            } => {
1129                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1130                if !is_valid {
1131                    packer.push(Datum::Null);
1132                    return;
1133                }
1134
1135                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1136                let end: usize = offsets[idx + 1]
1137                    .try_into()
1138                    .expect("unexpected negative offset");
1139
1140                packer.push_dict_with(|packer| {
1141                    for idx in start..end {
1142                        packer.push(Datum::String(keys.value(idx)));
1143                        vals.get(idx, packer);
1144                    }
1145                });
1146
1147                // Return early because we've already packed the necessary Datums.
1148                return;
1149            }
1150            DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
1151            DatumColumnDecoder::Record { fields, nulls } => {
1152                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1153                if !is_valid {
1154                    packer.push(Datum::Null);
1155                    return;
1156                }
1157
1158                // let mut datums = Vec::with_capacity(fields.len());
1159                packer.push_list_with(|packer| {
1160                    for field in fields {
1161                        field.get(idx, packer);
1162                    }
1163                });
1164
1165                // Return early because we've already packed the necessary Datums.
1166                return;
1167            }
1168        };
1169
1170        match datum {
1171            Some(d) => packer.push(d),
1172            None => packer.push(Datum::Null),
1173        }
1174    }
1175
1176    fn stats(&self) -> ColumnStatKinds {
1177        match self {
1178            DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
1179            DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
1180            DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
1181            DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
1182            DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
1183            DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
1184            DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
1185            DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
1186            DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
1187            DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
1188            DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
1189            DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
1190            DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
1191            DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
1192            DatumColumnDecoder::Time(a) => {
1193                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
1194            }
1195            DatumColumnDecoder::Timestamp(a) => {
1196                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1197            }
1198            DatumColumnDecoder::TimestampTz(a) => {
1199                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1200            }
1201            DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
1202            DatumColumnDecoder::Interval(a) => {
1203                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
1204            }
1205            DatumColumnDecoder::Uuid(a) => {
1206                fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
1207            }
1208            DatumColumnDecoder::AclItem(_)
1209            | DatumColumnDecoder::MzAclItem(_)
1210            | DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
1211            DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
1212            DatumColumnDecoder::Array { .. }
1213            | DatumColumnDecoder::List { .. }
1214            | DatumColumnDecoder::Map { .. }
1215            | DatumColumnDecoder::Record { .. }
1216            | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
1217        }
1218    }
1219
1220    fn goodbytes(&self) -> usize {
1221        match self {
1222            DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1223            DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(),
1224            DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(),
1225            DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(),
1226            DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1227            DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(),
1228            DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1229            DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(),
1230            DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(),
1231            DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(),
1232            DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1233            DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(),
1234            DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1235            DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1236            DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1237            DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1238            DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1239            DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1240            DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1241            DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1242            DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1243            DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1244            DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1245            DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(),
1246            DatumColumnDecoder::Array { dims, vals, .. } => {
1247                (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes()
1248            }
1249            DatumColumnDecoder::List { values, .. } => values.goodbytes(),
1250            DatumColumnDecoder::Map { keys, vals, .. } => {
1251                ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes()
1252            }
1253            DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
1254            DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1255        }
1256    }
1257}
1258
1259impl Schema<Row> for RelationDesc {
1260    type ArrowColumn = arrow::array::StructArray;
1261    type Statistics = OptionStats<StructStats>;
1262
1263    type Decoder = RowColumnarDecoder;
1264    type Encoder = RowColumnarEncoder;
1265
1266    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1267        RowColumnarDecoder::new(col, self)
1268    }
1269
1270    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1271        RowColumnarEncoder::new(self)
1272            .ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
1273    }
1274}
1275
1276/// A [`ColumnDecoder`] for a [`Row`].
1277#[derive(Debug)]
1278pub struct RowColumnarDecoder {
1279    /// The length of all columns in this decoder; matching all child arrays and the null array
1280    /// if present.
1281    len: usize,
1282    /// Field-specific information: the user-readable field name, the null count (or None if the
1283    /// column is non-nullable), and the decoder which wraps the column-specific array.
1284    decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
1285    /// The null buffer for this row, if present. (At time of writing, all rows are assumed to be
1286    /// logically nullable.)
1287    nullability: Option<NullBuffer>,
1288}
1289
1290/// Merge the provided null buffer with the existing array's null buffer, if any.
1291fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
1292    if null_mask.is_none() {
1293        Arc::clone(column)
1294    } else {
1295        // We calculate stats on the nested arrays, so make sure we don't count entries
1296        // that are masked off at a higher level.
1297        let nulls = NullBuffer::union(null_mask, column.nulls());
1298        let data = column
1299            .to_data()
1300            .into_builder()
1301            .nulls(nulls)
1302            .build()
1303            .expect("changed only null mask");
1304        make_array(data)
1305    }
1306}
1307
1308impl RowColumnarDecoder {
1309    /// Creates a [`RowColumnarDecoder`] that decodes from the provided [`StructArray`].
1310    ///
1311    /// Returns an error if the schema of the [`StructArray`] does not match
1312    /// the provided [`RelationDesc`].
1313    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1314        let inner_columns = col.columns();
1315        let desc_columns = desc.typ().columns();
1316
1317        if desc_columns.len() > inner_columns.len() {
1318            anyhow::bail!(
1319                "provided array has too few columns! {desc_columns:?} > {inner_columns:?}"
1320            );
1321        }
1322
1323        // For performance reasons we downcast just a single time.
1324        let mut decoders = Vec::with_capacity(desc_columns.len());
1325
1326        let null_mask = col.nulls();
1327
1328        // The columns of the `StructArray` are named with their column index.
1329        for (col_idx, col_name, col_type) in desc.iter_all() {
1330            let field_name = col_idx.to_stable_name();
1331            let column = col.column_by_name(&field_name).ok_or_else(|| {
1332                anyhow::anyhow!(
1333                    "StructArray did not contain column name {field_name}, found {:?}",
1334                    col.column_names()
1335                )
1336            })?;
1337            let column = mask_nulls(column, null_mask);
1338            let null_count = col_type.nullable.then(|| column.null_count());
1339            let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
1340            decoders.push((col_name.as_str().into(), null_count, decoder));
1341        }
1342
1343        Ok(RowColumnarDecoder {
1344            len: col.len(),
1345            decoders,
1346            nullability: col.logical_nulls(),
1347        })
1348    }
1349
1350    // Returns the number of null entries in this array of Row structs. This
1351    // will be 0 when `Row` is encoded directly, but could be non-zero when it's
1352    // used inside `SourceDataEncoder`.
1353    pub fn null_count(&self) -> usize {
1354        self.nullability.as_ref().map_or(0, |n| n.null_count())
1355    }
1356}
1357
1358impl ColumnDecoder<Row> for RowColumnarDecoder {
1359    fn decode(&self, idx: usize, val: &mut Row) {
1360        let mut packer = val.packer();
1361
1362        for (_, _, decoder) in &self.decoders {
1363            decoder.get(idx, &mut packer);
1364        }
1365    }
1366
1367    fn is_null(&self, idx: usize) -> bool {
1368        let Some(nullability) = self.nullability.as_ref() else {
1369            return false;
1370        };
1371        nullability.is_null(idx)
1372    }
1373
1374    fn goodbytes(&self) -> usize {
1375        let decoders_size: usize = self
1376            .decoders
1377            .iter()
1378            .map(|(_name, _null_count, decoder)| decoder.goodbytes())
1379            .sum();
1380
1381        decoders_size
1382            + self
1383                .nullability
1384                .as_ref()
1385                .map(|nulls| nulls.inner().inner().len())
1386                .unwrap_or(0)
1387    }
1388
1389    fn stats(&self) -> StructStats {
1390        StructStats {
1391            len: self.len,
1392            cols: self
1393                .decoders
1394                .iter()
1395                .map(|(name, null_count, decoder)| {
1396                    let name = name.to_string();
1397                    let stats = ColumnarStats {
1398                        nulls: null_count.map(|count| ColumnNullStats { count }),
1399                        values: decoder.stats(),
1400                    };
1401                    (name, stats)
1402                })
1403                .collect(),
1404        }
1405    }
1406}
1407
1408/// A [`ColumnEncoder`] for a [`Row`].
1409#[derive(Debug)]
1410pub struct RowColumnarEncoder {
1411    encoders: Vec<DatumEncoder>,
1412    // TODO(parkmycar): Replace the `usize` with a `ColumnIdx` type.
1413    col_names: Vec<(usize, Arc<str>)>,
1414    // TODO(parkmycar): Optionally omit this.
1415    nullability: BooleanBufferBuilder,
1416}
1417
1418impl RowColumnarEncoder {
1419    /// Creates a [`RowColumnarEncoder`] for the provided [`RelationDesc`].
1420    ///
1421    /// Returns `None` if the provided [`RelationDesc`] has no columns.
1422    ///
1423    /// # Note
1424    /// Internally we represent a [`Row`] as a [`StructArray`] which is
1425    /// required to have at least one field. Instead of handling this case by
1426    /// adding some special "internal" column we let a higher level encoder
1427    /// (e.g. `SourceDataColumnarEncoder`) handle this case.
1428    pub fn new(desc: &RelationDesc) -> Option<Self> {
1429        if desc.typ().columns().is_empty() {
1430            return None;
1431        }
1432
1433        let (col_names, encoders): (Vec<_>, Vec<_>) = desc
1434            .iter_all()
1435            .map(|(col_idx, col_name, col_type)| {
1436                let encoder = scalar_type_to_encoder(&col_type.scalar_type)
1437                    .expect("failed to create encoder");
1438                let encoder = DatumEncoder {
1439                    nullable: col_type.nullable,
1440                    encoder,
1441                };
1442
1443                // We name the Fields in Parquet with the column index, but for
1444                // backwards compat use the column name for stats.
1445                let name = (col_idx.to_raw(), col_name.as_str().into());
1446
1447                (name, encoder)
1448            })
1449            .unzip();
1450
1451        Some(RowColumnarEncoder {
1452            encoders,
1453            col_names,
1454            nullability: BooleanBufferBuilder::new(100),
1455        })
1456    }
1457}
1458
1459impl ColumnEncoder<Row> for RowColumnarEncoder {
1460    type FinishedColumn = StructArray;
1461
1462    fn goodbytes(&self) -> usize {
1463        self.encoders.iter().map(|e| e.goodbytes()).sum()
1464    }
1465
1466    fn append(&mut self, val: &Row) {
1467        let mut num_datums = 0;
1468        for (datum, encoder) in val.iter().zip_eq(self.encoders.iter_mut()) {
1469            encoder.push(datum);
1470            num_datums += 1;
1471        }
1472        assert_eq!(
1473            num_datums,
1474            self.encoders.len(),
1475            "tried to encode {val:?}, but only have {:?}",
1476            self.encoders
1477        );
1478
1479        self.nullability.append(true);
1480    }
1481
1482    fn append_null(&mut self) {
1483        for encoder in self.encoders.iter_mut() {
1484            encoder.push_invalid();
1485        }
1486        self.nullability.append(false);
1487    }
1488
1489    fn finish(self) -> Self::FinishedColumn {
1490        let RowColumnarEncoder {
1491            encoders,
1492            col_names,
1493            nullability,
1494            ..
1495        } = self;
1496
1497        let (arrays, fields): (Vec<_>, Vec<_>) = col_names
1498            .iter()
1499            .zip_eq(encoders)
1500            .map(|((col_idx, _col_name), encoder)| {
1501                // Note: We mark all columns as nullable at the Arrow/Parquet level because it has
1502                // a negligible performance difference, but it protects us from unintended
1503                // nullability changes in the columns of SQL objects.
1504                //
1505                // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
1506                let nullable = true;
1507                let array = encoder.finish();
1508                let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
1509
1510                (array, field)
1511            })
1512            .multiunzip();
1513
1514        let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
1515
1516        let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
1517
1518        array
1519    }
1520}
1521
1522/// Small helper method to make downcasting an [`Array`] return an error.
1523///
1524/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1525/// very hot path and downcasting is relatively slow
1526#[inline]
1527fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
1528    array
1529        .as_any()
1530        .downcast_ref::<T>()
1531        .ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
1532}
1533
1534/// Small helper function to downcast from an array to a [`DatumColumnDecoder`].
1535///
1536/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1537/// very hot path and downcasting is relatively slow
1538fn array_to_decoder(
1539    array: &Arc<dyn Array>,
1540    col_ty: &SqlScalarType,
1541) -> Result<DatumColumnDecoder, anyhow::Error> {
1542    let decoder = match (array.data_type(), col_ty) {
1543        (DataType::Boolean, SqlScalarType::Bool) => {
1544            let array = downcast_array::<BooleanArray>(array)?;
1545            DatumColumnDecoder::Bool(array.clone())
1546        }
1547        (DataType::UInt8, SqlScalarType::PgLegacyChar) => {
1548            let array = downcast_array::<UInt8Array>(array)?;
1549            DatumColumnDecoder::U8(array.clone())
1550        }
1551        (DataType::UInt16, SqlScalarType::UInt16) => {
1552            let array = downcast_array::<UInt16Array>(array)?;
1553            DatumColumnDecoder::U16(array.clone())
1554        }
1555        (
1556            DataType::UInt32,
1557            SqlScalarType::UInt32
1558            | SqlScalarType::Oid
1559            | SqlScalarType::RegClass
1560            | SqlScalarType::RegProc
1561            | SqlScalarType::RegType,
1562        ) => {
1563            let array = downcast_array::<UInt32Array>(array)?;
1564            DatumColumnDecoder::U32(array.clone())
1565        }
1566        (DataType::UInt64, SqlScalarType::UInt64) => {
1567            let array = downcast_array::<UInt64Array>(array)?;
1568            DatumColumnDecoder::U64(array.clone())
1569        }
1570        (DataType::Int16, SqlScalarType::Int16) => {
1571            let array = downcast_array::<Int16Array>(array)?;
1572            DatumColumnDecoder::I16(array.clone())
1573        }
1574        (DataType::Int32, SqlScalarType::Int32) => {
1575            let array = downcast_array::<Int32Array>(array)?;
1576            DatumColumnDecoder::I32(array.clone())
1577        }
1578        (DataType::Int64, SqlScalarType::Int64) => {
1579            let array = downcast_array::<Int64Array>(array)?;
1580            DatumColumnDecoder::I64(array.clone())
1581        }
1582        (DataType::Float32, SqlScalarType::Float32) => {
1583            let array = downcast_array::<Float32Array>(array)?;
1584            DatumColumnDecoder::F32(array.clone())
1585        }
1586        (DataType::Float64, SqlScalarType::Float64) => {
1587            let array = downcast_array::<Float64Array>(array)?;
1588            DatumColumnDecoder::F64(array.clone())
1589        }
1590        (DataType::Struct(_), SqlScalarType::Numeric { .. }) => {
1591            let array = downcast_array::<StructArray>(array)?;
1592            // Note: We only use the approx column for sorting, and ignore it
1593            // when decoding.
1594            let binary_values = array
1595                .column_by_name("binary")
1596                .expect("missing binary column");
1597
1598            let array = downcast_array::<BinaryArray>(binary_values)?;
1599            DatumColumnDecoder::Numeric(array.clone())
1600        }
1601        (
1602            DataType::Utf8,
1603            SqlScalarType::String
1604            | SqlScalarType::PgLegacyName
1605            | SqlScalarType::Char { .. }
1606            | SqlScalarType::VarChar { .. },
1607        ) => {
1608            let array = downcast_array::<StringArray>(array)?;
1609            DatumColumnDecoder::String(array.clone())
1610        }
1611        (DataType::Binary, SqlScalarType::Bytes) => {
1612            let array = downcast_array::<BinaryArray>(array)?;
1613            DatumColumnDecoder::Bytes(array.clone())
1614        }
1615        (DataType::Int32, SqlScalarType::Date) => {
1616            let array = downcast_array::<Int32Array>(array)?;
1617            DatumColumnDecoder::Date(array.clone())
1618        }
1619        (DataType::FixedSizeBinary(TIME_FIXED_BYTES), SqlScalarType::Time) => {
1620            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1621            DatumColumnDecoder::Time(array.clone())
1622        }
1623        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::Timestamp { .. }) => {
1624            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1625            DatumColumnDecoder::Timestamp(array.clone())
1626        }
1627        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::TimestampTz { .. }) => {
1628            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1629            DatumColumnDecoder::TimestampTz(array.clone())
1630        }
1631        (DataType::UInt64, SqlScalarType::MzTimestamp) => {
1632            let array = downcast_array::<UInt64Array>(array)?;
1633            DatumColumnDecoder::MzTimestamp(array.clone())
1634        }
1635        (DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), SqlScalarType::Interval) => {
1636            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1637            DatumColumnDecoder::Interval(array.clone())
1638        }
1639        (DataType::FixedSizeBinary(UUID_FIXED_BYTES), SqlScalarType::Uuid) => {
1640            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1641            DatumColumnDecoder::Uuid(array.clone())
1642        }
1643        (DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), SqlScalarType::AclItem) => {
1644            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1645            DatumColumnDecoder::AclItem(array.clone())
1646        }
1647        (DataType::Binary, SqlScalarType::MzAclItem) => {
1648            let array = downcast_array::<BinaryArray>(array)?;
1649            DatumColumnDecoder::MzAclItem(array.clone())
1650        }
1651        (DataType::Binary, SqlScalarType::Range { .. }) => {
1652            let array = downcast_array::<BinaryArray>(array)?;
1653            DatumColumnDecoder::Range(array.clone())
1654        }
1655        (DataType::Utf8, SqlScalarType::Jsonb) => {
1656            let array = downcast_array::<StringArray>(array)?;
1657            DatumColumnDecoder::Json(array.clone())
1658        }
1659        (DataType::Struct(_), s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector) => {
1660            let element_type = match s {
1661                SqlScalarType::Array(inner) => inner,
1662                SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1663                _ => unreachable!("checked above"),
1664            };
1665
1666            let array = downcast_array::<StructArray>(array)?;
1667            let nulls = array.nulls().cloned();
1668
1669            let dims = array
1670                .column_by_name("dims")
1671                .expect("missing dimensions column");
1672            let dims = downcast_array::<ListArray>(dims).cloned()?;
1673            let dim_offsets = dims.offsets().clone();
1674            let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
1675
1676            let vals = array.column_by_name("vals").expect("missing values column");
1677            let vals = downcast_array::<ListArray>(vals)?;
1678            let val_offsets = vals.offsets().clone();
1679            let vals = array_to_decoder(vals.values(), element_type)?;
1680
1681            DatumColumnDecoder::Array {
1682                dim_offsets,
1683                dims,
1684                val_offsets,
1685                vals: Box::new(vals),
1686                nulls,
1687            }
1688        }
1689        (DataType::List(_), SqlScalarType::List { element_type, .. }) => {
1690            let array = downcast_array::<ListArray>(array)?;
1691            let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
1692            DatumColumnDecoder::List {
1693                offsets: array.offsets().clone(),
1694                values: Box::new(inner_decoder),
1695                nulls: array.nulls().cloned(),
1696            }
1697        }
1698        (DataType::Map(_, true), SqlScalarType::Map { value_type, .. }) => {
1699            let array = downcast_array::<MapArray>(array)?;
1700            let keys = downcast_array::<StringArray>(array.keys())?;
1701            let vals = array_to_decoder(array.values(), value_type)?;
1702            DatumColumnDecoder::Map {
1703                offsets: array.offsets().clone(),
1704                keys: keys.clone(),
1705                vals: Box::new(vals),
1706                nulls: array.nulls().cloned(),
1707            }
1708        }
1709        (DataType::List(_), SqlScalarType::Map { value_type, .. }) => {
1710            let array: &ListArray = downcast_array(array)?;
1711            let entries: &StructArray = downcast_array(array.values())?;
1712            let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
1713            let keys: &StringArray = downcast_array(keys)?;
1714            let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
1715            DatumColumnDecoder::Map {
1716                offsets: array.offsets().clone(),
1717                keys: keys.clone(),
1718                vals: Box::new(vals),
1719                nulls: array.nulls().cloned(),
1720            }
1721        }
1722        (DataType::Boolean, SqlScalarType::Record { fields, .. }) if fields.is_empty() => {
1723            let empty_record_array = downcast_array::<BooleanArray>(array)?;
1724            DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
1725        }
1726        (DataType::Struct(_), SqlScalarType::Record { fields, .. }) => {
1727            let record_array = downcast_array::<StructArray>(array)?;
1728            let null_mask = record_array.nulls();
1729            let mut decoders = Vec::with_capacity(fields.len());
1730            for (tag, (_name, col_type)) in fields.iter().enumerate() {
1731                let inner_array = record_array
1732                    .column_by_name(&tag.to_string())
1733                    .ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
1734                let inner_array = mask_nulls(inner_array, null_mask);
1735                let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
1736
1737                decoders.push(Box::new(inner_decoder));
1738            }
1739
1740            DatumColumnDecoder::Record {
1741                fields: decoders,
1742                nulls: record_array.nulls().cloned(),
1743            }
1744        }
1745        (x, y) => {
1746            let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
1747            mz_ore::soft_panic_or_log!("{msg}");
1748            anyhow::bail!("{msg}");
1749        }
1750    };
1751
1752    Ok(decoder)
1753}
1754
1755/// Small helper function to create a [`DatumColumnEncoder`] from a [`SqlScalarType`]
1756fn scalar_type_to_encoder(col_ty: &SqlScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
1757    let encoder = match &col_ty {
1758        SqlScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
1759        SqlScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
1760        SqlScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
1761        SqlScalarType::UInt32
1762        | SqlScalarType::Oid
1763        | SqlScalarType::RegClass
1764        | SqlScalarType::RegProc
1765        | SqlScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
1766        SqlScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
1767        SqlScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
1768        SqlScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
1769        SqlScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
1770        SqlScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
1771        SqlScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
1772        SqlScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
1773            approx_values: Float64Builder::new(),
1774            binary_values: BinaryBuilder::new(),
1775            numeric_context: crate::adt::numeric::cx_datum().clone(),
1776        },
1777        SqlScalarType::String
1778        | SqlScalarType::PgLegacyName
1779        | SqlScalarType::Char { .. }
1780        | SqlScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
1781        SqlScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
1782        SqlScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
1783        SqlScalarType::Time => {
1784            DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES))
1785        }
1786        SqlScalarType::Timestamp { .. } => {
1787            DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1788        }
1789        SqlScalarType::TimestampTz { .. } => {
1790            DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1791        }
1792        SqlScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
1793        SqlScalarType::Interval => {
1794            DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
1795        }
1796        SqlScalarType::Uuid => {
1797            DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES))
1798        }
1799        SqlScalarType::AclItem => {
1800            DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
1801        }
1802        SqlScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
1803        SqlScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
1804        SqlScalarType::Jsonb => DatumColumnEncoder::Jsonb {
1805            offsets: vec![0],
1806            buf: Vec::new(),
1807            nulls: None,
1808        },
1809        s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector => {
1810            let element_type = match s {
1811                SqlScalarType::Array(inner) => inner,
1812                SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1813                _ => unreachable!("checked above"),
1814            };
1815            let inner = scalar_type_to_encoder(element_type)?;
1816            DatumColumnEncoder::Array {
1817                dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
1818                val_lengths: Vec::new(),
1819                vals: Box::new(inner),
1820                nulls: None,
1821            }
1822        }
1823        SqlScalarType::List { element_type, .. } => {
1824            let inner = scalar_type_to_encoder(&*element_type)?;
1825            DatumColumnEncoder::List {
1826                lengths: Vec::new(),
1827                values: Box::new(inner),
1828                nulls: None,
1829            }
1830        }
1831        SqlScalarType::Map { value_type, .. } => {
1832            let inner = scalar_type_to_encoder(&*value_type)?;
1833            DatumColumnEncoder::Map {
1834                lengths: Vec::new(),
1835                keys: StringBuilder::new(),
1836                vals: Box::new(inner),
1837                nulls: None,
1838            }
1839        }
1840        SqlScalarType::Record { fields, .. } if fields.is_empty() => {
1841            DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
1842        }
1843        SqlScalarType::Record { fields, .. } => {
1844            let encoders = fields
1845                .iter()
1846                .map(|(_name, ty)| {
1847                    scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
1848                        nullable: ty.nullable,
1849                        encoder: e,
1850                    })
1851                })
1852                .collect::<Result<_, _>>()?;
1853
1854            DatumColumnEncoder::Record {
1855                fields: encoders,
1856                nulls: None,
1857                length: 0,
1858            }
1859        }
1860    };
1861    Ok(encoder)
1862}
1863
1864impl Codec for Row {
1865    type Storage = ProtoRow;
1866    type Schema = RelationDesc;
1867
1868    fn codec_name() -> String {
1869        "protobuf[Row]".into()
1870    }
1871
1872    /// Encodes a row into the permanent storage format.
1873    ///
1874    /// This perfectly round-trips through [Row::decode]. It's guaranteed to be
1875    /// readable by future versions of Materialize through v(TODO: Figure out
1876    /// our policy).
1877    fn encode<B>(&self, buf: &mut B)
1878    where
1879        B: BufMut,
1880    {
1881        self.into_proto()
1882            .encode(buf)
1883            .expect("no required fields means no initialization errors");
1884    }
1885
1886    /// Decodes a row from the permanent storage format.
1887    ///
1888    /// This perfectly round-trips through [Row::encode]. It can read rows
1889    /// encoded by historical versions of Materialize back to v(TODO: Figure out
1890    /// our policy).
1891    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
1892        // NB: We could easily implement this directly instead of via
1893        // `decode_from`, but do this so that we get maximal coverage of the
1894        // more complicated codepath.
1895        //
1896        // The length of the encoded ProtoRow (i.e. `buf.len()`) doesn't perfect
1897        // predict the length of the resulting Row, but it's definitely
1898        // correlated, so probably a decent estimate.
1899        let mut row = Row::with_capacity(buf.len());
1900        <Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
1901        Ok(row)
1902    }
1903
1904    fn decode_from<'a>(
1905        &mut self,
1906        buf: &'a [u8],
1907        storage: &mut Option<ProtoRow>,
1908        schema: &RelationDesc,
1909    ) -> Result<(), String> {
1910        let mut proto = storage.take().unwrap_or_default();
1911        proto.clear();
1912        proto.merge(buf).map_err(|err| err.to_string())?;
1913        let ret = self.decode_from_proto(&proto, schema);
1914        storage.replace(proto);
1915        ret
1916    }
1917
1918    fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
1919        for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
1920            match x {
1921                EitherOrBoth::Both(typ, datum) if datum.is_instance_of_sql(typ) => continue,
1922                _ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
1923            };
1924        }
1925        Ok(())
1926    }
1927
1928    fn encode_schema(schema: &Self::Schema) -> Bytes {
1929        schema.into_proto().encode_to_vec().into()
1930    }
1931
1932    fn decode_schema(buf: &Bytes) -> Self::Schema {
1933        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1934        proto.into_rust().expect("valid schema")
1935    }
1936}
1937
1938impl<'a> From<Datum<'a>> for ProtoDatum {
1939    fn from(x: Datum<'a>) -> Self {
1940        let datum_type = match x {
1941            Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
1942            Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
1943            Datum::Int16(x) => DatumType::Int16(x.into()),
1944            Datum::Int32(x) => DatumType::Int32(x),
1945            Datum::UInt8(x) => DatumType::Uint8(x.into()),
1946            Datum::UInt16(x) => DatumType::Uint16(x.into()),
1947            Datum::UInt32(x) => DatumType::Uint32(x),
1948            Datum::UInt64(x) => DatumType::Uint64(x),
1949            Datum::Int64(x) => DatumType::Int64(x),
1950            Datum::Float32(x) => DatumType::Float32(x.into_inner()),
1951            Datum::Float64(x) => DatumType::Float64(x.into_inner()),
1952            Datum::Date(x) => DatumType::Date(x.into_proto()),
1953            Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
1954                secs: x.num_seconds_from_midnight(),
1955                frac: x.nanosecond(),
1956            }),
1957            Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
1958            Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
1959            Datum::Interval(x) => DatumType::Interval(x.into_proto()),
1960            Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
1961            Datum::String(x) => DatumType::String(x.to_owned()),
1962            Datum::Array(x) => DatumType::Array(ProtoArray {
1963                elements: Some(ProtoRow {
1964                    datums: x.elements().iter().map(|x| x.into()).collect(),
1965                }),
1966                dims: x
1967                    .dims()
1968                    .into_iter()
1969                    .map(|x| ProtoArrayDimension {
1970                        lower_bound: i64::cast_from(x.lower_bound),
1971                        length: u64::cast_from(x.length),
1972                    })
1973                    .collect(),
1974            }),
1975            Datum::List(x) => DatumType::List(ProtoRow {
1976                datums: x.iter().map(|x| x.into()).collect(),
1977            }),
1978            Datum::Map(x) => DatumType::Dict(ProtoDict {
1979                elements: x
1980                    .iter()
1981                    .map(|(k, v)| ProtoDictElement {
1982                        key: k.to_owned(),
1983                        val: Some(v.into()),
1984                    })
1985                    .collect(),
1986            }),
1987            Datum::Numeric(x) => {
1988                // TODO: Do we need this defensive clone?
1989                let mut x = x.0.clone();
1990                if let Some((bcd, scale)) = x.to_packed_bcd() {
1991                    DatumType::Numeric(ProtoNumeric { bcd, scale })
1992                } else if x.is_nan() {
1993                    DatumType::Other(ProtoDatumOther::NumericNaN.into())
1994                } else if x.is_infinite() {
1995                    if x.is_negative() {
1996                        DatumType::Other(ProtoDatumOther::NumericNegInf.into())
1997                    } else {
1998                        DatumType::Other(ProtoDatumOther::NumericPosInf.into())
1999                    }
2000                } else if x.is_special() {
2001                    panic!("internal error: unhandled special numeric value: {}", x);
2002                } else {
2003                    panic!(
2004                        "internal error: to_packed_bcd returned None for non-special value: {}",
2005                        x
2006                    )
2007                }
2008            }
2009            Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
2010            Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
2011            Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
2012            Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
2013            Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
2014            Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
2015                inner: inner.map(|RangeInner { lower, upper }| {
2016                    Box::new(ProtoRangeInner {
2017                        lower_inclusive: lower.inclusive,
2018                        lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
2019                        upper_inclusive: upper.inclusive,
2020                        upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
2021                    })
2022                }),
2023            })),
2024            Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
2025            Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
2026        };
2027        ProtoDatum {
2028            datum_type: Some(datum_type),
2029        }
2030    }
2031}
2032
2033impl RowPacker<'_> {
2034    pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
2035        match &x.datum_type {
2036            Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
2037                Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
2038                Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
2039                Ok(ProtoDatumOther::False) => self.push(Datum::False),
2040                Ok(ProtoDatumOther::True) => self.push(Datum::True),
2041                Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
2042                Ok(ProtoDatumOther::Dummy) => {
2043                    // We plan to remove the `Dummy` variant soon (materialize#17099). To prepare for that, we
2044                    // emit a log to Sentry here, to notify us of any instances that might have
2045                    // been made durable.
2046                    #[cfg(feature = "tracing")]
2047                    tracing::error!("protobuf decoding found Dummy datum");
2048                    self.push(Datum::Dummy);
2049                }
2050                Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
2051                Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
2052                Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
2053                Err(_) => return Err(format!("unknown datum type: {}", o)),
2054            },
2055            Some(DatumType::Int16(x)) => {
2056                let x = i16::try_from(*x)
2057                    .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
2058                self.push(Datum::Int16(x))
2059            }
2060            Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
2061            Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
2062            Some(DatumType::Uint8(x)) => {
2063                let x = u8::try_from(*x)
2064                    .map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
2065                self.push(Datum::UInt8(x))
2066            }
2067            Some(DatumType::Uint16(x)) => {
2068                let x = u16::try_from(*x)
2069                    .map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
2070                self.push(Datum::UInt16(x))
2071            }
2072            Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
2073            Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
2074            Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
2075            Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
2076            Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
2077            Some(DatumType::String(x)) => self.push(Datum::String(x)),
2078            Some(DatumType::Uuid(x)) => {
2079                // Uuid internally has a [u8; 16] so we'll have to do at least
2080                // one copy, but there's currently an additional one when the
2081                // Vec is created. Perhaps the protobuf Bytes support will let
2082                // us fix one of them.
2083                let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
2084                self.push(Datum::Uuid(u));
2085            }
2086            Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
2087            Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
2088            Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
2089            Some(DatumType::TimestampTz(x)) => {
2090                self.push(Datum::TimestampTz(x.clone().into_rust()?))
2091            }
2092            Some(DatumType::Interval(x)) => self.push(Datum::Interval(
2093                x.clone()
2094                    .into_rust()
2095                    .map_err(|e: TryFromProtoError| e.to_string())?,
2096            )),
2097            Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
2098                for d in x.datums.iter() {
2099                    row.try_push_proto(d)?;
2100                }
2101                Ok(())
2102            })?,
2103            Some(DatumType::Array(x)) => {
2104                let dims = x
2105                    .dims
2106                    .iter()
2107                    .map(|x| ArrayDimension {
2108                        lower_bound: isize::cast_from(x.lower_bound),
2109                        length: usize::cast_from(x.length),
2110                    })
2111                    .collect::<Vec<_>>();
2112                match x.elements.as_ref() {
2113                    None => self.try_push_array(&dims, [].iter()),
2114                    Some(elements) => {
2115                        // TODO: Could we avoid this Row alloc if we made a
2116                        // push_array_with?
2117                        let elements_row = Row::try_from(elements)?;
2118                        self.try_push_array(&dims, elements_row.iter())
2119                    }
2120                }
2121                .map_err(|err| err.to_string())?
2122            }
2123            Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
2124                for e in x.elements.iter() {
2125                    row.push(Datum::from(e.key.as_str()));
2126                    let val = e
2127                        .val
2128                        .as_ref()
2129                        .ok_or_else(|| format!("missing val for key: {}", e.key))?;
2130                    row.try_push_proto(val)?;
2131                }
2132                Ok(())
2133            })?,
2134            Some(DatumType::Numeric(x)) => {
2135                // Reminder that special values like NaN, PosInf, and NegInf are
2136                // represented as variants of ProtoDatumOther.
2137                let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
2138                self.push(Datum::from(n))
2139            }
2140            Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
2141            Some(DatumType::Range(inner)) => {
2142                let ProtoRange { inner } = &**inner;
2143                match inner {
2144                    None => self.push_range(Range { inner: None }).unwrap(),
2145                    Some(inner) => {
2146                        let ProtoRangeInner {
2147                            lower_inclusive,
2148                            lower,
2149                            upper_inclusive,
2150                            upper,
2151                        } = &**inner;
2152
2153                        self.push_range_with(
2154                            RangeLowerBound {
2155                                inclusive: *lower_inclusive,
2156                                bound: lower
2157                                    .as_ref()
2158                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2159                            },
2160                            RangeUpperBound {
2161                                inclusive: *upper_inclusive,
2162                                bound: upper
2163                                    .as_ref()
2164                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2165                            },
2166                        )
2167                        .expect("decoding ProtoRow must succeed");
2168                    }
2169                }
2170            }
2171            Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
2172            Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
2173            None => return Err("unknown datum type".into()),
2174        };
2175        Ok(())
2176    }
2177}
2178
2179/// TODO: remove this in favor of [`RustType::from_proto`].
2180impl TryFrom<&ProtoRow> for Row {
2181    type Error = String;
2182
2183    fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
2184        // TODO: Try to pre-size this.
2185        // see https://github.com/MaterializeInc/database-issues/issues/3640
2186        let mut row = Row::default();
2187        let mut packer = row.packer();
2188        for d in x.datums.iter() {
2189            packer.try_push_proto(d)?;
2190        }
2191        Ok(row)
2192    }
2193}
2194
2195impl RustType<ProtoRow> for Row {
2196    fn into_proto(&self) -> ProtoRow {
2197        let datums = self.iter().map(|x| x.into()).collect();
2198        ProtoRow { datums }
2199    }
2200
2201    fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
2202        // TODO: Try to pre-size this.
2203        // see https://github.com/MaterializeInc/database-issues/issues/3640
2204        let mut row = Row::default();
2205        let mut packer = row.packer();
2206        for d in proto.datums.iter() {
2207            packer
2208                .try_push_proto(d)
2209                .map_err(TryFromProtoError::RowConversionError)?;
2210        }
2211        Ok(row)
2212    }
2213}
2214
2215#[cfg(test)]
2216mod tests {
2217    use std::collections::BTreeSet;
2218
2219    use arrow::array::{ArrayData, make_array};
2220    use arrow::compute::SortOptions;
2221    use arrow::datatypes::ArrowNativeType;
2222    use arrow::row::SortField;
2223    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2224    use mz_ore::assert_err;
2225    use mz_ore::collections::CollectionExt;
2226    use mz_persist::indexed::columnar::arrow::realloc_array;
2227    use mz_persist::metrics::ColumnarMetrics;
2228    use mz_persist_types::Codec;
2229    use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
2230    use mz_persist_types::columnar::{codec_to_schema, schema_to_codec};
2231    use mz_proto::{ProtoType, RustType};
2232    use proptest::prelude::*;
2233    use proptest::strategy::Strategy;
2234    use uuid::Uuid;
2235
2236    use super::*;
2237    use crate::adt::array::ArrayDimension;
2238    use crate::adt::interval::Interval;
2239    use crate::adt::numeric::Numeric;
2240    use crate::adt::timestamp::CheckedTimestamp;
2241    use crate::fixed_length::ToDatumIter;
2242    use crate::relation::arb_relation_desc;
2243    use crate::{ColumnName, RowArena, SqlColumnType, arb_datum_for_column, arb_row_for_relation};
2244    use crate::{Datum, RelationDesc, Row, SqlScalarType};
2245
2246    #[track_caller]
2247    fn roundtrip_datum<'a>(
2248        ty: SqlColumnType,
2249        datum: impl Iterator<Item = Datum<'a>>,
2250        metrics: &ColumnarMetrics,
2251    ) {
2252        let desc = RelationDesc::builder().with_column("a", ty).finish();
2253        let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
2254        roundtrip_rows(&desc, rows, metrics)
2255    }
2256
2257    #[track_caller]
2258    fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
2259        let mut encoder = <RelationDesc as Schema<Row>>::encoder(desc).unwrap();
2260        for row in &rows {
2261            encoder.append(row);
2262        }
2263        let col = encoder.finish();
2264
2265        // Exercise reallocating columns with lgalloc.
2266        let col = realloc_array(&col, metrics);
2267        // Exercise our ProtoArray format.
2268        {
2269            let proto = col.to_data().into_proto();
2270            let bytes = proto.encode_to_vec();
2271            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2272            let array_data: ArrayData = proto.into_rust().unwrap();
2273
2274            let col_rnd = StructArray::from(array_data.clone());
2275            assert_eq!(col, col_rnd);
2276
2277            let col_dyn = arrow::array::make_array(array_data);
2278            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2279            assert_eq!(&col, col_dyn);
2280        }
2281
2282        let decoder = <RelationDesc as Schema<Row>>::decoder(desc, col.clone()).unwrap();
2283        let stats = decoder.stats();
2284
2285        // Collect all of our lower and upper bounds.
2286        let arena = RowArena::new();
2287        let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
2288            .iter()
2289            .map(|(name, ty)| {
2290                let col_stats = stats.cols.get(name.as_str()).unwrap();
2291                let lower_upper =
2292                    crate::stats::col_values(&ty.scalar_type, &col_stats.values, &arena);
2293                let null_count = col_stats.nulls.map_or(0, |n| n.count);
2294
2295                (lower_upper, null_count)
2296            })
2297            .unzip();
2298        // Track how many nulls we saw for each column so we can assert stats match.
2299        let mut actual_nulls = vec![0usize; stats.len()];
2300
2301        let mut rnd_row = Row::default();
2302        for (idx, og_row) in rows.iter().enumerate() {
2303            decoder.decode(idx, &mut rnd_row);
2304            assert_eq!(og_row, &rnd_row);
2305
2306            // Check for each Datum in each Row that we're within our stats bounds.
2307            for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
2308            {
2309                let lower_upper = stats[c_idx];
2310
2311                // Assert our stat bounds are correct.
2312                if rnd_datum.is_null() {
2313                    actual_nulls[c_idx] += 1;
2314                } else if let Some((lower, upper)) = lower_upper {
2315                    assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
2316                    assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
2317                } else {
2318                    match &ty.scalar_type {
2319                        // JSON stats are handled separately.
2320                        SqlScalarType::Jsonb => (),
2321                        // We don't collect stats for these types.
2322                        SqlScalarType::AclItem
2323                        | SqlScalarType::MzAclItem
2324                        | SqlScalarType::Range { .. }
2325                        | SqlScalarType::Array(_)
2326                        | SqlScalarType::Map { .. }
2327                        | SqlScalarType::List { .. }
2328                        | SqlScalarType::Record { .. }
2329                        | SqlScalarType::Int2Vector => (),
2330                        other => panic!("should have collected stats for {other:?}"),
2331                    }
2332                }
2333            }
2334        }
2335
2336        // Validate that the null counts in our stats matched the actual counts.
2337        for (col_idx, (stats_count, actual_count)) in
2338            stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
2339        {
2340            assert_eq!(
2341                stats_count, actual_count,
2342                "column {col_idx} has incorrect number of nulls!"
2343            );
2344        }
2345
2346        // Validate that we can convert losslessly to codec and back
2347        let codec = schema_to_codec::<Row>(desc, &col).unwrap();
2348        let col2 = codec_to_schema::<Row>(desc, &codec).unwrap();
2349        assert_eq!(col2.as_ref(), &col);
2350
2351        // Validate that we only generate supported array types
2352        let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
2353            col.data_type().clone(),
2354            SortOptions {
2355                descending: false,
2356                nulls_first: false,
2357            },
2358        )])
2359        .expect("sortable");
2360        let rows = converter
2361            .convert_columns(&[Arc::new(col.clone())])
2362            .expect("convertible");
2363        let mut row_vec = rows.iter().collect::<Vec<_>>();
2364        row_vec.sort();
2365        let row_col = converter
2366            .convert_rows(row_vec)
2367            .expect("convertible")
2368            .into_element();
2369        assert_eq!(row_col.len(), col.len());
2370
2371        let ord = ArrayOrd::new(&col);
2372        let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
2373        indices.sort_by_key(|i| ord.at(i.as_usize()));
2374        let indices = UInt64Array::from(indices);
2375        let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
2376        assert_eq!(row_col.as_ref(), ord_col.as_ref());
2377
2378        // Check that our order matches the datum-native order when `preserves_order` is true.
2379        let ordered_prefix_len = desc
2380            .iter()
2381            .take_while(|(_, c)| preserves_order(&c.scalar_type))
2382            .count();
2383        let decoder = <RelationDesc as Schema<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2384        let rows = (0..ord_col.len()).map(|i| {
2385            let mut row = Row::default();
2386            decoder.decode(i, &mut row);
2387            row
2388        });
2389        for (a, b) in rows.tuple_windows() {
2390            let a_prefix = a.iter().take(ordered_prefix_len);
2391            let b_prefix = b.iter().take(ordered_prefix_len);
2392            assert!(
2393                a_prefix.cmp(b_prefix).is_le(),
2394                "ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
2395                desc.iter().take(ordered_prefix_len).collect_vec(),
2396                a.to_datum_iter().take(ordered_prefix_len).collect_vec(),
2397                b.to_datum_iter().take(ordered_prefix_len).collect_vec()
2398            );
2399        }
2400
2401        // Check that our size estimates are consistent.
2402        assert_eq!(
2403            ord.goodbytes(),
2404            (0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
2405            "total size should match the sum of the sizes at each index"
2406        );
2407
2408        // Check that our lower bounds work as expected.
2409        if !ord_col.is_empty() {
2410            let min_idx = indices.values()[0].as_usize();
2411            let lower_bound = ArrayBound::new(ord_col, min_idx);
2412            let max_encoded_len = 1000;
2413            if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
2414                assert!(
2415                    proto.encoded_len() <= max_encoded_len,
2416                    "should respect the max len"
2417                );
2418                let array_data = proto.into_rust().expect("valid array");
2419                let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
2420                assert!(
2421                    new_lower_bound.get() <= lower_bound.get(),
2422                    "proto-roundtripped bound should be <= the original"
2423                );
2424            }
2425        }
2426    }
2427
2428    #[mz_ore::test]
2429    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2430    fn proptest_datums() {
2431        let strat = any::<SqlColumnType>().prop_flat_map(|ty| {
2432            proptest::collection::vec(arb_datum_for_column(ty.clone()), 0..16)
2433                .prop_map(move |d| (ty.clone(), d))
2434        });
2435        let metrics = ColumnarMetrics::disconnected();
2436
2437        proptest!(|((ty, datums) in strat)| {
2438            roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
2439        })
2440    }
2441
2442    #[mz_ore::test]
2443    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2444    fn proptest_non_empty_relation_descs() {
2445        let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
2446            proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
2447                .prop_map(move |rows| (desc.clone(), rows))
2448        });
2449        let metrics = ColumnarMetrics::disconnected();
2450
2451        proptest!(|((desc, rows) in strat)| {
2452            roundtrip_rows(&desc, rows, &metrics)
2453        })
2454    }
2455
2456    #[mz_ore::test]
2457    fn empty_relation_desc_returns_error() {
2458        let empty_desc = RelationDesc::empty();
2459        let result = <RelationDesc as Schema<Row>>::encoder(&empty_desc);
2460        assert_err!(result);
2461    }
2462
2463    #[mz_ore::test]
2464    fn smoketest_collections() {
2465        let mut row = Row::default();
2466        let mut packer = row.packer();
2467        let metrics = ColumnarMetrics::disconnected();
2468
2469        packer
2470            .try_push_array(
2471                &[ArrayDimension {
2472                    lower_bound: 0,
2473                    length: 3,
2474                }],
2475                [Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
2476            )
2477            .unwrap();
2478
2479        let array = row.unpack_first();
2480        roundtrip_datum(
2481            SqlScalarType::Array(Box::new(SqlScalarType::UInt32)).nullable(true),
2482            [array].into_iter(),
2483            &metrics,
2484        );
2485    }
2486
2487    #[mz_ore::test]
2488    fn smoketest_row() {
2489        let desc = RelationDesc::builder()
2490            .with_column("a", SqlScalarType::Int64.nullable(true))
2491            .with_column("b", SqlScalarType::String.nullable(true))
2492            .with_column("c", SqlScalarType::Bool.nullable(true))
2493            .with_column(
2494                "d",
2495                SqlScalarType::List {
2496                    element_type: Box::new(SqlScalarType::UInt32),
2497                    custom_id: None,
2498                }
2499                .nullable(true),
2500            )
2501            .with_column(
2502                "e",
2503                SqlScalarType::Map {
2504                    value_type: Box::new(SqlScalarType::Int16),
2505                    custom_id: None,
2506                }
2507                .nullable(true),
2508            )
2509            .finish();
2510        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2511
2512        let mut og_row = Row::default();
2513        {
2514            let mut packer = og_row.packer();
2515            packer.push(Datum::Int64(100));
2516            packer.push(Datum::String("hello world"));
2517            packer.push(Datum::True);
2518            packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2519            packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
2520        }
2521        let mut og_row_2 = Row::default();
2522        {
2523            let mut packer = og_row_2.packer();
2524            packer.push(Datum::Null);
2525            packer.push(Datum::Null);
2526            packer.push(Datum::Null);
2527            packer.push(Datum::Null);
2528            packer.push(Datum::Null);
2529        }
2530
2531        encoder.append(&og_row);
2532        encoder.append(&og_row_2);
2533        let col = encoder.finish();
2534
2535        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2536
2537        let mut rnd_row = Row::default();
2538        decoder.decode(0, &mut rnd_row);
2539        assert_eq!(og_row, rnd_row);
2540
2541        let mut rnd_row = Row::default();
2542        decoder.decode(1, &mut rnd_row);
2543        assert_eq!(og_row_2, rnd_row);
2544    }
2545
2546    #[mz_ore::test]
2547    fn test_nested_list() {
2548        let desc = RelationDesc::builder()
2549            .with_column(
2550                "a",
2551                SqlScalarType::List {
2552                    element_type: Box::new(SqlScalarType::List {
2553                        element_type: Box::new(SqlScalarType::Int64),
2554                        custom_id: None,
2555                    }),
2556                    custom_id: None,
2557                }
2558                .nullable(false),
2559            )
2560            .finish();
2561        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2562
2563        let mut og_row = Row::default();
2564        {
2565            let mut packer = og_row.packer();
2566            packer.push_list_with(|inner| {
2567                inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
2568                inner.push_list([Datum::Int64(5)]);
2569                inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
2570            });
2571        }
2572
2573        encoder.append(&og_row);
2574        let col = encoder.finish();
2575
2576        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2577        let mut rnd_row = Row::default();
2578        decoder.decode(0, &mut rnd_row);
2579
2580        assert_eq!(og_row, rnd_row);
2581    }
2582
2583    #[mz_ore::test]
2584    fn test_record() {
2585        let desc = RelationDesc::builder()
2586            .with_column(
2587                "a",
2588                SqlScalarType::Record {
2589                    fields: [
2590                        (
2591                            ColumnName::from("foo"),
2592                            SqlScalarType::Int64.nullable(false),
2593                        ),
2594                        (
2595                            ColumnName::from("bar"),
2596                            SqlScalarType::String.nullable(true),
2597                        ),
2598                        (
2599                            ColumnName::from("baz"),
2600                            SqlScalarType::List {
2601                                element_type: Box::new(SqlScalarType::UInt32),
2602                                custom_id: None,
2603                            }
2604                            .nullable(false),
2605                        ),
2606                    ]
2607                    .into(),
2608                    custom_id: None,
2609                }
2610                .nullable(true),
2611            )
2612            .finish();
2613        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2614
2615        let mut og_row = Row::default();
2616        {
2617            let mut packer = og_row.packer();
2618            packer.push_list_with(|inner| {
2619                inner.push(Datum::Int64(42));
2620                inner.push(Datum::Null);
2621                inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2622            });
2623        }
2624        let null_row = Row::pack_slice(&[Datum::Null]);
2625
2626        encoder.append(&og_row);
2627        encoder.append(&null_row);
2628        let col = encoder.finish();
2629
2630        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2631        let mut rnd_row = Row::default();
2632
2633        decoder.decode(0, &mut rnd_row);
2634        assert_eq!(og_row, rnd_row);
2635
2636        rnd_row.packer();
2637        decoder.decode(1, &mut rnd_row);
2638        assert_eq!(null_row, rnd_row);
2639    }
2640
2641    #[mz_ore::test]
2642    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
2643    fn roundtrip() {
2644        let mut row = Row::default();
2645        let mut packer = row.packer();
2646        packer.extend([
2647            Datum::False,
2648            Datum::True,
2649            Datum::Int16(1),
2650            Datum::Int32(2),
2651            Datum::Int64(3),
2652            Datum::Float32(4f32.into()),
2653            Datum::Float64(5f64.into()),
2654            Datum::Date(
2655                NaiveDate::from_ymd_opt(6, 7, 8)
2656                    .unwrap()
2657                    .try_into()
2658                    .unwrap(),
2659            ),
2660            Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
2661            Datum::Timestamp(
2662                CheckedTimestamp::from_timestamplike(
2663                    NaiveDate::from_ymd_opt(12, 13 % 12, 14)
2664                        .unwrap()
2665                        .and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
2666                )
2667                .unwrap(),
2668            ),
2669            Datum::TimestampTz(
2670                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
2671                    NaiveDate::from_ymd_opt(18, 19 % 12, 20)
2672                        .unwrap()
2673                        .and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
2674                    Utc,
2675                ))
2676                .unwrap(),
2677            ),
2678            Datum::Interval(Interval {
2679                months: 24,
2680                days: 42,
2681                micros: 25,
2682            }),
2683            Datum::Bytes(&[26, 27]),
2684            Datum::String("28"),
2685            Datum::from(Numeric::from(29)),
2686            Datum::from(Numeric::infinity()),
2687            Datum::from(-Numeric::infinity()),
2688            Datum::from(Numeric::nan()),
2689            Datum::JsonNull,
2690            Datum::Uuid(Uuid::from_u128(30)),
2691            Datum::Dummy,
2692            Datum::Null,
2693        ]);
2694        packer
2695            .try_push_array(
2696                &[ArrayDimension {
2697                    lower_bound: 2,
2698                    length: 2,
2699                }],
2700                vec![Datum::Int32(31), Datum::Int32(32)],
2701            )
2702            .expect("valid array");
2703        packer.push_list_with(|packer| {
2704            packer.push(Datum::String("33"));
2705            packer.push_list_with(|packer| {
2706                packer.push(Datum::String("34"));
2707                packer.push(Datum::String("35"));
2708            });
2709            packer.push(Datum::String("36"));
2710            packer.push(Datum::String("37"));
2711        });
2712        packer.push_dict_with(|row| {
2713            // Add a bunch of data to the hash to ensure we don't get a
2714            // HashMap's random iteration anywhere in the encode/decode path.
2715            let mut i = 38;
2716            for _ in 0..20 {
2717                row.push(Datum::String(&i.to_string()));
2718                row.push(Datum::Int32(i + 1));
2719                i += 2;
2720            }
2721        });
2722
2723        let mut desc = RelationDesc::builder();
2724        for (idx, _) in row.iter().enumerate() {
2725            // HACK(parkmycar): We don't currently validate the types of the `RelationDesc` are
2726            // correct, just the number of columns. So we can fill in any type here.
2727            desc = desc.with_column(idx.to_string(), SqlScalarType::Int32.nullable(true));
2728        }
2729        let desc = desc.finish();
2730
2731        let encoded = row.encode_to_vec();
2732        assert_eq!(Row::decode(&encoded, &desc), Ok(row));
2733    }
2734
2735    #[mz_ore::test]
2736    fn smoketest_projection() {
2737        let desc = RelationDesc::builder()
2738            .with_column("a", SqlScalarType::Int64.nullable(true))
2739            .with_column("b", SqlScalarType::String.nullable(true))
2740            .with_column("c", SqlScalarType::Bool.nullable(true))
2741            .finish();
2742        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2743
2744        let mut og_row = Row::default();
2745        {
2746            let mut packer = og_row.packer();
2747            packer.push(Datum::Int64(100));
2748            packer.push(Datum::String("hello world"));
2749            packer.push(Datum::True);
2750        }
2751        let mut og_row_2 = Row::default();
2752        {
2753            let mut packer = og_row_2.packer();
2754            packer.push(Datum::Null);
2755            packer.push(Datum::Null);
2756            packer.push(Datum::Null);
2757        }
2758
2759        encoder.append(&og_row);
2760        encoder.append(&og_row_2);
2761        let col = encoder.finish();
2762
2763        let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2764
2765        let decoder = <RelationDesc as Schema<Row>>::decoder(&projected_desc, col).unwrap();
2766
2767        let mut rnd_row = Row::default();
2768        decoder.decode(0, &mut rnd_row);
2769        let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]);
2770        assert_eq!(expected_row, rnd_row);
2771
2772        let mut rnd_row = Row::default();
2773        decoder.decode(1, &mut rnd_row);
2774        let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]);
2775        assert_eq!(expected_row, rnd_row);
2776    }
2777}