Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::marker::PhantomData;
17use std::mem::{size_of, transmute};
18use std::ops::Deref;
19use std::str;
20
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
22use compact_bytes::CompactBytes;
23use mz_ore::cast::{CastFrom, ReinterpretCast};
24use mz_ore::soft_assert_no_log;
25use mz_ore::vec::Vector;
26use mz_persist_types::Codec64;
27use num_enum::{IntoPrimitive, TryFromPrimitive};
28use ordered_float::OrderedFloat;
29#[cfg(any(test, feature = "proptest"))]
30use proptest::prelude::*;
31#[cfg(any(test, feature = "proptest"))]
32use proptest::strategy::{BoxedStrategy, Strategy};
33use serde::{Deserialize, Serialize};
34use uuid::Uuid;
35
36use crate::adt::array::{
37    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
38};
39use crate::adt::date::Date;
40use crate::adt::interval::Interval;
41use crate::adt::mz_acl_item::{AclItem, MzAclItem};
42use crate::adt::numeric;
43use crate::adt::numeric::Numeric;
44use crate::adt::range::{
45    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
46};
47use crate::adt::timestamp::CheckedTimestamp;
48#[cfg(any(test, feature = "proptest"))]
49use crate::scalar::arb_datum;
50use crate::scalar::{DatumKind, SqlScalarType};
51use crate::{Datum, RelationDesc, Timestamp};
52
53pub(crate) mod encode;
54pub mod iter;
55
56include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
57
58/// A packed representation for `Datum`s.
59///
60/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
61/// is laid out in memory like this:
62///
63///   tag: 3
64///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
65///   data: 0 0 0 42
66///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
67///
68/// For a total of 32 bytes! The second set of padding is needed in case we were
69/// to write a 16-byte datum into this location. The first set of padding is
70/// needed to align that hypothetical decimal to a 16 bytes boundary.
71///
72/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
73/// for the first set of padding by only providing access to the `Datum`s via
74/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
75/// avoid the need for the second set of padding by not providing mutable access
76/// to the `Datum`. Instead, `Row` is append-only.
77///
78/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
79/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
80/// can be created. If that is not possible, consider using the row buffer
81/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
82/// receive a copy of that row, leaving behind the original allocation to pack
83/// future rows.
84///
85/// Creating a row via [`Row::pack_slice`]:
86///
87/// ```
88/// # use mz_repr::{Row, Datum};
89/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
90/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
91/// ```
92///
93/// `Row`s can be unpacked by iterating over them:
94///
95/// ```
96/// # use mz_repr::{Row, Datum};
97/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
98/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
99/// ```
100///
101/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
102/// ```
103/// # use mz_repr::{Row, Datum};
104/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
105/// let datums = row.unpack();
106/// assert_eq!(datums[1], Datum::Int32(1));
107/// ```
108///
109/// # Performance
110///
111/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
112/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
113/// avoids the allocations involved in `Row::new()`.
114#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
115pub struct Row {
116    data: CompactBytes,
117}
118
119impl Row {
120    const SIZE: usize = CompactBytes::MAX_INLINE;
121
122    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
123    /// and validates the decoding against a provided [`RelationDesc`].
124    pub fn decode_from_proto(
125        &mut self,
126        proto: &ProtoRow,
127        desc: &RelationDesc,
128    ) -> Result<(), String> {
129        let mut packer = self.packer();
130        for (col_idx, _, _) in desc.iter_all() {
131            let d = match proto.datums.get(col_idx.to_raw()) {
132                Some(x) => x,
133                None => {
134                    packer.push(Datum::Null);
135                    continue;
136                }
137            };
138            packer.try_push_proto(d)?;
139        }
140
141        Ok(())
142    }
143
144    /// Allocate an empty `Row` with a pre-allocated capacity.
145    #[inline]
146    pub fn with_capacity(cap: usize) -> Self {
147        Self {
148            data: CompactBytes::with_capacity(cap),
149        }
150    }
151
152    /// Create an empty `Row`.
153    #[inline]
154    pub const fn empty() -> Self {
155        Self {
156            data: CompactBytes::empty(),
157        }
158    }
159
160    /// Creates a new row from supplied bytes.
161    ///
162    /// # Safety
163    ///
164    /// This method relies on `data` being an appropriate row encoding, and can
165    /// result in unsafety if this is not the case.
166    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
167        Row {
168            data: CompactBytes::new(data),
169        }
170    }
171
172    /// Constructs a [`RowPacker`] that will pack datums into this row's
173    /// allocation.
174    ///
175    /// This method clears the existing contents of the row, but retains the
176    /// allocation.
177    pub fn packer(&mut self) -> RowPacker<'_> {
178        self.clear();
179        RowPacker { row: self }
180    }
181
182    /// Take some `Datum`s and pack them into a `Row`.
183    ///
184    /// This method builds a `Row` by repeatedly increasing the backing
185    /// allocation. If the contents of the iterator are known ahead of
186    /// time, consider [`Row::with_capacity`] to right-size the allocation
187    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
188    /// This avoids the repeated allocation resizing and copying.
189    pub fn pack<'a, I, D>(iter: I) -> Row
190    where
191        I: IntoIterator<Item = D>,
192        D: Borrow<Datum<'a>>,
193    {
194        let mut row = Row::default();
195        row.packer().extend(iter);
196        row
197    }
198
199    /// Use `self` to pack `iter`, and then clone the result.
200    ///
201    /// This is a convenience method meant to reduce boilerplate around row
202    /// formation.
203    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
204    where
205        I: IntoIterator<Item = D>,
206        D: Borrow<Datum<'a>>,
207    {
208        self.packer().extend(iter);
209        self.clone()
210    }
211
212    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
213    /// error, in which case the packing operation is aborted and the error
214    /// returned.
215    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
216    where
217        I: IntoIterator<Item = Result<D, E>>,
218        D: Borrow<Datum<'a>>,
219    {
220        let mut row = Row::default();
221        row.packer().try_extend(iter)?;
222        Ok(row)
223    }
224
225    /// Pack a slice of `Datum`s into a `Row`.
226    ///
227    /// This method has the advantage over `pack` that it can determine the required
228    /// allocation before packing the elements, ensuring only one allocation and no
229    /// redundant copies required.
230    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
231        // Pre-allocate the needed number of bytes.
232        let mut row = Row::with_capacity(datums_size(slice.iter()));
233        row.packer().extend(slice.iter());
234        row
235    }
236
237    /// Returns the total amount of bytes used by this row.
238    pub fn byte_len(&self) -> usize {
239        let heap_size = if self.data.spilled() {
240            self.data.len()
241        } else {
242            0
243        };
244        let inline_size = std::mem::size_of::<Self>();
245        inline_size.saturating_add(heap_size)
246    }
247
248    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
249    pub fn data_len(&self) -> usize {
250        self.data.len()
251    }
252
253    /// Returns the total capacity in bytes used by this row.
254    pub fn byte_capacity(&self) -> usize {
255        self.data.capacity()
256    }
257
258    /// Extracts a Row slice containing the entire [`Row`].
259    #[inline]
260    pub fn as_row_ref(&self) -> &RowRef {
261        // SAFETY: `Row` contains valid row data, by construction.
262        unsafe { RowRef::from_slice(self.data.as_slice()) }
263    }
264
265    /// Clear the contents of the [`Row`], leaving any allocation in place.
266    #[inline]
267    fn clear(&mut self) {
268        self.data.clear();
269    }
270}
271
272impl Borrow<RowRef> for Row {
273    #[inline]
274    fn borrow(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl AsRef<RowRef> for Row {
280    #[inline]
281    fn as_ref(&self) -> &RowRef {
282        self.as_row_ref()
283    }
284}
285
286impl Deref for Row {
287    type Target = RowRef;
288
289    #[inline]
290    fn deref(&self) -> &Self::Target {
291        self.as_row_ref()
292    }
293}
294
295// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
296static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
297
298impl Clone for Row {
299    fn clone(&self) -> Self {
300        Row {
301            data: self.data.clone(),
302        }
303    }
304
305    fn clone_from(&mut self, source: &Self) {
306        self.data.clone_from(&source.data);
307    }
308}
309
310// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
311impl std::hash::Hash for Row {
312    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
313        self.as_row_ref().hash(state)
314    }
315}
316
317#[cfg(any(test, feature = "proptest"))]
318impl Arbitrary for Row {
319    type Parameters = prop::collection::SizeRange;
320    type Strategy = BoxedStrategy<Row>;
321
322    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
323        prop::collection::vec(arb_datum(true), size)
324            .prop_map(|items| {
325                let mut row = Row::default();
326                let mut packer = row.packer();
327                for item in items.iter() {
328                    let datum: Datum<'_> = item.into();
329                    packer.push(datum);
330                }
331                row
332            })
333            .boxed()
334    }
335}
336
337impl PartialOrd for Row {
338    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
339        Some(self.cmp(other))
340    }
341}
342
343impl Ord for Row {
344    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
345        self.as_ref().cmp(other.as_ref())
346    }
347}
348
349#[allow(missing_debug_implementations)]
350mod columnation {
351    use columnation::{Columnation, Region};
352    use mz_ore::region::LgAllocRegion;
353
354    use crate::Row;
355
356    /// Region allocation for `Row` data.
357    ///
358    /// Content bytes are stored in stable contiguous memory locations,
359    /// and then a `Row` referencing them is falsified.
360    pub struct RowStack {
361        region: LgAllocRegion<u8>,
362    }
363
364    impl RowStack {
365        const LIMIT: usize = 2 << 20;
366    }
367
368    // Implement `Default` manually to specify a region allocation limit.
369    impl Default for RowStack {
370        fn default() -> Self {
371            Self {
372                // Limit the region size to 2MiB.
373                region: LgAllocRegion::with_limit(Self::LIMIT),
374            }
375        }
376    }
377
378    impl Columnation for Row {
379        type InnerRegion = RowStack;
380    }
381
382    impl Region for RowStack {
383        type Item = Row;
384        #[inline]
385        fn clear(&mut self) {
386            self.region.clear();
387        }
388        #[inline(always)]
389        unsafe fn copy(&mut self, item: &Row) -> Row {
390            if item.data.spilled() {
391                let bytes = self.region.copy_slice(&item.data[..]);
392                Row {
393                    data: compact_bytes::CompactBytes::from_raw_parts(
394                        bytes.as_mut_ptr(),
395                        item.data.len(),
396                        item.data.capacity(),
397                    ),
398                }
399            } else {
400                item.clone()
401            }
402        }
403
404        fn reserve_items<'a, I>(&mut self, items: I)
405        where
406            Self: 'a,
407            I: Iterator<Item = &'a Self::Item> + Clone,
408        {
409            let size = items
410                .filter(|row| row.data.spilled())
411                .map(|row| row.data.len())
412                .sum();
413            let size = std::cmp::min(size, Self::LIMIT);
414            self.region.reserve(size);
415        }
416
417        fn reserve_regions<'a, I>(&mut self, regions: I)
418        where
419            Self: 'a,
420            I: Iterator<Item = &'a Self> + Clone,
421        {
422            let size = regions.map(|r| r.region.len()).sum();
423            let size = std::cmp::min(size, Self::LIMIT);
424            self.region.reserve(size);
425        }
426
427        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
428            self.region.heap_size(callback)
429        }
430    }
431}
432
433mod columnar {
434    use columnar::common::PushIndexAs;
435    use columnar::{
436        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, Index, IndexAs, Len, Push,
437    };
438    use mz_ore::cast::CastFrom;
439    use std::ops::Range;
440
441    use crate::{Row, RowRef};
442
443    #[derive(
444        Copy,
445        Clone,
446        Debug,
447        Default,
448        PartialEq,
449        serde::Serialize,
450        serde::Deserialize
451    )]
452    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
453        /// Bounds container; provides indexed access to offsets.
454        bounds: BC,
455        /// Values container; provides slice access to bytes.
456        values: VC,
457    }
458
459    impl Columnar for Row {
460        #[inline(always)]
461        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
462            self.clear();
463            self.data.extend_from_slice(other.data());
464        }
465        #[inline(always)]
466        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
467            other.to_owned()
468        }
469        type Container = Rows;
470        #[inline(always)]
471        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
472        where
473            Self: 'a,
474        {
475            thing
476        }
477    }
478
479    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
480        type Ref<'a> = &'a RowRef;
481        type Borrowed<'a>
482            = Rows<BC::Borrowed<'a>, &'a [u8]>
483        where
484            Self: 'a;
485        #[inline(always)]
486        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
487            Rows {
488                bounds: self.bounds.borrow(),
489                values: self.values.borrow(),
490            }
491        }
492        #[inline(always)]
493        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
494        where
495            Self: 'a,
496        {
497            Rows {
498                bounds: BC::reborrow(item.bounds),
499                values: item.values,
500            }
501        }
502
503        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
504        where
505            Self: 'a,
506        {
507            item
508        }
509    }
510
511    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
512        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
513            if !range.is_empty() {
514                // Imported bounds will be relative to this starting offset.
515                let values_len: u64 = self.values.len().try_into().expect("must fit");
516
517                // Push all bytes that we can, all at once.
518                let other_lower = if range.start == 0 {
519                    0
520                } else {
521                    other.bounds.index_as(range.start - 1)
522                };
523                let other_upper = other.bounds.index_as(range.end - 1);
524                self.values.extend_from_self(
525                    other.values,
526                    usize::try_from(other_lower).expect("must fit")
527                        ..usize::try_from(other_upper).expect("must fit"),
528                );
529
530                // Each bound needs to be shifted by `values_len - other_lower`.
531                if values_len == other_lower {
532                    self.bounds.extend_from_self(other.bounds, range);
533                } else {
534                    for index in range {
535                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
536                        self.bounds.push(&shifted)
537                    }
538                }
539            }
540        }
541        fn reserve_for<'a, I>(&mut self, selves: I)
542        where
543            Self: 'a,
544            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
545        {
546            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
547            self.values.reserve_for(selves.map(|r| r.values));
548        }
549    }
550
551    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
552        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
553        #[inline(always)]
554        fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
555            debug_assert!(index < Self::SLICE_COUNT);
556            if index < BC::SLICE_COUNT {
557                self.bounds.get_byte_slice(index)
558            } else {
559                self.values.get_byte_slice(index - BC::SLICE_COUNT)
560            }
561        }
562    }
563    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
564        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
565        #[inline(always)]
566        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
567            Self {
568                bounds: FromBytes::from_bytes(bytes),
569                values: FromBytes::from_bytes(bytes),
570            }
571        }
572    }
573
574    impl<BC: Len, VC> Len for Rows<BC, VC> {
575        #[inline(always)]
576        fn len(&self) -> usize {
577            self.bounds.len()
578        }
579    }
580
581    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
582        type Ref = &'a RowRef;
583        #[inline(always)]
584        fn get(&self, index: usize) -> Self::Ref {
585            let lower = if index == 0 {
586                0
587            } else {
588                self.bounds.index_as(index - 1)
589            };
590            let upper = self.bounds.index_as(index);
591            let lower = usize::cast_from(lower);
592            let upper = usize::cast_from(upper);
593            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
594            // that correspond to the original rows.
595            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
596        }
597    }
598    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
599        type Ref = &'a RowRef;
600        #[inline(always)]
601        fn get(&self, index: usize) -> Self::Ref {
602            let lower = if index == 0 {
603                0
604            } else {
605                self.bounds.index_as(index - 1)
606            };
607            let upper = self.bounds.index_as(index);
608            let lower = usize::cast_from(lower);
609            let upper = usize::cast_from(upper);
610            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
611            // that correspond to the original rows.
612            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
613        }
614    }
615
616    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
617        #[inline(always)]
618        fn push(&mut self, item: &Row) {
619            self.values.extend_from_slice(item.data.as_slice());
620            self.bounds.push(u64::cast_from(self.values.len()));
621        }
622    }
623    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
624        #[inline(always)]
625        fn push(&mut self, item: &RowRef) {
626            self.values.extend_from_slice(item.data());
627            self.bounds.push(&u64::cast_from(self.values.len()));
628        }
629    }
630    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
631        #[inline(always)]
632        fn clear(&mut self) {
633            self.bounds.clear();
634            self.values.clear();
635        }
636    }
637}
638
639/// A contiguous slice of bytes that are row data.
640///
641/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
642#[derive(PartialEq, Eq, Hash)]
643#[repr(transparent)]
644pub struct RowRef([u8]);
645
646impl RowRef {
647    /// Create a [`RowRef`] from a slice of data.
648    ///
649    /// # Safety
650    ///
651    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
652    /// ensure this.
653    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
654        #[allow(clippy::as_conversions)]
655        let ptr = row as *const [u8] as *const RowRef;
656        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
657        unsafe { &*ptr }
658    }
659
660    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
661    pub fn unpack(&self) -> Vec<Datum<'_>> {
662        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
663        let len = self.iter().count();
664        let mut vec = Vec::with_capacity(len);
665        vec.extend(self.iter());
666        vec
667    }
668
669    /// Return the first [`Datum`] in `self`
670    ///
671    /// Panics if the [`RowRef`] is empty.
672    pub fn unpack_first(&self) -> Datum<'_> {
673        self.iter().next().unwrap()
674    }
675
676    /// Iterate the [`Datum`] elements of the [`RowRef`].
677    pub fn iter(&self) -> DatumListIter<'_> {
678        DatumListIter { data: &self.0 }
679    }
680
681    /// Return the byte length of this [`RowRef`].
682    pub fn byte_len(&self) -> usize {
683        self.0.len()
684    }
685
686    /// For debugging only.
687    pub fn data(&self) -> &[u8] {
688        &self.0
689    }
690
691    /// True iff there is no data in this [`RowRef`].
692    pub fn is_empty(&self) -> bool {
693        self.0.is_empty()
694    }
695}
696
697impl ToOwned for RowRef {
698    type Owned = Row;
699
700    fn to_owned(&self) -> Self::Owned {
701        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
702        unsafe { Row::from_bytes_unchecked(&self.0) }
703    }
704}
705
706impl<'a> IntoIterator for &'a RowRef {
707    type Item = Datum<'a>;
708    type IntoIter = DatumListIter<'a>;
709
710    fn into_iter(self) -> DatumListIter<'a> {
711        DatumListIter { data: &self.0 }
712    }
713}
714
715/// These implementations order first by length, and then by slice contents.
716/// This allows many comparisons to complete without dereferencing memory.
717/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
718impl PartialOrd for RowRef {
719    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
720        Some(self.cmp(other))
721    }
722}
723
724impl Ord for RowRef {
725    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
726        match self.0.len().cmp(&other.0.len()) {
727            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
728            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
729            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
730        }
731    }
732}
733
734impl fmt::Debug for RowRef {
735    /// Debug representation using the internal datums
736    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
737        f.write_str("RowRef{")?;
738        f.debug_list().entries(&*self).finish()?;
739        f.write_str("}")
740    }
741}
742
743/// Packs datums into a [`Row`].
744///
745/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
746/// row. A packing operation always starts from scratch: the existing contents
747/// of the underlying row are cleared.
748///
749/// To complete a packing operation, drop the `RowPacker`.
750#[derive(Debug)]
751pub struct RowPacker<'a> {
752    row: &'a mut Row,
753}
754
755/// Infallible conversion from a [`Datum`] to a typed value.
756///
757/// Used by [`DatumList::typed_iter`] to yield elements as `T` rather than
758/// raw `Datum`s. At runtime, `T` is always `Datum<'a>`, so the conversion
759/// is identity.
760///
761/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
762/// behind the generic type parameter and type erasure.
763///
764/// This trait is sealed and cannot be implemented outside of this crate.
765pub trait FromDatum<'a>:
766    Sized + PartialEq + std::borrow::Borrow<Datum<'a>> + sealed::Sealed
767{
768    fn from_datum(datum: Datum<'a>) -> Self;
769}
770
771mod sealed {
772    use crate::Datum;
773
774    pub trait Sealed {}
775    impl<'a> Sealed for Datum<'a> {}
776}
777
778impl<'a> FromDatum<'a> for Datum<'a> {
779    #[inline]
780    fn from_datum(datum: Datum<'a>) -> Self {
781        datum
782    }
783}
784
785#[derive(Debug, Clone)]
786pub struct DatumListIter<'a> {
787    data: &'a [u8],
788}
789
790#[derive(Debug, Clone)]
791pub struct DatumListTypedIter<'a, T> {
792    inner: DatumListIter<'a>,
793    _phantom: PhantomData<fn() -> T>,
794}
795
796#[derive(Debug, Clone)]
797pub struct DatumDictIter<'a> {
798    data: &'a [u8],
799    prev_key: Option<&'a str>,
800}
801
802#[derive(Debug, Clone)]
803pub struct DatumDictTypedIter<'a, T> {
804    inner: DatumDictIter<'a>,
805    _phantom: PhantomData<fn() -> T>,
806}
807
808/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
809#[derive(Debug)]
810pub struct RowArena {
811    // A stack of byte regions, used as a bump allocator. Bytes handed to
812    // `push_bytes` are *copied* into the active (last) region and a reference
813    // into that region is returned.
814    //
815    // The invariant that keeps returned references valid for the arena's
816    // lifetime is that a region is never reallocated once it holds data: when
817    // the active region lacks spare capacity for a push we allocate a *new*,
818    // larger region rather than growing the current one (which would move its
819    // bytes and dangle outstanding references). The outer `Vec` may itself
820    // reallocate as regions are added, but that only moves the `Vec<u8>`
821    // headers, not the heap buffers they own, so references remain valid.
822    //
823    // `clear` retains only the largest region (emptied) to right-size the arena
824    // for reuse; reusing one region across `clear` cycles makes a steady-state
825    // workload (e.g. decoding rows one at a time) allocation-free.
826    inner: RefCell<Vec<Vec<u8>>>,
827    // A single recycled scratch buffer backing `RowArena::writer`. A writer takes ownership of this
828    // buffer (or allocates a fresh one if absent), builds into it, and on drop returns it here for
829    // the next writer to reuse — so building values incrementally does not allocate per use once the
830    // buffer reaches its high-water mark. Holding `Option` (rather than the buffer directly) means
831    // `writer` borrows this cell only transiently, to take and return the buffer, never across the
832    // writer's lifetime. That keeps nested writers sound: a writer obtained while another is live
833    // finds the slot empty and allocates its own buffer instead of double-borrowing.
834    scratch: RefCell<Option<Vec<u8>>>,
835}
836
837// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
838
839/// A sequence of Datums
840///
841/// The type parameter `T` represents the element type of the list. It is a
842/// phantom parameter that carries no runtime data — the actual elements are
843/// stored as serialized bytes and `T` is not enforced at runtime. It is up
844/// to the caller to ensure `T` matches the actual element type. The default
845/// `T = Datum<'a>` means existing code that writes `DatumList<'a>` continues
846/// to work unchanged.
847///
848/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
849/// behind the generic type parameter.
850pub struct DatumList<'a, T = Datum<'a>> {
851    /// Points at the serialized datums
852    data: &'a [u8],
853    _phantom: PhantomData<fn() -> T>,
854}
855
856impl<'a, T> DatumList<'a, T> {
857    /// Private constructor. All `DatumList` values should be created through
858    /// this function to keep the `PhantomData` bookkeeping in one place.
859    pub(crate) fn new(data: &'a [u8]) -> Self {
860        DatumList {
861            data,
862            _phantom: PhantomData,
863        }
864    }
865}
866
867impl<'a, T> Clone for DatumList<'a, T> {
868    fn clone(&self) -> Self {
869        *self
870    }
871}
872
873impl<'a, T> Copy for DatumList<'a, T> {}
874
875impl<'a, T> Debug for DatumList<'a, T> {
876    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877        f.debug_list().entries(self.iter()).finish()
878    }
879}
880
881impl<'a, T> PartialEq for DatumList<'a, T> {
882    #[inline(always)]
883    fn eq(&self, other: &DatumList<'a, T>) -> bool {
884        self.iter().eq(other.iter())
885    }
886}
887
888impl<'a, T> Eq for DatumList<'a, T> {}
889
890impl<'a, T> Hash for DatumList<'a, T> {
891    #[inline(always)]
892    fn hash<H: Hasher>(&self, state: &mut H) {
893        for d in self.iter() {
894            d.hash(state);
895        }
896    }
897}
898
899impl<T> Ord for DatumList<'_, T> {
900    #[inline(always)]
901    fn cmp(&self, other: &DatumList<'_, T>) -> Ordering {
902        self.iter().cmp(other.iter())
903    }
904}
905
906impl<T> PartialOrd for DatumList<'_, T> {
907    #[inline(always)]
908    fn partial_cmp(&self, other: &DatumList<'_, T>) -> Option<Ordering> {
909        Some(self.cmp(other))
910    }
911}
912
913/// A mapping from string keys to Datums
914///
915/// The type parameter `T` represents the value type of the map. It is a
916/// phantom parameter — the actual values are stored as serialized bytes and
917/// `T` is not enforced at runtime. It is up to the caller to ensure `T`
918/// matches the actual value type. The default `T = Datum<'a>` means existing
919/// code that writes `DatumMap<'a>` continues to work unchanged.
920///
921/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
922/// behind the generic type parameter.
923pub struct DatumMap<'a, T = Datum<'a>> {
924    /// Points at the serialized datums, which should be sorted in key order
925    data: &'a [u8],
926    _phantom: PhantomData<fn() -> T>,
927}
928
929impl<'a, T> DatumMap<'a, T> {
930    /// Private constructor. All `DatumMap` values should be created through
931    /// this function to keep the `PhantomData` bookkeeping in one place.
932    pub(crate) fn new(data: &'a [u8]) -> Self {
933        DatumMap {
934            data,
935            _phantom: PhantomData,
936        }
937    }
938}
939
940impl<'a, T> Clone for DatumMap<'a, T> {
941    fn clone(&self) -> Self {
942        *self
943    }
944}
945
946impl<'a, T> Copy for DatumMap<'a, T> {}
947
948impl<'a, T> PartialEq for DatumMap<'a, T> {
949    #[inline(always)]
950    fn eq(&self, other: &DatumMap<'a, T>) -> bool {
951        self.iter().eq(other.iter())
952    }
953}
954
955impl<'a, T> Eq for DatumMap<'a, T> {}
956
957impl<'a, T> Hash for DatumMap<'a, T> {
958    #[inline(always)]
959    fn hash<H: Hasher>(&self, state: &mut H) {
960        for (k, v) in self.iter() {
961            k.hash(state);
962            v.hash(state);
963        }
964    }
965}
966
967impl<'a, T> Ord for DatumMap<'a, T> {
968    #[inline(always)]
969    fn cmp(&self, other: &DatumMap<'a, T>) -> Ordering {
970        self.iter().cmp(other.iter())
971    }
972}
973
974impl<'a, T> PartialOrd for DatumMap<'a, T> {
975    #[inline(always)]
976    fn partial_cmp(&self, other: &DatumMap<'a, T>) -> Option<Ordering> {
977        Some(self.cmp(other))
978    }
979}
980
981impl<'a> crate::scalar::SqlContainerType for DatumList<'a, Datum<'a>> {
982    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
983        container.unwrap_list_element_type()
984    }
985    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
986        SqlScalarType::List {
987            element_type: Box::new(element),
988            custom_id: None,
989        }
990    }
991}
992
993impl<'a> crate::scalar::SqlContainerType for DatumMap<'a, Datum<'a>> {
994    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
995        container.unwrap_map_value_type()
996    }
997    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
998        SqlScalarType::Map {
999            value_type: Box::new(element),
1000            custom_id: None,
1001        }
1002    }
1003}
1004
1005/// Represents a single `Datum`, appropriate to be nested inside other
1006/// `Datum`s.
1007#[derive(Clone, Copy, Eq, PartialEq, Hash)]
1008pub struct DatumNested<'a> {
1009    val: &'a [u8],
1010}
1011
1012impl<'a> std::fmt::Display for DatumNested<'a> {
1013    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1014        std::fmt::Display::fmt(&self.datum(), f)
1015    }
1016}
1017
1018impl<'a> std::fmt::Debug for DatumNested<'a> {
1019    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1020        f.debug_struct("DatumNested")
1021            .field("val", &self.datum())
1022            .finish()
1023    }
1024}
1025
1026impl<'a> DatumNested<'a> {
1027    // Figure out which bytes `read_datum` returns (e.g. including the tag),
1028    // and then store a reference to those bytes, so we can "replay" this same
1029    // call later on without storing the datum itself.
1030    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
1031        let prev = *data;
1032        let _ = unsafe { read_datum(data) };
1033        DatumNested {
1034            val: &prev[..(prev.len() - data.len())],
1035        }
1036    }
1037
1038    /// Returns the datum `self` contains.
1039    pub fn datum(&self) -> Datum<'a> {
1040        let mut temp = self.val;
1041        unsafe { read_datum(&mut temp) }
1042    }
1043}
1044
1045impl<'a> Ord for DatumNested<'a> {
1046    fn cmp(&self, other: &Self) -> Ordering {
1047        self.datum().cmp(&other.datum())
1048    }
1049}
1050
1051impl<'a> PartialOrd for DatumNested<'a> {
1052    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1053        Some(self.cmp(other))
1054    }
1055}
1056
1057// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
1058// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
1059// it's annoying when they change.
1060#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
1061#[repr(u8)]
1062enum Tag {
1063    Null,
1064    False,
1065    True,
1066    Int16,
1067    Int32,
1068    Int64,
1069    UInt8,
1070    UInt32,
1071    Float32,
1072    Float64,
1073    Date,
1074    Time,
1075    Timestamp,
1076    TimestampTz,
1077    Interval,
1078    BytesTiny,
1079    BytesShort,
1080    BytesLong,
1081    BytesHuge,
1082    StringTiny,
1083    StringShort,
1084    StringLong,
1085    StringHuge,
1086    Uuid,
1087    Array,
1088    ListTiny,
1089    ListShort,
1090    ListLong,
1091    ListHuge,
1092    Dict,
1093    JsonNull,
1094    Dummy,
1095    Numeric,
1096    UInt16,
1097    UInt64,
1098    MzTimestamp,
1099    Range,
1100    MzAclItem,
1101    AclItem,
1102    // Everything except leap seconds and times beyond the range of
1103    // i64 nanoseconds. (Note that Materialize does not support leap
1104    // seconds, but this module does).
1105    CheapTimestamp,
1106    // Everything except leap seconds and times beyond the range of
1107    // i64 nanoseconds. (Note that Materialize does not support leap
1108    // seconds, but this module does).
1109    CheapTimestampTz,
1110    // The next several tags are for variable-length signed integer encoding.
1111    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
1112    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
1113    // NegativeIntN_K with negative values.
1114    //
1115    // The order of these tags matters, because we want to be able to choose the
1116    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
1117    // stack of `if` statements.
1118    //
1119    // Separate tags for non-negative and negative numbers are used to avoid having to
1120    // waste one bit in the actual data space to encode the sign.
1121    NonNegativeInt16_0, // i.e., 0
1122    NonNegativeInt16_8,
1123    NonNegativeInt16_16,
1124
1125    NonNegativeInt32_0,
1126    NonNegativeInt32_8,
1127    NonNegativeInt32_16,
1128    NonNegativeInt32_24,
1129    NonNegativeInt32_32,
1130
1131    NonNegativeInt64_0,
1132    NonNegativeInt64_8,
1133    NonNegativeInt64_16,
1134    NonNegativeInt64_24,
1135    NonNegativeInt64_32,
1136    NonNegativeInt64_40,
1137    NonNegativeInt64_48,
1138    NonNegativeInt64_56,
1139    NonNegativeInt64_64,
1140
1141    NegativeInt16_0, // i.e., -1
1142    NegativeInt16_8,
1143    NegativeInt16_16,
1144
1145    NegativeInt32_0,
1146    NegativeInt32_8,
1147    NegativeInt32_16,
1148    NegativeInt32_24,
1149    NegativeInt32_32,
1150
1151    NegativeInt64_0,
1152    NegativeInt64_8,
1153    NegativeInt64_16,
1154    NegativeInt64_24,
1155    NegativeInt64_32,
1156    NegativeInt64_40,
1157    NegativeInt64_48,
1158    NegativeInt64_56,
1159    NegativeInt64_64,
1160
1161    // These are like the ones above, but for unsigned types. The
1162    // situation is slightly simpler as we don't have negatives.
1163    UInt8_0, // i.e., 0
1164    UInt8_8,
1165
1166    UInt16_0,
1167    UInt16_8,
1168    UInt16_16,
1169
1170    UInt32_0,
1171    UInt32_8,
1172    UInt32_16,
1173    UInt32_24,
1174    UInt32_32,
1175
1176    UInt64_0,
1177    UInt64_8,
1178    UInt64_16,
1179    UInt64_24,
1180    UInt64_32,
1181    UInt64_40,
1182    UInt64_48,
1183    UInt64_56,
1184    UInt64_64,
1185}
1186
1187impl Tag {
1188    fn actual_int_length(self) -> Option<usize> {
1189        use Tag::*;
1190        let val = match self {
1191            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1192            | UInt32_0 | UInt64_0 => 0,
1193            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1194            | UInt32_8 | UInt64_8 => 1,
1195            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1196            | UInt32_16 | UInt64_16 => 2,
1197            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1198            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1199            NonNegativeInt64_40 | UInt64_40 => 5,
1200            NonNegativeInt64_48 | UInt64_48 => 6,
1201            NonNegativeInt64_56 | UInt64_56 => 7,
1202            NonNegativeInt64_64 | UInt64_64 => 8,
1203            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1204            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1205            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1206            NegativeInt32_24 | NegativeInt64_24 => 3,
1207            NegativeInt32_32 | NegativeInt64_32 => 4,
1208            NegativeInt64_40 => 5,
1209            NegativeInt64_48 => 6,
1210            NegativeInt64_56 => 7,
1211            NegativeInt64_64 => 8,
1212
1213            _ => return None,
1214        };
1215        Some(val)
1216    }
1217}
1218
1219// --------------------------------------------------------------------------------
1220// reading data
1221
1222/// Read a byte slice starting at byte `offset`.
1223///
1224/// Updates `offset` to point to the first byte after the end of the read region.
1225fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1226    let len = u64::from_le_bytes(read_byte_array(data));
1227    let len = usize::cast_from(len);
1228    let (bytes, next) = data.split_at(len);
1229    *data = next;
1230    bytes
1231}
1232
1233/// Read a data whose length is encoded in the row before its contents.
1234///
1235/// Updates `offset` to point to the first byte after the end of the read region.
1236///
1237/// # Safety
1238///
1239/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1240/// and it was only written with a `String` tag if it was indeed UTF-8.
1241unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1242    let len = match tag {
1243        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1244        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1245            usize::from(u16::from_le_bytes(read_byte_array(data)))
1246        }
1247        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1248            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1249        }
1250        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1251            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1252        }
1253        _ => unreachable!(),
1254    };
1255    let (bytes, next) = data.split_at(len);
1256    *data = next;
1257    match tag {
1258        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1259        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1260            Datum::String(str::from_utf8_unchecked(bytes))
1261        }
1262        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1263            Datum::List(DatumList::new(bytes))
1264        }
1265        _ => unreachable!(),
1266    }
1267}
1268
1269fn read_byte(data: &mut &[u8]) -> u8 {
1270    let byte = data[0];
1271    *data = &data[1..];
1272    byte
1273}
1274
1275/// Read `length` bytes from `data` at `offset`, updating the
1276/// latter. Extend the resulting buffer to an array of `N` bytes by
1277/// inserting `FILL` in the k most significant bytes, where k = N - length.
1278///
1279/// SAFETY:
1280///   * length <= N
1281///   * offset + length <= data.len()
1282fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1283    data: &mut &[u8],
1284    length: usize,
1285) -> [u8; N] {
1286    let mut raw = [FILL; N];
1287    let (prev, next) = data.split_at(length);
1288    (raw[..prev.len()]).copy_from_slice(prev);
1289    *data = next;
1290    raw
1291}
1292/// Read `length` bytes from `data` at `offset`, updating the
1293/// latter. Extend the resulting buffer to a negative `N`-byte
1294/// twos complement integer by filling the remaining bits with 1.
1295///
1296/// SAFETY:
1297///   * length <= N
1298///   * offset + length <= data.len()
1299fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1300    read_byte_array_sign_extending::<N, 255>(data, length)
1301}
1302
1303/// Read `length` bytes from `data` at `offset`, updating the
1304/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1305/// twos complement integer by filling the remaining bits with 0.
1306///
1307/// SAFETY:
1308///   * length <= N
1309///   * offset + length <= data.len()
1310fn read_byte_array_extending_nonnegative<const N: usize>(
1311    data: &mut &[u8],
1312    length: usize,
1313) -> [u8; N] {
1314    read_byte_array_sign_extending::<N, 0>(data, length)
1315}
1316
1317pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1318    let (prev, next) = data.split_first_chunk().unwrap();
1319    *data = next;
1320    *prev
1321}
1322
1323pub(super) fn read_date(data: &mut &[u8]) -> Date {
1324    let days = i32::from_le_bytes(read_byte_array(data));
1325    Date::from_pg_epoch(days).expect("unexpected date")
1326}
1327
1328pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1329    let year = i32::from_le_bytes(read_byte_array(data));
1330    let ordinal = u32::from_le_bytes(read_byte_array(data));
1331    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1332}
1333
1334pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1335    let secs = u32::from_le_bytes(read_byte_array(data));
1336    let nanos = u32::from_le_bytes(read_byte_array(data));
1337    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1338}
1339
1340/// Read a datum starting at byte `offset`.
1341///
1342/// Updates `offset` to point to the first byte after the end of the read region.
1343///
1344/// # Safety
1345///
1346/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1347/// Otherwise it could return invalid values, which is Undefined Behavior.
1348pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1349    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1350    match tag {
1351        Tag::Null => Datum::Null,
1352        Tag::False => Datum::False,
1353        Tag::True => Datum::True,
1354        Tag::UInt8_0 | Tag::UInt8_8 => {
1355            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1356                data,
1357                tag.actual_int_length()
1358                    .expect("returns a value for variable-length-encoded integer tags"),
1359            ));
1360            Datum::UInt8(i)
1361        }
1362        Tag::Int16 => {
1363            let i = i16::from_le_bytes(read_byte_array(data));
1364            Datum::Int16(i)
1365        }
1366        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1367            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1368            // and `data` is big enough because it was encoded validly. These assumptions
1369            // are checked in debug asserts.
1370            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1371                data,
1372                tag.actual_int_length()
1373                    .expect("returns a value for variable-length-encoded integer tags"),
1374            ));
1375            Datum::Int16(i)
1376        }
1377        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1378            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1379                data,
1380                tag.actual_int_length()
1381                    .expect("returns a value for variable-length-encoded integer tags"),
1382            ));
1383            Datum::UInt16(i)
1384        }
1385        Tag::Int32 => {
1386            let i = i32::from_le_bytes(read_byte_array(data));
1387            Datum::Int32(i)
1388        }
1389        Tag::NonNegativeInt32_0
1390        | Tag::NonNegativeInt32_32
1391        | Tag::NonNegativeInt32_8
1392        | Tag::NonNegativeInt32_16
1393        | Tag::NonNegativeInt32_24 => {
1394            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1395            // and `data` is big enough because it was encoded validly. These assumptions
1396            // are checked in debug asserts.
1397            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1398                data,
1399                tag.actual_int_length()
1400                    .expect("returns a value for variable-length-encoded integer tags"),
1401            ));
1402            Datum::Int32(i)
1403        }
1404        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1405            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1406                data,
1407                tag.actual_int_length()
1408                    .expect("returns a value for variable-length-encoded integer tags"),
1409            ));
1410            Datum::UInt32(i)
1411        }
1412        Tag::Int64 => {
1413            let i = i64::from_le_bytes(read_byte_array(data));
1414            Datum::Int64(i)
1415        }
1416        Tag::NonNegativeInt64_0
1417        | Tag::NonNegativeInt64_64
1418        | Tag::NonNegativeInt64_8
1419        | Tag::NonNegativeInt64_16
1420        | Tag::NonNegativeInt64_24
1421        | Tag::NonNegativeInt64_32
1422        | Tag::NonNegativeInt64_40
1423        | Tag::NonNegativeInt64_48
1424        | Tag::NonNegativeInt64_56 => {
1425            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1426            // and `data` is big enough because it was encoded validly. These assumptions
1427            // are checked in debug asserts.
1428
1429            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1430                data,
1431                tag.actual_int_length()
1432                    .expect("returns a value for variable-length-encoded integer tags"),
1433            ));
1434            Datum::Int64(i)
1435        }
1436        Tag::UInt64_0
1437        | Tag::UInt64_8
1438        | Tag::UInt64_16
1439        | Tag::UInt64_24
1440        | Tag::UInt64_32
1441        | Tag::UInt64_40
1442        | Tag::UInt64_48
1443        | Tag::UInt64_56
1444        | Tag::UInt64_64 => {
1445            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1446                data,
1447                tag.actual_int_length()
1448                    .expect("returns a value for variable-length-encoded integer tags"),
1449            ));
1450            Datum::UInt64(i)
1451        }
1452        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1453            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1454            // and `data` is big enough because it was encoded validly. These assumptions
1455            // are checked in debug asserts.
1456            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1457                data,
1458                tag.actual_int_length()
1459                    .expect("returns a value for variable-length-encoded integer tags"),
1460            ));
1461            Datum::Int16(i)
1462        }
1463        Tag::NegativeInt32_0
1464        | Tag::NegativeInt32_32
1465        | Tag::NegativeInt32_8
1466        | Tag::NegativeInt32_16
1467        | Tag::NegativeInt32_24 => {
1468            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1469            // and `data` is big enough because it was encoded validly. These assumptions
1470            // are checked in debug asserts.
1471            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1472                data,
1473                tag.actual_int_length()
1474                    .expect("returns a value for variable-length-encoded integer tags"),
1475            ));
1476            Datum::Int32(i)
1477        }
1478        Tag::NegativeInt64_0
1479        | Tag::NegativeInt64_64
1480        | Tag::NegativeInt64_8
1481        | Tag::NegativeInt64_16
1482        | Tag::NegativeInt64_24
1483        | Tag::NegativeInt64_32
1484        | Tag::NegativeInt64_40
1485        | Tag::NegativeInt64_48
1486        | Tag::NegativeInt64_56 => {
1487            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1488            // and `data` is big enough because the row was encoded validly. These assumptions
1489            // are checked in debug asserts.
1490            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1491                data,
1492                tag.actual_int_length()
1493                    .expect("returns a value for variable-length-encoded integer tags"),
1494            ));
1495            Datum::Int64(i)
1496        }
1497
1498        Tag::UInt8 => {
1499            let i = u8::from_le_bytes(read_byte_array(data));
1500            Datum::UInt8(i)
1501        }
1502        Tag::UInt16 => {
1503            let i = u16::from_le_bytes(read_byte_array(data));
1504            Datum::UInt16(i)
1505        }
1506        Tag::UInt32 => {
1507            let i = u32::from_le_bytes(read_byte_array(data));
1508            Datum::UInt32(i)
1509        }
1510        Tag::UInt64 => {
1511            let i = u64::from_le_bytes(read_byte_array(data));
1512            Datum::UInt64(i)
1513        }
1514        Tag::Float32 => {
1515            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1516            Datum::Float32(OrderedFloat::from(f))
1517        }
1518        Tag::Float64 => {
1519            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1520            Datum::Float64(OrderedFloat::from(f))
1521        }
1522        Tag::Date => Datum::Date(read_date(data)),
1523        Tag::Time => Datum::Time(read_time(data)),
1524        Tag::CheapTimestamp => {
1525            let ts = i64::from_le_bytes(read_byte_array(data));
1526            let secs = ts.div_euclid(1_000_000_000);
1527            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1528            let ndt = DateTime::from_timestamp(secs, nsecs)
1529                .expect("We only write round-trippable timestamps")
1530                .naive_utc();
1531            Datum::Timestamp(
1532                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1533            )
1534        }
1535        Tag::CheapTimestampTz => {
1536            let ts = i64::from_le_bytes(read_byte_array(data));
1537            let secs = ts.div_euclid(1_000_000_000);
1538            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1539            let dt = DateTime::from_timestamp(secs, nsecs)
1540                .expect("We only write round-trippable timestamps");
1541            Datum::TimestampTz(
1542                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1543            )
1544        }
1545        Tag::Timestamp => {
1546            let date = read_naive_date(data);
1547            let time = read_time(data);
1548            Datum::Timestamp(
1549                CheckedTimestamp::from_timestamplike(date.and_time(time))
1550                    .expect("unexpected timestamp"),
1551            )
1552        }
1553        Tag::TimestampTz => {
1554            let date = read_naive_date(data);
1555            let time = read_time(data);
1556            Datum::TimestampTz(
1557                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1558                    date.and_time(time),
1559                    Utc,
1560                ))
1561                .expect("unexpected timestamptz"),
1562            )
1563        }
1564        Tag::Interval => {
1565            let months = i32::from_le_bytes(read_byte_array(data));
1566            let days = i32::from_le_bytes(read_byte_array(data));
1567            let micros = i64::from_le_bytes(read_byte_array(data));
1568            Datum::Interval(Interval {
1569                months,
1570                days,
1571                micros,
1572            })
1573        }
1574        Tag::BytesTiny
1575        | Tag::BytesShort
1576        | Tag::BytesLong
1577        | Tag::BytesHuge
1578        | Tag::StringTiny
1579        | Tag::StringShort
1580        | Tag::StringLong
1581        | Tag::StringHuge
1582        | Tag::ListTiny
1583        | Tag::ListShort
1584        | Tag::ListLong
1585        | Tag::ListHuge => read_lengthed_datum(data, tag),
1586        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1587        Tag::Array => {
1588            // See the comment in `Row::push_array` for details on the encoding
1589            // of arrays.
1590            let ndims = read_byte(data);
1591            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1592            let (dims, next) = data.split_at(dims_size);
1593            *data = next;
1594            let bytes = read_untagged_bytes(data);
1595            Datum::Array(Array {
1596                dims: ArrayDimensions { data: dims },
1597                elements: DatumList::new(bytes),
1598            })
1599        }
1600        Tag::Dict => {
1601            let bytes = read_untagged_bytes(data);
1602            Datum::Map(DatumMap::new(bytes))
1603        }
1604        Tag::JsonNull => Datum::JsonNull,
1605        Tag::Dummy => Datum::Dummy,
1606        Tag::Numeric => {
1607            let digits = read_byte(data).into();
1608            let exponent = i8::reinterpret_cast(read_byte(data));
1609            let bits = read_byte(data);
1610
1611            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1612            let lsu_u8_len = lsu_u16_len * 2;
1613            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1614            *data = next;
1615
1616            // TODO: if we refactor the decimal library to accept the owned
1617            // array as a parameter to `from_raw_parts` below, we could likely
1618            // avoid a copy because it is exactly the value we want
1619            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1620            for (i, c) in lsu_u8.chunks(2).enumerate() {
1621                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1622            }
1623
1624            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1625            Datum::from(d)
1626        }
1627        Tag::MzTimestamp => {
1628            let t = Timestamp::decode(read_byte_array(data));
1629            Datum::MzTimestamp(t)
1630        }
1631        Tag::Range => {
1632            // See notes on `push_range_with` for details about encoding.
1633            let flag_byte = read_byte(data);
1634            let flags = range::InternalFlags::from_bits(flag_byte)
1635                .expect("range flags must be encoded validly");
1636
1637            if flags.contains(range::InternalFlags::EMPTY) {
1638                assert!(
1639                    flags == range::InternalFlags::EMPTY,
1640                    "empty ranges contain only RANGE_EMPTY flag"
1641                );
1642
1643                return Datum::Range(Range { inner: None });
1644            }
1645
1646            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1647                None
1648            } else {
1649                Some(DatumNested::extract(data))
1650            };
1651
1652            let lower = RangeBound {
1653                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1654                bound: lower_bound,
1655            };
1656
1657            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1658                None
1659            } else {
1660                Some(DatumNested::extract(data))
1661            };
1662
1663            let upper = RangeBound {
1664                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1665                bound: upper_bound,
1666            };
1667
1668            Datum::Range(Range {
1669                inner: Some(RangeInner { lower, upper }),
1670            })
1671        }
1672        Tag::MzAclItem => {
1673            const N: usize = MzAclItem::binary_size();
1674            let mz_acl_item =
1675                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1676            Datum::MzAclItem(mz_acl_item)
1677        }
1678        Tag::AclItem => {
1679            const N: usize = AclItem::binary_size();
1680            let acl_item =
1681                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1682            Datum::AclItem(acl_item)
1683        }
1684    }
1685}
1686
1687// --------------------------------------------------------------------------------
1688// writing data
1689
1690fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1691where
1692    D: Vector<u8>,
1693{
1694    let len = u64::cast_from(bytes.len());
1695    data.extend_from_slice(&len.to_le_bytes());
1696    data.extend_from_slice(bytes);
1697}
1698
1699fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1700where
1701    D: Vector<u8>,
1702{
1703    match tag {
1704        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1705            let len = bytes.len().to_le_bytes();
1706            data.push(len[0]);
1707        }
1708        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1709            let len = bytes.len().to_le_bytes();
1710            data.extend_from_slice(&len[0..2]);
1711        }
1712        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1713            let len = bytes.len().to_le_bytes();
1714            data.extend_from_slice(&len[0..4]);
1715        }
1716        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1717            let len = bytes.len().to_le_bytes();
1718            data.extend_from_slice(&len);
1719        }
1720        _ => unreachable!(),
1721    }
1722    data.extend_from_slice(bytes);
1723}
1724
1725pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1726    i32::to_le_bytes(date.pg_epoch_days())
1727}
1728
1729fn push_date<D>(data: &mut D, date: Date)
1730where
1731    D: Vector<u8>,
1732{
1733    data.extend_from_slice(&date_to_array(date));
1734}
1735
1736pub(super) fn naive_date_to_arrays(
1737    date: NaiveDate,
1738) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1739    (
1740        i32::to_le_bytes(date.year()),
1741        u32::to_le_bytes(date.ordinal()),
1742    )
1743}
1744
1745fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1746where
1747    D: Vector<u8>,
1748{
1749    let (ds1, ds2) = naive_date_to_arrays(date);
1750    data.extend_from_slice(&ds1);
1751    data.extend_from_slice(&ds2);
1752}
1753
1754pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1755    (
1756        u32::to_le_bytes(time.num_seconds_from_midnight()),
1757        u32::to_le_bytes(time.nanosecond()),
1758    )
1759}
1760
1761fn push_time<D>(data: &mut D, time: NaiveTime)
1762where
1763    D: Vector<u8>,
1764{
1765    let (ts1, ts2) = time_to_arrays(time);
1766    data.extend_from_slice(&ts1);
1767    data.extend_from_slice(&ts2);
1768}
1769
1770/// Returns an i64 representing a `NaiveDateTime`, if
1771/// said i64 can be round-tripped back to a `NaiveDateTime`.
1772///
1773/// The only exotic NDTs for which this can't happen are those that
1774/// are hundreds of years in the future or past, or those that
1775/// represent a leap second. (Note that Materialize does not support
1776/// leap seconds, but this module does).
1777// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1778// with extra checking.
1779fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1780    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1781    if subsec_nanos >= 1_000_000_000 {
1782        return None;
1783    }
1784    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1785    as_ns.checked_add(i64::from(subsec_nanos))
1786}
1787
1788// This function is extremely hot, so
1789// we just use `as` to avoid the overhead of
1790// `try_into` followed by `unwrap`.
1791// `leading_ones` and `leading_zeros`
1792// can never return values greater than 64, so the conversion is safe.
1793#[inline(always)]
1794#[allow(clippy::as_conversions)]
1795fn min_bytes_signed<T>(i: T) -> u8
1796where
1797    T: Into<i64>,
1798{
1799    let i: i64 = i.into();
1800
1801    // To fit in n bytes, we require that
1802    // everything but the leading sign bits fits in n*8
1803    // bits.
1804    let n_sign_bits = if i.is_negative() {
1805        i.leading_ones() as u8
1806    } else {
1807        i.leading_zeros() as u8
1808    };
1809
1810    (64 - n_sign_bits + 7) / 8
1811}
1812
1813// In principle we could just use `min_bytes_signed`, rather than
1814// having a separate function here, as long as we made that one take
1815// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1816// to realize that that function is the same as the current version,
1817// and generates worse code.
1818//
1819// Justification for `as` is the same as in `min_bytes_signed`.
1820#[inline(always)]
1821#[allow(clippy::as_conversions)]
1822fn min_bytes_unsigned<T>(i: T) -> u8
1823where
1824    T: Into<u64>,
1825{
1826    let i: u64 = i.into();
1827
1828    let n_sign_bits = i.leading_zeros() as u8;
1829
1830    (64 - n_sign_bits + 7) / 8
1831}
1832
1833const TINY: usize = 1 << 8;
1834const SHORT: usize = 1 << 16;
1835const LONG: usize = 1 << 32;
1836
1837fn push_datum<D>(data: &mut D, datum: Datum)
1838where
1839    D: Vector<u8>,
1840{
1841    match datum {
1842        Datum::Null => data.push(Tag::Null.into()),
1843        Datum::False => data.push(Tag::False.into()),
1844        Datum::True => data.push(Tag::True.into()),
1845        Datum::Int16(i) => {
1846            let mbs = min_bytes_signed(i);
1847            let tag = u8::from(if i.is_negative() {
1848                Tag::NegativeInt16_0
1849            } else {
1850                Tag::NonNegativeInt16_0
1851            }) + mbs;
1852
1853            data.push(tag);
1854            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1855        }
1856        Datum::Int32(i) => {
1857            let mbs = min_bytes_signed(i);
1858            let tag = u8::from(if i.is_negative() {
1859                Tag::NegativeInt32_0
1860            } else {
1861                Tag::NonNegativeInt32_0
1862            }) + mbs;
1863
1864            data.push(tag);
1865            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1866        }
1867        Datum::Int64(i) => {
1868            let mbs = min_bytes_signed(i);
1869            let tag = u8::from(if i.is_negative() {
1870                Tag::NegativeInt64_0
1871            } else {
1872                Tag::NonNegativeInt64_0
1873            }) + mbs;
1874
1875            data.push(tag);
1876            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1877        }
1878        Datum::UInt8(i) => {
1879            let mbu = min_bytes_unsigned(i);
1880            let tag = u8::from(Tag::UInt8_0) + mbu;
1881            data.push(tag);
1882            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1883        }
1884        Datum::UInt16(i) => {
1885            let mbu = min_bytes_unsigned(i);
1886            let tag = u8::from(Tag::UInt16_0) + mbu;
1887            data.push(tag);
1888            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1889        }
1890        Datum::UInt32(i) => {
1891            let mbu = min_bytes_unsigned(i);
1892            let tag = u8::from(Tag::UInt32_0) + mbu;
1893            data.push(tag);
1894            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1895        }
1896        Datum::UInt64(i) => {
1897            let mbu = min_bytes_unsigned(i);
1898            let tag = u8::from(Tag::UInt64_0) + mbu;
1899            data.push(tag);
1900            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1901        }
1902        Datum::Float32(f) => {
1903            data.push(Tag::Float32.into());
1904            data.extend_from_slice(&f.to_bits().to_le_bytes());
1905        }
1906        Datum::Float64(f) => {
1907            data.push(Tag::Float64.into());
1908            data.extend_from_slice(&f.to_bits().to_le_bytes());
1909        }
1910        Datum::Date(d) => {
1911            data.push(Tag::Date.into());
1912            push_date(data, d);
1913        }
1914        Datum::Time(t) => {
1915            data.push(Tag::Time.into());
1916            push_time(data, t);
1917        }
1918        Datum::Timestamp(t) => {
1919            let datetime = t.to_naive();
1920            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1921                data.push(Tag::CheapTimestamp.into());
1922                data.extend_from_slice(&nanos.to_le_bytes());
1923            } else {
1924                data.push(Tag::Timestamp.into());
1925                push_naive_date(data, datetime.date());
1926                push_time(data, datetime.time());
1927            }
1928        }
1929        Datum::TimestampTz(t) => {
1930            let datetime = t.to_naive();
1931            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1932                data.push(Tag::CheapTimestampTz.into());
1933                data.extend_from_slice(&nanos.to_le_bytes());
1934            } else {
1935                data.push(Tag::TimestampTz.into());
1936                push_naive_date(data, datetime.date());
1937                push_time(data, datetime.time());
1938            }
1939        }
1940        Datum::Interval(i) => {
1941            data.push(Tag::Interval.into());
1942            data.extend_from_slice(&i.months.to_le_bytes());
1943            data.extend_from_slice(&i.days.to_le_bytes());
1944            data.extend_from_slice(&i.micros.to_le_bytes());
1945        }
1946        Datum::Bytes(bytes) => {
1947            let tag = match bytes.len() {
1948                0..TINY => Tag::BytesTiny,
1949                TINY..SHORT => Tag::BytesShort,
1950                SHORT..LONG => Tag::BytesLong,
1951                _ => Tag::BytesHuge,
1952            };
1953            data.push(tag.into());
1954            push_lengthed_bytes(data, bytes, tag);
1955        }
1956        Datum::String(string) => {
1957            let tag = match string.len() {
1958                0..TINY => Tag::StringTiny,
1959                TINY..SHORT => Tag::StringShort,
1960                SHORT..LONG => Tag::StringLong,
1961                _ => Tag::StringHuge,
1962            };
1963            data.push(tag.into());
1964            push_lengthed_bytes(data, string.as_bytes(), tag);
1965        }
1966        Datum::List(list) => {
1967            let tag = match list.data.len() {
1968                0..TINY => Tag::ListTiny,
1969                TINY..SHORT => Tag::ListShort,
1970                SHORT..LONG => Tag::ListLong,
1971                _ => Tag::ListHuge,
1972            };
1973            data.push(tag.into());
1974            push_lengthed_bytes(data, list.data, tag);
1975        }
1976        Datum::Uuid(u) => {
1977            data.push(Tag::Uuid.into());
1978            data.extend_from_slice(u.as_bytes());
1979        }
1980        Datum::Array(array) => {
1981            // See the comment in `Row::push_array` for details on the encoding
1982            // of arrays.
1983            data.push(Tag::Array.into());
1984            data.push(array.dims.ndims());
1985            data.extend_from_slice(array.dims.data);
1986            push_untagged_bytes(data, array.elements.data);
1987        }
1988        Datum::Map(dict) => {
1989            data.push(Tag::Dict.into());
1990            push_untagged_bytes(data, dict.data);
1991        }
1992        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1993        Datum::MzTimestamp(t) => {
1994            data.push(Tag::MzTimestamp.into());
1995            data.extend_from_slice(&t.encode());
1996        }
1997        Datum::Dummy => data.push(Tag::Dummy.into()),
1998        Datum::Numeric(mut n) => {
1999            // Pseudo-canonical representation of decimal values with
2000            // insignificant zeroes trimmed. This compresses the number further
2001            // than `Numeric::trim` by removing all zeroes, and not only those in
2002            // the fractional component.
2003            numeric::cx_datum().reduce(&mut n.0);
2004            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
2005            data.push(Tag::Numeric.into());
2006            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
2007            data.push(
2008                i8::try_from(exponent)
2009                    .expect("exponent to fit within i8; should not exceed +/- 39")
2010                    .to_le_bytes()[0],
2011            );
2012            data.push(bits);
2013
2014            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
2015
2016            // Little endian machines can take the lsu directly from u16 to u8.
2017            if cfg!(target_endian = "little") {
2018                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
2019                // each element can safely be transmuted into two `u8`s.
2020                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
2021                // The `u8` aligned version of the `lsu` should have twice as many
2022                // elements as we expect for the `u16` version.
2023                soft_assert_no_log!(
2024                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
2025                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
2026                    Numeric::digits_to_lsu_elements_len(digits) * 2,
2027                    lsu_bytes.len()
2028                );
2029                // There should be no unaligned elements in the prefix or suffix.
2030                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
2031                data.extend_from_slice(lsu_bytes);
2032            } else {
2033                for u in lsu {
2034                    data.extend_from_slice(&u.to_le_bytes());
2035                }
2036            }
2037        }
2038        Datum::Range(range) => {
2039            // See notes on `push_range_with` for details about encoding.
2040            data.push(Tag::Range.into());
2041            data.push(range.internal_flag_bits());
2042
2043            if let Some(RangeInner { lower, upper }) = range.inner {
2044                for bound in [lower.bound, upper.bound] {
2045                    if let Some(bound) = bound {
2046                        match bound.datum() {
2047                            Datum::Null => panic!("cannot push Datum::Null into range"),
2048                            d => push_datum::<D>(data, d),
2049                        }
2050                    }
2051                }
2052            }
2053        }
2054        Datum::MzAclItem(mz_acl_item) => {
2055            data.push(Tag::MzAclItem.into());
2056            data.extend_from_slice(&mz_acl_item.encode_binary());
2057        }
2058        Datum::AclItem(acl_item) => {
2059            data.push(Tag::AclItem.into());
2060            data.extend_from_slice(&acl_item.encode_binary());
2061        }
2062    }
2063}
2064
2065/// Return the number of bytes these Datums would use if packed as a Row.
2066pub fn row_size<'a, I>(a: I) -> usize
2067where
2068    I: IntoIterator<Item = Datum<'a>>,
2069{
2070    // Using datums_size instead of a.data().len() here is safer because it will
2071    // return the size of the datums if they were packed into a Row. Although
2072    // a.data().len() happens to give the correct answer (and is faster), data()
2073    // is documented as for debugging only.
2074    let sz = datums_size::<_, _>(a);
2075    let size_of_row = std::mem::size_of::<Row>();
2076    // The Row struct attempts to inline data until it can't fit in the
2077    // preallocated size. Otherwise it spills to heap, and uses the Row to point
2078    // to that.
2079    if sz > Row::SIZE {
2080        sz + size_of_row
2081    } else {
2082        size_of_row
2083    }
2084}
2085
2086/// Number of bytes required by the datum.
2087/// This is used to optimistically pre-allocate buffers for packing rows.
2088pub fn datum_size(datum: &Datum) -> usize {
2089    match datum {
2090        Datum::Null => 1,
2091        Datum::False => 1,
2092        Datum::True => 1,
2093        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
2094        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
2095        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
2096        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2097        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2098        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2099        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2100        Datum::Float32(_) => 1 + size_of::<f32>(),
2101        Datum::Float64(_) => 1 + size_of::<f64>(),
2102        Datum::Date(_) => 1 + size_of::<i32>(),
2103        Datum::Time(_) => 1 + 8,
2104        Datum::Timestamp(t) => {
2105            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
2106                8
2107            } else {
2108                16
2109            }
2110        }
2111        Datum::TimestampTz(t) => {
2112            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
2113                8
2114            } else {
2115                16
2116            }
2117        }
2118        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
2119        Datum::Bytes(bytes) => {
2120            // We use a variable length representation of slice length.
2121            let bytes_for_length = match bytes.len() {
2122                0..TINY => 1,
2123                TINY..SHORT => 2,
2124                SHORT..LONG => 4,
2125                _ => 8,
2126            };
2127            1 + bytes_for_length + bytes.len()
2128        }
2129        Datum::String(string) => {
2130            // We use a variable length representation of slice length.
2131            let bytes_for_length = match string.len() {
2132                0..TINY => 1,
2133                TINY..SHORT => 2,
2134                SHORT..LONG => 4,
2135                _ => 8,
2136            };
2137            1 + bytes_for_length + string.len()
2138        }
2139        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
2140        Datum::Array(array) => {
2141            1 + size_of::<u8>()
2142                + array.dims.data.len()
2143                + size_of::<u64>()
2144                + array.elements.data.len()
2145        }
2146        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2147        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2148        Datum::JsonNull => 1,
2149        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2150        Datum::Dummy => 1,
2151        Datum::Numeric(d) => {
2152            let mut d = d.0.clone();
2153            // Values must be reduced to determine appropriate number of
2154            // coefficient units.
2155            numeric::cx_datum().reduce(&mut d);
2156            // 4 = 1 bit each for tag, digits, exponent, bits
2157            4 + (d.coefficient_units().len() * 2)
2158        }
2159        Datum::Range(Range { inner }) => {
2160            // Tag + flags
2161            2 + match inner {
2162                None => 0,
2163                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2164                    .iter()
2165                    .map(|bound| match bound {
2166                        None => 0,
2167                        Some(bound) => bound.val.len(),
2168                    })
2169                    .sum(),
2170            }
2171        }
2172        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2173        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2174    }
2175}
2176
2177/// Number of bytes required by a sequence of datums.
2178///
2179/// This method can be used to right-size the allocation for a `Row`
2180/// before calling [`RowPacker::extend`].
2181pub fn datums_size<'a, I, D>(iter: I) -> usize
2182where
2183    I: IntoIterator<Item = D>,
2184    D: Borrow<Datum<'a>>,
2185{
2186    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2187}
2188
2189/// Number of bytes required by a list of datums. This computes the size that would be required if
2190/// the given datums were packed into a list.
2191///
2192/// This is used to optimistically pre-allocate buffers for packing rows.
2193pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2194where
2195    I: IntoIterator<Item = D>,
2196    D: Borrow<Datum<'a>>,
2197{
2198    1 + size_of::<u64>() + datums_size(iter)
2199}
2200
2201impl RowPacker<'_> {
2202    /// Constructs a row packer that will pack additional datums into the
2203    /// provided row.
2204    ///
2205    /// This function is intentionally somewhat inconvenient to call. You
2206    /// usually want to call [`Row::packer`] instead to start packing from
2207    /// scratch.
2208    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2209        RowPacker { row }
2210    }
2211
2212    /// Extend an existing `Row` with a `Datum`.
2213    #[inline]
2214    pub fn push<'a, D>(&mut self, datum: D)
2215    where
2216        D: Borrow<Datum<'a>>,
2217    {
2218        push_datum(&mut self.row.data, *datum.borrow());
2219    }
2220
2221    /// Extend an existing `Row` with additional `Datum`s.
2222    #[inline]
2223    pub fn extend<'a, I, D>(&mut self, iter: I)
2224    where
2225        I: IntoIterator<Item = D>,
2226        D: Borrow<Datum<'a>>,
2227    {
2228        for datum in iter {
2229            push_datum(&mut self.row.data, *datum.borrow())
2230        }
2231    }
2232
2233    /// Extend an existing `Row` with additional `Datum`s.
2234    ///
2235    /// In the case the iterator produces an error, the pushing of
2236    /// datums in terminated and the error returned. The `Row` will
2237    /// be incomplete, but it will be safe to read datums from it.
2238    #[inline]
2239    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2240    where
2241        I: IntoIterator<Item = Result<D, E>>,
2242        D: Borrow<Datum<'a>>,
2243    {
2244        for datum in iter {
2245            push_datum(&mut self.row.data, *datum?.borrow());
2246        }
2247        Ok(())
2248    }
2249
2250    /// Appends the datums of an entire `Row`.
2251    pub fn extend_by_row(&mut self, row: &Row) {
2252        self.row.data.extend_from_slice(row.data.as_slice());
2253    }
2254
2255    /// Appends the datums of an entire `Row`.
2256    pub fn extend_by_row_ref(&mut self, row: &RowRef) {
2257        self.row.data.extend_from_slice(row.data());
2258    }
2259
2260    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2261    ///
2262    /// # Safety
2263    ///
2264    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2265    /// This method relies on `data` being an appropriate row encoding, and can
2266    /// result in unsafety if this is not the case.
2267    #[inline]
2268    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2269        self.row.data.extend_from_slice(data)
2270    }
2271
2272    /// Pushes a [`DatumList`] that is built from a closure.
2273    ///
2274    /// The supplied closure will be invoked once with a `Row` that can be used
2275    /// to populate the list. It is valid to call any method on the
2276    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2277    /// or [`RowPacker::truncate_datums`].
2278    ///
2279    /// Returns the value returned by the closure, if any.
2280    ///
2281    /// ```
2282    /// # use mz_repr::{Row, Datum};
2283    /// let mut row = Row::default();
2284    /// row.packer().push_list_with(|row| {
2285    ///     row.push(Datum::String("age"));
2286    ///     row.push(Datum::Int64(42));
2287    /// });
2288    /// assert_eq!(
2289    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2290    ///     vec![Datum::String("age"), Datum::Int64(42)],
2291    /// );
2292    /// ```
2293    #[inline]
2294    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2295    where
2296        F: FnOnce(&mut RowPacker) -> R,
2297    {
2298        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2299        // 1 byte. If not, we'll fix it up later.
2300        let start = self.row.data.len();
2301        self.row.data.push(Tag::ListTiny.into());
2302        // Write a dummy len, will fix it up later.
2303        self.row.data.push(0);
2304
2305        let out = f(self);
2306
2307        // The `- 1 - 1` is for the tag and the len.
2308        let len = self.row.data.len() - start - 1 - 1;
2309        // We now know the real len.
2310        if len < TINY {
2311            // If the len fits in 1 byte, we just need to fix up the len.
2312            self.row.data[start + 1] = len.to_le_bytes()[0];
2313        } else {
2314            // Note: We move this code path into its own function, so that the common case can be
2315            // inlined.
2316            long_list(&mut self.row.data, start, len);
2317        }
2318
2319        /// 1. Fix up the tag.
2320        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2321        /// 3. Fix up the len.
2322        /// `data`: The row's backing data.
2323        /// `start`: where `push_list_with` started writing in `data`.
2324        /// `len`: the length of the data, excluding the tag and the length.
2325        #[cold]
2326        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2327            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2328            // elsewhere.) The other parameters are the same as for `long_list`.
2329            let long_list_inner = |data: &mut CompactBytes, len_len| {
2330                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2331                // The `- 1` is because the old length was 1 byte.
2332                const ZEROS: [u8; 8] = [0; 8];
2333                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2334                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2335                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2336                // start after the 1-byte tag and the len_len-byte length.
2337                //
2338                // Note that this is the only operation in `long_list` whose cost is proportional
2339                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2340                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2341                // Datum than a Datum encoding in the `f` closure.
2342                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2343                // Write the new length.
2344                data[start + 1..start + 1 + len_len]
2345                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2346            };
2347            match len {
2348                0..TINY => {
2349                    unreachable!()
2350                }
2351                TINY..SHORT => {
2352                    data[start] = Tag::ListShort.into();
2353                    long_list_inner(data, 2);
2354                }
2355                SHORT..LONG => {
2356                    data[start] = Tag::ListLong.into();
2357                    long_list_inner(data, 4);
2358                }
2359                _ => {
2360                    data[start] = Tag::ListHuge.into();
2361                    long_list_inner(data, 8);
2362                }
2363            };
2364        }
2365
2366        out
2367    }
2368
2369    /// Pushes a [`DatumMap`] that is built from a closure.
2370    ///
2371    /// The supplied closure will be invoked once with a `Row` that can be used
2372    /// to populate the dict.
2373    ///
2374    /// The closure **must** alternate pushing string keys and arbitrary values,
2375    /// otherwise reading the dict will cause a panic.
2376    ///
2377    /// The closure **must** push keys in ascending order, otherwise equality
2378    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2379    /// MODE will cause a panic.
2380    ///
2381    /// The closure **must not** call [`RowPacker::clear`],
2382    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2383    ///
2384    /// # Example
2385    ///
2386    /// ```
2387    /// # use mz_repr::{Row, Datum};
2388    /// let mut row = Row::default();
2389    /// row.packer().push_dict_with(|row| {
2390    ///
2391    ///     // key
2392    ///     row.push(Datum::String("age"));
2393    ///     // value
2394    ///     row.push(Datum::Int64(42));
2395    ///
2396    ///     // key
2397    ///     row.push(Datum::String("name"));
2398    ///     // value
2399    ///     row.push(Datum::String("bob"));
2400    /// });
2401    /// assert_eq!(
2402    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2403    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2404    /// );
2405    /// ```
2406    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2407    where
2408        F: FnOnce(&mut RowPacker) -> R,
2409    {
2410        self.row.data.push(Tag::Dict.into());
2411        let start = self.row.data.len();
2412        // write a dummy len, will fix it up later
2413        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2414
2415        let res = f(self);
2416
2417        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2418        // fix up the len
2419        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2420
2421        res
2422    }
2423
2424    /// Like [`RowPacker::push_dict_with`], but accepts a fallible closure.
2425    pub fn try_push_dict_with<F, E>(&mut self, f: F) -> Result<(), E>
2426    where
2427        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2428    {
2429        self.push_dict_with(f)
2430    }
2431
2432    /// Convenience function to construct an array from an iter of `Datum`s.
2433    ///
2434    /// Returns an error if the number of elements in `iter` does not match
2435    /// the cardinality of the array as described by `dims`, or if the
2436    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2437    /// occurs, the packer's state will be unchanged.
2438    pub fn try_push_array<'a, I, D>(
2439        &mut self,
2440        dims: &[ArrayDimension],
2441        iter: I,
2442    ) -> Result<(), InvalidArrayError>
2443    where
2444        I: IntoIterator<Item = D>,
2445        D: Borrow<Datum<'a>>,
2446    {
2447        // SAFETY: The function returns the exact number of elements pushed into the array.
2448        unsafe {
2449            self.push_array_with_unchecked(dims, |packer| {
2450                let mut nelements = 0;
2451                for datum in iter {
2452                    packer.push(datum);
2453                    nelements += 1;
2454                }
2455                Ok::<_, InvalidArrayError>(nelements)
2456            })
2457        }
2458    }
2459
2460    /// Like [`RowPacker::try_push_array`], but accepts a fallible iterator of
2461    /// elements.
2462    pub fn try_push_array_fallible<'a, I, D, E>(
2463        &mut self,
2464        dims: &[ArrayDimension],
2465        iter: I,
2466    ) -> Result<Result<(), E>, InvalidArrayError>
2467    where
2468        I: IntoIterator<Item = Result<D, E>>,
2469        D: Borrow<Datum<'a>>,
2470    {
2471        enum Error<E> {
2472            Usage(InvalidArrayError),
2473            Inner(E),
2474        }
2475
2476        impl<E> From<InvalidArrayError> for Error<E> {
2477            fn from(e: InvalidArrayError) -> Self {
2478                Self::Usage(e)
2479            }
2480        }
2481
2482        // SAFETY: The function returns the exact number of elements pushed into the array.
2483        let result = unsafe {
2484            self.push_array_with_unchecked(dims, |packer| {
2485                let mut nelements = 0;
2486                for datum in iter {
2487                    packer.push(datum.map_err(Error::Inner)?);
2488                    nelements += 1;
2489                }
2490                Ok(nelements)
2491            })
2492        };
2493        match result {
2494            Ok(()) => Ok(Ok(())),
2495            Err(Error::Usage(e)) => Err(e),
2496            Err(Error::Inner(e)) => Ok(Err(e)),
2497        }
2498    }
2499
2500    /// Convenience function to construct an array from a function. The function must return the
2501    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2502    /// a number different to the number of elements it pushed.
2503    ///
2504    /// Returns an error if the number of elements pushed by `f` does not match
2505    /// the cardinality of the array as described by `dims`, or if the
2506    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2507    /// occurs, the packer's state will be unchanged.
2508    pub unsafe fn push_array_with_unchecked<F, E>(
2509        &mut self,
2510        dims: &[ArrayDimension],
2511        f: F,
2512    ) -> Result<(), E>
2513    where
2514        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2515        E: From<InvalidArrayError>,
2516    {
2517        // Arrays are encoded as follows.
2518        //
2519        // u8    ndims
2520        // u64   dim_0 lower bound
2521        // u64   dim_0 length
2522        // ...
2523        // u64   dim_n lower bound
2524        // u64   dim_n length
2525        // u64   element data size in bytes
2526        // u8    element data, where elements are encoded in row-major order
2527
2528        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2529            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2530        }
2531
2532        let start = self.row.data.len();
2533        self.row.data.push(Tag::Array.into());
2534
2535        // Write dimension information.
2536        self.row
2537            .data
2538            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2539        for dim in dims {
2540            self.row
2541                .data
2542                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2543            self.row
2544                .data
2545                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2546        }
2547
2548        // Write elements.
2549        let off = self.row.data.len();
2550        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2551        let nelements = match f(self) {
2552            Ok(nelements) => nelements,
2553            Err(e) => {
2554                self.row.data.truncate(start);
2555                return Err(e);
2556            }
2557        };
2558        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2559        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2560
2561        // Check that the number of elements written matches the dimension
2562        // information.
2563        let cardinality = match dims {
2564            [] => 0,
2565            // Saturate the product: a cardinality that overflows `usize` is
2566            // impossibly large (no array can hold that many elements), so it can
2567            // never equal the actual `nelements` and the check below rejects it as
2568            // `WrongCardinality`. A plain `product()` would panic under overflow
2569            // checks (debug/fuzz) and silently wrap in release — and a wrapped
2570            // value could even spuriously match `nelements`, accepting a corrupt
2571            // array (e.g. dims claiming `[2^32, 2^32]` wrap to 0 elements).
2572            dims => dims
2573                .iter()
2574                .map(|d| d.length)
2575                .fold(1usize, usize::saturating_mul),
2576        };
2577        if nelements != cardinality {
2578            self.row.data.truncate(start);
2579            return Err(InvalidArrayError::WrongCardinality {
2580                actual: nelements,
2581                expected: cardinality,
2582            }
2583            .into());
2584        }
2585
2586        Ok(())
2587    }
2588
2589    /// Pushes an [`Array`] that is built from a closure.
2590    ///
2591    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2592    /// should prefer [`RowPacker::try_push_array`] when possible.
2593    ///
2594    /// Returns an error if the number of elements pushed does not match
2595    /// the cardinality of the array as described by `dims`, or if the
2596    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2597    /// occurs, the packer's state will be unchanged.
2598    pub fn push_array_with_row_major<F, I>(
2599        &mut self,
2600        dims: I,
2601        f: F,
2602    ) -> Result<(), InvalidArrayError>
2603    where
2604        I: IntoIterator<Item = ArrayDimension>,
2605        F: FnOnce(&mut RowPacker) -> usize,
2606    {
2607        let start = self.row.data.len();
2608        self.row.data.push(Tag::Array.into());
2609
2610        // Write dummy dimension length for now, we'll fix it up.
2611        let dims_start = self.row.data.len();
2612        self.row.data.push(42);
2613
2614        let mut num_dims: u8 = 0;
2615        let mut cardinality: usize = 1;
2616        for dim in dims {
2617            num_dims += 1;
2618            // Saturate: an overflowing cardinality is impossibly large and is
2619            // rejected by the `nelements` check below. See the matching note in
2620            // `push_array_with_unchecked`.
2621            cardinality = cardinality.saturating_mul(dim.length);
2622
2623            self.row
2624                .data
2625                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2626            self.row
2627                .data
2628                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2629        }
2630
2631        if num_dims > MAX_ARRAY_DIMENSIONS {
2632            // Reset the packer state so we don't have invalid data.
2633            self.row.data.truncate(start);
2634            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2635        }
2636        // Fix up our dimension length.
2637        self.row.data[dims_start..dims_start + size_of::<u8>()]
2638            .copy_from_slice(&num_dims.to_le_bytes());
2639
2640        // Write elements.
2641        let off = self.row.data.len();
2642        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2643
2644        let nelements = f(self);
2645
2646        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2647        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2648
2649        // Check that the number of elements written matches the dimension
2650        // information.
2651        let cardinality = match num_dims {
2652            0 => 0,
2653            _ => cardinality,
2654        };
2655        if nelements != cardinality {
2656            self.row.data.truncate(start);
2657            return Err(InvalidArrayError::WrongCardinality {
2658                actual: nelements,
2659                expected: cardinality,
2660            });
2661        }
2662
2663        Ok(())
2664    }
2665
2666    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2667    ///
2668    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2669    pub fn push_list<'a, I, D>(&mut self, iter: I)
2670    where
2671        I: IntoIterator<Item = D>,
2672        D: Borrow<Datum<'a>>,
2673    {
2674        self.push_list_with(|packer| {
2675            for elem in iter {
2676                packer.push(*elem.borrow())
2677            }
2678        });
2679    }
2680
2681    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2682    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2683    where
2684        I: IntoIterator<Item = (&'a str, D)>,
2685        D: Borrow<Datum<'a>>,
2686    {
2687        self.push_dict_with(|packer| {
2688            for (k, v) in iter {
2689                packer.push(Datum::String(k));
2690                packer.push(*v.borrow())
2691            }
2692        })
2693    }
2694
2695    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2696    ///
2697    /// # Panics
2698    /// - If lower and upper express finite values and they are datums of
2699    ///   different types.
2700    /// - If lower or upper express finite values and are equal to
2701    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2702    ///   [`RangeBound::new`].
2703    ///
2704    /// # Notes
2705    /// - This function canonicalizes the range before pushing it to the row.
2706    /// - Prefer this function over `push_range_with` because of its
2707    ///   canonicaliztion.
2708    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2709    ///   handles `Datum::Null` in a SQL-friendly way.
2710    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2711        range.canonicalize()?;
2712        match range.inner {
2713            None => {
2714                self.row.data.push(Tag::Range.into());
2715                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2716                self.row.data.push(range::InternalFlags::EMPTY.bits());
2717                Ok(())
2718            }
2719            Some(inner) => self.push_range_with(
2720                RangeLowerBound {
2721                    inclusive: inner.lower.inclusive,
2722                    bound: inner
2723                        .lower
2724                        .bound
2725                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2726                },
2727                RangeUpperBound {
2728                    inclusive: inner.upper.inclusive,
2729                    bound: inner
2730                        .upper
2731                        .bound
2732                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2733                },
2734            ),
2735        }
2736    }
2737
2738    /// Pushes a `DatumRange` built from the specified arguments.
2739    ///
2740    /// # Warning
2741    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2742    /// inputs. Consequentially, this means it's possible to generate ranges
2743    /// that will not reflect the proper ordering and equality.
2744    ///
2745    /// # Panics
2746    /// - If lower or upper expresses a finite value and does not push exactly
2747    ///   one value into the `RowPacker`.
2748    /// - If lower and upper express finite values and they are datums of
2749    ///   different types.
2750    /// - If lower or upper express finite values and push `Datum::Null`.
2751    ///
2752    /// # Notes
2753    /// - Prefer `push_range_with` over this function. This function should be
2754    ///   used only when you are not pushing `Datum`s to the inner row.
2755    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2756    ///   and `upper` are optional, contingent on the flag value expressing an
2757    ///   empty range (where neither will be present) or infinite bounds (where
2758    ///   each infinite bound will be absent).
2759    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2760    pub fn push_range_with<L, U, E>(
2761        &mut self,
2762        lower: RangeLowerBound<L>,
2763        upper: RangeUpperBound<U>,
2764    ) -> Result<(), E>
2765    where
2766        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2767        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2768        E: From<InvalidRangeError>,
2769    {
2770        let start = self.row.data.len();
2771        self.row.data.push(Tag::Range.into());
2772
2773        let mut flags = range::InternalFlags::empty();
2774
2775        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2776        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2777        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2778        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2779
2780        let mut expected_datums = 0;
2781
2782        self.row.data.push(flags.bits());
2783
2784        let datum_check = self.row.data.len();
2785
2786        if let Some(value) = lower.bound {
2787            let start = self.row.data.len();
2788            value(self)?;
2789            assert!(
2790                start < self.row.data.len(),
2791                "finite values must each push exactly one value; expected 1 but got 0"
2792            );
2793            expected_datums += 1;
2794        }
2795
2796        if let Some(value) = upper.bound {
2797            let start = self.row.data.len();
2798            value(self)?;
2799            assert!(
2800                start < self.row.data.len(),
2801                "finite values must each push exactly one value; expected 1 but got 0"
2802            );
2803            expected_datums += 1;
2804        }
2805
2806        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2807        // and if two are pushed then the second is not less than the first. Panic in
2808        // some cases and error in others.
2809        let mut actual_datums = 0;
2810        let mut seen = None;
2811        let mut dataz = &self.row.data[datum_check..];
2812        while !dataz.is_empty() {
2813            let d = unsafe { read_datum(&mut dataz) };
2814            // These checks only fail when decoding untrusted/corrupted bytes;
2815            // valid callers always push consistent, non-null bounds. Return an
2816            // error rather than asserting so a crafted proto doesn't panic.
2817            if d == Datum::Null {
2818                self.row.data.truncate(start);
2819                return Err(InvalidRangeError::InvalidRangeData.into());
2820            }
2821
2822            match seen {
2823                None => seen = Some(d),
2824                Some(seen) => {
2825                    let seen_kind = DatumKind::from(seen);
2826                    let d_kind = DatumKind::from(d);
2827                    if seen_kind != d_kind {
2828                        self.row.data.truncate(start);
2829                        return Err(InvalidRangeError::InvalidRangeData.into());
2830                    }
2831
2832                    if seen > d {
2833                        self.row.data.truncate(start);
2834                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2835                    }
2836                }
2837            }
2838            actual_datums += 1;
2839        }
2840
2841        if actual_datums != expected_datums {
2842            self.row.data.truncate(start);
2843            return Err(InvalidRangeError::InvalidRangeData.into());
2844        }
2845
2846        Ok(())
2847    }
2848
2849    /// Clears the contents of the packer without de-allocating its backing memory.
2850    pub fn clear(&mut self) {
2851        self.row.data.clear();
2852    }
2853
2854    /// Truncates the underlying storage to the specified byte position.
2855    ///
2856    /// # Safety
2857    ///
2858    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2859    /// If `pos` specifies a byte offset that is *within* a datum, the row
2860    /// packer will produce an invalid row, the unpacking of which may
2861    /// trigger undefined behavior!
2862    ///
2863    /// To find the byte offset of a datum boundary, inspect the packer's
2864    /// byte length by calling `packer.data().len()` after pushing the desired
2865    /// number of datums onto the packer.
2866    pub unsafe fn truncate(&mut self, pos: usize) {
2867        self.row.data.truncate(pos)
2868    }
2869
2870    /// Truncates the underlying row to contain at most the first `n` datums.
2871    pub fn truncate_datums(&mut self, n: usize) {
2872        let prev_len = self.row.data.len();
2873        let mut iter = self.row.iter();
2874        for _ in iter.by_ref().take(n) {}
2875        let next_len = iter.data.len();
2876        // SAFETY: iterator offsets always lie on a datum boundary.
2877        unsafe { self.truncate(prev_len - next_len) }
2878    }
2879
2880    /// Returns the total amount of bytes used by the underlying row.
2881    pub fn byte_len(&self) -> usize {
2882        self.row.byte_len()
2883    }
2884}
2885
2886impl<'a> IntoIterator for &'a Row {
2887    type Item = Datum<'a>;
2888    type IntoIter = DatumListIter<'a>;
2889    fn into_iter(self) -> DatumListIter<'a> {
2890        self.iter()
2891    }
2892}
2893
2894impl fmt::Debug for Row {
2895    /// Debug representation using the internal datums
2896    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2897        f.write_str("Row{")?;
2898        f.debug_list().entries(self.iter()).finish()?;
2899        f.write_str("}")
2900    }
2901}
2902
2903impl fmt::Display for Row {
2904    /// Display representation using the internal datums
2905    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2906        f.write_str("(")?;
2907        for (i, datum) in self.iter().enumerate() {
2908            if i != 0 {
2909                f.write_str(", ")?;
2910            }
2911            write!(f, "{}", datum)?;
2912        }
2913        f.write_str(")")
2914    }
2915}
2916
2917impl<'a, T> DatumList<'a, T> {
2918    pub fn iter(&self) -> DatumListIter<'a> {
2919        DatumListIter { data: self.data }
2920    }
2921
2922    /// Iterate elements as typed `T` values rather than raw `Datum`s.
2923    ///
2924    /// Each datum is decoded and converted via [`FromDatum`]. Since generic
2925    /// type parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2926    /// generation, this is monomorphized to an identity conversion at runtime.
2927    pub fn typed_iter(&self) -> DatumListTypedIter<'a, T>
2928    where
2929        T: FromDatum<'a>,
2930    {
2931        DatumListTypedIter {
2932            inner: self.iter(),
2933            _phantom: PhantomData,
2934        }
2935    }
2936
2937    /// For debugging only
2938    pub fn data(&self) -> &'a [u8] {
2939        self.data
2940    }
2941}
2942
2943impl<T> DatumList<'static, T> {
2944    pub fn empty() -> Self {
2945        DatumList::new(&[])
2946    }
2947}
2948
2949impl<'a> IntoIterator for DatumList<'a> {
2950    type Item = Datum<'a>;
2951    type IntoIter = DatumListIter<'a>;
2952    fn into_iter(self) -> DatumListIter<'a> {
2953        self.iter()
2954    }
2955}
2956
2957impl<'a> Iterator for DatumListIter<'a> {
2958    type Item = Datum<'a>;
2959    fn next(&mut self) -> Option<Self::Item> {
2960        if self.data.is_empty() {
2961            None
2962        } else {
2963            Some(unsafe { read_datum(&mut self.data) })
2964        }
2965    }
2966}
2967
2968impl<'a, T: FromDatum<'a>> Iterator for DatumListTypedIter<'a, T> {
2969    type Item = T;
2970    fn next(&mut self) -> Option<Self::Item> {
2971        self.inner.next().map(T::from_datum)
2972    }
2973}
2974
2975impl<'a, T> DatumMap<'a, T> {
2976    pub fn iter(&self) -> DatumDictIter<'a> {
2977        DatumDictIter {
2978            data: self.data,
2979            prev_key: None,
2980        }
2981    }
2982
2983    /// Iterate entries as `(&str, T)` pairs rather than `(&str, Datum)`.
2984    ///
2985    /// Each value datum is converted via [`FromDatum`]. Since generic type
2986    /// parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2987    /// generation, this is monomorphized to an identity conversion at runtime.
2988    pub fn typed_iter(&self) -> DatumDictTypedIter<'a, T>
2989    where
2990        T: FromDatum<'a>,
2991    {
2992        DatumDictTypedIter {
2993            inner: self.iter(),
2994            _phantom: PhantomData,
2995        }
2996    }
2997
2998    /// For debugging only
2999    pub fn data(&self) -> &'a [u8] {
3000        self.data
3001    }
3002}
3003
3004impl<T> DatumMap<'static, T> {
3005    pub fn empty() -> Self {
3006        DatumMap::new(&[])
3007    }
3008}
3009
3010impl<'a, T> Debug for DatumMap<'a, T> {
3011    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3012        f.debug_map().entries(self.iter()).finish()
3013    }
3014}
3015
3016impl<'a> IntoIterator for &'a DatumMap<'a> {
3017    type Item = (&'a str, Datum<'a>);
3018    type IntoIter = DatumDictIter<'a>;
3019    fn into_iter(self) -> DatumDictIter<'a> {
3020        self.iter()
3021    }
3022}
3023
3024impl<'a> Iterator for DatumDictIter<'a> {
3025    type Item = (&'a str, Datum<'a>);
3026    fn next(&mut self) -> Option<Self::Item> {
3027        if self.data.is_empty() {
3028            None
3029        } else {
3030            let key_tag =
3031                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
3032            assert!(
3033                key_tag == Tag::StringTiny
3034                    || key_tag == Tag::StringShort
3035                    || key_tag == Tag::StringLong
3036                    || key_tag == Tag::StringHuge,
3037                "Dict keys must be strings, got {:?}",
3038                key_tag
3039            );
3040            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
3041            let val = unsafe { read_datum(&mut self.data) };
3042
3043            // if in debug mode, sanity check keys
3044            if cfg!(debug_assertions) {
3045                if let Some(prev_key) = self.prev_key {
3046                    debug_assert!(
3047                        prev_key < key,
3048                        "Dict keys must be unique and given in ascending order: {} came before {}",
3049                        prev_key,
3050                        key
3051                    );
3052                }
3053                self.prev_key = Some(key);
3054            }
3055
3056            Some((key, val))
3057        }
3058    }
3059}
3060
3061impl<'a, T: FromDatum<'a>> Iterator for DatumDictTypedIter<'a, T> {
3062    type Item = (&'a str, T);
3063    fn next(&mut self) -> Option<Self::Item> {
3064        self.inner.next().map(|(k, v)| (k, T::from_datum(v)))
3065    }
3066}
3067
3068impl RowArena {
3069    pub fn new() -> Self {
3070        RowArena {
3071            inner: RefCell::new(vec![]),
3072            scratch: RefCell::new(None),
3073        }
3074    }
3075
3076    /// Creates a `RowArena` with an initial region sized to hold `capacity` bytes, to avoid
3077    /// reallocations as the first datums are created in the arena.
3078    pub fn with_capacity(capacity: usize) -> Self {
3079        let mut inner = Vec::new();
3080        if capacity > 0 {
3081            inner.push(Vec::with_capacity(capacity));
3082        }
3083        RowArena {
3084            inner: RefCell::new(inner),
3085            scratch: RefCell::new(None),
3086        }
3087    }
3088
3089    /// Ensures the active region can hold at least `additional` more bytes without allocating a
3090    /// new region. Call this when you expect to push roughly `additional` bytes next.
3091    pub fn reserve(&self, additional: usize) {
3092        if additional == 0 {
3093            return;
3094        }
3095        let mut inner = self.inner.borrow_mut();
3096        match inner.last_mut() {
3097            // The active region is empty, so nothing references it yet and it is safe to grow it
3098            // in place (a reallocation cannot dangle a live reference).
3099            Some(active) if active.is_empty() => {
3100                if active.capacity() < additional {
3101                    active.reserve_exact(additional);
3102                }
3103            }
3104            // The active region holds live data; we cannot grow it without moving those bytes, so
3105            // stage a fresh region. Size it like `push_bytes` does (at least double the current
3106            // region) so a sequence of small `reserve`s still yields at most log-many regions
3107            // rather than many small ones.
3108            Some(active) => {
3109                let new_cap = std::cmp::max(additional, active.capacity().saturating_mul(2));
3110                inner.push(Vec::with_capacity(new_cap));
3111            }
3112            None => inner.push(Vec::with_capacity(additional)),
3113        }
3114    }
3115
3116    /// Copies `bytes` into the arena and returns a reference valid for its lifetime.
3117    ///
3118    /// Accepts anything that derefs to `[u8]` (e.g. `Vec<u8>`, `&[u8]`); the bytes are copied, so
3119    /// the caller's allocation is not retained.
3120    #[allow(clippy::transmute_ptr_to_ptr)]
3121    pub fn push_bytes<'a, B: Deref<Target = [u8]>>(&'a self, bytes: B) -> &'a [u8] {
3122        let bytes: &[u8] = &bytes;
3123        let need = bytes.len();
3124        if need == 0 {
3125            return &[];
3126        }
3127        let mut inner = self.inner.borrow_mut();
3128
3129        // Find or create a region with spare capacity for `need` bytes, never growing a region
3130        // that already holds data (see the type-level comment for why this preserves references).
3131        let has_room = inner
3132            .last()
3133            .map_or(false, |region| region.capacity() - region.len() >= need);
3134        if !has_room {
3135            let last_cap = inner.last().map_or(0, |region| region.capacity());
3136            let new_cap = std::cmp::max(need, last_cap.saturating_mul(2));
3137            inner.push(Vec::with_capacity(new_cap));
3138        }
3139
3140        let region = inner.last_mut().expect("region present");
3141        let start = region.len();
3142        region.extend_from_slice(bytes);
3143        let copied = &region[start..];
3144        unsafe {
3145            // This is safe because:
3146            //   * `copied` references bytes inside `region`'s heap buffer, which we just sized to
3147            //     fit without reallocating; that buffer is never resized again while it holds data
3148            //     (we allocate a new region instead), so the reference stays valid.
3149            //   * The buffer lives as long as the arena: regions are only dropped by `clear`/`drop`,
3150            //     both of which take `&mut`/ownership, so no `&'a self`-tied reference can outlive
3151            //     them.
3152            //   * Pushing further regions may reallocate `self.inner`, but that moves only the
3153            //     `Vec<u8>` headers, not the heap buffers they own.
3154            transmute::<&[u8], &'a [u8]>(copied)
3155        }
3156    }
3157
3158    /// Copies `string` into the arena and returns a reference valid for its lifetime.
3159    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
3160        let copied = self.push_bytes(string.as_bytes());
3161        unsafe {
3162            // This is safe because we just copied the bytes of a valid `String`.
3163            std::str::from_utf8_unchecked(copied)
3164        }
3165    }
3166
3167    /// Returns a growable, writeable byte buffer for assembling a value incrementally.
3168    ///
3169    /// Write into it with [`RowArenaBuf::push`], [`RowArenaBuf::extend_from_slice`], or
3170    /// [`std::io::Write`], then call [`RowArenaBuf::finish`] to copy the result into the arena and
3171    /// obtain a reference valid for the arena's lifetime. The backing buffer is a single scratch
3172    /// allocation reused across writers, so this lets a producer that builds bytes piecewise (e.g.
3173    /// decoding a row) avoid managing its own scratch.
3174    ///
3175    /// Nested writers are sound but not free: a writer obtained while another is still live can't
3176    /// reuse the (in-use) scratch, so it allocates its own buffer. Steady-state, non-nested use
3177    /// stays allocation-free.
3178    pub fn writer(&self) -> RowArenaBuf<'_> {
3179        // Take the recycled buffer if one is available, else allocate a fresh one. The cell is
3180        // borrowed only for this `take`, never for the writer's lifetime, so a nested `writer` call
3181        // doesn't double-borrow: it simply finds the slot empty and allocates its own buffer.
3182        let mut buf = self.scratch.borrow_mut().take().unwrap_or_default();
3183        buf.clear();
3184        RowArenaBuf { arena: self, buf }
3185    }
3186
3187    /// Take ownership of `row` for the lifetime of the arena, returning a
3188    /// reference to the first datum in the row.
3189    ///
3190    /// If we had an owned datum type, this method would be much clearer, and
3191    /// would be called `push_owned_datum`.
3192    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
3193        let copied = self.push_bytes(row.data());
3194        unsafe {
3195            // This is safe because `copied` is a valid encoding of a single datum (we just packed
3196            // it into `row`), backed by the arena for the lifetime `'a`. Copying the bytes also
3197            // sidesteps the `Row`'s inline (`SmallVec`) storage entirely.
3198            let datum = read_datum(&mut &copied[..]);
3199            transmute::<Datum<'_>, Datum<'a>>(datum)
3200        }
3201    }
3202
3203    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
3204    /// `Datum`.
3205    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
3206        let copied = self.push_bytes(row.data());
3207        unsafe {
3208            // Safe for the same reasons as `push_unary_row`.
3209            let nested = DatumNested::extract(&mut &copied[..]);
3210            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
3211        }
3212    }
3213
3214    /// Convenience function to make a new `Row` containing a single datum, and
3215    /// take ownership of it for the lifetime of the arena
3216    ///
3217    /// ```
3218    /// # use mz_repr::{RowArena, Datum};
3219    /// let arena = RowArena::new();
3220    /// let datum = arena.make_datum(|packer| {
3221    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
3222    /// });
3223    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
3224    /// ```
3225    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
3226    where
3227        F: FnOnce(&mut RowPacker),
3228    {
3229        let mut row = Row::default();
3230        f(&mut row.packer());
3231        self.push_unary_row(row)
3232    }
3233
3234    /// Convenience function to build a list datum from an iterator of typed
3235    /// elements and return it as a `DatumList<'a, T>`.
3236    ///
3237    /// By accepting an iterator of `T: Borrow<Datum>` instead of a raw
3238    /// `RowPacker` closure, this guarantees that only elements of type `T`
3239    /// are pushed.
3240    pub fn make_datum_list<'a, T: std::borrow::Borrow<Datum<'a>>>(
3241        &'a self,
3242        iter: impl IntoIterator<Item = T>,
3243    ) -> DatumList<'a, T> {
3244        let datum = self.make_datum(|packer| {
3245            packer.push_list_with(|packer| {
3246                for elem in iter {
3247                    packer.push(*elem.borrow());
3248                }
3249            });
3250        });
3251        DatumList::new(datum.unwrap_list().data())
3252    }
3253
3254    /// Convenience function identical to `make_datum` but instead returns a
3255    /// `DatumNested`.
3256    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
3257    where
3258        F: FnOnce(&mut RowPacker),
3259    {
3260        let mut row = Row::default();
3261        f(&mut row.packer());
3262        self.push_unary_row_datum_nested(row)
3263    }
3264
3265    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
3266    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
3267    where
3268        F: FnOnce(&mut RowPacker) -> Result<(), E>,
3269    {
3270        let mut row = Row::default();
3271        f(&mut row.packer())?;
3272        Ok(self.push_unary_row(row))
3273    }
3274
3275    /// Clear the contents of the arena.
3276    ///
3277    /// Retains the single largest region (emptied) so the arena can be reused without
3278    /// reallocating; a workload that clears between uses of similar size becomes allocation-free.
3279    pub fn clear(&mut self) {
3280        let inner = self.inner.get_mut();
3281        // Keep only the largest-capacity region, reset to empty, and drop the rest. Because region
3282        // capacities only ever grow (each new region at least doubles the previous), the largest is
3283        // normally the last; we scan for it defensively, which is cheap given log-many regions.
3284        if let Some(largest) = (0..inner.len()).max_by_key(|&i| inner[i].capacity()) {
3285            inner.swap(0, largest);
3286            inner.truncate(1);
3287            inner[0].clear();
3288        }
3289    }
3290}
3291
3292impl Default for RowArena {
3293    fn default() -> RowArena {
3294        RowArena::new()
3295    }
3296}
3297
3298/// A growable, writeable byte buffer that builds a value into a [`RowArena`].
3299///
3300/// Obtained from [`RowArena::writer`]. Behaves like a writeable byte slice (push/extend bytes,
3301/// read back as `&[u8]`); [`RowArenaBuf::finish`] copies the assembled bytes into the arena and
3302/// returns a reference valid for the arena's lifetime. The buffer is owned for the writer's
3303/// lifetime and, on drop, returned to the arena to be reused by the next writer.
3304#[derive(Debug)]
3305pub struct RowArenaBuf<'a> {
3306    arena: &'a RowArena,
3307    buf: Vec<u8>,
3308}
3309
3310impl<'a> RowArenaBuf<'a> {
3311    /// Appends a single byte.
3312    pub fn push(&mut self, byte: u8) {
3313        self.buf.push(byte);
3314    }
3315
3316    /// Appends a slice of bytes.
3317    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
3318        self.buf.extend_from_slice(bytes);
3319    }
3320
3321    /// The bytes written so far.
3322    pub fn as_slice(&self) -> &[u8] {
3323        &self.buf
3324    }
3325
3326    /// The number of bytes written so far.
3327    pub fn len(&self) -> usize {
3328        self.buf.len()
3329    }
3330
3331    /// Whether no bytes have been written.
3332    pub fn is_empty(&self) -> bool {
3333        self.buf.is_empty()
3334    }
3335
3336    /// Copies the written bytes into the arena, returning a reference valid for its lifetime.
3337    pub fn finish(self) -> &'a [u8] {
3338        // `self` is dropped at the end of this call, returning `buf` to the arena for reuse; the
3339        // returned reference points into a committed region, not `buf`, so it stays valid.
3340        self.arena.push_bytes(self.buf.as_slice())
3341    }
3342
3343    /// Like [`RowArenaBuf::finish`], but returns the bytes as a `&str`.
3344    ///
3345    /// Intended for buffers written via [`std::fmt::Write`] (e.g. `write!`), whose contents are
3346    /// valid UTF-8. Panics if the bytes are not valid UTF-8.
3347    pub fn finish_str(self) -> &'a str {
3348        let bytes = self.arena.push_bytes(self.buf.as_slice());
3349        std::str::from_utf8(bytes).expect("RowArenaBuf::finish_str on non-UTF-8 contents")
3350    }
3351}
3352
3353impl<'a> Drop for RowArenaBuf<'a> {
3354    fn drop(&mut self) {
3355        // Return the buffer to the arena so the next writer can reuse its allocation. We keep only
3356        // one buffer: if the slot is already occupied — an outer writer is still live, or a nested
3357        // writer beat us to it — we drop ours rather than growing an unbounded pool. The borrow is
3358        // transient and never overlaps a live writer's, so this can't double-borrow.
3359        let mut slot = self.arena.scratch.borrow_mut();
3360        if slot.is_none() {
3361            *slot = Some(std::mem::take(&mut self.buf));
3362        }
3363    }
3364}
3365
3366impl<'a> std::ops::Deref for RowArenaBuf<'a> {
3367    type Target = [u8];
3368    fn deref(&self) -> &[u8] {
3369        &self.buf
3370    }
3371}
3372
3373impl<'a> std::io::Write for RowArenaBuf<'a> {
3374    fn write(&mut self, bytes: &[u8]) -> std::io::Result<usize> {
3375        self.buf.extend_from_slice(bytes);
3376        Ok(bytes.len())
3377    }
3378
3379    fn flush(&mut self) -> std::io::Result<()> {
3380        Ok(())
3381    }
3382}
3383
3384impl<'a> std::fmt::Write for RowArenaBuf<'a> {
3385    fn write_str(&mut self, s: &str) -> std::fmt::Result {
3386        self.buf.extend_from_slice(s.as_bytes());
3387        Ok(())
3388    }
3389}
3390
3391/// A thread-local row, which can be borrowed and returned.
3392/// # Example
3393///
3394/// Use this type instead of creating a new row:
3395/// ```
3396/// use mz_repr::SharedRow;
3397///
3398/// let mut row_builder = SharedRow::get();
3399/// ```
3400///
3401/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
3402/// an allocation locally. Additionally, we can observe the size of the local row in a central
3403/// place and potentially reallocate to reduce memory needs.
3404///
3405/// # Panic
3406///
3407/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
3408#[derive(Debug)]
3409pub struct SharedRow(Row);
3410
3411impl SharedRow {
3412    thread_local! {
3413        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
3414        /// There can be at most one active user of this Row, which is tracked by the state of the
3415        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
3416        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
3417        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
3418    }
3419
3420    /// Get the shared row.
3421    ///
3422    /// The row's contents are cleared before returning it.
3423    ///
3424    /// # Panic
3425    ///
3426    /// Panics when the row is already borrowed elsewhere.
3427    pub fn get() -> Self {
3428        let mut row = Self::SHARED_ROW
3429            .take()
3430            .expect("attempted to borrow already borrowed SharedRow");
3431        // Clear row
3432        row.packer();
3433        Self(row)
3434    }
3435
3436    /// Gets the shared row and uses it to pack `iter`.
3437    pub fn pack<'a, I, D>(iter: I) -> Row
3438    where
3439        I: IntoIterator<Item = D>,
3440        D: Borrow<Datum<'a>>,
3441    {
3442        let mut row_builder = Self::get();
3443        let mut row_packer = row_builder.packer();
3444        row_packer.extend(iter);
3445        row_builder.clone()
3446    }
3447}
3448
3449impl std::ops::Deref for SharedRow {
3450    type Target = Row;
3451
3452    fn deref(&self) -> &Self::Target {
3453        &self.0
3454    }
3455}
3456
3457impl std::ops::DerefMut for SharedRow {
3458    fn deref_mut(&mut self) -> &mut Self::Target {
3459        &mut self.0
3460    }
3461}
3462
3463impl Drop for SharedRow {
3464    fn drop(&mut self) {
3465        // Take the Row allocation from this instance and put it back in the thread local slot for
3466        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3467        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3468    }
3469}
3470
3471#[cfg(test)]
3472mod tests {
3473    use std::cmp::Ordering;
3474    use std::collections::hash_map::DefaultHasher;
3475    use std::hash::{Hash, Hasher};
3476
3477    use chrono::{DateTime, NaiveDate};
3478    use itertools::Itertools;
3479    use mz_ore::{assert_err, assert_none};
3480    use ordered_float::OrderedFloat;
3481
3482    use crate::SqlScalarType;
3483
3484    use super::*;
3485
3486    fn hash<T: Hash>(t: &T) -> u64 {
3487        let mut hasher = DefaultHasher::new();
3488        t.hash(&mut hasher);
3489        hasher.finish()
3490    }
3491
3492    #[mz_ore::test]
3493    fn test_assumptions() {
3494        assert_eq!(size_of::<Tag>(), 1);
3495        #[cfg(target_endian = "big")]
3496        {
3497            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3498            assert!(false);
3499        }
3500    }
3501
3502    #[mz_ore::test]
3503    fn miri_test_arena() {
3504        let arena = RowArena::new();
3505
3506        assert_eq!(arena.push_string("".to_owned()), "");
3507        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3508
3509        let empty: &[u8] = &[];
3510        assert_eq!(arena.push_bytes(vec![]), empty);
3511        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3512
3513        let mut row = Row::default();
3514        let mut packer = row.packer();
3515        packer.push_dict_with(|row| {
3516            row.push(Datum::String("a"));
3517            row.push_list_with(|row| {
3518                row.push(Datum::String("one"));
3519                row.push(Datum::String("two"));
3520                row.push(Datum::String("three"));
3521            });
3522            row.push(Datum::String("b"));
3523            row.push(Datum::String("c"));
3524        });
3525        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3526    }
3527
3528    #[mz_ore::test]
3529    fn miri_test_arena_growth_keeps_references() {
3530        // References returned by `push_bytes` must stay valid as later pushes allocate new
3531        // regions; this exercises the "never resize a region that holds data" invariant.
3532        let arena = RowArena::new();
3533        let chunks: Vec<Vec<u8>> = (0..128u16)
3534            .map(|i| vec![u8::try_from(i % 256).unwrap(); usize::from(i % 13) + 1])
3535            .collect();
3536        let refs: Vec<&[u8]> = chunks
3537            .iter()
3538            .map(|c| arena.push_bytes(c.as_slice()))
3539            .collect();
3540        for (i, r) in refs.iter().enumerate() {
3541            assert_eq!(*r, chunks[i].as_slice());
3542        }
3543    }
3544
3545    #[mz_ore::test]
3546    fn miri_test_arena_unary_row_at_offset() {
3547        // A row pushed after other bytes lands at a non-zero offset within a region; reading it
3548        // back must not depend on the row starting at offset zero or on any alignment.
3549        let arena = RowArena::new();
3550        arena.reserve(4096);
3551        let _pad = arena.push_bytes(vec![0xAB; 5]);
3552        let row = Row::pack_slice(&[Datum::String("hello"), Datum::Int64(42), Datum::True]);
3553        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3554    }
3555
3556    #[mz_ore::test]
3557    fn miri_test_arena_clear_reuse() {
3558        // After `clear` the arena retains a region and remains usable across cycles.
3559        let mut arena = RowArena::new();
3560        for i in 0..100u8 {
3561            let _ = arena.push_bytes(vec![i; 16]);
3562        }
3563        arena.clear();
3564        assert_eq!(arena.push_bytes(vec![7u8; 8]), &[7u8; 8]);
3565        assert_eq!(arena.push_string("after clear".to_owned()), "after clear");
3566        arena.clear();
3567        let empty: &[u8] = &[];
3568        assert_eq!(arena.push_bytes(Vec::<u8>::new()), empty);
3569    }
3570
3571    #[mz_ore::test]
3572    fn miri_test_arena_writer() {
3573        use std::io::Write;
3574
3575        let arena = RowArena::new();
3576
3577        // Build a value incrementally and commit it.
3578        let mut w = arena.writer();
3579        let mut expected = Vec::new();
3580        for i in 0..1000u16 {
3581            let byte = u8::try_from(i % 256).unwrap();
3582            w.push(byte);
3583            expected.push(byte);
3584            w.extend_from_slice(&[byte, byte]);
3585            expected.extend_from_slice(&[byte, byte]);
3586        }
3587        assert_eq!(w.as_slice(), expected.as_slice());
3588        assert_eq!(w.len(), expected.len());
3589        let first = w.finish();
3590        assert_eq!(first, expected.as_slice());
3591
3592        // A second writer reuses the scratch; its result is independent of the first, which stays
3593        // valid because `finish` copied it into the arena.
3594        let mut w2 = arena.writer();
3595        write!(w2, "hello").unwrap();
3596        let second = w2.finish();
3597        assert_eq!(second, b"hello");
3598        assert_eq!(first, expected.as_slice());
3599
3600        // An empty writer commits to an empty slice.
3601        let empty: &[u8] = &[];
3602        assert_eq!(arena.writer().finish(), empty);
3603
3604        // Abandoning a writer without finishing is fine; the next writer starts empty.
3605        {
3606            let mut w3 = arena.writer();
3607            w3.extend_from_slice(b"discarded");
3608        }
3609        assert_eq!(arena.writer().as_slice(), empty);
3610    }
3611
3612    #[mz_ore::test]
3613    fn miri_test_arena_writer_nested() {
3614        // Reentrancy: a writer obtained while another is still live must not panic (no `RefCell`
3615        // double-borrow) and must not disturb the outer writer. The nested writer just gets its own
3616        // buffer; the outer one keeps building independently.
3617        let arena = RowArena::new();
3618
3619        let mut outer = arena.writer();
3620        outer.extend_from_slice(b"outer-before-");
3621
3622        // Take a second writer while `outer` is still live -- the case that double-borrowed before.
3623        let inner_bytes = {
3624            let mut inner = arena.writer();
3625            inner.extend_from_slice(b"inner");
3626            // The outer writer is unaffected by the nested one.
3627            assert_eq!(outer.as_slice(), b"outer-before-");
3628            inner.finish()
3629        };
3630        assert_eq!(inner_bytes, b"inner");
3631
3632        // `outer` is intact and still writable after the nested writer committed.
3633        outer.extend_from_slice(b"after");
3634        let outer_bytes = outer.finish();
3635        assert_eq!(outer_bytes, b"outer-before-after");
3636        // Both committed slices stay valid and independent.
3637        assert_eq!(inner_bytes, b"inner");
3638
3639        // Once all writers have dropped, the recycled buffer is reusable (and cleared on acquire).
3640        let mut again = arena.writer();
3641        again.extend_from_slice(b"reused");
3642        assert_eq!(again.finish(), b"reused");
3643    }
3644
3645    #[mz_ore::test]
3646    fn miri_test_arena_writer_fmt() {
3647        use std::fmt::Write;
3648
3649        // Format text into the writer (e.g. building a cast-to-string result) and commit as `&str`.
3650        let arena = RowArena::new();
3651        let mut w = arena.writer();
3652        for i in 0..5 {
3653            write!(w, "{i},").unwrap();
3654        }
3655        assert_eq!(w.finish_str(), "0,1,2,3,4,");
3656    }
3657
3658    #[mz_ore::test]
3659    fn miri_test_round_trip() {
3660        fn round_trip(datums: Vec<Datum>) {
3661            let row = Row::pack(datums.clone());
3662
3663            // When run under miri this catches undefined bytes written to data
3664            // eg by calling push_copy! on a type which contains undefined padding values
3665            println!("{:?}", row.data());
3666
3667            let datums2 = row.iter().collect::<Vec<_>>();
3668            let datums3 = row.unpack();
3669            assert_eq!(datums, datums2);
3670            assert_eq!(datums, datums3);
3671        }
3672
3673        round_trip(vec![]);
3674        round_trip(
3675            SqlScalarType::enumerate()
3676                .iter()
3677                .flat_map(|r#type| r#type.interesting_datums())
3678                .collect(),
3679        );
3680        round_trip(vec![
3681            Datum::Null,
3682            Datum::Null,
3683            Datum::False,
3684            Datum::True,
3685            Datum::Int16(-21),
3686            Datum::Int32(-42),
3687            Datum::Int64(-2_147_483_648 - 42),
3688            Datum::UInt8(0),
3689            Datum::UInt8(1),
3690            Datum::UInt16(0),
3691            Datum::UInt16(1),
3692            Datum::UInt16(1 << 8),
3693            Datum::UInt32(0),
3694            Datum::UInt32(1),
3695            Datum::UInt32(1 << 8),
3696            Datum::UInt32(1 << 16),
3697            Datum::UInt32(1 << 24),
3698            Datum::UInt64(0),
3699            Datum::UInt64(1),
3700            Datum::UInt64(1 << 8),
3701            Datum::UInt64(1 << 16),
3702            Datum::UInt64(1 << 24),
3703            Datum::UInt64(1 << 32),
3704            Datum::UInt64(1 << 40),
3705            Datum::UInt64(1 << 48),
3706            Datum::UInt64(1 << 56),
3707            Datum::Float32(OrderedFloat::from(-42.12)),
3708            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3709            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3710            Datum::Timestamp(
3711                CheckedTimestamp::from_timestamplike(
3712                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3713                        .unwrap()
3714                        .and_hms_opt(14, 32, 11)
3715                        .unwrap(),
3716                )
3717                .unwrap(),
3718            ),
3719            Datum::TimestampTz(
3720                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3721                    .unwrap(),
3722            ),
3723            Datum::Interval(Interval {
3724                months: 312,
3725                ..Default::default()
3726            }),
3727            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3728            Datum::Bytes(&[]),
3729            Datum::Bytes(&[0, 2, 1, 255]),
3730            Datum::String(""),
3731            Datum::String("العَرَبِيَّة"),
3732        ]);
3733    }
3734
3735    #[mz_ore::test]
3736    fn test_array() {
3737        // Construct an array using `Row::push_array` and verify that it unpacks
3738        // correctly.
3739        const DIM: ArrayDimension = ArrayDimension {
3740            lower_bound: 2,
3741            length: 2,
3742        };
3743        let mut row = Row::default();
3744        let mut packer = row.packer();
3745        packer
3746            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3747            .unwrap();
3748        let arr1 = row.unpack_first().unwrap_array();
3749        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3750        assert_eq!(
3751            arr1.elements().into_iter().collect::<Vec<_>>(),
3752            vec![Datum::Int32(1), Datum::Int32(2)]
3753        );
3754
3755        // Pack a previously-constructed `Datum::Array` and verify that it
3756        // unpacks correctly.
3757        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3758        let arr2 = row.unpack_first().unwrap_array();
3759        assert_eq!(arr1, arr2);
3760    }
3761
3762    #[mz_ore::test]
3763    fn test_multidimensional_array() {
3764        let datums = vec![
3765            Datum::Int32(1),
3766            Datum::Int32(2),
3767            Datum::Int32(3),
3768            Datum::Int32(4),
3769            Datum::Int32(5),
3770            Datum::Int32(6),
3771            Datum::Int32(7),
3772            Datum::Int32(8),
3773        ];
3774
3775        let mut row = Row::default();
3776        let mut packer = row.packer();
3777        packer
3778            .try_push_array(
3779                &[
3780                    ArrayDimension {
3781                        lower_bound: 1,
3782                        length: 1,
3783                    },
3784                    ArrayDimension {
3785                        lower_bound: 1,
3786                        length: 4,
3787                    },
3788                    ArrayDimension {
3789                        lower_bound: 1,
3790                        length: 2,
3791                    },
3792                ],
3793                &datums,
3794            )
3795            .unwrap();
3796        let array = row.unpack_first().unwrap_array();
3797        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3798    }
3799
3800    #[mz_ore::test]
3801    fn test_array_max_dimensions() {
3802        let mut row = Row::default();
3803        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3804
3805        // An array with one too many dimensions should be rejected.
3806        let res = row.packer().try_push_array(
3807            &vec![
3808                ArrayDimension {
3809                    lower_bound: 1,
3810                    length: 1
3811                };
3812                max_dims + 1
3813            ],
3814            vec![Datum::Int32(4)],
3815        );
3816        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3817        assert!(row.data.is_empty());
3818
3819        // An array with exactly the maximum allowable dimensions should be
3820        // accepted.
3821        row.packer()
3822            .try_push_array(
3823                &vec![
3824                    ArrayDimension {
3825                        lower_bound: 1,
3826                        length: 1
3827                    };
3828                    max_dims
3829                ],
3830                vec![Datum::Int32(4)],
3831            )
3832            .unwrap();
3833    }
3834
3835    #[mz_ore::test]
3836    fn test_array_wrong_cardinality() {
3837        let mut row = Row::default();
3838        let res = row.packer().try_push_array(
3839            &[
3840                ArrayDimension {
3841                    lower_bound: 1,
3842                    length: 2,
3843                },
3844                ArrayDimension {
3845                    lower_bound: 1,
3846                    length: 3,
3847                },
3848            ],
3849            vec![Datum::Int32(1), Datum::Int32(2)],
3850        );
3851        assert_eq!(
3852            res,
3853            Err(InvalidArrayError::WrongCardinality {
3854                actual: 2,
3855                expected: 6,
3856            })
3857        );
3858        assert!(row.data.is_empty());
3859    }
3860
3861    #[mz_ore::test]
3862    fn test_array_cardinality_overflow() {
3863        // Dimension lengths whose product overflows `usize` must be rejected as
3864        // a `WrongCardinality` error, not panic (under overflow checks) or wrap
3865        // (in release, which could spuriously accept a corrupt array). The
3866        // product saturates to `usize::MAX`, which no real element count matches.
3867        let mut row = Row::default();
3868        let res = row.packer().try_push_array(
3869            &[
3870                ArrayDimension {
3871                    lower_bound: 1,
3872                    length: usize::MAX,
3873                },
3874                ArrayDimension {
3875                    lower_bound: 1,
3876                    length: 2,
3877                },
3878            ],
3879            vec![Datum::Int32(1), Datum::Int32(2)],
3880        );
3881        assert_eq!(
3882            res,
3883            Err(InvalidArrayError::WrongCardinality {
3884                actual: 2,
3885                expected: usize::MAX,
3886            })
3887        );
3888        assert!(row.data.is_empty());
3889    }
3890
3891    #[mz_ore::test]
3892    fn test_nesting() {
3893        let mut row = Row::default();
3894        row.packer().push_dict_with(|row| {
3895            row.push(Datum::String("favourites"));
3896            row.push_list_with(|row| {
3897                row.push(Datum::String("ice cream"));
3898                row.push(Datum::String("oreos"));
3899                row.push(Datum::String("cheesecake"));
3900            });
3901            row.push(Datum::String("name"));
3902            row.push(Datum::String("bob"));
3903        });
3904
3905        let mut iter = row.unpack_first().unwrap_map().iter();
3906
3907        let (k, v) = iter.next().unwrap();
3908        assert_eq!(k, "favourites");
3909        assert_eq!(
3910            v.unwrap_list().iter().collect::<Vec<_>>(),
3911            vec![
3912                Datum::String("ice cream"),
3913                Datum::String("oreos"),
3914                Datum::String("cheesecake"),
3915            ]
3916        );
3917
3918        let (k, v) = iter.next().unwrap();
3919        assert_eq!(k, "name");
3920        assert_eq!(v, Datum::String("bob"));
3921    }
3922
3923    #[mz_ore::test]
3924    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3925        let pack = |ok| {
3926            let mut row = Row::default();
3927            row.packer().push_dict_with(|row| {
3928                if ok {
3929                    row.push(Datum::String("key"));
3930                    row.push(Datum::Int32(42));
3931                    Ok(7)
3932                } else {
3933                    Err("fail")
3934                }
3935            })?;
3936            Ok(row)
3937        };
3938
3939        assert_eq!(pack(false), Err("fail"));
3940
3941        let row = pack(true)?;
3942        let mut dict = row.unpack_first().unwrap_map().iter();
3943        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3944        assert_eq!(dict.next(), None);
3945
3946        Ok(())
3947    }
3948
3949    #[mz_ore::test]
3950    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3951    fn test_datum_sizes() {
3952        let arena = RowArena::new();
3953
3954        // Test the claims about various datum sizes.
3955        let values_of_interest = vec![
3956            Datum::Null,
3957            Datum::False,
3958            Datum::Int16(0),
3959            Datum::Int32(0),
3960            Datum::Int64(0),
3961            Datum::UInt8(0),
3962            Datum::UInt8(1),
3963            Datum::UInt16(0),
3964            Datum::UInt16(1),
3965            Datum::UInt16(1 << 8),
3966            Datum::UInt32(0),
3967            Datum::UInt32(1),
3968            Datum::UInt32(1 << 8),
3969            Datum::UInt32(1 << 16),
3970            Datum::UInt32(1 << 24),
3971            Datum::UInt64(0),
3972            Datum::UInt64(1),
3973            Datum::UInt64(1 << 8),
3974            Datum::UInt64(1 << 16),
3975            Datum::UInt64(1 << 24),
3976            Datum::UInt64(1 << 32),
3977            Datum::UInt64(1 << 40),
3978            Datum::UInt64(1 << 48),
3979            Datum::UInt64(1 << 56),
3980            Datum::Float32(OrderedFloat(0.0)),
3981            Datum::Float64(OrderedFloat(0.0)),
3982            Datum::from(numeric::Numeric::from(0)),
3983            Datum::from(numeric::Numeric::from(1000)),
3984            Datum::from(numeric::Numeric::from(9999)),
3985            Datum::Date(
3986                NaiveDate::from_ymd_opt(1, 1, 1)
3987                    .unwrap()
3988                    .try_into()
3989                    .unwrap(),
3990            ),
3991            Datum::Timestamp(
3992                CheckedTimestamp::from_timestamplike(
3993                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3994                )
3995                .unwrap(),
3996            ),
3997            Datum::TimestampTz(
3998                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3999                    .unwrap(),
4000            ),
4001            Datum::Interval(Interval::default()),
4002            Datum::Bytes(&[]),
4003            Datum::String(""),
4004            Datum::JsonNull,
4005            Datum::Range(Range { inner: None }),
4006            arena.make_datum(|packer| {
4007                packer
4008                    .push_range(Range::new(Some((
4009                        RangeLowerBound::new(Datum::Int32(-1), true),
4010                        RangeUpperBound::new(Datum::Int32(1), true),
4011                    ))))
4012                    .unwrap();
4013            }),
4014        ];
4015        for value in values_of_interest {
4016            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
4017                panic!("Disparity in claimed size for {:?}", value);
4018            }
4019        }
4020    }
4021
4022    #[mz_ore::test]
4023    fn test_range_errors() {
4024        fn test_range_errors_inner<'a>(
4025            datums: Vec<Vec<Datum<'a>>>,
4026        ) -> Result<(), InvalidRangeError> {
4027            let mut row = Row::default();
4028            let row_len = row.byte_len();
4029            let mut packer = row.packer();
4030            let r = packer.push_range_with(
4031                RangeLowerBound {
4032                    inclusive: true,
4033                    bound: Some(|row: &mut RowPacker| {
4034                        for d in &datums[0] {
4035                            row.push(d);
4036                        }
4037                        Ok(())
4038                    }),
4039                },
4040                RangeUpperBound {
4041                    inclusive: true,
4042                    bound: Some(|row: &mut RowPacker| {
4043                        for d in &datums[1] {
4044                            row.push(d);
4045                        }
4046                        Ok(())
4047                    }),
4048                },
4049            );
4050
4051            assert_eq!(row_len, row.byte_len());
4052
4053            r
4054        }
4055
4056        // A finite bound whose closure pushes zero values violates the
4057        // `push_range_with` caller contract and still panics. This is
4058        // unreachable when decoding a `ProtoRow`: each decoded bound pushes
4059        // exactly one datum (or fails), so only an in-process caller can hit it.
4060        for panicking_case in [
4061            vec![vec![Datum::Int32(1)], vec![]],
4062            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
4063        ] {
4064            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
4065            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
4066            assert_err!(result);
4067        }
4068
4069        // Inconsistent bound counts, mismatched datum kinds, and Null bounds are
4070        // all reachable from a crafted/corrupted `ProtoRow`, so they return an
4071        // error instead of panicking.
4072        for error_case in [
4073            vec![
4074                vec![Datum::Int32(1), Datum::Int32(2)],
4075                vec![Datum::Int32(3)],
4076            ],
4077            vec![
4078                vec![Datum::Int32(1)],
4079                vec![Datum::Int32(2), Datum::Int32(3)],
4080            ],
4081            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
4082            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
4083            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
4084        ] {
4085            assert_eq!(
4086                test_range_errors_inner(error_case),
4087                Err(InvalidRangeError::InvalidRangeData)
4088            );
4089        }
4090
4091        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
4092        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
4093    }
4094
4095    /// Lists have a variable-length encoding for their lengths. We test each case here.
4096    #[mz_ore::test]
4097    #[cfg_attr(miri, ignore)] // slow
4098    fn test_list_encoding() {
4099        fn test_list_encoding_inner(len: usize) {
4100            let list_elem = |i: usize| {
4101                if i % 2 == 0 {
4102                    Datum::False
4103                } else {
4104                    Datum::True
4105                }
4106            };
4107            let mut row = Row::default();
4108            {
4109                // Push some stuff.
4110                let mut packer = row.packer();
4111                packer.push(Datum::String("start"));
4112                packer.push_list_with(|packer| {
4113                    for i in 0..len {
4114                        packer.push(list_elem(i));
4115                    }
4116                });
4117                packer.push(Datum::String("end"));
4118            }
4119            // Check that we read back exactly what we pushed.
4120            let mut row_it = row.iter();
4121            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
4122            match row_it.next().unwrap() {
4123                Datum::List(list) => {
4124                    let mut list_it = list.iter();
4125                    for i in 0..len {
4126                        assert_eq!(list_it.next().unwrap(), list_elem(i));
4127                    }
4128                    assert_none!(list_it.next());
4129                }
4130                _ => panic!("expected Datum::List"),
4131            }
4132            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
4133            assert_none!(row_it.next());
4134        }
4135
4136        test_list_encoding_inner(0);
4137        test_list_encoding_inner(1);
4138        test_list_encoding_inner(10);
4139        test_list_encoding_inner(TINY - 1); // tiny
4140        test_list_encoding_inner(TINY + 1); // short
4141        test_list_encoding_inner(SHORT + 1); // long
4142
4143        // The biggest one takes 40 s on my laptop, probably not worth it.
4144        //test_list_encoding_inner(LONG + 1); // huge
4145    }
4146
4147    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
4148    /// A list containing -0.0 and one containing +0.0 have different byte representations
4149    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
4150    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
4151    /// compares via iter().cmp(other.iter()), and them as equal.
4152    #[mz_ore::test]
4153    fn test_datum_list_eq_ord_consistency() {
4154        // Build list containing +0.0
4155        let mut row_pos = Row::default();
4156        row_pos.packer().push_list_with(|p| {
4157            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4158        });
4159        let list_pos = row_pos.unpack_first().unwrap_list();
4160
4161        // Build list containing -0.0 (distinct bit pattern from +0.0)
4162        let mut row_neg = Row::default();
4163        row_neg.packer().push_list_with(|p| {
4164            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4165        });
4166        let list_neg = row_neg.unpack_first().unwrap_list();
4167
4168        // Eq is bytewise: different encodings => not equal
4169        // This was a bug in the past, so we test it.
4170        assert_eq!(
4171            list_pos, list_neg,
4172            "Eq should see different encodings as equal"
4173        );
4174
4175        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
4176        assert_eq!(
4177            list_pos.cmp(&list_neg),
4178            Ordering::Equal,
4179            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
4180        );
4181    }
4182
4183    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
4184    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
4185    #[mz_ore::test]
4186    fn test_datum_map_eq_bytewise_consistency() {
4187        // Build map {"k": +0.0}
4188        let mut row_pos = Row::default();
4189        row_pos.packer().push_dict_with(|p| {
4190            p.push(Datum::String("k"));
4191            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4192        });
4193        let map_pos = row_pos.unpack_first().unwrap_map();
4194
4195        // Build map {"k": -0.0}
4196        let mut row_neg = Row::default();
4197        row_neg.packer().push_dict_with(|p| {
4198            p.push(Datum::String("k"));
4199            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4200        });
4201        let map_neg = row_neg.unpack_first().unwrap_map();
4202
4203        // Same keys and semantically equal values, but Eq (bytewise) says not equal
4204        assert_eq!(
4205            map_pos, map_neg,
4206            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
4207        );
4208        // Verify they have the same logical content
4209        let entries_pos: Vec<_> = map_pos.iter().collect();
4210        let entries_neg: Vec<_> = map_neg.iter().collect();
4211        assert_eq!(entries_pos.len(), entries_neg.len());
4212        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
4213            assert_eq!(k1, k2);
4214            assert_eq!(
4215                v1, v2,
4216                "Datum-level comparison treats -0.0 and +0.0 as equal"
4217            );
4218        }
4219    }
4220
4221    /// Hash must agree with Eq: equal lists must have the same hash.
4222    #[mz_ore::test]
4223    fn test_datum_list_hash_consistency() {
4224        // Equal lists (including -0.0 vs +0.0) must hash the same
4225        let mut row_pos = Row::default();
4226        row_pos.packer().push_list_with(|p| {
4227            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4228        });
4229        let list_pos = row_pos.unpack_first().unwrap_list();
4230
4231        let mut row_neg = Row::default();
4232        row_neg.packer().push_list_with(|p| {
4233            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4234        });
4235        let list_neg = row_neg.unpack_first().unwrap_list();
4236
4237        assert_eq!(list_pos, list_neg);
4238        assert_eq!(
4239            hash(&list_pos),
4240            hash(&list_neg),
4241            "equal lists must have same hash"
4242        );
4243
4244        // Unequal lists should have different hashes (with asymptotic probability 1)
4245        let mut row_a = Row::default();
4246        row_a.packer().push_list_with(|p| {
4247            p.push(Datum::Int32(1));
4248            p.push(Datum::Int32(2));
4249        });
4250        let list_a = row_a.unpack_first().unwrap_list();
4251
4252        let mut row_b = Row::default();
4253        row_b.packer().push_list_with(|p| {
4254            p.push(Datum::Int32(1));
4255            p.push(Datum::Int32(3));
4256        });
4257        let list_b = row_b.unpack_first().unwrap_list();
4258
4259        assert_ne!(list_a, list_b);
4260        assert_ne!(
4261            hash(&list_a),
4262            hash(&list_b),
4263            "unequal lists must have different hashes"
4264        );
4265    }
4266
4267    /// Ord/PartialOrd for DatumList: less, equal, greater.
4268    #[mz_ore::test]
4269    fn test_datum_list_ordering() {
4270        let mut row_12 = Row::default();
4271        row_12.packer().push_list_with(|p| {
4272            p.push(Datum::Int32(1));
4273            p.push(Datum::Int32(2));
4274        });
4275        let list_12 = row_12.unpack_first().unwrap_list();
4276
4277        let mut row_13 = Row::default();
4278        row_13.packer().push_list_with(|p| {
4279            p.push(Datum::Int32(1));
4280            p.push(Datum::Int32(3));
4281        });
4282        let list_13 = row_13.unpack_first().unwrap_list();
4283
4284        let mut row_123 = Row::default();
4285        row_123.packer().push_list_with(|p| {
4286            p.push(Datum::Int32(1));
4287            p.push(Datum::Int32(2));
4288            p.push(Datum::Int32(3));
4289        });
4290        let list_123 = row_123.unpack_first().unwrap_list();
4291
4292        // [1, 2] < [1, 3] due to the second element being different
4293        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
4294        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
4295        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
4296        // shorter prefix compares less
4297        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
4298    }
4299
4300    /// Hash must agree with Eq: equal maps must have the same hash.
4301    #[mz_ore::test]
4302    fn test_datum_map_hash_consistency() {
4303        let mut row_pos = Row::default();
4304        row_pos.packer().push_dict_with(|p| {
4305            p.push(Datum::String("x"));
4306            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4307        });
4308        let map_pos = row_pos.unpack_first().unwrap_map();
4309
4310        let mut row_neg = Row::default();
4311        row_neg.packer().push_dict_with(|p| {
4312            p.push(Datum::String("x"));
4313            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4314        });
4315        let map_neg = row_neg.unpack_first().unwrap_map();
4316
4317        assert_eq!(map_pos, map_neg);
4318        assert_eq!(
4319            hash(&map_pos),
4320            hash(&map_neg),
4321            "equal maps must have same hash"
4322        );
4323
4324        let mut row_a = Row::default();
4325        row_a.packer().push_dict_with(|p| {
4326            p.push(Datum::String("a"));
4327            p.push(Datum::Int32(1));
4328        });
4329        let map_a = row_a.unpack_first().unwrap_map();
4330
4331        let mut row_b = Row::default();
4332        row_b.packer().push_dict_with(|p| {
4333            p.push(Datum::String("a"));
4334            p.push(Datum::Int32(2));
4335        });
4336        let map_b = row_b.unpack_first().unwrap_map();
4337
4338        assert_ne!(map_a, map_b);
4339        assert_ne!(
4340            hash(&map_a),
4341            hash(&map_b),
4342            "unequal maps must have different hashes"
4343        );
4344    }
4345
4346    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
4347    #[mz_ore::test]
4348    fn test_datum_map_ordering() {
4349        let mut row_a1 = Row::default();
4350        row_a1.packer().push_dict_with(|p| {
4351            p.push(Datum::String("a"));
4352            p.push(Datum::Int32(1));
4353        });
4354        let map_a1 = row_a1.unpack_first().unwrap_map();
4355
4356        let mut row_a2 = Row::default();
4357        row_a2.packer().push_dict_with(|p| {
4358            p.push(Datum::String("a"));
4359            p.push(Datum::Int32(2));
4360        });
4361        let map_a2 = row_a2.unpack_first().unwrap_map();
4362
4363        let mut row_b1 = Row::default();
4364        row_b1.packer().push_dict_with(|p| {
4365            p.push(Datum::String("b"));
4366            p.push(Datum::Int32(1));
4367        });
4368        let map_b1 = row_b1.unpack_first().unwrap_map();
4369
4370        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
4371        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
4372        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
4373        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
4374    }
4375
4376    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
4377    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
4378    #[mz_ore::test]
4379    fn test_datum_list_and_map_null_sorts_last() {
4380        // DatumList: [1] < [null] so non-null sorts before null
4381        let mut row_list_1 = Row::default();
4382        row_list_1
4383            .packer()
4384            .push_list_with(|p| p.push(Datum::Int32(1)));
4385        let list_1 = row_list_1.unpack_first().unwrap_list();
4386
4387        let mut row_list_null = Row::default();
4388        row_list_null
4389            .packer()
4390            .push_list_with(|p| p.push(Datum::Null));
4391        let list_null = row_list_null.unpack_first().unwrap_list();
4392
4393        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
4394        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
4395
4396        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
4397        let mut row_map_1 = Row::default();
4398        row_map_1.packer().push_dict_with(|p| {
4399            p.push(Datum::String("k"));
4400            p.push(Datum::Int32(1));
4401        });
4402        let map_1 = row_map_1.unpack_first().unwrap_map();
4403
4404        let mut row_map_null = Row::default();
4405        row_map_null.packer().push_dict_with(|p| {
4406            p.push(Datum::String("k"));
4407            p.push(Datum::Null);
4408        });
4409        let map_null = row_map_null.unpack_first().unwrap_map();
4410
4411        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
4412        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
4413    }
4414}