mz_repr/row/
encode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A permanent storage encoding for rows.
11//!
12//! See row.proto for details.
13
14use std::fmt::Debug;
15use std::ops::AddAssign;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use arrow::array::{
20    Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
21    BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, Float32Builder,
22    Float64Array, Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array,
23    Int64Builder, ListArray, ListBuilder, MapArray, StringArray, StringBuilder, StructArray,
24    UInt8Array, UInt8Builder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array,
25    UInt64Builder, make_array,
26};
27use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
28use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
29use bytes::{BufMut, Bytes};
30use chrono::Timelike;
31use dec::{Context, Decimal, OrderedDecimal};
32use itertools::{EitherOrBoth, Itertools};
33use mz_ore::assert_none;
34use mz_ore::cast::CastFrom;
35use mz_persist_types::Codec;
36use mz_persist_types::arrow::ArrayOrd;
37use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema};
38use mz_persist_types::stats::{
39    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, FixedSizeBytesStatsKind,
40    OptionStats, PrimitiveStats, StructStats,
41};
42use mz_proto::chrono::ProtoNaiveTime;
43use mz_proto::{ProtoType, RustType, TryFromProtoError};
44use prost::Message;
45use timely::Container;
46use uuid::Uuid;
47
48use crate::adt::array::{ArrayDimension, PackedArrayDimension};
49use crate::adt::date::Date;
50use crate::adt::datetime::PackedNaiveTime;
51use crate::adt::interval::PackedInterval;
52use crate::adt::jsonb::{JsonbPacker, JsonbRef};
53use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
54use crate::adt::numeric::{Numeric, PackedNumeric};
55use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
56use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
57use crate::row::proto_datum::DatumType;
58use crate::row::{
59    ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
60    ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
61};
62use crate::stats::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
63use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, ScalarType, Timestamp};
64
65// TODO(parkmycar): Benchmark the difference between `FixedSizeBinaryArray` and `BinaryArray`.
66//
67// `FixedSizeBinaryArray`s push empty bytes when a value is null which for larger binary types
68// could result in poor performance.
69#[allow(clippy::as_conversions)]
70mod fixed_binary_sizes {
71    use super::*;
72
73    pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
74    pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
75    pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
76    pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
77    pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
78    pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
79
80    pub const UUID_FIXED_BYTES: i32 = 16;
81    static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
82}
83use fixed_binary_sizes::*;
84
85/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match:
86/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this
87/// is true for many types.
88pub fn preserves_order(scalar_type: &ScalarType) -> bool {
89    match scalar_type {
90        // These types have short, fixed-length encodings that are designed to sort identically.
91        ScalarType::Bool
92        | ScalarType::Int16
93        | ScalarType::Int32
94        | ScalarType::Int64
95        | ScalarType::UInt16
96        | ScalarType::UInt32
97        | ScalarType::UInt64
98        | ScalarType::Date
99        | ScalarType::Time
100        | ScalarType::Timestamp { .. }
101        | ScalarType::TimestampTz { .. }
102        | ScalarType::Interval
103        | ScalarType::Bytes
104        | ScalarType::String
105        | ScalarType::Uuid
106        | ScalarType::MzTimestamp
107        | ScalarType::MzAclItem
108        | ScalarType::AclItem => true,
109        // We sort records lexicographically; a record has a meaningful sort if all its fields do.
110        ScalarType::Record { fields, .. } => fields
111            .iter()
112            .all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
113        // Our floating-point encoding preserves order generally, but differs when comparing
114        // -0 and 0. Opt these out for now.
115        ScalarType::Float32 | ScalarType::Float64 => false,
116        // Numeric is sensitive to similar ordering issues as floating point numbers, and requires
117        // some special handling we don't have yet.
118        ScalarType::Numeric { .. } => false,
119        // For all other types: either the encoding is known to not preserve ordering, or we
120        // don't yet care to make strong guarantees one way or the other.
121        ScalarType::PgLegacyChar
122        | ScalarType::PgLegacyName
123        | ScalarType::Char { .. }
124        | ScalarType::VarChar { .. }
125        | ScalarType::Jsonb
126        | ScalarType::Array(_)
127        | ScalarType::List { .. }
128        | ScalarType::Oid
129        | ScalarType::Map { .. }
130        | ScalarType::RegProc
131        | ScalarType::RegType
132        | ScalarType::RegClass
133        | ScalarType::Int2Vector
134        | ScalarType::Range { .. } => false,
135    }
136}
137
138/// An encoder for a column of [`Datum`]s.
139#[derive(Debug)]
140struct DatumEncoder {
141    nullable: bool,
142    encoder: DatumColumnEncoder,
143}
144
145impl DatumEncoder {
146    fn goodbytes(&self) -> usize {
147        self.encoder.goodbytes()
148    }
149
150    fn push(&mut self, datum: Datum) {
151        assert!(
152            !datum.is_null() || self.nullable,
153            "tried pushing Null into non-nullable column"
154        );
155        self.encoder.push(datum);
156    }
157
158    fn push_invalid(&mut self) {
159        self.encoder.push_invalid();
160    }
161
162    fn finish(self) -> ArrayRef {
163        self.encoder.finish()
164    }
165}
166
167/// An encoder for a single column of [`Datum`]s. To encode an entire row see
168/// [`RowColumnarEncoder`].
169///
170/// Note: We specifically structure the encoder as an enum instead of using trait objects because
171/// Datum encoding is an extremely hot path and downcasting objects is relatively slow.
172#[derive(Debug)]
173enum DatumColumnEncoder {
174    Bool(BooleanBuilder),
175    U8(UInt8Builder),
176    U16(UInt16Builder),
177    U32(UInt32Builder),
178    U64(UInt64Builder),
179    I16(Int16Builder),
180    I32(Int32Builder),
181    I64(Int64Builder),
182    F32(Float32Builder),
183    F64(Float64Builder),
184    Numeric {
185        /// The raw bytes so we can losslessly roundtrip Numerics.
186        binary_values: BinaryBuilder,
187        /// Also maintain a float64 approximation for sorting.
188        approx_values: Float64Builder,
189        /// Re-usable `libdecimal` context for conversions.
190        numeric_context: Context<Numeric>,
191    },
192    Bytes(BinaryBuilder),
193    String(StringBuilder),
194    Date(Int32Builder),
195    Time(FixedSizeBinaryBuilder),
196    Timestamp(FixedSizeBinaryBuilder),
197    TimestampTz(FixedSizeBinaryBuilder),
198    MzTimestamp(UInt64Builder),
199    Interval(FixedSizeBinaryBuilder),
200    Uuid(FixedSizeBinaryBuilder),
201    AclItem(FixedSizeBinaryBuilder),
202    MzAclItem(BinaryBuilder),
203    Range(BinaryBuilder),
204    /// Hand rolled "StringBuilder" that reduces the number of copies required
205    /// to serialize JSON.
206    ///
207    /// An alternative would be to use [`StringBuilder`] but that requires
208    /// serializing to an intermediary string, and then copying that
209    /// intermediary string into an underlying buffer.
210    Jsonb {
211        /// Monotonically increasing offsets of each encoded segment.
212        offsets: Vec<i32>,
213        /// Buffer that contains UTF-8 encoded JSON.
214        buf: Vec<u8>,
215        /// Null entries, if any.
216        nulls: Option<BooleanBufferBuilder>,
217    },
218    Array {
219        /// Binary encoded `ArrayDimension`s.
220        dims: ListBuilder<FixedSizeBinaryBuilder>,
221        /// Lengths of each `Array` in this column.
222        val_lengths: Vec<usize>,
223        /// Contiguous array of underlying data.
224        vals: Box<DatumColumnEncoder>,
225        /// Null entires, if any.
226        nulls: Option<BooleanBufferBuilder>,
227    },
228    List {
229        /// Lengths of each `List` in this column.
230        lengths: Vec<usize>,
231        /// Contiguous array of underlying data.
232        values: Box<DatumColumnEncoder>,
233        /// Null entires, if any.
234        nulls: Option<BooleanBufferBuilder>,
235    },
236    Map {
237        /// Lengths of each `Map` in this column
238        lengths: Vec<usize>,
239        /// Contiguous array of key data.
240        keys: StringBuilder,
241        /// Contiguous array of val data.
242        vals: Box<DatumColumnEncoder>,
243        /// Null entires, if any.
244        nulls: Option<BooleanBufferBuilder>,
245    },
246    Record {
247        /// Columns in the record.
248        fields: Vec<DatumEncoder>,
249        /// Null entries, if any.
250        nulls: Option<BooleanBufferBuilder>,
251        /// Number of values we've pushed into this builder thus far.
252        length: usize,
253    },
254    /// Special encoder for a [`ScalarType::Record`] that has no inner fields.
255    ///
256    /// We have a special case for this scenario because Arrow does not allow a
257    /// [`StructArray`] (what normally use to encod a `Record`) with no fields.
258    RecordEmpty(BooleanBuilder),
259}
260
261impl DatumColumnEncoder {
262    fn goodbytes(&self) -> usize {
263        match self {
264            DatumColumnEncoder::Bool(a) => a.len(),
265            DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
266            DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
267            DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
268            DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
269            DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
270            DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
271            DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
272            DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
273            DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
274            DatumColumnEncoder::Numeric {
275                binary_values,
276                approx_values,
277                ..
278            } => {
279                binary_values.values_slice().len()
280                    + approx_values.values_slice().to_byte_slice().len()
281            }
282            DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
283            DatumColumnEncoder::String(a) => a.values_slice().len(),
284            DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
285            DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
286            DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
287            DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
288            DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
289            DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
290            DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
291            DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
292            DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
293            DatumColumnEncoder::Range(a) => a.values_slice().len(),
294            DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
295            DatumColumnEncoder::Array { dims, vals, .. } => {
296                dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
297            }
298            DatumColumnEncoder::List { values, .. } => values.goodbytes(),
299            DatumColumnEncoder::Map { keys, vals, .. } => {
300                keys.values_slice().len() + vals.goodbytes()
301            }
302            DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
303            DatumColumnEncoder::RecordEmpty(a) => a.len(),
304        }
305    }
306
307    fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
308        match (self, datum) {
309            (DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
310                bool_builder.append_value(true)
311            }
312            (DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
313                bool_builder.append_value(false)
314            }
315            (DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
316            (DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
317            (DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
318            (DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
319            (DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
320            (DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
321            (DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
322            (DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
323            (DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
324            (
325                DatumColumnEncoder::Numeric {
326                    approx_values,
327                    binary_values,
328                    numeric_context,
329                },
330                Datum::Numeric(val),
331            ) => {
332                let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
333                    numeric_context.clear_status();
334                    if val.0.is_negative() {
335                        f64::NEG_INFINITY
336                    } else {
337                        f64::INFINITY
338                    }
339                });
340                let packed = PackedNumeric::from_value(val.0);
341
342                approx_values.append_value(float_approx);
343                binary_values.append_value(packed.as_bytes());
344            }
345            (DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
346            (DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
347            (DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
348                builder.append_value(val.pg_epoch_days())
349            }
350            (DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
351                let packed = PackedNaiveTime::from_value(val);
352                builder
353                    .append_value(packed.as_bytes())
354                    .expect("known correct size");
355            }
356            (DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
357                let packed = PackedNaiveDateTime::from_value(val.to_naive());
358                builder
359                    .append_value(packed.as_bytes())
360                    .expect("known correct size");
361            }
362            (DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
363                let packed = PackedNaiveDateTime::from_value(val.to_naive());
364                builder
365                    .append_value(packed.as_bytes())
366                    .expect("known correct size");
367            }
368            (DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
369                builder.append_value(val.into());
370            }
371            (DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
372                let packed = PackedInterval::from_value(val);
373                builder
374                    .append_value(packed.as_bytes())
375                    .expect("known correct size");
376            }
377            (DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
378                .append_value(val.as_bytes())
379                .expect("known correct size"),
380            (DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
381                let packed = PackedAclItem::from_value(val);
382                builder
383                    .append_value(packed.as_bytes())
384                    .expect("known correct size");
385            }
386            (DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
387                let packed = PackedMzAclItem::from_value(val);
388                builder.append_value(packed.as_bytes());
389            }
390            (DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
391                let proto = ProtoDatum::from(d);
392                let bytes = proto.encode_to_vec();
393                builder.append_value(&bytes);
394            }
395            (
396                DatumColumnEncoder::Jsonb {
397                    offsets,
398                    buf,
399                    nulls,
400                },
401                d @ Datum::JsonNull
402                | d @ Datum::True
403                | d @ Datum::False
404                | d @ Datum::Numeric(_)
405                | d @ Datum::String(_)
406                | d @ Datum::List(_)
407                | d @ Datum::Map(_),
408            ) => {
409                // TODO(parkmycar): Why do we need to re-borrow here?
410                let mut buf = buf;
411                let json = JsonbRef::from_datum(d);
412
413                // Serialize our JSON.
414                json.to_writer(&mut buf)
415                    .expect("failed to serialize Datum to jsonb");
416                let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
417                offsets.push(offset);
418
419                if let Some(nulls) = nulls {
420                    nulls.append(true);
421                }
422            }
423            (
424                DatumColumnEncoder::Array {
425                    dims,
426                    val_lengths,
427                    vals,
428                    nulls,
429                },
430                Datum::Array(array),
431            ) => {
432                // Store our array dimensions.
433                for dimension in array.dims() {
434                    let packed = PackedArrayDimension::from_value(dimension);
435                    dims.values()
436                        .append_value(packed.as_bytes())
437                        .expect("known correct size");
438                }
439                dims.append(true);
440
441                // Store the values of the array.
442                let mut count = 0;
443                for datum in &array.elements() {
444                    count += 1;
445                    vals.push(datum);
446                }
447                val_lengths.push(count);
448
449                if let Some(nulls) = nulls {
450                    nulls.append(true);
451                }
452            }
453            (
454                DatumColumnEncoder::List {
455                    lengths,
456                    values,
457                    nulls,
458                },
459                Datum::List(list),
460            ) => {
461                let mut count = 0;
462                for datum in &list {
463                    count += 1;
464                    values.push(datum);
465                }
466                lengths.push(count);
467
468                if let Some(nulls) = nulls {
469                    nulls.append(true);
470                }
471            }
472            (
473                DatumColumnEncoder::Map {
474                    lengths,
475                    keys,
476                    vals,
477                    nulls,
478                },
479                Datum::Map(map),
480            ) => {
481                let mut count = 0;
482                for (key, datum) in &map {
483                    count += 1;
484                    keys.append_value(key);
485                    vals.push(datum);
486                }
487                lengths.push(count);
488
489                if let Some(nulls) = nulls {
490                    nulls.append(true);
491                }
492            }
493            (
494                DatumColumnEncoder::Record {
495                    fields,
496                    nulls,
497                    length,
498                },
499                Datum::List(records),
500            ) => {
501                let mut count = 0;
502                // `zip_eq` will panic if the number of records != number of fields.
503                for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
504                    count += 1;
505                    encoder.push(datum);
506                }
507                assert_eq!(count, fields.len());
508
509                length.add_assign(1);
510                if let Some(nulls) = nulls.as_mut() {
511                    nulls.append(true);
512                }
513            }
514            (DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
515                assert_none!(records.into_iter().next());
516                builder.append_value(true);
517            }
518            (encoder, Datum::Null) => encoder.push_invalid(),
519            (encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
520        }
521    }
522
523    fn push_invalid(&mut self) {
524        match self {
525            DatumColumnEncoder::Bool(builder) => builder.append_null(),
526            DatumColumnEncoder::U8(builder) => builder.append_null(),
527            DatumColumnEncoder::U16(builder) => builder.append_null(),
528            DatumColumnEncoder::U32(builder) => builder.append_null(),
529            DatumColumnEncoder::U64(builder) => builder.append_null(),
530            DatumColumnEncoder::I16(builder) => builder.append_null(),
531            DatumColumnEncoder::I32(builder) => builder.append_null(),
532            DatumColumnEncoder::I64(builder) => builder.append_null(),
533            DatumColumnEncoder::F32(builder) => builder.append_null(),
534            DatumColumnEncoder::F64(builder) => builder.append_null(),
535            DatumColumnEncoder::Numeric {
536                approx_values,
537                binary_values,
538                numeric_context: _,
539            } => {
540                approx_values.append_null();
541                binary_values.append_null();
542            }
543            DatumColumnEncoder::String(builder) => builder.append_null(),
544            DatumColumnEncoder::Bytes(builder) => builder.append_null(),
545            DatumColumnEncoder::Date(builder) => builder.append_null(),
546            DatumColumnEncoder::Time(builder) => builder.append_null(),
547            DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
548            DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
549            DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
550            DatumColumnEncoder::Interval(builder) => builder.append_null(),
551            DatumColumnEncoder::Uuid(builder) => builder.append_null(),
552            DatumColumnEncoder::AclItem(builder) => builder.append_null(),
553            DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
554            DatumColumnEncoder::Range(builder) => builder.append_null(),
555            DatumColumnEncoder::Jsonb {
556                offsets,
557                buf: _,
558                nulls,
559            } => {
560                let nulls = nulls.get_or_insert_with(|| {
561                    let mut buf = BooleanBufferBuilder::new(offsets.len());
562                    // The offsets buffer has one more value than there are elements.
563                    buf.append_n(offsets.len() - 1, true);
564                    buf
565                });
566
567                offsets.push(offsets.last().copied().unwrap_or(0));
568                nulls.append(false);
569            }
570            DatumColumnEncoder::Array {
571                dims,
572                val_lengths,
573                vals: _,
574                nulls,
575            } => {
576                let nulls = nulls.get_or_insert_with(|| {
577                    let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
578                    buf.append_n(dims.len(), true);
579                    buf
580                });
581                dims.append_null();
582
583                val_lengths.push(0);
584                nulls.append(false);
585            }
586            DatumColumnEncoder::List {
587                lengths,
588                values: _,
589                nulls,
590            } => {
591                let nulls = nulls.get_or_insert_with(|| {
592                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
593                    buf.append_n(lengths.len(), true);
594                    buf
595                });
596
597                lengths.push(0);
598                nulls.append(false);
599            }
600            DatumColumnEncoder::Map {
601                lengths,
602                keys: _,
603                vals: _,
604                nulls,
605            } => {
606                let nulls = nulls.get_or_insert_with(|| {
607                    let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
608                    buf.append_n(lengths.len(), true);
609                    buf
610                });
611
612                lengths.push(0);
613                nulls.append(false);
614            }
615            DatumColumnEncoder::Record {
616                fields,
617                nulls,
618                length,
619            } => {
620                let nulls = nulls.get_or_insert_with(|| {
621                    let mut buf = BooleanBufferBuilder::new(*length + 1);
622                    buf.append_n(*length, true);
623                    buf
624                });
625                nulls.append(false);
626                length.add_assign(1);
627
628                for field in fields {
629                    field.push_invalid();
630                }
631            }
632            DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
633        }
634    }
635
636    fn finish(self) -> ArrayRef {
637        match self {
638            DatumColumnEncoder::Bool(mut builder) => {
639                let array = builder.finish();
640                Arc::new(array)
641            }
642            DatumColumnEncoder::U8(mut builder) => {
643                let array = builder.finish();
644                Arc::new(array)
645            }
646            DatumColumnEncoder::U16(mut builder) => {
647                let array = builder.finish();
648                Arc::new(array)
649            }
650            DatumColumnEncoder::U32(mut builder) => {
651                let array = builder.finish();
652                Arc::new(array)
653            }
654            DatumColumnEncoder::U64(mut builder) => {
655                let array = builder.finish();
656                Arc::new(array)
657            }
658            DatumColumnEncoder::I16(mut builder) => {
659                let array = builder.finish();
660                Arc::new(array)
661            }
662            DatumColumnEncoder::I32(mut builder) => {
663                let array = builder.finish();
664                Arc::new(array)
665            }
666            DatumColumnEncoder::I64(mut builder) => {
667                let array = builder.finish();
668                Arc::new(array)
669            }
670            DatumColumnEncoder::F32(mut builder) => {
671                let array = builder.finish();
672                Arc::new(array)
673            }
674            DatumColumnEncoder::F64(mut builder) => {
675                let array = builder.finish();
676                Arc::new(array)
677            }
678            DatumColumnEncoder::Numeric {
679                mut approx_values,
680                mut binary_values,
681                numeric_context: _,
682            } => {
683                let approx_array = approx_values.finish();
684                let binary_array = binary_values.finish();
685
686                assert_eq!(approx_array.len(), binary_array.len());
687                // This is O(n) so we only enable it for debug assertions.
688                debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
689
690                let fields = Fields::from(vec![
691                    Field::new("approx", approx_array.data_type().clone(), true),
692                    Field::new("binary", binary_array.data_type().clone(), true),
693                ]);
694                let nulls = approx_array.logical_nulls();
695                let array = StructArray::new(
696                    fields,
697                    vec![Arc::new(approx_array), Arc::new(binary_array)],
698                    nulls,
699                );
700                Arc::new(array)
701            }
702            DatumColumnEncoder::String(mut builder) => {
703                let array = builder.finish();
704                Arc::new(array)
705            }
706            DatumColumnEncoder::Bytes(mut builder) => {
707                let array = builder.finish();
708                Arc::new(array)
709            }
710            DatumColumnEncoder::Date(mut builder) => {
711                let array = builder.finish();
712                Arc::new(array)
713            }
714            DatumColumnEncoder::Time(mut builder) => {
715                let array = builder.finish();
716                Arc::new(array)
717            }
718            DatumColumnEncoder::Timestamp(mut builder) => {
719                let array = builder.finish();
720                Arc::new(array)
721            }
722            DatumColumnEncoder::TimestampTz(mut builder) => {
723                let array = builder.finish();
724                Arc::new(array)
725            }
726            DatumColumnEncoder::MzTimestamp(mut builder) => {
727                let array = builder.finish();
728                Arc::new(array)
729            }
730            DatumColumnEncoder::Interval(mut builder) => {
731                let array = builder.finish();
732                Arc::new(array)
733            }
734            DatumColumnEncoder::Uuid(mut builder) => {
735                let array = builder.finish();
736                Arc::new(array)
737            }
738            DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
739            DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
740            DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
741            DatumColumnEncoder::Jsonb {
742                offsets,
743                buf,
744                mut nulls,
745            } => {
746                let values = Buffer::from_vec(buf);
747                let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
748                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
749                let array = StringArray::new(offsets, values, nulls);
750                Arc::new(array)
751            }
752            DatumColumnEncoder::Array {
753                mut dims,
754                val_lengths,
755                vals,
756                mut nulls,
757            } => {
758                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
759                let vals = vals.finish();
760
761                // Note: Values in an Array can always be Null, regardless of whether or not the
762                // column is nullable.
763                let field = Field::new_list_field(vals.data_type().clone(), true);
764                let val_offsets = OffsetBuffer::from_lengths(val_lengths);
765                let values =
766                    ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
767
768                let dims = dims.finish();
769                assert_eq!(values.len(), dims.len());
770
771                // Note: The inner arrays are always nullable, and we let the higher-level array
772                // drive nullability for the entire column.
773                let fields = Fields::from(vec![
774                    Field::new("dims", dims.data_type().clone(), true),
775                    Field::new("vals", values.data_type().clone(), true),
776                ]);
777                let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
778
779                Arc::new(array)
780            }
781            DatumColumnEncoder::List {
782                lengths,
783                values,
784                mut nulls,
785            } => {
786                let values = values.finish();
787
788                // Note: Values in an Array can always be Null, regardless of whether or not the
789                // column is nullable.
790                let field = Field::new_list_field(values.data_type().clone(), true);
791                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
792                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
793
794                let array = ListArray::new(Arc::new(field), offsets, values, nulls);
795                Arc::new(array)
796            }
797            DatumColumnEncoder::Map {
798                lengths,
799                mut keys,
800                vals,
801                mut nulls,
802            } => {
803                let keys = keys.finish();
804                let vals = vals.finish();
805
806                let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
807                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
808
809                // Note: Values in an Map can always be Null, regardless of whether or not the
810                // column is nullable, but Keys cannot.
811                assert_none!(keys.logical_nulls());
812                let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
813                let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
814                let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
815                let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
816
817                // Note: DatumMap is always sorted, and Arrow enforces that the inner 'map_entries'
818                // array can never be null.
819                let field = Field::new("map_entries", entries.data_type().clone(), false);
820                let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
821                Arc::new(array)
822            }
823            DatumColumnEncoder::Record {
824                fields,
825                mut nulls,
826                length: _,
827            } => {
828                let (fields, arrays): (Vec<_>, Vec<_>) = fields
829                    .into_iter()
830                    .enumerate()
831                    .map(|(tag, encoder)| {
832                        // Note: We mark all columns as nullable at the Arrow/Parquet level because
833                        // it has a negligible performance difference, but it protects us from
834                        // unintended nullability changes in the columns of SQL objects.
835                        //
836                        // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
837                        let nullable = true;
838                        let array = encoder.finish();
839                        let field =
840                            Field::new(tag.to_string(), array.data_type().clone(), nullable);
841                        (field, array)
842                    })
843                    .unzip();
844                let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
845
846                let array = StructArray::new(Fields::from(fields), arrays, nulls);
847                Arc::new(array)
848            }
849            DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
850        }
851    }
852}
853
854/// A decoder for a column of [`Datum`]s.
855///
856/// Note: We specifically structure the decoder as an enum instead of using trait objects because
857/// Datum decoding is an extremely hot path and downcasting objects is relatively slow.
858#[derive(Debug)]
859enum DatumColumnDecoder {
860    Bool(BooleanArray),
861    U8(UInt8Array),
862    U16(UInt16Array),
863    U32(UInt32Array),
864    U64(UInt64Array),
865    I16(Int16Array),
866    I32(Int32Array),
867    I64(Int64Array),
868    F32(Float32Array),
869    F64(Float64Array),
870    Numeric(BinaryArray),
871    Bytes(BinaryArray),
872    String(StringArray),
873    Date(Int32Array),
874    Time(FixedSizeBinaryArray),
875    Timestamp(FixedSizeBinaryArray),
876    TimestampTz(FixedSizeBinaryArray),
877    MzTimestamp(UInt64Array),
878    Interval(FixedSizeBinaryArray),
879    Uuid(FixedSizeBinaryArray),
880    Json(StringArray),
881    Array {
882        dim_offsets: OffsetBuffer<i32>,
883        dims: FixedSizeBinaryArray,
884        val_offsets: OffsetBuffer<i32>,
885        vals: Box<DatumColumnDecoder>,
886        nulls: Option<NullBuffer>,
887    },
888    List {
889        offsets: OffsetBuffer<i32>,
890        values: Box<DatumColumnDecoder>,
891        nulls: Option<NullBuffer>,
892    },
893    Map {
894        offsets: OffsetBuffer<i32>,
895        keys: StringArray,
896        vals: Box<DatumColumnDecoder>,
897        nulls: Option<NullBuffer>,
898    },
899    RecordEmpty(BooleanArray),
900    Record {
901        fields: Vec<Box<DatumColumnDecoder>>,
902        nulls: Option<NullBuffer>,
903    },
904    Range(BinaryArray),
905    MzAclItem(BinaryArray),
906    AclItem(FixedSizeBinaryArray),
907}
908
909impl DatumColumnDecoder {
910    fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
911        let datum = match self {
912            DatumColumnDecoder::Bool(array) => array
913                .is_valid(idx)
914                .then(|| array.value(idx))
915                .map(|x| if x { Datum::True } else { Datum::False }),
916            DatumColumnDecoder::U8(array) => array
917                .is_valid(idx)
918                .then(|| array.value(idx))
919                .map(Datum::UInt8),
920            DatumColumnDecoder::U16(array) => array
921                .is_valid(idx)
922                .then(|| array.value(idx))
923                .map(Datum::UInt16),
924            DatumColumnDecoder::U32(array) => array
925                .is_valid(idx)
926                .then(|| array.value(idx))
927                .map(Datum::UInt32),
928            DatumColumnDecoder::U64(array) => array
929                .is_valid(idx)
930                .then(|| array.value(idx))
931                .map(Datum::UInt64),
932            DatumColumnDecoder::I16(array) => array
933                .is_valid(idx)
934                .then(|| array.value(idx))
935                .map(Datum::Int16),
936            DatumColumnDecoder::I32(array) => array
937                .is_valid(idx)
938                .then(|| array.value(idx))
939                .map(Datum::Int32),
940            DatumColumnDecoder::I64(array) => array
941                .is_valid(idx)
942                .then(|| array.value(idx))
943                .map(Datum::Int64),
944            DatumColumnDecoder::F32(array) => array
945                .is_valid(idx)
946                .then(|| array.value(idx))
947                .map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
948            DatumColumnDecoder::F64(array) => array
949                .is_valid(idx)
950                .then(|| array.value(idx))
951                .map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
952            DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
953                let val = array.value(idx);
954                let val = PackedNumeric::from_bytes(val)
955                    .expect("failed to roundtrip Numeric")
956                    .into_value();
957                Datum::Numeric(OrderedDecimal(val))
958            }),
959            DatumColumnDecoder::String(array) => array
960                .is_valid(idx)
961                .then(|| array.value(idx))
962                .map(Datum::String),
963            DatumColumnDecoder::Bytes(array) => array
964                .is_valid(idx)
965                .then(|| array.value(idx))
966                .map(Datum::Bytes),
967            DatumColumnDecoder::Date(array) => {
968                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
969                    let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
970                    Datum::Date(date)
971                })
972            }
973            DatumColumnDecoder::Time(array) => {
974                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
975                    let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
976                    Datum::Time(packed.into_value())
977                })
978            }
979            DatumColumnDecoder::Timestamp(array) => {
980                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
981                    let packed = PackedNaiveDateTime::from_bytes(x)
982                        .expect("failed to roundtrip PackedNaiveDateTime");
983                    let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
984                        .expect("failed to roundtrip timestamp");
985                    Datum::Timestamp(timestamp)
986                })
987            }
988            DatumColumnDecoder::TimestampTz(array) => {
989                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
990                    let packed = PackedNaiveDateTime::from_bytes(x)
991                        .expect("failed to roundtrip PackedNaiveDateTime");
992                    let timestamp =
993                        CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
994                            .expect("failed to roundtrip timestamp");
995                    Datum::TimestampTz(timestamp)
996                })
997            }
998            DatumColumnDecoder::MzTimestamp(array) => array
999                .is_valid(idx)
1000                .then(|| array.value(idx))
1001                .map(|x| Datum::MzTimestamp(Timestamp::from(x))),
1002            DatumColumnDecoder::Interval(array) => {
1003                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1004                    let packed =
1005                        PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
1006                    Datum::Interval(packed.into_value())
1007                })
1008            }
1009            DatumColumnDecoder::Uuid(array) => {
1010                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1011                    let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
1012                    Datum::Uuid(uuid)
1013                })
1014            }
1015            DatumColumnDecoder::AclItem(array) => {
1016                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1017                    let packed =
1018                        PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1019                    Datum::AclItem(packed.into_value())
1020                })
1021            }
1022            DatumColumnDecoder::MzAclItem(array) => {
1023                array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1024                    let packed =
1025                        PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1026                    Datum::MzAclItem(packed.into_value())
1027                })
1028            }
1029            DatumColumnDecoder::Range(array) => {
1030                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1031                    packer.push(Datum::Null);
1032                    return;
1033                };
1034
1035                let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
1036                packer
1037                    .try_push_proto(&proto)
1038                    .expect("failed to pack ProtoRange");
1039
1040                // Return early because we've already packed the necessary Datums.
1041                return;
1042            }
1043            DatumColumnDecoder::Json(array) => {
1044                let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1045                    packer.push(Datum::Null);
1046                    return;
1047                };
1048                JsonbPacker::new(packer)
1049                    .pack_str(val)
1050                    .expect("failed to roundtrip JSON");
1051
1052                // Return early because we've already packed the necessary Datums.
1053                return;
1054            }
1055            DatumColumnDecoder::Array {
1056                dim_offsets,
1057                dims,
1058                val_offsets,
1059                vals,
1060                nulls,
1061            } => {
1062                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1063                if !is_valid {
1064                    packer.push(Datum::Null);
1065                    return;
1066                }
1067
1068                let start: usize = dim_offsets[idx]
1069                    .try_into()
1070                    .expect("unexpected negative offset");
1071                let end: usize = dim_offsets[idx + 1]
1072                    .try_into()
1073                    .expect("unexpected negative offset");
1074                let dimensions = (start..end).map(|idx| {
1075                    PackedArrayDimension::from_bytes(dims.value(idx))
1076                        .expect("failed to roundtrip ArrayDimension")
1077                        .into_value()
1078                });
1079
1080                let start: usize = val_offsets[idx]
1081                    .try_into()
1082                    .expect("unexpected negative offset");
1083                let end: usize = val_offsets[idx + 1]
1084                    .try_into()
1085                    .expect("unexpected negative offset");
1086                packer
1087                    .push_array_with_row_major(dimensions, |packer| {
1088                        for x in start..end {
1089                            vals.get(x, packer);
1090                        }
1091                        // Return the numer of Datums we just packed.
1092                        end - start
1093                    })
1094                    .expect("failed to pack Array");
1095
1096                // Return early because we've already packed the necessary Datums.
1097                return;
1098            }
1099            DatumColumnDecoder::List {
1100                offsets,
1101                values,
1102                nulls,
1103            } => {
1104                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1105                if !is_valid {
1106                    packer.push(Datum::Null);
1107                    return;
1108                }
1109
1110                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1111                let end: usize = offsets[idx + 1]
1112                    .try_into()
1113                    .expect("unexpected negative offset");
1114
1115                packer.push_list_with(|packer| {
1116                    for idx in start..end {
1117                        values.get(idx, packer)
1118                    }
1119                });
1120
1121                // Return early because we've already packed the necessary Datums.
1122                return;
1123            }
1124            DatumColumnDecoder::Map {
1125                offsets,
1126                keys,
1127                vals,
1128                nulls,
1129            } => {
1130                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1131                if !is_valid {
1132                    packer.push(Datum::Null);
1133                    return;
1134                }
1135
1136                let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1137                let end: usize = offsets[idx + 1]
1138                    .try_into()
1139                    .expect("unexpected negative offset");
1140
1141                packer.push_dict_with(|packer| {
1142                    for idx in start..end {
1143                        packer.push(Datum::String(keys.value(idx)));
1144                        vals.get(idx, packer);
1145                    }
1146                });
1147
1148                // Return early because we've already packed the necessary Datums.
1149                return;
1150            }
1151            DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
1152            DatumColumnDecoder::Record { fields, nulls } => {
1153                let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1154                if !is_valid {
1155                    packer.push(Datum::Null);
1156                    return;
1157                }
1158
1159                // let mut datums = Vec::with_capacity(fields.len());
1160                packer.push_list_with(|packer| {
1161                    for field in fields {
1162                        field.get(idx, packer);
1163                    }
1164                });
1165
1166                // Return early because we've already packed the necessary Datums.
1167                return;
1168            }
1169        };
1170
1171        match datum {
1172            Some(d) => packer.push(d),
1173            None => packer.push(Datum::Null),
1174        }
1175    }
1176
1177    fn stats(&self) -> ColumnStatKinds {
1178        match self {
1179            DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
1180            DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
1181            DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
1182            DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
1183            DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
1184            DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
1185            DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
1186            DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
1187            DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
1188            DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
1189            DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
1190            DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
1191            DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
1192            DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
1193            DatumColumnDecoder::Time(a) => {
1194                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
1195            }
1196            DatumColumnDecoder::Timestamp(a) => {
1197                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1198            }
1199            DatumColumnDecoder::TimestampTz(a) => {
1200                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1201            }
1202            DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
1203            DatumColumnDecoder::Interval(a) => {
1204                fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
1205            }
1206            DatumColumnDecoder::Uuid(a) => {
1207                fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
1208            }
1209            DatumColumnDecoder::AclItem(_)
1210            | DatumColumnDecoder::MzAclItem(_)
1211            | DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
1212            DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
1213            DatumColumnDecoder::Array { .. }
1214            | DatumColumnDecoder::List { .. }
1215            | DatumColumnDecoder::Map { .. }
1216            | DatumColumnDecoder::Record { .. }
1217            | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
1218        }
1219    }
1220
1221    fn goodbytes(&self) -> usize {
1222        match self {
1223            DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1224            DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(),
1225            DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(),
1226            DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(),
1227            DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1228            DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(),
1229            DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1230            DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(),
1231            DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(),
1232            DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(),
1233            DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1234            DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(),
1235            DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1236            DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1237            DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1238            DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1239            DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1240            DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1241            DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1242            DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1243            DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1244            DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1245            DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1246            DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(),
1247            DatumColumnDecoder::Array { dims, vals, .. } => {
1248                (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes()
1249            }
1250            DatumColumnDecoder::List { values, .. } => values.goodbytes(),
1251            DatumColumnDecoder::Map { keys, vals, .. } => {
1252                ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes()
1253            }
1254            DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
1255            DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1256        }
1257    }
1258}
1259
1260impl Schema<Row> for RelationDesc {
1261    type ArrowColumn = arrow::array::StructArray;
1262    type Statistics = OptionStats<StructStats>;
1263
1264    type Decoder = RowColumnarDecoder;
1265    type Encoder = RowColumnarEncoder;
1266
1267    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1268        RowColumnarDecoder::new(col, self)
1269    }
1270
1271    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1272        RowColumnarEncoder::new(self)
1273            .ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
1274    }
1275}
1276
1277/// A [`ColumnDecoder`] for a [`Row`].
1278#[derive(Debug)]
1279pub struct RowColumnarDecoder {
1280    /// The length of all columns in this decoder; matching all child arrays and the null array
1281    /// if present.
1282    len: usize,
1283    /// Field-specific information: the user-readable field name, the null count (or None if the
1284    /// column is non-nullable), and the decoder which wraps the column-specific array.
1285    decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
1286    /// The null buffer for this row, if present. (At time of writing, all rows are assumed to be
1287    /// logically nullable.)
1288    nullability: Option<NullBuffer>,
1289}
1290
1291/// Merge the provided null buffer with the existing array's null buffer, if any.
1292fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
1293    if null_mask.is_none() {
1294        Arc::clone(column)
1295    } else {
1296        // We calculate stats on the nested arrays, so make sure we don't count entries
1297        // that are masked off at a higher level.
1298        let nulls = NullBuffer::union(null_mask, column.nulls());
1299        let data = column
1300            .to_data()
1301            .into_builder()
1302            .nulls(nulls)
1303            .build()
1304            .expect("changed only null mask");
1305        make_array(data)
1306    }
1307}
1308
1309impl RowColumnarDecoder {
1310    /// Creates a [`RowColumnarDecoder`] that decodes from the provided [`StructArray`].
1311    ///
1312    /// Returns an error if the schema of the [`StructArray`] does not match
1313    /// the provided [`RelationDesc`].
1314    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1315        let inner_columns = col.columns();
1316        let desc_columns = desc.typ().columns();
1317
1318        if desc_columns.len() > inner_columns.len() {
1319            anyhow::bail!(
1320                "provided array has too few columns! {desc_columns:?} > {inner_columns:?}"
1321            );
1322        }
1323
1324        // For performance reasons we downcast just a single time.
1325        let mut decoders = Vec::with_capacity(desc_columns.len());
1326
1327        let null_mask = col.nulls();
1328
1329        // The columns of the `StructArray` are named with their column index.
1330        for (col_idx, col_name, col_type) in desc.iter_all() {
1331            let field_name = col_idx.to_stable_name();
1332            let column = col.column_by_name(&field_name).ok_or_else(|| {
1333                anyhow::anyhow!(
1334                    "StructArray did not contain column name {field_name}, found {:?}",
1335                    col.column_names()
1336                )
1337            })?;
1338            let column = mask_nulls(column, null_mask);
1339            let null_count = col_type.nullable.then(|| column.null_count());
1340            let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
1341            decoders.push((col_name.as_str().into(), null_count, decoder));
1342        }
1343
1344        Ok(RowColumnarDecoder {
1345            len: col.len(),
1346            decoders,
1347            nullability: col.logical_nulls(),
1348        })
1349    }
1350
1351    // Returns the number of null entries in this array of Row structs. This
1352    // will be 0 when `Row` is encoded directly, but could be non-zero when it's
1353    // used inside `SourceDataEncoder`.
1354    pub fn null_count(&self) -> usize {
1355        self.nullability.as_ref().map_or(0, |n| n.null_count())
1356    }
1357}
1358
1359impl ColumnDecoder<Row> for RowColumnarDecoder {
1360    fn decode(&self, idx: usize, val: &mut Row) {
1361        let mut packer = val.packer();
1362
1363        for (_, _, decoder) in &self.decoders {
1364            decoder.get(idx, &mut packer);
1365        }
1366    }
1367
1368    fn is_null(&self, idx: usize) -> bool {
1369        let Some(nullability) = self.nullability.as_ref() else {
1370            return false;
1371        };
1372        nullability.is_null(idx)
1373    }
1374
1375    fn goodbytes(&self) -> usize {
1376        let decoders_size: usize = self
1377            .decoders
1378            .iter()
1379            .map(|(_name, _null_count, decoder)| decoder.goodbytes())
1380            .sum();
1381
1382        decoders_size
1383            + self
1384                .nullability
1385                .as_ref()
1386                .map(|nulls| nulls.inner().inner().len())
1387                .unwrap_or(0)
1388    }
1389
1390    fn stats(&self) -> StructStats {
1391        StructStats {
1392            len: self.len,
1393            cols: self
1394                .decoders
1395                .iter()
1396                .map(|(name, null_count, decoder)| {
1397                    let name = name.to_string();
1398                    let stats = ColumnarStats {
1399                        nulls: null_count.map(|count| ColumnNullStats { count }),
1400                        values: decoder.stats(),
1401                    };
1402                    (name, stats)
1403                })
1404                .collect(),
1405        }
1406    }
1407}
1408
1409/// A [`ColumnEncoder`] for a [`Row`].
1410#[derive(Debug)]
1411pub struct RowColumnarEncoder {
1412    encoders: Vec<DatumEncoder>,
1413    // TODO(parkmycar): Replace the `usize` with a `ColumnIdx` type.
1414    col_names: Vec<(usize, Arc<str>)>,
1415    // TODO(parkmycar): Optionally omit this.
1416    nullability: BooleanBufferBuilder,
1417}
1418
1419impl RowColumnarEncoder {
1420    /// Creates a [`RowColumnarEncoder`] for the provided [`RelationDesc`].
1421    ///
1422    /// Returns `None` if the provided [`RelationDesc`] has no columns.
1423    ///
1424    /// # Note
1425    /// Internally we represent a [`Row`] as a [`StructArray`] which is
1426    /// required to have at least one field. Instead of handling this case by
1427    /// adding some special "internal" column we let a higher level encoder
1428    /// (e.g. `SourceDataColumnarEncoder`) handle this case.
1429    pub fn new(desc: &RelationDesc) -> Option<Self> {
1430        if desc.typ().columns().is_empty() {
1431            return None;
1432        }
1433
1434        let (col_names, encoders): (Vec<_>, Vec<_>) = desc
1435            .iter_all()
1436            .map(|(col_idx, col_name, col_type)| {
1437                let encoder = scalar_type_to_encoder(&col_type.scalar_type)
1438                    .expect("failed to create encoder");
1439                let encoder = DatumEncoder {
1440                    nullable: col_type.nullable,
1441                    encoder,
1442                };
1443
1444                // We name the Fields in Parquet with the column index, but for
1445                // backwards compat use the column name for stats.
1446                let name = (col_idx.to_raw(), col_name.as_str().into());
1447
1448                (name, encoder)
1449            })
1450            .unzip();
1451
1452        Some(RowColumnarEncoder {
1453            encoders,
1454            col_names,
1455            nullability: BooleanBufferBuilder::new(100),
1456        })
1457    }
1458}
1459
1460impl ColumnEncoder<Row> for RowColumnarEncoder {
1461    type FinishedColumn = StructArray;
1462
1463    fn goodbytes(&self) -> usize {
1464        self.encoders.iter().map(|e| e.goodbytes()).sum()
1465    }
1466
1467    fn append(&mut self, val: &Row) {
1468        let mut num_datums = 0;
1469        for (datum, encoder) in val.iter().zip(self.encoders.iter_mut()) {
1470            encoder.push(datum);
1471            num_datums += 1;
1472        }
1473        assert_eq!(
1474            num_datums,
1475            self.encoders.len(),
1476            "tried to encode {val:?}, but only have {:?}",
1477            self.encoders
1478        );
1479
1480        self.nullability.append(true);
1481    }
1482
1483    fn append_null(&mut self) {
1484        for encoder in self.encoders.iter_mut() {
1485            encoder.push_invalid();
1486        }
1487        self.nullability.append(false);
1488    }
1489
1490    fn finish(self) -> Self::FinishedColumn {
1491        let RowColumnarEncoder {
1492            encoders,
1493            col_names,
1494            nullability,
1495            ..
1496        } = self;
1497
1498        let (arrays, fields): (Vec<_>, Vec<_>) = col_names
1499            .iter()
1500            .zip_eq(encoders)
1501            .map(|((col_idx, _col_name), encoder)| {
1502                // Note: We mark all columns as nullable at the Arrow/Parquet level because it has
1503                // a negligible performance difference, but it protects us from unintended
1504                // nullability changes in the columns of SQL objects.
1505                //
1506                // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
1507                let nullable = true;
1508                let array = encoder.finish();
1509                let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
1510
1511                (array, field)
1512            })
1513            .multiunzip();
1514
1515        let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
1516
1517        let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
1518
1519        array
1520    }
1521}
1522
1523/// Small helper method to make downcasting an [`Array`] return an error.
1524///
1525/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1526/// very hot path and downcasting is relatively slow
1527#[inline]
1528fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
1529    array
1530        .as_any()
1531        .downcast_ref::<T>()
1532        .ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
1533}
1534
1535/// Small helper function to downcast from an array to a [`DatumColumnDecoder`].
1536///
1537/// Note: it is _super_ important that we downcast as few times as possible. Datum encoding is a
1538/// very hot path and downcasting is relatively slow
1539fn array_to_decoder(
1540    array: &Arc<dyn Array>,
1541    col_ty: &ScalarType,
1542) -> Result<DatumColumnDecoder, anyhow::Error> {
1543    let decoder = match (array.data_type(), col_ty) {
1544        (DataType::Boolean, ScalarType::Bool) => {
1545            let array = downcast_array::<BooleanArray>(array)?;
1546            DatumColumnDecoder::Bool(array.clone())
1547        }
1548        (DataType::UInt8, ScalarType::PgLegacyChar) => {
1549            let array = downcast_array::<UInt8Array>(array)?;
1550            DatumColumnDecoder::U8(array.clone())
1551        }
1552        (DataType::UInt16, ScalarType::UInt16) => {
1553            let array = downcast_array::<UInt16Array>(array)?;
1554            DatumColumnDecoder::U16(array.clone())
1555        }
1556        (
1557            DataType::UInt32,
1558            ScalarType::UInt32
1559            | ScalarType::Oid
1560            | ScalarType::RegClass
1561            | ScalarType::RegProc
1562            | ScalarType::RegType,
1563        ) => {
1564            let array = downcast_array::<UInt32Array>(array)?;
1565            DatumColumnDecoder::U32(array.clone())
1566        }
1567        (DataType::UInt64, ScalarType::UInt64) => {
1568            let array = downcast_array::<UInt64Array>(array)?;
1569            DatumColumnDecoder::U64(array.clone())
1570        }
1571        (DataType::Int16, ScalarType::Int16) => {
1572            let array = downcast_array::<Int16Array>(array)?;
1573            DatumColumnDecoder::I16(array.clone())
1574        }
1575        (DataType::Int32, ScalarType::Int32) => {
1576            let array = downcast_array::<Int32Array>(array)?;
1577            DatumColumnDecoder::I32(array.clone())
1578        }
1579        (DataType::Int64, ScalarType::Int64) => {
1580            let array = downcast_array::<Int64Array>(array)?;
1581            DatumColumnDecoder::I64(array.clone())
1582        }
1583        (DataType::Float32, ScalarType::Float32) => {
1584            let array = downcast_array::<Float32Array>(array)?;
1585            DatumColumnDecoder::F32(array.clone())
1586        }
1587        (DataType::Float64, ScalarType::Float64) => {
1588            let array = downcast_array::<Float64Array>(array)?;
1589            DatumColumnDecoder::F64(array.clone())
1590        }
1591        (DataType::Struct(_), ScalarType::Numeric { .. }) => {
1592            let array = downcast_array::<StructArray>(array)?;
1593            // Note: We only use the approx column for sorting, and ignore it
1594            // when decoding.
1595            let binary_values = array
1596                .column_by_name("binary")
1597                .expect("missing binary column");
1598
1599            let array = downcast_array::<BinaryArray>(binary_values)?;
1600            DatumColumnDecoder::Numeric(array.clone())
1601        }
1602        (
1603            DataType::Utf8,
1604            ScalarType::String
1605            | ScalarType::PgLegacyName
1606            | ScalarType::Char { .. }
1607            | ScalarType::VarChar { .. },
1608        ) => {
1609            let array = downcast_array::<StringArray>(array)?;
1610            DatumColumnDecoder::String(array.clone())
1611        }
1612        (DataType::Binary, ScalarType::Bytes) => {
1613            let array = downcast_array::<BinaryArray>(array)?;
1614            DatumColumnDecoder::Bytes(array.clone())
1615        }
1616        (DataType::Int32, ScalarType::Date) => {
1617            let array = downcast_array::<Int32Array>(array)?;
1618            DatumColumnDecoder::Date(array.clone())
1619        }
1620        (DataType::FixedSizeBinary(TIME_FIXED_BYTES), ScalarType::Time) => {
1621            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1622            DatumColumnDecoder::Time(array.clone())
1623        }
1624        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::Timestamp { .. }) => {
1625            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1626            DatumColumnDecoder::Timestamp(array.clone())
1627        }
1628        (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::TimestampTz { .. }) => {
1629            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1630            DatumColumnDecoder::TimestampTz(array.clone())
1631        }
1632        (DataType::UInt64, ScalarType::MzTimestamp) => {
1633            let array = downcast_array::<UInt64Array>(array)?;
1634            DatumColumnDecoder::MzTimestamp(array.clone())
1635        }
1636        (DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), ScalarType::Interval) => {
1637            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1638            DatumColumnDecoder::Interval(array.clone())
1639        }
1640        (DataType::FixedSizeBinary(UUID_FIXED_BYTES), ScalarType::Uuid) => {
1641            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1642            DatumColumnDecoder::Uuid(array.clone())
1643        }
1644        (DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), ScalarType::AclItem) => {
1645            let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1646            DatumColumnDecoder::AclItem(array.clone())
1647        }
1648        (DataType::Binary, ScalarType::MzAclItem) => {
1649            let array = downcast_array::<BinaryArray>(array)?;
1650            DatumColumnDecoder::MzAclItem(array.clone())
1651        }
1652        (DataType::Binary, ScalarType::Range { .. }) => {
1653            let array = downcast_array::<BinaryArray>(array)?;
1654            DatumColumnDecoder::Range(array.clone())
1655        }
1656        (DataType::Utf8, ScalarType::Jsonb) => {
1657            let array = downcast_array::<StringArray>(array)?;
1658            DatumColumnDecoder::Json(array.clone())
1659        }
1660        (DataType::Struct(_), s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector) => {
1661            let element_type = match s {
1662                ScalarType::Array(inner) => inner,
1663                ScalarType::Int2Vector => &ScalarType::Int16,
1664                _ => unreachable!("checked above"),
1665            };
1666
1667            let array = downcast_array::<StructArray>(array)?;
1668            let nulls = array.nulls().cloned();
1669
1670            let dims = array
1671                .column_by_name("dims")
1672                .expect("missing dimensions column");
1673            let dims = downcast_array::<ListArray>(dims).cloned()?;
1674            let dim_offsets = dims.offsets().clone();
1675            let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
1676
1677            let vals = array.column_by_name("vals").expect("missing values column");
1678            let vals = downcast_array::<ListArray>(vals)?;
1679            let val_offsets = vals.offsets().clone();
1680            let vals = array_to_decoder(vals.values(), element_type)?;
1681
1682            DatumColumnDecoder::Array {
1683                dim_offsets,
1684                dims,
1685                val_offsets,
1686                vals: Box::new(vals),
1687                nulls,
1688            }
1689        }
1690        (DataType::List(_), ScalarType::List { element_type, .. }) => {
1691            let array = downcast_array::<ListArray>(array)?;
1692            let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
1693            DatumColumnDecoder::List {
1694                offsets: array.offsets().clone(),
1695                values: Box::new(inner_decoder),
1696                nulls: array.nulls().cloned(),
1697            }
1698        }
1699        (DataType::Map(_, true), ScalarType::Map { value_type, .. }) => {
1700            let array = downcast_array::<MapArray>(array)?;
1701            let keys = downcast_array::<StringArray>(array.keys())?;
1702            let vals = array_to_decoder(array.values(), value_type)?;
1703            DatumColumnDecoder::Map {
1704                offsets: array.offsets().clone(),
1705                keys: keys.clone(),
1706                vals: Box::new(vals),
1707                nulls: array.nulls().cloned(),
1708            }
1709        }
1710        (DataType::List(_), ScalarType::Map { value_type, .. }) => {
1711            let array: &ListArray = downcast_array(array)?;
1712            let entries: &StructArray = downcast_array(array.values())?;
1713            let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
1714            let keys: &StringArray = downcast_array(keys)?;
1715            let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
1716            DatumColumnDecoder::Map {
1717                offsets: array.offsets().clone(),
1718                keys: keys.clone(),
1719                vals: Box::new(vals),
1720                nulls: array.nulls().cloned(),
1721            }
1722        }
1723        (DataType::Boolean, ScalarType::Record { fields, .. }) if fields.is_empty() => {
1724            let empty_record_array = downcast_array::<BooleanArray>(array)?;
1725            DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
1726        }
1727        (DataType::Struct(_), ScalarType::Record { fields, .. }) => {
1728            let record_array = downcast_array::<StructArray>(array)?;
1729            let null_mask = record_array.nulls();
1730            let mut decoders = Vec::with_capacity(fields.len());
1731            for (tag, (_name, col_type)) in fields.iter().enumerate() {
1732                let inner_array = record_array
1733                    .column_by_name(&tag.to_string())
1734                    .ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
1735                let inner_array = mask_nulls(inner_array, null_mask);
1736                let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
1737
1738                decoders.push(Box::new(inner_decoder));
1739            }
1740
1741            DatumColumnDecoder::Record {
1742                fields: decoders,
1743                nulls: record_array.nulls().cloned(),
1744            }
1745        }
1746        (x, y) => {
1747            let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
1748            mz_ore::soft_panic_or_log!("{msg}");
1749            anyhow::bail!("{msg}");
1750        }
1751    };
1752
1753    Ok(decoder)
1754}
1755
1756/// Small helper function to create a [`DatumColumnEncoder`] from a [`ScalarType`]
1757fn scalar_type_to_encoder(col_ty: &ScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
1758    let encoder = match &col_ty {
1759        ScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
1760        ScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
1761        ScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
1762        ScalarType::UInt32
1763        | ScalarType::Oid
1764        | ScalarType::RegClass
1765        | ScalarType::RegProc
1766        | ScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
1767        ScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
1768        ScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
1769        ScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
1770        ScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
1771        ScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
1772        ScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
1773        ScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
1774            approx_values: Float64Builder::new(),
1775            binary_values: BinaryBuilder::new(),
1776            numeric_context: crate::adt::numeric::cx_datum().clone(),
1777        },
1778        ScalarType::String
1779        | ScalarType::PgLegacyName
1780        | ScalarType::Char { .. }
1781        | ScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
1782        ScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
1783        ScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
1784        ScalarType::Time => DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES)),
1785        ScalarType::Timestamp { .. } => {
1786            DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1787        }
1788        ScalarType::TimestampTz { .. } => {
1789            DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1790        }
1791        ScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
1792        ScalarType::Interval => {
1793            DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
1794        }
1795        ScalarType::Uuid => DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES)),
1796        ScalarType::AclItem => {
1797            DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
1798        }
1799        ScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
1800        ScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
1801        ScalarType::Jsonb => DatumColumnEncoder::Jsonb {
1802            offsets: vec![0],
1803            buf: Vec::new(),
1804            nulls: None,
1805        },
1806        s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector => {
1807            let element_type = match s {
1808                ScalarType::Array(inner) => inner,
1809                ScalarType::Int2Vector => &ScalarType::Int16,
1810                _ => unreachable!("checked above"),
1811            };
1812            let inner = scalar_type_to_encoder(element_type)?;
1813            DatumColumnEncoder::Array {
1814                dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
1815                val_lengths: Vec::new(),
1816                vals: Box::new(inner),
1817                nulls: None,
1818            }
1819        }
1820        ScalarType::List { element_type, .. } => {
1821            let inner = scalar_type_to_encoder(&*element_type)?;
1822            DatumColumnEncoder::List {
1823                lengths: Vec::new(),
1824                values: Box::new(inner),
1825                nulls: None,
1826            }
1827        }
1828        ScalarType::Map { value_type, .. } => {
1829            let inner = scalar_type_to_encoder(&*value_type)?;
1830            DatumColumnEncoder::Map {
1831                lengths: Vec::new(),
1832                keys: StringBuilder::new(),
1833                vals: Box::new(inner),
1834                nulls: None,
1835            }
1836        }
1837        ScalarType::Record { fields, .. } if fields.is_empty() => {
1838            DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
1839        }
1840        ScalarType::Record { fields, .. } => {
1841            let encoders = fields
1842                .iter()
1843                .map(|(_name, ty)| {
1844                    scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
1845                        nullable: ty.nullable,
1846                        encoder: e,
1847                    })
1848                })
1849                .collect::<Result<_, _>>()?;
1850
1851            DatumColumnEncoder::Record {
1852                fields: encoders,
1853                nulls: None,
1854                length: 0,
1855            }
1856        }
1857    };
1858    Ok(encoder)
1859}
1860
1861impl Codec for Row {
1862    type Storage = ProtoRow;
1863    type Schema = RelationDesc;
1864
1865    fn codec_name() -> String {
1866        "protobuf[Row]".into()
1867    }
1868
1869    /// Encodes a row into the permanent storage format.
1870    ///
1871    /// This perfectly round-trips through [Row::decode]. It's guaranteed to be
1872    /// readable by future versions of Materialize through v(TODO: Figure out
1873    /// our policy).
1874    fn encode<B>(&self, buf: &mut B)
1875    where
1876        B: BufMut,
1877    {
1878        self.into_proto()
1879            .encode(buf)
1880            .expect("no required fields means no initialization errors");
1881    }
1882
1883    /// Decodes a row from the permanent storage format.
1884    ///
1885    /// This perfectly round-trips through [Row::encode]. It can read rows
1886    /// encoded by historical versions of Materialize back to v(TODO: Figure out
1887    /// our policy).
1888    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
1889        // NB: We could easily implement this directly instead of via
1890        // `decode_from`, but do this so that we get maximal coverage of the
1891        // more complicated codepath.
1892        //
1893        // The length of the encoded ProtoRow (i.e. `buf.len()`) doesn't perfect
1894        // predict the length of the resulting Row, but it's definitely
1895        // correlated, so probably a decent estimate.
1896        let mut row = Row::with_capacity(buf.len());
1897        <Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
1898        Ok(row)
1899    }
1900
1901    fn decode_from<'a>(
1902        &mut self,
1903        buf: &'a [u8],
1904        storage: &mut Option<ProtoRow>,
1905        schema: &RelationDesc,
1906    ) -> Result<(), String> {
1907        let mut proto = storage.take().unwrap_or_default();
1908        proto.clear();
1909        proto.merge(buf).map_err(|err| err.to_string())?;
1910        let ret = self.decode_from_proto(&proto, schema);
1911        storage.replace(proto);
1912        ret
1913    }
1914
1915    fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
1916        for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
1917            match x {
1918                EitherOrBoth::Both(typ, datum) if datum.is_instance_of(typ) => continue,
1919                _ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
1920            };
1921        }
1922        Ok(())
1923    }
1924
1925    fn encode_schema(schema: &Self::Schema) -> Bytes {
1926        schema.into_proto().encode_to_vec().into()
1927    }
1928
1929    fn decode_schema(buf: &Bytes) -> Self::Schema {
1930        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1931        proto.into_rust().expect("valid schema")
1932    }
1933}
1934
1935impl<'a> From<Datum<'a>> for ProtoDatum {
1936    fn from(x: Datum<'a>) -> Self {
1937        let datum_type = match x {
1938            Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
1939            Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
1940            Datum::Int16(x) => DatumType::Int16(x.into()),
1941            Datum::Int32(x) => DatumType::Int32(x),
1942            Datum::UInt8(x) => DatumType::Uint8(x.into()),
1943            Datum::UInt16(x) => DatumType::Uint16(x.into()),
1944            Datum::UInt32(x) => DatumType::Uint32(x),
1945            Datum::UInt64(x) => DatumType::Uint64(x),
1946            Datum::Int64(x) => DatumType::Int64(x),
1947            Datum::Float32(x) => DatumType::Float32(x.into_inner()),
1948            Datum::Float64(x) => DatumType::Float64(x.into_inner()),
1949            Datum::Date(x) => DatumType::Date(x.into_proto()),
1950            Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
1951                secs: x.num_seconds_from_midnight(),
1952                frac: x.nanosecond(),
1953            }),
1954            Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
1955            Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
1956            Datum::Interval(x) => DatumType::Interval(x.into_proto()),
1957            Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
1958            Datum::String(x) => DatumType::String(x.to_owned()),
1959            Datum::Array(x) => DatumType::Array(ProtoArray {
1960                elements: Some(ProtoRow {
1961                    datums: x.elements().iter().map(|x| x.into()).collect(),
1962                }),
1963                dims: x
1964                    .dims()
1965                    .into_iter()
1966                    .map(|x| ProtoArrayDimension {
1967                        lower_bound: i64::cast_from(x.lower_bound),
1968                        length: u64::cast_from(x.length),
1969                    })
1970                    .collect(),
1971            }),
1972            Datum::List(x) => DatumType::List(ProtoRow {
1973                datums: x.iter().map(|x| x.into()).collect(),
1974            }),
1975            Datum::Map(x) => DatumType::Dict(ProtoDict {
1976                elements: x
1977                    .iter()
1978                    .map(|(k, v)| ProtoDictElement {
1979                        key: k.to_owned(),
1980                        val: Some(v.into()),
1981                    })
1982                    .collect(),
1983            }),
1984            Datum::Numeric(x) => {
1985                // TODO: Do we need this defensive clone?
1986                let mut x = x.0.clone();
1987                if let Some((bcd, scale)) = x.to_packed_bcd() {
1988                    DatumType::Numeric(ProtoNumeric { bcd, scale })
1989                } else if x.is_nan() {
1990                    DatumType::Other(ProtoDatumOther::NumericNaN.into())
1991                } else if x.is_infinite() {
1992                    if x.is_negative() {
1993                        DatumType::Other(ProtoDatumOther::NumericNegInf.into())
1994                    } else {
1995                        DatumType::Other(ProtoDatumOther::NumericPosInf.into())
1996                    }
1997                } else if x.is_special() {
1998                    panic!("internal error: unhandled special numeric value: {}", x);
1999                } else {
2000                    panic!(
2001                        "internal error: to_packed_bcd returned None for non-special value: {}",
2002                        x
2003                    )
2004                }
2005            }
2006            Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
2007            Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
2008            Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
2009            Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
2010            Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
2011            Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
2012                inner: inner.map(|RangeInner { lower, upper }| {
2013                    Box::new(ProtoRangeInner {
2014                        lower_inclusive: lower.inclusive,
2015                        lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
2016                        upper_inclusive: upper.inclusive,
2017                        upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
2018                    })
2019                }),
2020            })),
2021            Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
2022            Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
2023        };
2024        ProtoDatum {
2025            datum_type: Some(datum_type),
2026        }
2027    }
2028}
2029
2030impl RowPacker<'_> {
2031    pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
2032        match &x.datum_type {
2033            Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
2034                Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
2035                Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
2036                Ok(ProtoDatumOther::False) => self.push(Datum::False),
2037                Ok(ProtoDatumOther::True) => self.push(Datum::True),
2038                Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
2039                Ok(ProtoDatumOther::Dummy) => {
2040                    // We plan to remove the `Dummy` variant soon (materialize#17099). To prepare for that, we
2041                    // emit a log to Sentry here, to notify us of any instances that might have
2042                    // been made durable.
2043                    #[cfg(feature = "tracing")]
2044                    tracing::error!("protobuf decoding found Dummy datum");
2045                    self.push(Datum::Dummy);
2046                }
2047                Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
2048                Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
2049                Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
2050                Err(_) => return Err(format!("unknown datum type: {}", o)),
2051            },
2052            Some(DatumType::Int16(x)) => {
2053                let x = i16::try_from(*x)
2054                    .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
2055                self.push(Datum::Int16(x))
2056            }
2057            Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
2058            Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
2059            Some(DatumType::Uint8(x)) => {
2060                let x = u8::try_from(*x)
2061                    .map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
2062                self.push(Datum::UInt8(x))
2063            }
2064            Some(DatumType::Uint16(x)) => {
2065                let x = u16::try_from(*x)
2066                    .map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
2067                self.push(Datum::UInt16(x))
2068            }
2069            Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
2070            Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
2071            Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
2072            Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
2073            Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
2074            Some(DatumType::String(x)) => self.push(Datum::String(x)),
2075            Some(DatumType::Uuid(x)) => {
2076                // Uuid internally has a [u8; 16] so we'll have to do at least
2077                // one copy, but there's currently an additional one when the
2078                // Vec is created. Perhaps the protobuf Bytes support will let
2079                // us fix one of them.
2080                let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
2081                self.push(Datum::Uuid(u));
2082            }
2083            Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
2084            Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
2085            Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
2086            Some(DatumType::TimestampTz(x)) => {
2087                self.push(Datum::TimestampTz(x.clone().into_rust()?))
2088            }
2089            Some(DatumType::Interval(x)) => self.push(Datum::Interval(
2090                x.clone()
2091                    .into_rust()
2092                    .map_err(|e: TryFromProtoError| e.to_string())?,
2093            )),
2094            Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
2095                for d in x.datums.iter() {
2096                    row.try_push_proto(d)?;
2097                }
2098                Ok(())
2099            })?,
2100            Some(DatumType::Array(x)) => {
2101                let dims = x
2102                    .dims
2103                    .iter()
2104                    .map(|x| ArrayDimension {
2105                        lower_bound: isize::cast_from(x.lower_bound),
2106                        length: usize::cast_from(x.length),
2107                    })
2108                    .collect::<Vec<_>>();
2109                match x.elements.as_ref() {
2110                    None => self.try_push_array(&dims, [].iter()),
2111                    Some(elements) => {
2112                        // TODO: Could we avoid this Row alloc if we made a
2113                        // push_array_with?
2114                        let elements_row = Row::try_from(elements)?;
2115                        self.try_push_array(&dims, elements_row.iter())
2116                    }
2117                }
2118                .map_err(|err| err.to_string())?
2119            }
2120            Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
2121                for e in x.elements.iter() {
2122                    row.push(Datum::from(e.key.as_str()));
2123                    let val = e
2124                        .val
2125                        .as_ref()
2126                        .ok_or_else(|| format!("missing val for key: {}", e.key))?;
2127                    row.try_push_proto(val)?;
2128                }
2129                Ok(())
2130            })?,
2131            Some(DatumType::Numeric(x)) => {
2132                // Reminder that special values like NaN, PosInf, and NegInf are
2133                // represented as variants of ProtoDatumOther.
2134                let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
2135                self.push(Datum::from(n))
2136            }
2137            Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
2138            Some(DatumType::Range(inner)) => {
2139                let ProtoRange { inner } = &**inner;
2140                match inner {
2141                    None => self.push_range(Range { inner: None }).unwrap(),
2142                    Some(inner) => {
2143                        let ProtoRangeInner {
2144                            lower_inclusive,
2145                            lower,
2146                            upper_inclusive,
2147                            upper,
2148                        } = &**inner;
2149
2150                        self.push_range_with(
2151                            RangeLowerBound {
2152                                inclusive: *lower_inclusive,
2153                                bound: lower
2154                                    .as_ref()
2155                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2156                            },
2157                            RangeUpperBound {
2158                                inclusive: *upper_inclusive,
2159                                bound: upper
2160                                    .as_ref()
2161                                    .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2162                            },
2163                        )
2164                        .expect("decoding ProtoRow must succeed");
2165                    }
2166                }
2167            }
2168            Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
2169            Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
2170            None => return Err("unknown datum type".into()),
2171        };
2172        Ok(())
2173    }
2174}
2175
2176/// TODO: remove this in favor of [`RustType::from_proto`].
2177impl TryFrom<&ProtoRow> for Row {
2178    type Error = String;
2179
2180    fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
2181        // TODO: Try to pre-size this.
2182        // see https://github.com/MaterializeInc/database-issues/issues/3640
2183        let mut row = Row::default();
2184        let mut packer = row.packer();
2185        for d in x.datums.iter() {
2186            packer.try_push_proto(d)?;
2187        }
2188        Ok(row)
2189    }
2190}
2191
2192impl RustType<ProtoRow> for Row {
2193    fn into_proto(&self) -> ProtoRow {
2194        let datums = self.iter().map(|x| x.into()).collect();
2195        ProtoRow { datums }
2196    }
2197
2198    fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
2199        // TODO: Try to pre-size this.
2200        // see https://github.com/MaterializeInc/database-issues/issues/3640
2201        let mut row = Row::default();
2202        let mut packer = row.packer();
2203        for d in proto.datums.iter() {
2204            packer
2205                .try_push_proto(d)
2206                .map_err(TryFromProtoError::RowConversionError)?;
2207        }
2208        Ok(row)
2209    }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214    use std::collections::BTreeSet;
2215
2216    use arrow::array::{ArrayData, make_array};
2217    use arrow::compute::SortOptions;
2218    use arrow::datatypes::ArrowNativeType;
2219    use arrow::row::SortField;
2220    use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2221    use mz_ore::assert_err;
2222    use mz_ore::collections::CollectionExt;
2223    use mz_persist::indexed::columnar::arrow::realloc_array;
2224    use mz_persist::metrics::ColumnarMetrics;
2225    use mz_persist_types::Codec;
2226    use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
2227    use mz_persist_types::columnar::{codec_to_schema, schema_to_codec};
2228    use mz_proto::{ProtoType, RustType};
2229    use proptest::prelude::*;
2230    use proptest::strategy::Strategy;
2231    use uuid::Uuid;
2232
2233    use super::*;
2234    use crate::adt::array::ArrayDimension;
2235    use crate::adt::interval::Interval;
2236    use crate::adt::numeric::Numeric;
2237    use crate::adt::timestamp::CheckedTimestamp;
2238    use crate::fixed_length::ToDatumIter;
2239    use crate::relation::arb_relation_desc;
2240    use crate::{ColumnName, ColumnType, RowArena, arb_datum_for_column, arb_row_for_relation};
2241    use crate::{Datum, RelationDesc, Row, ScalarType};
2242
2243    #[track_caller]
2244    fn roundtrip_datum<'a>(
2245        ty: ColumnType,
2246        datum: impl Iterator<Item = Datum<'a>>,
2247        metrics: &ColumnarMetrics,
2248    ) {
2249        let desc = RelationDesc::builder().with_column("a", ty).finish();
2250        let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
2251        roundtrip_rows(&desc, rows, metrics)
2252    }
2253
2254    #[track_caller]
2255    fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
2256        let mut encoder = <RelationDesc as Schema<Row>>::encoder(desc).unwrap();
2257        for row in &rows {
2258            encoder.append(row);
2259        }
2260        let col = encoder.finish();
2261
2262        // Exercise reallocating columns with lgalloc.
2263        let col = realloc_array(&col, metrics);
2264        // Exercise our ProtoArray format.
2265        {
2266            let proto = col.to_data().into_proto();
2267            let bytes = proto.encode_to_vec();
2268            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2269            let array_data: ArrayData = proto.into_rust().unwrap();
2270
2271            let col_rnd = StructArray::from(array_data.clone());
2272            assert_eq!(col, col_rnd);
2273
2274            let col_dyn = arrow::array::make_array(array_data);
2275            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2276            assert_eq!(&col, col_dyn);
2277        }
2278
2279        let decoder = <RelationDesc as Schema<Row>>::decoder(desc, col.clone()).unwrap();
2280        let stats = decoder.stats();
2281
2282        // Collect all of our lower and upper bounds.
2283        let arena = RowArena::new();
2284        let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
2285            .iter()
2286            .map(|(name, ty)| {
2287                let col_stats = stats.cols.get(name.as_str()).unwrap();
2288                let lower_upper =
2289                    crate::stats::col_values(&ty.scalar_type, &col_stats.values, &arena);
2290                let null_count = col_stats.nulls.map_or(0, |n| n.count);
2291
2292                (lower_upper, null_count)
2293            })
2294            .unzip();
2295        // Track how many nulls we saw for each column so we can assert stats match.
2296        let mut actual_nulls = vec![0usize; stats.len()];
2297
2298        let mut rnd_row = Row::default();
2299        for (idx, og_row) in rows.iter().enumerate() {
2300            decoder.decode(idx, &mut rnd_row);
2301            assert_eq!(og_row, &rnd_row);
2302
2303            // Check for each Datum in each Row that we're within our stats bounds.
2304            for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
2305            {
2306                let lower_upper = stats[c_idx];
2307
2308                // Assert our stat bounds are correct.
2309                if rnd_datum.is_null() {
2310                    actual_nulls[c_idx] += 1;
2311                } else if let Some((lower, upper)) = lower_upper {
2312                    assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
2313                    assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
2314                } else {
2315                    match &ty.scalar_type {
2316                        // JSON stats are handled separately.
2317                        ScalarType::Jsonb => (),
2318                        // We don't collect stats for these types.
2319                        ScalarType::AclItem
2320                        | ScalarType::MzAclItem
2321                        | ScalarType::Range { .. }
2322                        | ScalarType::Array(_)
2323                        | ScalarType::Map { .. }
2324                        | ScalarType::List { .. }
2325                        | ScalarType::Record { .. }
2326                        | ScalarType::Int2Vector => (),
2327                        other => panic!("should have collected stats for {other:?}"),
2328                    }
2329                }
2330            }
2331        }
2332
2333        // Validate that the null counts in our stats matched the actual counts.
2334        for (col_idx, (stats_count, actual_count)) in
2335            stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
2336        {
2337            assert_eq!(
2338                stats_count, actual_count,
2339                "column {col_idx} has incorrect number of nulls!"
2340            );
2341        }
2342
2343        // Validate that we can convert losslessly to codec and back
2344        let codec = schema_to_codec::<Row>(desc, &col).unwrap();
2345        let col2 = codec_to_schema::<Row>(desc, &codec).unwrap();
2346        assert_eq!(col2.as_ref(), &col);
2347
2348        // Validate that we only generate supported array types
2349        let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
2350            col.data_type().clone(),
2351            SortOptions {
2352                descending: false,
2353                nulls_first: false,
2354            },
2355        )])
2356        .expect("sortable");
2357        let rows = converter
2358            .convert_columns(&[Arc::new(col.clone())])
2359            .expect("convertible");
2360        let mut row_vec = rows.iter().collect::<Vec<_>>();
2361        row_vec.sort();
2362        let row_col = converter
2363            .convert_rows(row_vec)
2364            .expect("convertible")
2365            .into_element();
2366        assert_eq!(row_col.len(), col.len());
2367
2368        let ord = ArrayOrd::new(&col);
2369        let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
2370        indices.sort_by_key(|i| ord.at(i.as_usize()));
2371        let indices = UInt64Array::from(indices);
2372        let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
2373        assert_eq!(row_col.as_ref(), ord_col.as_ref());
2374
2375        // Check that our order matches the datum-native order when `preserves_order` is true.
2376        let ordered_prefix_len = desc
2377            .iter()
2378            .take_while(|(_, c)| preserves_order(&c.scalar_type))
2379            .count();
2380        let decoder = <RelationDesc as Schema<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2381        let rows = (0..ord_col.len()).map(|i| {
2382            let mut row = Row::default();
2383            decoder.decode(i, &mut row);
2384            row
2385        });
2386        for (a, b) in rows.tuple_windows() {
2387            let a_prefix = a.iter().take(ordered_prefix_len);
2388            let b_prefix = b.iter().take(ordered_prefix_len);
2389            assert!(
2390                a_prefix.cmp(b_prefix).is_le(),
2391                "ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
2392                desc.iter().take(ordered_prefix_len).collect_vec(),
2393                a.to_datum_iter().take(ordered_prefix_len).collect_vec(),
2394                b.to_datum_iter().take(ordered_prefix_len).collect_vec()
2395            );
2396        }
2397
2398        // Check that our size estimates are consistent.
2399        assert_eq!(
2400            ord.goodbytes(),
2401            (0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
2402            "total size should match the sum of the sizes at each index"
2403        );
2404
2405        // Check that our lower bounds work as expected.
2406        if !ord_col.is_empty() {
2407            let min_idx = indices.values()[0].as_usize();
2408            let lower_bound = ArrayBound::new(ord_col, min_idx);
2409            let max_encoded_len = 1000;
2410            if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
2411                assert!(
2412                    proto.encoded_len() <= max_encoded_len,
2413                    "should respect the max len"
2414                );
2415                let array_data = proto.into_rust().expect("valid array");
2416                let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
2417                assert!(
2418                    new_lower_bound.get() <= lower_bound.get(),
2419                    "proto-roundtripped bound should be <= the original"
2420                );
2421            }
2422        }
2423    }
2424
2425    #[mz_ore::test]
2426    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2427    fn proptest_datums() {
2428        let strat = any::<ColumnType>().prop_flat_map(|ty| {
2429            proptest::collection::vec(arb_datum_for_column(ty.clone()), 0..16)
2430                .prop_map(move |d| (ty.clone(), d))
2431        });
2432        let metrics = ColumnarMetrics::disconnected();
2433
2434        proptest!(|((ty, datums) in strat)| {
2435            roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
2436        })
2437    }
2438
2439    #[mz_ore::test]
2440    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2441    fn proptest_non_empty_relation_descs() {
2442        let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
2443            proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
2444                .prop_map(move |rows| (desc.clone(), rows))
2445        });
2446        let metrics = ColumnarMetrics::disconnected();
2447
2448        proptest!(|((desc, rows) in strat)| {
2449            roundtrip_rows(&desc, rows, &metrics)
2450        })
2451    }
2452
2453    #[mz_ore::test]
2454    fn empty_relation_desc_returns_error() {
2455        let empty_desc = RelationDesc::empty();
2456        let result = <RelationDesc as Schema<Row>>::encoder(&empty_desc);
2457        assert_err!(result);
2458    }
2459
2460    #[mz_ore::test]
2461    fn smoketest_collections() {
2462        let mut row = Row::default();
2463        let mut packer = row.packer();
2464        let metrics = ColumnarMetrics::disconnected();
2465
2466        packer
2467            .try_push_array(
2468                &[ArrayDimension {
2469                    lower_bound: 0,
2470                    length: 3,
2471                }],
2472                [Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
2473            )
2474            .unwrap();
2475
2476        let array = row.unpack_first();
2477        roundtrip_datum(
2478            ScalarType::Array(Box::new(ScalarType::UInt32)).nullable(true),
2479            [array].into_iter(),
2480            &metrics,
2481        );
2482    }
2483
2484    #[mz_ore::test]
2485    fn smoketest_row() {
2486        let desc = RelationDesc::builder()
2487            .with_column("a", ScalarType::Int64.nullable(true))
2488            .with_column("b", ScalarType::String.nullable(true))
2489            .with_column("c", ScalarType::Bool.nullable(true))
2490            .with_column(
2491                "d",
2492                ScalarType::List {
2493                    element_type: Box::new(ScalarType::UInt32),
2494                    custom_id: None,
2495                }
2496                .nullable(true),
2497            )
2498            .with_column(
2499                "e",
2500                ScalarType::Map {
2501                    value_type: Box::new(ScalarType::Int16),
2502                    custom_id: None,
2503                }
2504                .nullable(true),
2505            )
2506            .finish();
2507        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2508
2509        let mut og_row = Row::default();
2510        {
2511            let mut packer = og_row.packer();
2512            packer.push(Datum::Int64(100));
2513            packer.push(Datum::String("hello world"));
2514            packer.push(Datum::True);
2515            packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2516            packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
2517        }
2518        let mut og_row_2 = Row::default();
2519        {
2520            let mut packer = og_row_2.packer();
2521            packer.push(Datum::Null);
2522            packer.push(Datum::Null);
2523            packer.push(Datum::Null);
2524            packer.push(Datum::Null);
2525            packer.push(Datum::Null);
2526        }
2527
2528        encoder.append(&og_row);
2529        encoder.append(&og_row_2);
2530        let col = encoder.finish();
2531
2532        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2533
2534        let mut rnd_row = Row::default();
2535        decoder.decode(0, &mut rnd_row);
2536        assert_eq!(og_row, rnd_row);
2537
2538        let mut rnd_row = Row::default();
2539        decoder.decode(1, &mut rnd_row);
2540        assert_eq!(og_row_2, rnd_row);
2541    }
2542
2543    #[mz_ore::test]
2544    fn test_nested_list() {
2545        let desc = RelationDesc::builder()
2546            .with_column(
2547                "a",
2548                ScalarType::List {
2549                    element_type: Box::new(ScalarType::List {
2550                        element_type: Box::new(ScalarType::Int64),
2551                        custom_id: None,
2552                    }),
2553                    custom_id: None,
2554                }
2555                .nullable(false),
2556            )
2557            .finish();
2558        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2559
2560        let mut og_row = Row::default();
2561        {
2562            let mut packer = og_row.packer();
2563            packer.push_list_with(|inner| {
2564                inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
2565                inner.push_list([Datum::Int64(5)]);
2566                inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
2567            });
2568        }
2569
2570        encoder.append(&og_row);
2571        let col = encoder.finish();
2572
2573        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2574        let mut rnd_row = Row::default();
2575        decoder.decode(0, &mut rnd_row);
2576
2577        assert_eq!(og_row, rnd_row);
2578    }
2579
2580    #[mz_ore::test]
2581    fn test_record() {
2582        let desc = RelationDesc::builder()
2583            .with_column(
2584                "a",
2585                ScalarType::Record {
2586                    fields: [
2587                        (ColumnName::from("foo"), ScalarType::Int64.nullable(false)),
2588                        (ColumnName::from("bar"), ScalarType::String.nullable(true)),
2589                        (
2590                            ColumnName::from("baz"),
2591                            ScalarType::List {
2592                                element_type: Box::new(ScalarType::UInt32),
2593                                custom_id: None,
2594                            }
2595                            .nullable(false),
2596                        ),
2597                    ]
2598                    .into(),
2599                    custom_id: None,
2600                }
2601                .nullable(true),
2602            )
2603            .finish();
2604        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2605
2606        let mut og_row = Row::default();
2607        {
2608            let mut packer = og_row.packer();
2609            packer.push_list_with(|inner| {
2610                inner.push(Datum::Int64(42));
2611                inner.push(Datum::Null);
2612                inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2613            });
2614        }
2615        let null_row = Row::pack_slice(&[Datum::Null]);
2616
2617        encoder.append(&og_row);
2618        encoder.append(&null_row);
2619        let col = encoder.finish();
2620
2621        let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2622        let mut rnd_row = Row::default();
2623
2624        decoder.decode(0, &mut rnd_row);
2625        assert_eq!(og_row, rnd_row);
2626
2627        rnd_row.packer();
2628        decoder.decode(1, &mut rnd_row);
2629        assert_eq!(null_row, rnd_row);
2630    }
2631
2632    #[mz_ore::test]
2633    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
2634    fn roundtrip() {
2635        let mut row = Row::default();
2636        let mut packer = row.packer();
2637        packer.extend([
2638            Datum::False,
2639            Datum::True,
2640            Datum::Int16(1),
2641            Datum::Int32(2),
2642            Datum::Int64(3),
2643            Datum::Float32(4f32.into()),
2644            Datum::Float64(5f64.into()),
2645            Datum::Date(
2646                NaiveDate::from_ymd_opt(6, 7, 8)
2647                    .unwrap()
2648                    .try_into()
2649                    .unwrap(),
2650            ),
2651            Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
2652            Datum::Timestamp(
2653                CheckedTimestamp::from_timestamplike(
2654                    NaiveDate::from_ymd_opt(12, 13 % 12, 14)
2655                        .unwrap()
2656                        .and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
2657                )
2658                .unwrap(),
2659            ),
2660            Datum::TimestampTz(
2661                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
2662                    NaiveDate::from_ymd_opt(18, 19 % 12, 20)
2663                        .unwrap()
2664                        .and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
2665                    Utc,
2666                ))
2667                .unwrap(),
2668            ),
2669            Datum::Interval(Interval {
2670                months: 24,
2671                days: 42,
2672                micros: 25,
2673            }),
2674            Datum::Bytes(&[26, 27]),
2675            Datum::String("28"),
2676            Datum::from(Numeric::from(29)),
2677            Datum::from(Numeric::infinity()),
2678            Datum::from(-Numeric::infinity()),
2679            Datum::from(Numeric::nan()),
2680            Datum::JsonNull,
2681            Datum::Uuid(Uuid::from_u128(30)),
2682            Datum::Dummy,
2683            Datum::Null,
2684        ]);
2685        packer
2686            .try_push_array(
2687                &[ArrayDimension {
2688                    lower_bound: 2,
2689                    length: 2,
2690                }],
2691                vec![Datum::Int32(31), Datum::Int32(32)],
2692            )
2693            .expect("valid array");
2694        packer.push_list_with(|packer| {
2695            packer.push(Datum::String("33"));
2696            packer.push_list_with(|packer| {
2697                packer.push(Datum::String("34"));
2698                packer.push(Datum::String("35"));
2699            });
2700            packer.push(Datum::String("36"));
2701            packer.push(Datum::String("37"));
2702        });
2703        packer.push_dict_with(|row| {
2704            // Add a bunch of data to the hash to ensure we don't get a
2705            // HashMap's random iteration anywhere in the encode/decode path.
2706            let mut i = 38;
2707            for _ in 0..20 {
2708                row.push(Datum::String(&i.to_string()));
2709                row.push(Datum::Int32(i + 1));
2710                i += 2;
2711            }
2712        });
2713
2714        let mut desc = RelationDesc::builder();
2715        for (idx, _) in row.iter().enumerate() {
2716            // HACK(parkmycar): We don't currently validate the types of the `RelationDesc` are
2717            // correct, just the number of columns. So we can fill in any type here.
2718            desc = desc.with_column(idx.to_string(), ScalarType::Int32.nullable(true));
2719        }
2720        let desc = desc.finish();
2721
2722        let encoded = row.encode_to_vec();
2723        assert_eq!(Row::decode(&encoded, &desc), Ok(row));
2724    }
2725
2726    #[mz_ore::test]
2727    fn smoketest_projection() {
2728        let desc = RelationDesc::builder()
2729            .with_column("a", ScalarType::Int64.nullable(true))
2730            .with_column("b", ScalarType::String.nullable(true))
2731            .with_column("c", ScalarType::Bool.nullable(true))
2732            .finish();
2733        let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2734
2735        let mut og_row = Row::default();
2736        {
2737            let mut packer = og_row.packer();
2738            packer.push(Datum::Int64(100));
2739            packer.push(Datum::String("hello world"));
2740            packer.push(Datum::True);
2741        }
2742        let mut og_row_2 = Row::default();
2743        {
2744            let mut packer = og_row_2.packer();
2745            packer.push(Datum::Null);
2746            packer.push(Datum::Null);
2747            packer.push(Datum::Null);
2748        }
2749
2750        encoder.append(&og_row);
2751        encoder.append(&og_row_2);
2752        let col = encoder.finish();
2753
2754        let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2755
2756        let decoder = <RelationDesc as Schema<Row>>::decoder(&projected_desc, col).unwrap();
2757
2758        let mut rnd_row = Row::default();
2759        decoder.decode(0, &mut rnd_row);
2760        let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]);
2761        assert_eq!(expected_row, rnd_row);
2762
2763        let mut rnd_row = Row::default();
2764        decoder.decode(1, &mut rnd_row);
2765        let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]);
2766        assert_eq!(expected_row, rnd_row);
2767    }
2768}